From 172ab07eeb1215cc9d22dabc589f7529ac2d59ea Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Sat, 9 Nov 2013 23:12:27 +0000 Subject: psyc: handling messages from multicast and passing them to clients; pause/resume fixes --- src/psyc/test_psyc.c | 50 +++++++++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 21 deletions(-) (limited to 'src/psyc/test_psyc.c') diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 41c4ca8e4..2986fdf6a 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -144,10 +144,12 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, return GNUNET_OK; } + struct TransmitClosure { struct GNUNET_PSYC_MasterTransmitHandle *handle; uint8_t n; + uint8_t paused; uint8_t fragment_count; char *fragments[16]; uint16_t fragment_sizes[16]; @@ -157,8 +159,9 @@ struct TransmitClosure static void transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); struct TransmitClosure *tmit = cls; + tmit->paused = GNUNET_NO; GNUNET_PSYC_master_transmit_resume (tmit->handle); } @@ -167,33 +170,36 @@ static int transmit_notify (void *cls, size_t *data_size, void *data) { struct TransmitClosure *tmit = cls; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Transmit notify: %lu bytes\n", *data_size); - - if (tmit->fragment_count <= tmit->n) - return GNUNET_YES; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmit notify: %lu bytes available, " + "processing fragment %u/%u.\n", + *data_size, tmit->n + 1, tmit->fragment_count); GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); - *data_size = tmit->fragment_sizes[tmit->n]; - memcpy (data, tmit->fragments[tmit->n], *data_size); - tmit->n++; - - if (tmit->n == tmit->fragment_count - 1) + if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) { /* Send last fragment later. */ - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, - tmit); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); + tmit->paused = GNUNET_YES; + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 3), + &transmit_resume, tmit); *data_size = 0; return GNUNET_NO; } - return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; + + GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); + *data_size = tmit->fragment_sizes[tmit->n]; + memcpy (data, tmit->fragments[tmit->n], *data_size); + + return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; } void master_started (void *cls, uint64_t max_message_id) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Master started: %lu\n", max_message_id); struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, @@ -202,11 +208,13 @@ master_started (void *cls, uint64_t max_message_id) "_foo_bar", "foo bar baz", 11); struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); - tmit->fragment_count = 2; - tmit->fragments[0] = "foo bar"; - tmit->fragment_sizes[0] = 7; - tmit->fragments[1] = "baz!"; - tmit->fragment_sizes[1] = 4; + tmit->fragment_count = 3; + tmit->fragments[0] = "foo"; + tmit->fragment_sizes[0] = 4; + tmit->fragments[1] = "foo bar"; + tmit->fragment_sizes[1] = 7; + tmit->fragments[2] = "foo bar baz"; + tmit->fragment_sizes[2] = 11; tmit->handle = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); -- cgit v1.2.3