From 95f9076a2139f5fb042b944a0658b6cda2fa35db Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 30 Apr 2016 08:17:37 +0000 Subject: implementing new scheduler shutdown semantics --- src/cadet/gnunet-cadet-profiler.c | 224 ++++++++++++++++++++++---------------- 1 file changed, 128 insertions(+), 96 deletions(-) (limited to 'src/cadet/gnunet-cadet-profiler.c') diff --git a/src/cadet/gnunet-cadet-profiler.c b/src/cadet/gnunet-cadet-profiler.c index ffa993f8e..b2a07cb0c 100644 --- a/src/cadet/gnunet-cadet-profiler.c +++ b/src/cadet/gnunet-cadet-profiler.c @@ -147,7 +147,12 @@ struct CadetPeer /** * Task to do the next ping. */ - struct GNUNET_SCHEDULER_Task * ping_task; + struct GNUNET_SCHEDULER_Task *ping_task; + + /** + * NTR operation for the next ping. + */ + struct GNUNET_CADET_TransmitHandle *ping_ntr; float mean[number_rounds]; float var[number_rounds]; @@ -179,7 +184,7 @@ static struct GNUNET_TESTBED_Operation *stats_op; /** * Operation to get peer ids. */ -struct CadetPeer *peers; +static struct CadetPeer *peers; /** * Peer ids counter. @@ -206,20 +211,15 @@ static unsigned long long peers_pinging; */ static struct GNUNET_CADET_TEST_Context *test_ctx; -/** - * Task called to shutdown test. - */ -static struct GNUNET_SCHEDULER_Task * shutdown_handle; - /** * Task called to disconnect peers, before shutdown. */ -static struct GNUNET_SCHEDULER_Task * disconnect_task; +static struct GNUNET_SCHEDULER_Task *disconnect_task; /** * Task to perform tests */ -static struct GNUNET_SCHEDULER_Task * test_task; +static struct GNUNET_SCHEDULER_Task *test_task; /** * Round number. @@ -241,6 +241,11 @@ static unsigned int peers_warmup; */ static int test_finished; +/** + * Task running each round of the benchmark. + */ +static struct GNUNET_SCHEDULER_Task *round_task; + /** * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES. @@ -309,19 +314,6 @@ show_end_data (void) } -/** - * Shut down peergroup, clean up. - * - * @param cls Closure (unused). - */ -static void -shutdown_task (void *cls) -{ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Ending test.\n"); - shutdown_handle = NULL; -} - - /** * Disconnect from cadet services af all peers, call shutdown. * @@ -334,7 +326,8 @@ disconnect_cadet_peers (void *cls) unsigned int i; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "disconnecting cadet service, called from line %ld\n", line); + "disconnecting cadet service, called from line %ld\n", + line); disconnect_task = NULL; for (i = 0; i < peers_total; i++) { @@ -346,28 +339,56 @@ disconnect_cadet_peers (void *cls) if (NULL != peers[i].ch) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: channel %p\n", i, peers[i].ch); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u: channel %p\n", i, peers[i].ch); GNUNET_CADET_channel_destroy (peers[i].ch); } if (NULL != peers[i].warmup_ch) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: warmup channel %p\n", + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u: warmup channel %p\n", i, peers[i].warmup_ch); GNUNET_CADET_channel_destroy (peers[i].warmup_ch); } if (NULL != peers[i].incoming_ch) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: incoming channel %p\n", + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u: incoming channel %p\n", i, peers[i].incoming_ch); GNUNET_CADET_channel_destroy (peers[i].incoming_ch); } } GNUNET_CADET_TEST_cleanup (test_ctx); - if (NULL != shutdown_handle) + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Shut down peergroup, clean up. + * + * @param cls Closure (unused). + */ +static void +shutdown_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Ending test.\n"); + if (NULL != disconnect_task) { - GNUNET_SCHEDULER_cancel (shutdown_handle); + GNUNET_SCHEDULER_cancel (disconnect_task); + disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, + (void *) __LINE__); + } + if (NULL != round_task) + { + GNUNET_SCHEDULER_cancel (round_task); + round_task = NULL; + } + if (NULL != test_task) + { + GNUNET_SCHEDULER_cancel (test_task); + test_task = NULL; } - shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); } @@ -418,13 +439,16 @@ stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg) * @param subsystem name of subsystem that created the statistic * @param name the name of the datum * @param value the current value - * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not - * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration */ static int -stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer, - const char *subsystem, const char *name, - uint64_t value, int is_persistent) +stats_iterator (void *cls, + const struct GNUNET_TESTBED_Peer *peer, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) { uint32_t i; @@ -444,16 +468,13 @@ stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer, static void collect_stats (void *cls) { - const struct GNUNET_SCHEDULER_TaskContext *tc; - - tc = GNUNET_SCHEDULER_get_task_context (); - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n"); - stats_op = GNUNET_TESTBED_get_statistics (peers_total, testbed_handles, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Start collecting statistics...\n"); + stats_op = GNUNET_TESTBED_get_statistics (peers_total, + testbed_handles, NULL, NULL, - stats_iterator, stats_cont, NULL); + &stats_iterator, + &stats_cont, NULL); } @@ -465,17 +486,12 @@ collect_stats (void *cls) static void finish_profiler (void *cls) { - const struct GNUNET_SCHEDULER_TaskContext *tc; - - tc = GNUNET_SCHEDULER_get_task_context (); - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - test_finished = GNUNET_YES; - show_end_data(); + show_end_data (); GNUNET_SCHEDULER_add_now (&collect_stats, NULL); } + /** * Set the total number of running peers. * @@ -515,9 +531,15 @@ adjust_running_peers (unsigned int target) run ? "arting" : "opping", r, GNUNET_i2s (&peers[r].id)); if (NULL != peers[r].ping_task) + { GNUNET_SCHEDULER_cancel (peers[r].ping_task); - peers[r].ping_task = NULL; - + peers[r].ping_task = NULL; + } + if (NULL != peers[r].ping_ntr) + { + GNUNET_CADET_notify_transmit_ready_cancel (peers[r].ping_ntr); + peers[r].ping_ntr = NULL; + } peers[r].up = run; if (NULL != peers[r].ch) @@ -547,12 +569,6 @@ adjust_running_peers (unsigned int target) static void next_rnd (void *cls) { - const struct GNUNET_SCHEDULER_TaskContext *tc; - - tc = GNUNET_SCHEDULER_get_task_context (); - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ROUND %ld\n", current_round); if (0.0 == rounds[current_round]) { @@ -563,7 +579,9 @@ next_rnd (void *cls) adjust_running_peers (rounds[current_round] * peers_total); current_round++; - GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL); + round_task = GNUNET_SCHEDULER_add_delayed (round_time, + &next_rnd, + NULL); } @@ -616,21 +634,19 @@ static void ping (void *cls) { struct CadetPeer *peer = cls; - const struct GNUNET_SCHEDULER_TaskContext *tc; peer->ping_task = NULL; - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) - || GNUNET_YES == test_finished) + if (GNUNET_YES == test_finished) return; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u -> %u (%u)\n", - get_index (peer), get_index (peer->dest), peer->data_sent); - - GNUNET_CADET_notify_transmit_ready (peer->ch, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - sizeof (struct CadetPingMessage), - &tmt_rdy_ping, peer); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u -> %u (%u)\n", + get_index (peer), + get_index (peer->dest), + peer->data_sent); + peer->ping_ntr = GNUNET_CADET_notify_transmit_ready (peer->ch, GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct CadetPingMessage), + &tmt_rdy_ping, peer); } /** @@ -640,12 +656,13 @@ ping (void *cls) * @param tc Task context. */ static void -pong (struct GNUNET_CADET_Channel *channel, const struct CadetPingMessage *ping) +pong (struct GNUNET_CADET_Channel *channel, + const struct CadetPingMessage *ping) { struct CadetPingMessage *copy; copy = GNUNET_new (struct CadetPingMessage); - memcpy (copy, ping, sizeof (*ping)); + *copy = *ping; GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO, GNUNET_TIME_UNIT_FOREVER_REL, sizeof (struct CadetPingMessage), @@ -666,7 +683,9 @@ tmt_rdy_ping (void *cls, size_t size, void *buf) struct CadetPeer *peer = (struct CadetPeer *) cls; struct CadetPingMessage *msg = buf; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy called, filling buffer\n"); + peer->ping_ntr = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "tmt_rdy called, filling buffer\n"); if (size < sizeof (struct CadetPingMessage) || NULL == buf) { GNUNET_break (GNUNET_YES == test_finished); @@ -827,7 +846,8 @@ incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel, * with the channel is stored */ static void -channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel, +channel_cleaner (void *cls, + const struct GNUNET_CADET_Channel *channel, void *channel_ctx) { long n = (long) cls; @@ -874,13 +894,8 @@ start_test (void *cls) { enum GNUNET_CADET_ChannelOption flags; unsigned long i; - const struct GNUNET_SCHEDULER_TaskContext *tc; test_task = NULL; - tc = GNUNET_SCHEDULER_get_task_context (); - if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) - return; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n"); flags = GNUNET_CADET_OPTION_DEFAULT; @@ -909,7 +924,9 @@ start_test (void *cls) number_rounds + 1), &disconnect_cadet_peers, (void *) __LINE__); - GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL); + round_task = GNUNET_SCHEDULER_add_delayed (round_time, + &next_rnd, + NULL); } @@ -939,6 +956,7 @@ warmup (void) } } + /** * Callback to be called when the requested peer information is available * @@ -950,9 +968,9 @@ warmup (void) */ static void peer_id_cb (void *cls, - struct GNUNET_TESTBED_Operation *op, - const struct GNUNET_TESTBED_PeerInformation *pinfo, - const char *emsg) + struct GNUNET_TESTBED_Operation *op, + const struct GNUNET_TESTBED_PeerInformation *pinfo, + const char *emsg) { long n = (long) cls; @@ -991,6 +1009,7 @@ peer_id_cb (void *cls, &start_test, NULL); } + /** * test main: start test when all peers are connected * @@ -1009,7 +1028,8 @@ tmain (void *cls, { unsigned long i; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "test main\n"); test_ctx = ctx; GNUNET_assert (peers_total == num_peers); peers_running = num_peers; @@ -1017,11 +1037,12 @@ tmain (void *cls, disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME, &disconnect_cadet_peers, (void *) __LINE__); - shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); for (i = 0; i < peers_total; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requesting id %ld\n", i); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "requesting id %ld\n", + i); peers[i].up = GNUNET_YES; peers[i].cadet = cadetes[i]; peers[i].op = @@ -1047,25 +1068,35 @@ main (int argc, char *argv[]) if (4 > argc) { - fprintf (stderr, "usage: %s ROUND_TIME PEERS PINGS [DO_WARMUP]\n", argv[0]); - fprintf (stderr, "example: %s 30s 16 1 Y\n", argv[0]); + fprintf (stderr, + "usage: %s ROUND_TIME PEERS PINGS [DO_WARMUP]\n", + argv[0]); + fprintf (stderr, + "example: %s 30s 16 1 Y\n", + argv[0]); return 1; } - if (GNUNET_OK != GNUNET_STRINGS_fancy_time_to_relative (argv[1], &round_time)) + if (GNUNET_OK != + GNUNET_STRINGS_fancy_time_to_relative (argv[1], + &round_time)) { - fprintf (stderr, "%s is not a valid time\n", argv[1]); + fprintf (stderr, + "%s is not a valid time\n", + argv[1]); return 1; } peers_total = atoll (argv[2]); if (2 > peers_total) { - fprintf (stderr, "%s peers is not valid (> 2)\n", argv[1]); + fprintf (stderr, + "%s peers is not valid (> 2)\n", + argv[1]); return 1; } - peers = GNUNET_malloc (sizeof (struct CadetPeer) * peers_total); - + peers = GNUNET_new_array (peers_total, + struct CadetPeer); peers_pinging = atoll (argv[3]); if (peers_total < 2 * peers_pinging) @@ -1077,7 +1108,8 @@ main (int argc, char *argv[]) do_warmup = (5 > argc || argv[4][0] != 'N'); - ids = GNUNET_CONTAINER_multipeermap_create (2 * peers_total, GNUNET_YES); + ids = GNUNET_CONTAINER_multipeermap_create (2 * peers_total, + GNUNET_YES); GNUNET_assert (NULL != ids); p_ids = 0; test_finished = GNUNET_NO; -- cgit v1.2.3