summaryrefslogtreecommitdiff
path: root/src/transport/transport-testing-communicator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport-testing-communicator.c')
-rw-r--r--src/transport/transport-testing-communicator.c1164
1 files changed, 1164 insertions, 0 deletions
diff --git a/src/transport/transport-testing-communicator.c b/src/transport/transport-testing-communicator.c
new file mode 100644
index 000000000..6d74b12e8
--- /dev/null
+++ b/src/transport/transport-testing-communicator.c
@@ -0,0 +1,1164 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2019 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file transport/transport-testing-communicator.c
+ * @brief functions related to testing-tng
+ * @author Christian Grothoff
+ * @author Julius B√ľnger
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_constants.h"
+#include "transport-testing-communicator.h"
+#include "gnunet_ats_transport_service.h"
+#include "gnunet_hello_lib.h"
+#include "gnunet_signatures.h"
+#include "transport.h"
+#include <inttypes.h>
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
+
+struct MyClient
+{
+ struct MyClient *prev;
+ struct MyClient *next;
+ /**
+ * @brief Handle to the client
+ */
+ struct GNUNET_SERVICE_Client *client;
+
+ /**
+ * @brief Handle to the client
+ */
+ struct GNUNET_MQ_Handle *c_mq;
+
+ /**
+ * The TCH
+ */
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc;
+
+};
+
+/**
+ * @brief Queue of a communicator and some context
+ */
+struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
+{
+ /**
+ * @brief Handle to the TransportCommunicator
+ */
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
+
+ /**
+ * @brief Envelope to a message that requests the opening of the queue.
+ *
+ * If the client already requests queue(s), but the communicator is not yet
+ * connected, we cannot send the request to open the queue. Save it until the
+ * communicator becomes available and send it then.
+ */
+ struct GNUNET_MQ_Envelope *open_queue_env;
+
+ /**
+ * @brief Peer ID of the peer on the other side of the queue
+ */
+ struct GNUNET_PeerIdentity peer_id;
+
+ /**
+ * @brief Queue ID
+ */
+ uint32_t qid;
+
+ /**
+ * @brief Current message id
+ */
+ uint64_t mid;
+
+ /**
+ * An `enum GNUNET_NetworkType` in NBO.
+ */
+ uint32_t nt;
+
+ /**
+ * Maximum transmission unit. UINT32_MAX for unlimited.
+ */
+ uint32_t mtu;
+
+ /**
+ * Queue length. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Queue prio
+ */
+ uint32_t priority;
+
+ /**
+ * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
+ */
+ uint32_t cs;
+
+ /**
+ * @brief Next element inside a DLL
+ */
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *next;
+
+ /**
+ * @brief Previous element inside a DLL
+ */
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *prev;
+};
+
+
+/**
+ * @brief Handle/Context to a single transmission
+ */
+struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission
+{
+};
+
+
+/**
+ * @brief Check whether incoming msg indicating available communicator is
+ * correct
+ *
+ * @param cls Closure
+ * @param msg Message struct
+ *
+ * @return GNUNET_YES in case message is correct
+ */
+static int
+check_communicator_available (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
+{
+ uint16_t size;
+
+ size = ntohs (msg->header.size) - sizeof(*msg);
+ if (0 == size)
+ return GNUNET_OK; /* receive-only communicator */
+ GNUNET_MQ_check_zero_termination (msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * @brief Handle new communicator
+ *
+ * Store characteristics of communicator, call respective client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_communicator_available (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ uint16_t size;
+ tc_h->c_mq = client->c_mq;
+
+ size = ntohs (msg->header.size) - sizeof(*msg);
+ if (0 == size)
+ {
+ GNUNET_SERVICE_client_continue (client->client);
+ return; /* receive-only communicator */
+ }
+ tc_h->c_characteristics = ntohl (msg->cc);
+ tc_h->c_addr_prefix = GNUNET_strdup ((const char *) &msg[1]);
+ if (NULL != tc_h->communicator_available_cb)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "calling communicator_available_cb()\n");
+ tc_h->communicator_available_cb (tc_h->cb_cls,
+ tc_h,
+ tc_h->c_characteristics,
+ tc_h->c_addr_prefix);
+ }
+ GNUNET_SERVICE_client_continue (client->client);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "finished communicator_available_cb()\n");
+
+}
+
+
+/**
+ * Incoming message. Test message is well-formed.
+ *
+ * @param cls the client
+ * @param msg the send message that was sent
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_communicator_backchannel (void *cls,
+ const struct
+ GNUNET_TRANSPORT_CommunicatorBackchannel *msg)
+{
+ // struct TransportClient *tc = cls;
+
+ // if (CT_COMMUNICATOR != tc->type)
+ // {
+ // GNUNET_break (0);
+ // return GNUNET_SYSERR;
+ // }
+ // GNUNET_MQ_check_boxed_message (msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * @brief Receive an incoming message.
+ *
+ * Pass the message to the client.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message
+ */
+static void
+handle_communicator_backchannel (void *cls,
+ const struct
+ GNUNET_TRANSPORT_CommunicatorBackchannel *
+ bc_msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *other_tc_h;
+ struct GNUNET_MessageHeader *msg;
+ msg = (struct GNUNET_MessageHeader *) &bc_msg[1];
+ uint16_t isize = ntohs (msg->size);
+ const char *target_communicator = ((const char *) msg) + isize;
+ struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
+ struct GNUNET_MQ_Envelope *env;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received backchannel message\n");
+ if (tc_h->bc_enabled != GNUNET_YES)
+ {
+ GNUNET_SERVICE_client_continue (client->client);
+ return;
+ }
+ /* Find client providing this communicator */
+ /* Finally, deliver backchannel message to communicator */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivering backchannel message of type %u to %s\n",
+ ntohs (msg->type),
+ target_communicator);
+ other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
+ GNUNET_PeerIdentity*) &bc_msg->pid);
+ env = GNUNET_MQ_msg_extra (
+ cbi,
+ isize,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
+ cbi->pid = tc_h->peer_id;
+ memcpy (&cbi[1], msg, isize);
+
+
+ GNUNET_MQ_send (other_tc_h->c_mq, env);
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * Address of our peer added. Test message is well-formed.
+ *
+ * @param cls the client
+ * @param aam the send message that was sent
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_address (void *cls,
+ const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
+{
+ // if (CT_COMMUNICATOR != tc->type)
+ // {
+ // GNUNET_break (0);
+ // return GNUNET_SYSERR;
+ // }
+ GNUNET_MQ_check_zero_termination (msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * @brief The communicator informs about an address.
+ *
+ * Store address and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message
+ */
+static void
+handle_add_address (void *cls,
+ const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ uint16_t size;
+ size = ntohs (msg->header.size) - sizeof(*msg);
+ if (0 == size)
+ return; /* receive-only communicator */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "received add address cb %u\n", size);
+ tc_h->c_address = GNUNET_strdup ((const char *) &msg[1]);
+ if (NULL != tc_h->add_address_cb)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "calling add_address_cb()\n");
+ tc_h->add_address_cb (tc_h->cb_cls,
+ tc_h,
+ tc_h->c_address,
+ GNUNET_TIME_relative_ntoh (msg->expiration),
+ msg->aid,
+ ntohl (msg->nt));
+ }
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * Incoming message. Test message is well-formed.
+ *
+ * @param cls the client
+ * @param msg the send message that was sent
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_incoming_msg (void *cls,
+ const struct GNUNET_TRANSPORT_IncomingMessage *msg)
+{
+ // struct TransportClient *tc = cls;
+
+ // if (CT_COMMUNICATOR != tc->type)
+ // {
+ // GNUNET_break (0);
+ // return GNUNET_SYSERR;
+ // }
+ GNUNET_MQ_check_boxed_message (msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * @brief Receive an incoming message.
+ *
+ * Pass the message to the client.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message
+ */
+static void
+handle_incoming_msg (void *cls,
+ const struct GNUNET_TRANSPORT_IncomingMessage *inc_msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_MessageHeader *msg;
+ msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
+ size_t payload_len = ntohs (msg->size) - sizeof (struct
+ GNUNET_MessageHeader);
+ if (NULL != tc_h->incoming_msg_cb)
+ {
+ tc_h->incoming_msg_cb (tc_h->cb_cls,
+ tc_h,
+ (char*) &msg[1],
+ payload_len);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Incoming message from communicator but no handler!\n");
+ }
+ if (GNUNET_YES == ntohl (inc_msg->fc_on))
+ {
+ /* send ACK when done to communicator for flow control! */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+ env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+ GNUNET_assert (NULL != env);
+ ack->reserved = htonl (0);
+ ack->fc_id = inc_msg->fc_id;
+ ack->sender = inc_msg->sender;
+ GNUNET_MQ_send (tc_h->c_mq, env);
+ }
+
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * @brief Communicator informs that it tries to establish requested queue
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message
+ */
+static void
+handle_queue_create_ok (void *cls,
+ const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+
+ if (NULL != tc_h->queue_create_reply_cb)
+ {
+ tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_YES);
+ }
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * @brief Communicator informs that it wont try establishing requested queue.
+ *
+ * It will not do so probably because the address is bougus (see comment to
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL)
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message
+ */
+static void
+handle_queue_create_fail (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+
+ if (NULL != tc_h->queue_create_reply_cb)
+ {
+ tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_NO);
+ }
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * New queue became available. Check message.
+ *
+ * @param cls the client
+ * @param aqm the send message that was sent
+ */
+static int
+check_add_queue_message (void *cls,
+ const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
+{
+ GNUNET_MQ_check_zero_termination (aqm);
+ return GNUNET_OK;
+}
+
+
+/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_add_queue_message (void *cls,
+ const struct GNUNET_TRANSPORT_AddQueueMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got queue with ID %u\n", msg->qid);
+ for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
+ {
+ if (tc_queue->qid == msg->qid)
+ break;
+ }
+ if (NULL == tc_queue)
+ {
+ tc_queue =
+ GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
+ tc_queue->tc_h = tc_h;
+ tc_queue->qid = msg->qid;
+ tc_queue->peer_id = msg->receiver;
+ GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
+ }
+ GNUNET_assert (tc_queue->qid == msg->qid);
+ GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+ tc_queue->nt = msg->nt;
+ tc_queue->mtu = ntohl (msg->mtu);
+ tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len = GNUNET_ntohll (msg->q_len);
+ if (NULL != tc_h->add_queue_cb)
+ {
+ tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
+ }
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_update_queue_message (void *cls,
+ const struct
+ GNUNET_TRANSPORT_UpdateQueueMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received queue update message for %u with q_len %" PRIu64 "\n",
+ msg->qid, GNUNET_ntohll (msg->q_len));
+ tc_queue = tc_h->queue_head;
+ if (NULL != tc_queue)
+ {
+ while (tc_queue->qid != msg->qid)
+ {
+ tc_queue = tc_queue->next;
+ }
+ }
+ GNUNET_assert (tc_queue->qid == msg->qid);
+ GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+ tc_queue->nt = msg->nt;
+ tc_queue->mtu = ntohl (msg->mtu);
+ tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ // Uncomment this for alternativ 1 of backchannel functionality
+ tc_queue->q_len += GNUNET_ntohll (msg->q_len);
+ // Until here for alternativ 1
+ // Uncomment this for alternativ 2 of backchannel functionality
+ // tc_queue->q_len = GNUNET_ntohll (msg->q_len);
+ // Until here for alternativ 2
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
+ * @brief Shut down the service
+ *
+ * @param cls Closure - Handle to the service
+ */
+static void
+shutdown_service (void *cls)
+{
+ struct GNUNET_SERVICE_Handle *h = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Shutting down service!\n");
+
+ GNUNET_SERVICE_stop (h);
+}
+
+
+/**
+ * @brief Callback called when new Client (Communicator) connects
+ *
+ * @param cls Closure - TransporCommmunicator Handle
+ * @param client Client
+ * @param mq Messagequeue
+ *
+ * @return TransportCommunicator Handle
+ */
+static void *
+connect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
+{
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+ struct MyClient *new_c;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected to %p.\n",
+ client, tc_h);
+ new_c = GNUNET_new (struct MyClient);
+ new_c->client = client;
+ new_c->c_mq = mq;
+ new_c->tc = tc_h;
+ GNUNET_CONTAINER_DLL_insert (tc_h->client_head,
+ tc_h->client_tail,
+ new_c);
+
+ if (NULL == tc_h->queue_head)
+ return new_c;
+ /* Iterate over queues. They are yet to be opened. Request opening. */
+ for (struct
+ GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_iter =
+ tc_h->queue_head;
+ NULL != tc_queue_iter;
+ tc_queue_iter = tc_queue_iter->next)
+ {
+ if (NULL == tc_queue_iter->open_queue_env)
+ continue;
+ /* Send the previously created mq envelope to request the creation of the
+ * queue. */
+ GNUNET_MQ_send (tc_h->c_mq,
+ tc_queue_iter->open_queue_env);
+ tc_queue_iter->open_queue_env = NULL;
+ }
+ return new_c;
+}
+
+
+/**
+ * @brief Callback called when Client disconnects
+ *
+ * @param cls Closure - TransportCommunicator Handle
+ * @param client Client
+ * @param internal_cls TransporCommmunicator Handle
+ */
+static void
+disconnect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *internal_cls)
+{
+ struct MyClient *cl = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
+
+ for (cl = tc_h->client_head; NULL != cl; cl = cl->next)
+ {
+ if (cl->client != client)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (tc_h->client_head,
+ tc_h->client_tail,
+ cl);
+ if (cl->c_mq == tc_h->c_mq)
+ tc_h->c_mq = NULL;
+ GNUNET_free (cl);
+ break;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected.\n");
+}
+
+
+/**
+ * Message was transmitted. Process the request.
+ *
+ * @param cls the client
+ * @param sma the send message that was sent
+ */
+static void
+handle_send_message_ack (void *cls,
+ const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+{
+ struct MyClient *client = cls;
+ GNUNET_SERVICE_client_continue (client->client);
+ // NOP
+}
+
+
+/**
+ * @brief Start the communicator part of the transport service
+ *
+ * @param communicator_available Callback to be called when a new communicator
+ * becomes available
+ * @param cfg Configuration
+ */
+static void
+transport_communicator_start (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
+{
+ struct GNUNET_MQ_MessageHandler mh[] = {
+ GNUNET_MQ_hd_var_size (communicator_available,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
+ struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
+ tc_h),
+ GNUNET_MQ_hd_var_size (communicator_backchannel,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
+ struct GNUNET_TRANSPORT_CommunicatorBackchannel,
+ tc_h),
+ GNUNET_MQ_hd_var_size (add_address,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
+ struct GNUNET_TRANSPORT_AddAddressMessage,
+ tc_h),
+ // GNUNET_MQ_hd_fixed_size (del_address,
+ // GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
+ // struct GNUNET_TRANSPORT_DelAddressMessage,
+ // NULL),
+ GNUNET_MQ_hd_var_size (incoming_msg,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
+ struct GNUNET_TRANSPORT_IncomingMessage,
+ tc_h),
+ GNUNET_MQ_hd_fixed_size (queue_create_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
+ struct GNUNET_TRANSPORT_CreateQueueResponse,
+ tc_h),
+ GNUNET_MQ_hd_fixed_size (queue_create_fail,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
+ struct GNUNET_TRANSPORT_CreateQueueResponse,
+ tc_h),
+ GNUNET_MQ_hd_var_size (add_queue_message,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
+ struct GNUNET_TRANSPORT_AddQueueMessage,
+ tc_h),
+ GNUNET_MQ_hd_fixed_size (update_queue_message,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
+ struct GNUNET_TRANSPORT_UpdateQueueMessage,
+ tc_h),
+ // GNUNET_MQ_hd_fixed_size (del_queue_message,
+ // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
+ // struct GNUNET_TRANSPORT_DelQueueMessage,
+ // NULL),
+ GNUNET_MQ_hd_fixed_size (send_message_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
+ struct GNUNET_TRANSPORT_SendMessageToAck,
+ tc_h),
+ GNUNET_MQ_handler_end ()
+ };
+
+
+ tc_h->sh = GNUNET_SERVICE_start ("transport",
+ tc_h->cfg,
+ &connect_cb,
+ &disconnect_cb,
+ tc_h,
+ mh);
+ GNUNET_assert (NULL != tc_h->sh);
+}
+
+
+/**
+ * @brief Task run at shutdown to kill communicator and clean up
+ *
+ * @param cls Closure - Process of communicator
+ */
+static void
+shutdown_process (struct GNUNET_OS_Process *proc)
+{
+ if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Error shutting down process with SIGERM, trying SIGKILL\n");
+ if (0 != GNUNET_OS_process_kill (proc, SIGKILL))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Error shutting down process with SIGERM and SIGKILL\n");
+ }
+ }
+ GNUNET_OS_process_destroy (proc);
+}
+
+static void
+shutdown_peerstore (void *cls)
+{
+ struct GNUNET_OS_Process *proc = cls;
+ shutdown_process (proc);
+}
+
+static void
+shutdown_communicator (void *cls)
+{
+ struct GNUNET_OS_Process *proc = cls;
+ shutdown_process (proc);
+}
+
+
+/**
+ * @brief Start the communicator
+ *
+ * @param cfgname Name of the communicator
+ */
+static void
+communicator_start (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
+ const char *binary_name)
+{
+ char *binary;
+ char *loprefix;
+ char *section_name;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "communicator_start\n");
+
+ section_name = strchr (binary_name, '-');
+ section_name++;
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (tc_h->cfg,
+ section_name,
+ "PREFIX",
+ &loprefix))
+ loprefix = GNUNET_strdup ("");
+
+
+ binary = GNUNET_OS_get_libexec_binary_path (binary_name);
+ tc_h->c_proc = GNUNET_OS_start_process_s (GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
+ NULL,
+ loprefix,
+ binary,
+ binary_name,
+ "-c",
+ tc_h->cfg_filename,
+ NULL);
+ if (NULL == tc_h->c_proc)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start communicator!");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_INFO, "started communicator\n");
+ GNUNET_free (binary);
+}
+
+
+/**
+ * @brief Task run at shutdown to kill communicator and clean up
+ *
+ * @param cls Closure - Process of communicator
+ */
+static void
+shutdown_nat (void *cls)
+{
+ struct GNUNET_OS_Process *proc = cls;
+ shutdown_process (proc);
+}
+
+
+/**
+ * @brief Task run at shutdown to kill the resolver process
+ *
+ * @param cls Closure - Process of communicator
+ */
+static void
+shutdown_resolver (void *cls)
+{
+ struct GNUNET_OS_Process *proc = cls;
+ shutdown_process (proc);
+}
+
+
+static void
+resolver_start (struct
+ GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
+{
+ char *binary;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "resolver_start\n");
+ binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-resolver");
+ tc_h->resolver_proc = GNUNET_OS_start_process (
+ GNUNET_OS_INHERIT_STD_OUT_AND_ERR
+ | GNUNET_OS_USE_PIPE_CONTROL,
+ NULL,
+ NULL,
+ NULL,
+ binary,
+ "gnunet-service-resolver",
+ "-c",
+ tc_h->cfg_filename,
+ NULL);
+ if (NULL == tc_h->resolver_proc)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start resolver service!");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_INFO, "started resolver service\n");
+ GNUNET_free (binary);
+
+}
+
+
+/**
+ * @brief Start Peerstore
+ *
+ */
+static void
+peerstore_start (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
+{
+ char *binary;
+
+ binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-peerstore");
+ tc_h->ps_proc = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
+ NULL,
+ NULL,
+ NULL,
+ binary,
+ "gnunet-service-peerstore",
+ "-c",
+ tc_h->cfg_filename,
+ NULL);
+ if (NULL == tc_h->ps_proc)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start Peerstore!");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_INFO, "started Peerstore\n");
+ GNUNET_free (binary);
+}
+
+/**
+ * @brief Start NAT
+ *
+ */
+static void
+nat_start (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
+{
+ char *binary;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "nat_start\n");
+ binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-nat");
+ tc_h->nat_proc = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_OUT_AND_ERR
+ | GNUNET_OS_USE_PIPE_CONTROL,
+ NULL,
+ NULL,
+ NULL,
+ binary,
+ "gnunet-service-nat",
+ "-c",
+ tc_h->cfg_filename,
+ NULL);
+ if (NULL == tc_h->nat_proc)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start NAT!");
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_INFO, "started NAT\n");
+ GNUNET_free (binary);
+}
+
+
+/**
+ * @brief Start communicator part of transport service and communicator
+ *
+ * @param service_name Name of the service
+ * @param cfg Configuration handle
+ * @param communicator_available_cb Callback that is called when a new
+ * @param add_address_cb Callback that is called when a new
+ * communicator becomes available
+ * @param cb_cls Closure to @a communicator_available_cb and @a
+ *
+ * @return Handle to the communicator duo
+ */
+struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
+GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
+ const char *service_name,
+ const char *binary_name,
+ const char *cfg_filename,
+ const struct GNUNET_PeerIdentity *peer_id,
+ GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback
+ communicator_available_cb,
+ GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb,
+ GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb,
+ GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb,
+ GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_message_cb,
+ GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb,
+ void *cb_cls)
+{
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting new transport/communicator combo with config %s\n",
+ cfg_filename);
+ tc_h =
+ GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle);
+ tc_h->cfg_filename = GNUNET_strdup (cfg_filename);
+ tc_h->cfg = GNUNET_CONFIGURATION_create ();
+ if ((GNUNET_SYSERR == GNUNET_CONFIGURATION_load (tc_h->cfg, cfg_filename)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _ ("Malformed configuration file `%s', exit ...\n"),
+ cfg_filename);
+ GNUNET_free (tc_h->cfg_filename);
+ GNUNET_CONFIGURATION_destroy (tc_h->cfg);
+ GNUNET_free (tc_h);
+ return NULL;
+ }
+ tc_h->bc_enabled = GNUNET_CONFIGURATION_get_value_yesno (tc_h->cfg,
+ "communicator-test",
+ "BACKCHANNEL_ENABLED");
+ tc_h->communicator_available_cb = communicator_available_cb;
+ tc_h->add_address_cb = add_address_cb;
+ tc_h->queue_create_reply_cb = queue_create_reply_cb;
+ tc_h->add_queue_cb = add_queue_cb;
+ tc_h->incoming_msg_cb = incoming_message_cb;
+ tc_h->bc_cb = bc_cb;
+ tc_h->peer_id = *peer_id;
+ tc_h->cb_cls = cb_cls;
+
+ /* Start communicator part of service */
+ transport_communicator_start (tc_h);
+ /* Start NAT */
+ nat_start (tc_h);
+ /* Start resolver service */
+ resolver_start (tc_h);
+ /* Start peerstore service */
+ peerstore_start (tc_h);
+ /* Schedule start communicator */
+ communicator_start (tc_h,
+ binary_name);
+ return tc_h;
+}
+
+
+void
+GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
+{
+ shutdown_communicator (tc_h->c_proc);
+ shutdown_service (tc_h->sh);
+ shutdown_nat (tc_h->nat_proc);
+ shutdown_resolver (tc_h->resolver_proc);
+ shutdown_peerstore (tc_h->ps_proc);
+ GNUNET_CONFIGURATION_destroy (tc_h->cfg);
+ GNUNET_free (tc_h);
+}
+
+
+/**
+ * @brief Instruct communicator to open a queue
+ *
+ * @param tc_h Handle to communicator which shall open queue
+ * @param peer_id Towards which peer
+ * @param address For which address
+ */
+void
+GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
+ const struct GNUNET_PeerIdentity *peer_id,
+ const char *address)
+{
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+ static uint32_t idgen;
+ char *prefix;
+ struct GNUNET_TRANSPORT_CreateQueue *msg;
+ struct GNUNET_MQ_Envelope *env;
+ size_t alen;
+
+ tc_queue =
+ GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
+ tc_queue->tc_h = tc_h;
+ prefix = GNUNET_HELLO_address_to_prefix (address);
+ if (NULL == prefix)
+ {
+ GNUNET_break (0); /* We got an invalid address!? */
+ GNUNET_free (tc_queue);
+ return;
+ }
+ GNUNET_free (prefix);
+ alen = strlen (address) + 1;
+ env =
+ GNUNET_MQ_msg_extra (msg, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
+ msg->request_id = htonl (idgen++);
+ tc_queue->qid = msg->request_id;
+ msg->receiver = *peer_id;
+ tc_queue->peer_id = *peer_id;
+ memcpy (&msg[1], address, alen);
+ if (NULL != tc_h->c_mq)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending queue create immediately\n");
+ GNUNET_MQ_send (tc_h->c_mq, env);
+ }
+ else
+ {
+ tc_queue->open_queue_env = env;
+ }
+ GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
+}
+
+
+/**
+ * @brief Instruct communicator to send data
+ *
+ * @param tc_queue The queue to use for sending
+ * @param cont function to call when done sending
+ * @param cont_cls closure for @a cont
+ * @param payload Data to send
+ * @param payload_size Size of the @a payload
+ */
+void
+GNUNET_TRANSPORT_TESTING_transport_communicator_send
+ (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
+ GNUNET_SCHEDULER_TaskCallback cont,
+ void *cont_cls,
+ const void *payload,
+ size_t payload_size)
+{
+ struct GNUNET_MessageHeader *mh;
+ struct GNUNET_TRANSPORT_SendMessageTo *msg;
+ struct GNUNET_MQ_Envelope *env;
+ size_t inbox_size;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
+
+ tc_queue = NULL;
+ for (tc_queue_tmp = tc_h->queue_head;
+ NULL != tc_queue_tmp;
+ tc_queue_tmp = tc_queue_tmp->next)
+ {
+ if (tc_queue_tmp->q_len <= 0)
+ continue;
+ if (NULL == tc_queue)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ continue;
+ }
+ if (tc_queue->priority < tc_queue_tmp->priority)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ }
+ }
+ GNUNET_assert (NULL != tc_queue);
+ // Uncomment this for alternativ 1 of backchannel functionality
+ if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
+ tc_queue->q_len--;
+ // Until here for alternativ 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message\n");
+ inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
+ env = GNUNET_MQ_msg_extra (msg,
+ inbox_size,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
+ GNUNET_assert (NULL != env);
+ msg->qid = htonl (tc_queue->qid);
+ msg->mid = tc_queue->mid++;
+ msg->receiver = tc_queue->peer_id;
+ mh = (struct GNUNET_MessageHeader *) &msg[1];
+ mh->size = htons (inbox_size);
+ mh->type = GNUNET_MESSAGE_TYPE_DUMMY;
+ memcpy (&mh[1],
+ payload,
+ payload_size);
+ if (NULL != cont)
+ GNUNET_MQ_notify_sent (env,
+ cont,
+ cont_cls);
+ GNUNET_MQ_send (tc_queue->tc_h->c_mq,
+ env);
+}