/* This file is part of GNUnet. Copyright (C) 2011, 2017 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . SPDX-License-Identifier: AGPL3.0-or-later */ /** * @file cadet/test_cadet_flow.c * @author Bart Polot * @author Christian Grothoff * @brief Test for flow control of CADET service */ #include #include "platform.h" #include "cadet_test_lib.h" #include "gnunet_cadet_service.h" #include "gnunet_statistics_service.h" #include /** * Ugly workaround to unify data handlers on incoming and outgoing channels. */ struct CadetTestChannelWrapper { /** * Channel pointer. */ struct GNUNET_CADET_Channel *ch; }; /** * How many messages to send by default. */ #define TOTAL_PACKETS_DEFAULT 500 /** * How long until we give up on connecting the peers? */ #define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 120) /** * Time to wait by default for stuff that should be rather fast. */ #define SHORT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20) /** * How fast do we send messages? */ #define SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 10) /** * How many packets to send. */ static unsigned int total_packets = TOTAL_PACKETS_DEFAULT; /** * Time to wait for fast operations. */ static struct GNUNET_TIME_Relative short_time; /** * Size of each test packet's payload */ static size_t size_payload = sizeof(uint32_t); /** * Operation to get peer ids. */ static struct GNUNET_TESTBED_Operation *t_op[2]; /** * Peer ids. */ static struct GNUNET_PeerIdentity *p_id[2]; /** * Port ID */ static struct GNUNET_HashCode port; /** * Peer ids counter. */ static unsigned int p_ids; /** * Is the setup initialized? */ static int initialized; /** * Number of payload packes sent. */ static int data_sent; /** * Number of payload packets received. */ static int data_received; /** * Number of payload packed acknowledgements sent. */ static int ack_sent; /** * Number of payload packed explicitly (app level) acknowledged. */ static int ack_received; /** * Total number of peers asked to run. */ static unsigned int peers_requested = 2; /** * Number of currently running peers (should be same as @c peers_requested). */ static unsigned int peers_running; /** * Test context (to shut down). */ struct GNUNET_CADET_TEST_Context *test_ctx; /** * Task called to disconnect peers. */ static struct GNUNET_SCHEDULER_Task *disconnect_task; /** * Task To perform tests */ static struct GNUNET_SCHEDULER_Task *test_task; /** * Task runnining #send_next_msg(). */ static struct GNUNET_SCHEDULER_Task *send_next_msg_task; /** * Cadet handle for the root peer */ static struct GNUNET_CADET_Handle *h1; /** * Cadet handle for the first leaf peer */ static struct GNUNET_CADET_Handle *h2; /** * Channel handle for the root peer */ static struct GNUNET_CADET_Channel *outgoing_ch; /** * Channel handle for the dest peer */ static struct GNUNET_CADET_Channel *incoming_ch; /** * Time we started the data transmission (after channel has been established * and initilized). */ static struct GNUNET_TIME_Absolute start_time; /** * Peers handle. */ static struct GNUNET_TESTBED_Peer **testbed_peers; /** * Statistics operation handle. */ static struct GNUNET_TESTBED_Operation *stats_op; /** * Keepalives sent. */ static unsigned int ka_sent; /** * Keepalives received. */ static unsigned int ka_received; /** * How many messages were dropped by CADET because of full buffers? */ static unsigned int msg_dropped; /** * Show the results of the test (banwidth acheived) and log them to GAUGER */ static void show_end_data(void) { static struct GNUNET_TIME_Absolute end_time; static struct GNUNET_TIME_Relative total_time; end_time = GNUNET_TIME_absolute_get(); total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time); fprintf(stderr, "\nResults of test \"%s\"\n", test_name); fprintf(stderr, "Test time %s\n", GNUNET_STRINGS_relative_time_to_string(total_time, GNUNET_YES)); fprintf(stderr, "Test bandwidth: %f kb/s\n", 4 * total_packets * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms fprintf(stderr, "Test throughput: %f packets/s\n\n", total_packets * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms GAUGER("CADET", test_name, total_packets * 1000.0 / (total_time.rel_value_us / 1000), "packets/s"); } /** * Shut down peergroup, clean up. * * @param cls Closure (unused). * @param tc Task Context. */ static void shutdown_task(void *cls) { GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n"); if (NULL != send_next_msg_task) { GNUNET_SCHEDULER_cancel(send_next_msg_task); send_next_msg_task = NULL; } if (NULL != test_task) { GNUNET_SCHEDULER_cancel(test_task); test_task = NULL; } for (unsigned int i = 0; i < 2; i++) GNUNET_TESTBED_operation_done(t_op[i]); if (NULL != outgoing_ch) { GNUNET_CADET_channel_destroy(outgoing_ch); outgoing_ch = NULL; } if (NULL != incoming_ch) { GNUNET_CADET_channel_destroy(incoming_ch); incoming_ch = NULL; } GNUNET_CADET_TEST_cleanup(test_ctx); } /** * Stats callback. Finish the stats testbed operation and when all stats have * been iterated, shutdown the test. * * @param cls Closure (line number from which termination was requested). * @param op the operation that has been finished * @param emsg error message in case the operation has failed; will be NULL if * operation has executed successfully. */ static void stats_cont(void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg) { GNUNET_log(GNUNET_ERROR_TYPE_INFO, "KA sent: %u, KA received: %u\n", ka_sent, ka_received); if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1))) { GNUNET_break(0); ok--; } GNUNET_TESTBED_operation_done(stats_op); if (NULL != disconnect_task) GNUNET_SCHEDULER_cancel(disconnect_task); disconnect_task = GNUNET_SCHEDULER_add_now(&disconnect_cadet_peers, cls); } /** * Process statistic values. * * @param cls closure (line number, unused) * @param peer the peer the statistic belong to * @param subsystem name of subsystem that created the statistic * @param name the name of the datum * @param value the current value * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration */ static int stats_iterator(void *cls, const struct GNUNET_TESTBED_Peer *peer, const char *subsystem, const char *name, uint64_t value, int is_persistent) { static const char *s_sent = "# keepalives sent"; static const char *s_recv = "# keepalives received"; static const char *rdrops = "# messages dropped due to full buffer"; static const char *cdrops = "# messages dropped due to slow client"; uint32_t i; i = GNUNET_TESTBED_get_index(peer); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i, subsystem, name, (unsigned long long)value); if (0 == strncmp(s_sent, name, strlen(s_sent)) && 0 == i) ka_sent = value; if (0 == strncmp(s_recv, name, strlen(s_recv)) && peers_requested - 1 == i) ka_received = value; if (0 == strncmp(rdrops, name, strlen(rdrops))) msg_dropped += value; if (0 == strncmp(cdrops, name, strlen(cdrops))) msg_dropped += value; return GNUNET_OK; } /** * Task to gather all statistics. * * @param cls Closure (line from which the task was scheduled). */ static void gather_stats_and_exit(void *cls) { long l = (long)cls; disconnect_task = NULL; GNUNET_log(GNUNET_ERROR_TYPE_INFO, "gathering statistics from line %ld\n", l); if (NULL != outgoing_ch) { GNUNET_CADET_channel_destroy(outgoing_ch); outgoing_ch = NULL; } stats_op = GNUNET_TESTBED_get_statistics(peers_running, testbed_peers, "cadet", NULL, &stats_iterator, stats_cont, cls); } /** * Abort test: schedule disconnect and shutdown immediately * * @param line Line in the code the abort is requested from (__LINE__). */ static void abort_test(long line) { if (NULL != disconnect_task) { GNUNET_SCHEDULER_cancel(disconnect_task); GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Aborting test from %ld\n", line); disconnect_task = GNUNET_SCHEDULER_add_now(&disconnect_cadet_peers, (void *)line); } } /** * Send a message on the channel with the appropriate size and payload. * * Update the appropriate *_sent counter. * * @param channel Channel to send the message on. */ static void send_test_message(struct GNUNET_CADET_Channel *channel) { struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *msg; uint32_t *data; int payload; int size; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending test message on channel %p\n", channel); size = size_payload; if (GNUNET_NO == initialized) { GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n"); size += 1000; payload = data_sent; if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer data_sent++; } else if (SPEED == test || SPEED_ACK == test) { if (get_target_channel() == channel) { payload = ack_sent; size += ack_sent; ack_sent++; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending ACK %u [%d bytes]\n", payload, size); } else { payload = data_sent; size += data_sent; data_sent++; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending DATA %u [%d bytes]\n", data_sent, size); } } else if (FORWARD == test) { payload = ack_sent; } else if (P2P_SIGNAL == test) { payload = data_sent; } else { GNUNET_assert(0); } env = GNUNET_MQ_msg_extra(msg, size, GNUNET_MESSAGE_TYPE_DUMMY); data = (uint32_t *)&msg[1]; *data = htonl(payload); GNUNET_MQ_send(GNUNET_CADET_get_mq(channel), env); } /** * Task to request a new data transmission in a SPEED test, without waiting * for previous messages to be sent/arrrive. * * @param cls Closure (unused). */ static void send_next_msg(void *cls) { struct GNUNET_CADET_Channel *channel; send_next_msg_task = NULL; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending next message: %d\n", data_sent); channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch; GNUNET_assert(NULL != channel); GNUNET_assert(SPEED == test); send_test_message(channel); if (data_sent < total_packets) { /* SPEED test: Send all messages as soon as possible */ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Scheduling message %d\n", data_sent + 1); send_next_msg_task = GNUNET_SCHEDULER_add_delayed(SEND_INTERVAL, &send_next_msg, NULL); } } /** * Check if payload is sane (size contains payload). * * @param cls should match #ch * @param message The actual message. * @return #GNUNET_OK to keep the channel open, * #GNUNET_SYSERR to close it (signal serious error). */ static int check_data(void *cls, const struct GNUNET_MessageHeader *message) { return GNUNET_OK; /* all is well-formed */ } /** * Function is called whenever a message is received. * * @param cls closure (set from GNUNET_CADET_connect(), peer number) * @param message the actual message */ static void handle_data(void *cls, const struct GNUNET_MessageHeader *message) { struct CadetTestChannelWrapper *ch = cls; struct GNUNET_CADET_Channel *channel = ch->ch; uint32_t *data; uint32_t payload; int *counter; GNUNET_CADET_receive_done(channel); counter = get_target_channel() == channel ? &data_received : &ack_received; if (channel == outgoing_ch) { GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Root client got a message.\n"); } else if (channel == incoming_ch) { GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Leaf client got a message.\n"); } else { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unknown channel %p.\n", channel); GNUNET_assert(0); } data = (uint32_t *)&message[1]; payload = ntohl(*data); if (payload == *counter) { GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Payload as expected: %u\n", payload); } else { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Received payload %u, expected: %u\n", payload, *counter); } (*counter)++; if (get_target_channel() == channel) /* Got "data" */ { GNUNET_log(GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received); if (data_received < total_packets) return; } else /* Got "ack" */ { if (SPEED_ACK == test || SPEED == test) { GNUNET_log(GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received); /* Send more data */ send_test_message(channel); if (ack_received < total_packets && SPEED != test) return; if (ok == 2 && SPEED == test) return; show_end_data(); } if (test == P2P_SIGNAL) { GNUNET_CADET_channel_destroy(incoming_ch); incoming_ch = NULL; } else { GNUNET_CADET_channel_destroy(outgoing_ch); outgoing_ch = NULL; } } } /** * Method called whenever a peer connects to a port in MQ-based CADET. * * @param cls Closure from #GNUNET_CADET_open_port (peer # as long). * @param channel New handle to the channel. * @param source Peer that started this channel. * @return Closure for the incoming @a channel. It's given to: * - The #GNUNET_CADET_DisconnectEventHandler (given to * #GNUNET_CADET_open_port) when the channel dies. * - Each the #GNUNET_MQ_MessageCallback handlers for each message * received on the @a channel. */ static void * connect_handler(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source) { struct CadetTestChannelWrapper *ch; long peer = (long)cls; GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Incoming channel from %s to %ld: %p\n", GNUNET_i2s(source), peer, channel); if (peer == peers_requested - 1) { if (NULL != incoming_ch) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Duplicate incoming channel for client %lu\n", (long)cls); GNUNET_assert(0); } incoming_ch = channel; } else { GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Incoming channel for unexpected peer #%lu\n", (long)cls); GNUNET_assert(0); } ch = GNUNET_new(struct CadetTestChannelWrapper); ch->ch = channel; return ch; } /** * Function called whenever an MQ-channel is destroyed, even if the destruction * was requested by #GNUNET_CADET_channel_destroy. * It must NOT call #GNUNET_CADET_channel_destroy on the channel. * * It should clean up any associated state, including cancelling any pending * transmission on this channel. * * @param cls Channel closure (channel wrapper). * @param channel Connection to the other end (henceforth invalid). */ static void disconnect_handler(void *cls, const struct GNUNET_CADET_Channel *channel) { struct CadetTestChannelWrapper *ch_w = cls; GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Channel disconnected at %d\n", ok); GNUNET_assert(ch_w->ch == channel); if (channel == incoming_ch) incoming_ch = NULL; else if (outgoing_ch == channel) outgoing_ch = NULL; else GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Disconnect on unknown channel %p\n", channel); if (NULL != disconnect_task) GNUNET_SCHEDULER_cancel(disconnect_task); disconnect_task = GNUNET_SCHEDULER_add_now(&gather_stats_and_exit, (void *)__LINE__); GNUNET_free(ch_w); } /** * Start the testcase, we know the peers and have handles to CADET. * * Testcase continues when the root receives confirmation of connected peers, * on callback function ch. * * @param cls Closure (unused). */ static void start_test(void *cls) { struct GNUNET_MQ_MessageHandler handlers[] = { GNUNET_MQ_hd_var_size(data, GNUNET_MESSAGE_TYPE_DUMMY, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_handler_end() }; struct CadetTestChannelWrapper *ch; enum GNUNET_CADET_ChannelOption flags; test_task = NULL; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "In start_test\n"); start_time = GNUNET_TIME_absolute_get(); ch = GNUNET_new(struct CadetTestChannelWrapper); outgoing_ch = GNUNET_CADET_channel_create(h1, ch, p_id[1], &port, flags, NULL, &disconnect_handler, handlers); ch->ch = outgoing_ch; GNUNET_assert(NULL == disconnect_task); disconnect_task = GNUNET_SCHEDULER_add_delayed(short_time, &gather_stats_and_exit, (void *)__LINE__); GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Sending data initializer on channel %p...\n", outgoing_ch); send_test_message(outgoing_ch); } /** * Callback to be called when the requested peer information is available * * @param cls the closure from GNUNET_TESTBED_peer_get_information() * @param op the operation this callback corresponds to * @param pinfo the result; will be NULL if the operation has failed * @param emsg error message if the operation has failed; * NULL if the operation is successfull */ static void pi_cb(void *cls, struct GNUNET_TESTBED_Operation *op, const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg) { long i = (long)cls; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "ID callback for %ld\n", i); if ((NULL == pinfo) || (NULL != emsg)) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg); abort_test(__LINE__); return; } p_id[i] = pinfo->result.id; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "id: %s\n", GNUNET_i2s(p_id[i])); p_ids++; if (p_ids < 2) return; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n"); test_task = GNUNET_SCHEDULER_add_now(&start_test, NULL); } /** * test main: start test when all peers are connected * * @param cls Closure. * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end. * @param num_peers Number of peers that are running. * @param peers Array of peers. * @param cadets Handle to each of the CADETs of the peers. */ static void tmain(void *cls, struct GNUNET_CADET_TEST_Context *ctx, unsigned int num_peers, struct GNUNET_TESTBED_Peer **peers, struct GNUNET_CADET_Handle **cadets) { GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "test main\n"); test_ctx = ctx; peers_running = num_peers; GNUNET_assert(peers_running == peers_requested); testbed_peers = peers; h1 = cadets[0]; h2 = cadets[num_peers - 1]; GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL); p_ids = 0; t_op[0] = GNUNET_TESTBED_peer_get_information(peers[0], GNUNET_TESTBED_PIT_IDENTITY, &pi_cb, (void *)0L); t_op[1] = GNUNET_TESTBED_peer_get_information(peers[num_peers - 1], GNUNET_TESTBED_PIT_IDENTITY, &pi_cb, (void *)1L); GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n"); } /** * Main: start test */ int main(int argc, char *argv[]) { static const struct GNUNET_HashCode *ports[2]; struct GNUNET_MQ_MessageHandler handlers[] = { GNUNET_MQ_hd_var_size(data, GNUNET_MESSAGE_TYPE_DUMMY, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_handler_end() }; const char *config_file = "test_cadet.conf"; char port_id[] = "test port"; struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_relative_time('t', "time", "short_time", gettext_noop("set short timeout"), &short_time), GNUNET_GETOPT_option_uint('m', "messages", "NUM_MESSAGES", gettext_noop("set number of messages to send"), &total_packets), GNUNET_GETOPT_option_uint('p', "peers", "NUM_PEERS", gettext_noop("number of peers to launch"), &peers_requested), GNUNET_GETOPT_OPTION_END }; GNUNET_log_setup("test-cadet-flow", "DEBUG", NULL); total_packets = TOTAL_PACKETS; short_time = SHORT_TIME; if (-1 == GNUNET_GETOPT_run(argv[0], options, argc, argv)) { fprintf(stderr, "test failed: problem with CLI parameters\n"); return 1; } GNUNET_CRYPTO_hash(port_id, sizeof(port_id), &port); ports[0] = &port; ports[1] = NULL; GNUNET_CADET_TEST_ruN("test_cadet_flow", config_file, peers_requested, &tmain, NULL, /* tmain cls */ &connect_handler, NULL, &disconnect_handler, handlers, ports); return 0; } /* end of test_cadet_flow.c */