From 30deabaf6460ef605637dfd1b4b1cf79839e5c9f Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 21 Oct 2016 16:04:46 +0000 Subject: activating client_new implementation, seems to mostly work fine, or better than the old one --- src/transport/transport_api_core.c | 27 +++++++- src/util/client.c | 2 +- src/util/client_new.c | 49 +++++++++----- src/util/mq.c | 135 ++++++++++++++++--------------------- 4 files changed, 113 insertions(+), 100 deletions(-) (limited to 'src') diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c index f6ea43db9..de18a140c 100644 --- a/src/transport/transport_api_core.c +++ b/src/transport/transport_api_core.c @@ -345,6 +345,25 @@ handle_hello (void *cls, } +/** + * A message from the handler's message queue to a neighbour was + * transmitted. Now trigger (possibly delayed) notification of the + * neighbour's message queue that we are done and thus ready for + * the next message. + * + * @param cls the `struct Neighbour` where the message was sent + */ +static void +notify_send_done_fin (void *cls) +{ + struct Neighbour *n = cls; + + n->timeout_task = NULL; + n->is_ready = GNUNET_YES; + GNUNET_MQ_impl_send_continue (n->mq); +} + + /** * A message from the handler's message queue to a neighbour was * transmitted. Now trigger (possibly delayed) notification of the @@ -364,8 +383,8 @@ notify_send_done (void *cls) { GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, n->env_size + n->traffic_overhead); - n->traffic_overhead = 0; n->env = NULL; + n->traffic_overhead = 0; } delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128); @@ -375,10 +394,11 @@ notify_send_done (void *cls) GNUNET_MQ_impl_send_continue (n->mq); return; } + GNUNET_MQ_impl_send_in_flight (n->mq); /* cannot send even a small message without violating - quota, wait a before notifying MQ */ + quota, wait a before allowing MQ to send next message */ n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done, + ¬ify_send_done_fin, n); } @@ -411,6 +431,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_MQ_impl_send_continue (mq); return; } + GNUNET_assert (NULL == n->env); n->env = GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); diff --git a/src/util/client.c b/src/util/client.c index f40d5e6eb..47db91c8e 100644 --- a/src/util/client.c +++ b/src/util/client.c @@ -375,7 +375,7 @@ do_connect (const char *service_name, * @return the message queue, NULL on error */ struct GNUNET_MQ_Handle * -GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, +GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, diff --git a/src/util/client_new.c b/src/util/client_new.c index 1e90470fb..593d3a268 100644 --- a/src/util/client_new.c +++ b/src/util/client_new.c @@ -213,10 +213,9 @@ start_connect (void *cls); static void connect_fail_continuation (struct ClientState *cstate) { - LOG (GNUNET_ERROR_TYPE_INFO, - "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n", - cstate->hostname, - cstate->port); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Failed to establish connection to `%s', no further addresses to try.\n", + cstate->service_name); GNUNET_break (NULL == cstate->ap_head); GNUNET_break (NULL == cstate->ap_tail); GNUNET_break (NULL == cstate->dns_active); @@ -245,6 +244,7 @@ transmit_ready (void *cls) ssize_t ret; size_t len; const char *pos; + int notify_in_flight; cstate->send_task = NULL; pos = (const char *) cstate->msg; @@ -262,10 +262,7 @@ transmit_ready (void *cls) GNUNET_MQ_ERROR_WRITE); return; } - if (0 == cstate->msg_off) - { - GNUNET_MQ_impl_send_in_flight (cstate->mq); - } + notify_in_flight = (0 == cstate->msg_off); cstate->msg_off += ret; if (cstate->msg_off < len) { @@ -274,6 +271,8 @@ transmit_ready (void *cls) cstate->sock, &transmit_ready, cstate); + if (notify_in_flight) + GNUNET_MQ_impl_send_in_flight (cstate->mq); return; } cstate->msg = NULL; @@ -345,6 +344,7 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, { /* defer destruction */ cstate->in_destroy = GNUNET_YES; + cstate->mq = NULL; return; } if (NULL != cstate->dns_active) @@ -384,8 +384,12 @@ receive_ready (void *cls) GNUNET_NO); if (GNUNET_SYSERR == ret) { - GNUNET_MQ_inject_error (cstate->mq, - GNUNET_MQ_ERROR_READ); + if (NULL != cstate->mq) + GNUNET_MQ_inject_error (cstate->mq, + GNUNET_MQ_ERROR_READ); + if (GNUNET_YES == cstate->in_destroy) + connection_client_destroy_impl (cstate->mq, + cstate); return; } if (GNUNET_YES == cstate->in_destroy) @@ -723,16 +727,25 @@ start_connect (void *cls) #endif if ( (0 == (cstate->attempts++ % 2)) || - (0 == cstate->port) ) + (0 == cstate->port) || + (NULL == cstate->hostname) ) { - /* on even rounds, try UNIX first */ + /* on even rounds, try UNIX first, or always + if we do not have a DNS name and TCP port. */ cstate->sock = try_unixpath (cstate->service_name, cstate->cfg); if (NULL != cstate->sock) { connect_success_continuation (cstate); return; - } + } + } + if ( (NULL == cstate->hostname) || + (0 == cstate->port) ) + { + /* All options failed. Boo! */ + connect_fail_continuation (cstate); + return; } cstate->dns_active = GNUNET_RESOLVER_ip_get (cstate->hostname, @@ -807,11 +820,11 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, * @return the message queue, NULL on error */ struct GNUNET_MQ_Handle * -GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg, - const char *service_name, - const struct GNUNET_MQ_MessageHandler *handlers, - GNUNET_MQ_ErrorHandler error_handler, - void *error_handler_cls) +GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, + const char *service_name, + const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_MQ_ErrorHandler error_handler, + void *error_handler_cls) { struct ClientState *cstate; diff --git a/src/util/mq.c b/src/util/mq.c index 4ba6c5ff8..ba947d5b8 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -127,6 +127,11 @@ struct GNUNET_MQ_Handle */ void *error_handler_cls; + /** + * Task to asynchronously run #impl_send_continue(). + */ + struct GNUNET_SCHEDULER_Task *send_task; + /** * Linked list of messages pending to be sent */ @@ -144,23 +149,11 @@ struct GNUNET_MQ_Handle */ struct GNUNET_MQ_Envelope *current_envelope; - /** - * GNUNET_YES if the sent notification was called - * for the current envelope. - */ - int send_notification_called; - /** * Map of associations, lazily allocated */ struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; - /** - * Task scheduled during #GNUNET_MQ_impl_send_continue - * or #GNUNET_MQ_impl_send_in_flight - */ - struct GNUNET_SCHEDULER_Task *send_task; - /** * Functions to call on queue destruction; kept in a DLL. */ @@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle unsigned int queue_length; /** - * GNUNET_YES if GNUNET_MQ_impl_evacuate was called. + * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called. + * FIXME: is this dead? */ int evacuate_called; + + /** + * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called. + */ + int in_flight; }; @@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) unsigned int GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) { - return mq->queue_length; + return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0; } @@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, mq->queue_length++; ev->parent_queue = mq; /* is the implementation busy? queue it! */ - if (NULL != mq->current_envelope) + if ( (NULL != mq->current_envelope) || + (NULL != mq->send_task) ) { GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, @@ -427,35 +427,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, } -/** - * Task run to call the send notification for the next queued - * message, if any. Only useful for implementing message queues, - * results in undefined behavior if not used carefully. - * - * @param cls message queue to send the next message with - */ -static void -impl_send_in_flight (void *cls) -{ - struct GNUNET_MQ_Handle *mq = cls; - struct GNUNET_MQ_Envelope *current_envelope; - - mq->send_task = NULL; - /* call is only valid if we're actually currently sending - * a message */ - current_envelope = mq->current_envelope; - GNUNET_assert (NULL != current_envelope); - /* can't call cancel from now on anymore */ - current_envelope->parent_queue = NULL; - if ( (GNUNET_NO == mq->send_notification_called) && - (NULL != current_envelope->sent_cb) ) - { - current_envelope->sent_cb (current_envelope->sent_cls); - } - mq->send_notification_called = GNUNET_YES; -} - - /** * Task run to call the send implementation for the next queued * message, if any. Only useful for implementing message queues, @@ -467,32 +438,19 @@ static void impl_send_continue (void *cls) { struct GNUNET_MQ_Handle *mq = cls; - struct GNUNET_MQ_Envelope *current_envelope; - + mq->send_task = NULL; /* call is only valid if we're actually currently sending * a message */ - current_envelope = mq->current_envelope; - GNUNET_assert (NULL != current_envelope); - impl_send_in_flight (mq); - GNUNET_assert (0 < mq->queue_length); - mq->queue_length--; if (NULL == mq->envelope_head) - { - mq->current_envelope = NULL; - } - else - { - mq->current_envelope = mq->envelope_head; - GNUNET_CONTAINER_DLL_remove (mq->envelope_head, - mq->envelope_tail, - mq->current_envelope); - mq->send_notification_called = GNUNET_NO; - mq->send_impl (mq, - mq->current_envelope->mh, - mq->impl_state); - } - GNUNET_free (current_envelope); + return; + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, + mq->current_envelope->mh, + mq->impl_state); } @@ -506,22 +464,32 @@ impl_send_continue (void *cls) void GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) { - /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ - if (NULL != mq->send_task) - { - GNUNET_SCHEDULER_cancel (mq->send_task); - } + struct GNUNET_MQ_Envelope *current_envelope; + GNUNET_MQ_NotifyCallback cb; + + GNUNET_assert (0 < mq->queue_length); + mq->queue_length--; + current_envelope = mq->current_envelope; + current_envelope->parent_queue = NULL; + mq->current_envelope = NULL; + GNUNET_assert (NULL == mq->send_task); mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, - mq); + mq); + if (NULL != (cb = current_envelope->sent_cb)) + { + current_envelope->sent_cb = NULL; + cb (current_envelope->sent_cls); + } + GNUNET_free (current_envelope); } /** * Call the send notification for the current message, but do not - * try to send the next message until #gnunet_mq_impl_send_continue + * try to send the next message until #GNUNET_MQ_impl_send_continue * is called. * - * only useful for implementing message queues, results in undefined + * Only useful for implementing message queues, results in undefined * behavior if not used carefully. * * @param mq message queue to send the next message with @@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) void GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) { - GNUNET_assert (NULL == mq->send_task); - mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, - mq); + struct GNUNET_MQ_Envelope *current_envelope; + GNUNET_MQ_NotifyCallback cb; + + mq->in_flight = GNUNET_YES; + /* call is only valid if we're actually currently sending + * a message */ + current_envelope = mq->current_envelope; + GNUNET_assert (NULL != current_envelope); + /* can't call cancel from now on anymore */ + current_envelope->parent_queue = NULL; + if (NULL != (cb = current_envelope->sent_cb)) + { + current_envelope->sent_cb = NULL; + cb (current_envelope->sent_cls); + } } @@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); - mq->send_notification_called = GNUNET_NO; mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); -- cgit v1.2.3