From b03978816ac35a1123456c2d872d4330bfcc3ae1 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 15 Feb 2020 12:55:24 +0100 Subject: proposed fix for excessive queueing (somehow does not quite work, not sure why) --- src/transport/test_communicator_basic.c | 56 ++++++++++++++----------------- src/transport/transport-testing2.c | 59 +++++++++++++++++---------------- src/transport/transport-testing2.h | 16 ++++++--- 3 files changed, 67 insertions(+), 64 deletions(-) diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index c469a55a1..e99db7cfb 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -43,8 +43,6 @@ static struct GNUNET_SCHEDULER_Task *to_task; -static struct GNUNET_SCHEDULER_Task *active_task; - static int queue_est = GNUNET_NO; static struct GNUNET_PeerIdentity peer_id[NUM_PEERS]; @@ -233,21 +231,21 @@ size_test (void *cls) { char *payload; - active_task = NULL; GNUNET_assert (TP_SIZE_CHECK == phase); if (ack >= 64000) return; /* Leave some room for our protocol, so not 2^16 exactly */ payload = make_payload (ack); + ack += 5; + num_sent++; GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, + (ack < 64000) + ? &size_test + : NULL, + NULL, payload, ack); GNUNET_free (payload); - ack += 5; - num_sent++; timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); - if (ack < 64000) - active_task = GNUNET_SCHEDULER_add_now (&size_test, - NULL); } @@ -256,18 +254,18 @@ long_test (void *cls) { char *payload; - active_task = NULL; payload = make_payload (LONG_MESSAGE_SIZE); + num_sent++; GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, + (BURST_PACKETS == + num_sent) + ? NULL + : &long_test, + NULL, payload, LONG_MESSAGE_SIZE); - num_sent++; GNUNET_free (payload); timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); - if (num_sent == BURST_PACKETS) - return; - active_task = GNUNET_SCHEDULER_add_now (&long_test, - NULL); } @@ -276,18 +274,18 @@ short_test (void *cls) { char *payload; - active_task = NULL; payload = make_payload (SHORT_MESSAGE_SIZE); + num_sent++; GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, + (BURST_PACKETS == + num_sent) + ? NULL + : &short_test, + NULL, payload, SHORT_MESSAGE_SIZE); - num_sent++; GNUNET_free (payload); timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); - if (num_sent >= BURST_PACKETS) - return; - active_task = GNUNET_SCHEDULER_add_now (&short_test, - NULL); } @@ -320,9 +318,7 @@ add_queue_cb (void *cls, to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &latency_timeout, NULL); - GNUNET_assert (NULL == active_task); - active_task = GNUNET_SCHEDULER_add_now (&short_test, - NULL); + short_test (NULL); } @@ -398,8 +394,7 @@ incoming_message_cb (void *cls, num_sent = 0; avg_latency = 0; num_received = 0; - active_task = GNUNET_SCHEDULER_add_now (&long_test, - NULL); + long_test (NULL); } break; } @@ -436,8 +431,7 @@ incoming_message_cb (void *cls, num_received = 0; num_sent = 0; avg_latency = 0; - active_task = GNUNET_SCHEDULER_add_now (&size_test, - NULL); + size_test (NULL); } break; } @@ -462,8 +456,7 @@ incoming_message_cb (void *cls, { start_short = GNUNET_TIME_absolute_get (); phase = TP_BURST_SHORT; - active_task = GNUNET_SCHEDULER_add_now (&short_test, - NULL); + short_test (NULL); break; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -484,10 +477,9 @@ do_shutdown (void *cls) GNUNET_SCHEDULER_cancel (to_task); to_task = NULL; } - if (NULL != active_task) + for (unsigned int i = 0; i < NUM_PEERS; i++) { - GNUNET_SCHEDULER_cancel (active_task); - active_task = NULL; + GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (tc_hs[i]); } } diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index 75864294b..b087f6976 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -858,16 +858,6 @@ nat_start ( } -static void -do_shutdown (void *cls) -{ - struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; - shutdown_communicator (tc_h->c_proc); - shutdown_service (tc_h->sh); - shutdown_nat (tc_h->nat_proc); -} - - /** * @brief Start communicator part of transport service and communicator * @@ -928,11 +918,22 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( /* Schedule start communicator */ communicator_start (tc_h, binary_name); - GNUNET_SCHEDULER_add_shutdown (&do_shutdown, tc_h); return tc_h; } +void +GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop ( + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h) +{ + shutdown_communicator (tc_h->c_proc); + shutdown_service (tc_h->sh); + shutdown_nat (tc_h->nat_proc); + GNUNET_CONFIGURATION_destroy (tc_h->cfg); + GNUNET_free (tc_h); +} + + /** * @brief Instruct communicator to open a queue * @@ -988,39 +989,41 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue ( * @brief Instruct communicator to send data * * @param tc_queue The queue to use for sending + * @param cont function to call when done sending + * @param cont_cls closure for @a cont * @param payload Data to send - * @param payload_size Size of the payload - * - * @return Handle to the transmission + * @param payload_size Size of the @a payload */ -struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission * +void GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, + GNUNET_SCHEDULER_TaskCallback cont, + void *cont_cls, const void *payload, size_t payload_size) { - struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission *tc_t; struct GNUNET_MessageHeader *mh; struct GNUNET_TRANSPORT_SendMessageTo *msg; struct GNUNET_MQ_Envelope *env; size_t inbox_size; - inbox_size = sizeof(struct GNUNET_MessageHeader) + payload_size; - mh = GNUNET_malloc (inbox_size); - mh->size = htons (inbox_size); - mh->type = GNUNET_MESSAGE_TYPE_DUMMY; - memcpy (&mh[1], - payload, - payload_size); + inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; env = GNUNET_MQ_msg_extra (msg, inbox_size, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); msg->qid = htonl (tc_queue->qid); msg->mid = tc_queue->mid++; msg->receiver = tc_queue->peer_id; - memcpy (&msg[1], mh, inbox_size); - GNUNET_free (mh); - GNUNET_MQ_send (tc_queue->tc_h->c_mq, env); - // GNUNET_assert (0); // FIXME: not iplemented! - return tc_t; + mh = (struct GNUNET_MessageHeader *) &msg[1]; + mh->size = htons (inbox_size); + mh->type = GNUNET_MESSAGE_TYPE_DUMMY; + memcpy (&mh[1], + payload, + payload_size); + if (NULL != cont) + GNUNET_MQ_notify_sent (env, + cont, + cont_cls); + GNUNET_MQ_send (tc_queue->tc_h->c_mq, + env); } diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index e7602e3e2..96a08a193 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -181,6 +181,11 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( void *cb_cls); +void +GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop ( + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h); + + /** * @brief Instruct communicator to open a queue * @@ -202,14 +207,17 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (struct * @brief Instruct communicator to send data * * @param tc_queue The queue to use for sending + * @param cont function to call when done sending + * @param cont_cls closure for @a cont * @param payload Data to send - * @param payload_size Size of the payload - * - * @return Handle to the transmission + * @param payload_size Size of the @a payload */ -struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission * +void GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, + GNUNET_SCHEDULER_TaskCallback + cont, + void *cont_cls, const void *payload, size_t payload_size); -- cgit v1.2.3