summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/transport_api_core.c27
-rw-r--r--src/util/client.c2
-rw-r--r--src/util/client_new.c49
-rw-r--r--src/util/mq.c135
4 files changed, 113 insertions, 100 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index f6ea43db9..de18a140c 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -354,6 +354,25 @@ handle_hello (void *cls,
* @param cls the `struct Neighbour` where the message was sent
*/
static void
+notify_send_done_fin (void *cls)
+{
+ struct Neighbour *n = cls;
+
+ n->timeout_task = NULL;
+ n->is_ready = GNUNET_YES;
+ GNUNET_MQ_impl_send_continue (n->mq);
+}
+
+
+/**
+ * A message from the handler's message queue to a neighbour was
+ * transmitted. Now trigger (possibly delayed) notification of the
+ * neighbour's message queue that we are done and thus ready for
+ * the next message.
+ *
+ * @param cls the `struct Neighbour` where the message was sent
+ */
+static void
notify_send_done (void *cls)
{
struct Neighbour *n = cls;
@@ -364,8 +383,8 @@ notify_send_done (void *cls)
{
GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
n->env_size + n->traffic_overhead);
- n->traffic_overhead = 0;
n->env = NULL;
+ n->traffic_overhead = 0;
}
delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
128);
@@ -375,10 +394,11 @@ notify_send_done (void *cls)
GNUNET_MQ_impl_send_continue (n->mq);
return;
}
+ GNUNET_MQ_impl_send_in_flight (n->mq);
/* cannot send even a small message without violating
- quota, wait a before notifying MQ */
+ quota, wait a before allowing MQ to send next message */
n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- &notify_send_done,
+ &notify_send_done_fin,
n);
}
@@ -411,6 +431,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
GNUNET_MQ_impl_send_continue (mq);
return;
}
+ GNUNET_assert (NULL == n->env);
n->env = GNUNET_MQ_msg_nested_mh (obm,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
msg);
diff --git a/src/util/client.c b/src/util/client.c
index f40d5e6eb..47db91c8e 100644
--- a/src/util/client.c
+++ b/src/util/client.c
@@ -375,7 +375,7 @@ do_connect (const char *service_name,
* @return the message queue, NULL on error
*/
struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *service_name,
const struct GNUNET_MQ_MessageHandler *handlers,
GNUNET_MQ_ErrorHandler error_handler,
diff --git a/src/util/client_new.c b/src/util/client_new.c
index 1e90470fb..593d3a268 100644
--- a/src/util/client_new.c
+++ b/src/util/client_new.c
@@ -213,10 +213,9 @@ start_connect (void *cls);
static void
connect_fail_continuation (struct ClientState *cstate)
{
- LOG (GNUNET_ERROR_TYPE_INFO,
- "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n",
- cstate->hostname,
- cstate->port);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to establish connection to `%s', no further addresses to try.\n",
+ cstate->service_name);
GNUNET_break (NULL == cstate->ap_head);
GNUNET_break (NULL == cstate->ap_tail);
GNUNET_break (NULL == cstate->dns_active);
@@ -245,6 +244,7 @@ transmit_ready (void *cls)
ssize_t ret;
size_t len;
const char *pos;
+ int notify_in_flight;
cstate->send_task = NULL;
pos = (const char *) cstate->msg;
@@ -262,10 +262,7 @@ transmit_ready (void *cls)
GNUNET_MQ_ERROR_WRITE);
return;
}
- if (0 == cstate->msg_off)
- {
- GNUNET_MQ_impl_send_in_flight (cstate->mq);
- }
+ notify_in_flight = (0 == cstate->msg_off);
cstate->msg_off += ret;
if (cstate->msg_off < len)
{
@@ -274,6 +271,8 @@ transmit_ready (void *cls)
cstate->sock,
&transmit_ready,
cstate);
+ if (notify_in_flight)
+ GNUNET_MQ_impl_send_in_flight (cstate->mq);
return;
}
cstate->msg = NULL;
@@ -345,6 +344,7 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
{
/* defer destruction */
cstate->in_destroy = GNUNET_YES;
+ cstate->mq = NULL;
return;
}
if (NULL != cstate->dns_active)
@@ -384,8 +384,12 @@ receive_ready (void *cls)
GNUNET_NO);
if (GNUNET_SYSERR == ret)
{
- GNUNET_MQ_inject_error (cstate->mq,
- GNUNET_MQ_ERROR_READ);
+ if (NULL != cstate->mq)
+ GNUNET_MQ_inject_error (cstate->mq,
+ GNUNET_MQ_ERROR_READ);
+ if (GNUNET_YES == cstate->in_destroy)
+ connection_client_destroy_impl (cstate->mq,
+ cstate);
return;
}
if (GNUNET_YES == cstate->in_destroy)
@@ -723,16 +727,25 @@ start_connect (void *cls)
#endif
if ( (0 == (cstate->attempts++ % 2)) ||
- (0 == cstate->port) )
+ (0 == cstate->port) ||
+ (NULL == cstate->hostname) )
{
- /* on even rounds, try UNIX first */
+ /* on even rounds, try UNIX first, or always
+ if we do not have a DNS name and TCP port. */
cstate->sock = try_unixpath (cstate->service_name,
cstate->cfg);
if (NULL != cstate->sock)
{
connect_success_continuation (cstate);
return;
- }
+ }
+ }
+ if ( (NULL == cstate->hostname) ||
+ (0 == cstate->port) )
+ {
+ /* All options failed. Boo! */
+ connect_fail_continuation (cstate);
+ return;
}
cstate->dns_active
= GNUNET_RESOLVER_ip_get (cstate->hostname,
@@ -807,11 +820,11 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
* @return the message queue, NULL on error
*/
struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const char *service_name,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
+GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const char *service_name,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *error_handler_cls)
{
struct ClientState *cstate;
diff --git a/src/util/mq.c b/src/util/mq.c
index 4ba6c5ff8..ba947d5b8 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -128,6 +128,11 @@ struct GNUNET_MQ_Handle
void *error_handler_cls;
/**
+ * Task to asynchronously run #impl_send_continue().
+ */
+ struct GNUNET_SCHEDULER_Task *send_task;
+
+ /**
* Linked list of messages pending to be sent
*/
struct GNUNET_MQ_Envelope *envelope_head;
@@ -145,23 +150,11 @@ struct GNUNET_MQ_Handle
struct GNUNET_MQ_Envelope *current_envelope;
/**
- * GNUNET_YES if the sent notification was called
- * for the current envelope.
- */
- int send_notification_called;
-
- /**
* Map of associations, lazily allocated
*/
struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
/**
- * Task scheduled during #GNUNET_MQ_impl_send_continue
- * or #GNUNET_MQ_impl_send_in_flight
- */
- struct GNUNET_SCHEDULER_Task *send_task;
-
- /**
* Functions to call on queue destruction; kept in a DLL.
*/
struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
@@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle
unsigned int queue_length;
/**
- * GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * FIXME: is this dead?
*/
int evacuate_called;
+
+ /**
+ * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
+ */
+ int in_flight;
};
@@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
unsigned int
GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
{
- return mq->queue_length;
+ return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
}
@@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
mq->queue_length++;
ev->parent_queue = mq;
/* is the implementation busy? queue it! */
- if (NULL != mq->current_envelope)
+ if ( (NULL != mq->current_envelope) ||
+ (NULL != mq->send_task) )
{
GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
mq->envelope_tail,
@@ -428,35 +428,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
/**
- * Task run to call the send notification for the next queued
- * message, if any. Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
-static void
-impl_send_in_flight (void *cls)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
- mq->send_task = NULL;
- /* call is only valid if we're actually currently sending
- * a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- /* can't call cancel from now on anymore */
- current_envelope->parent_queue = NULL;
- if ( (GNUNET_NO == mq->send_notification_called) &&
- (NULL != current_envelope->sent_cb) )
- {
- current_envelope->sent_cb (current_envelope->sent_cls);
- }
- mq->send_notification_called = GNUNET_YES;
-}
-
-
-/**
* Task run to call the send implementation for the next queued
* message, if any. Only useful for implementing message queues,
* results in undefined behavior if not used carefully.
@@ -467,32 +438,19 @@ static void
impl_send_continue (void *cls)
{
struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
+
mq->send_task = NULL;
/* call is only valid if we're actually currently sending
* a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- impl_send_in_flight (mq);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
if (NULL == mq->envelope_head)
- {
- mq->current_envelope = NULL;
- }
- else
- {
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
- mq->send_impl (mq,
- mq->current_envelope->mh,
- mq->impl_state);
- }
- GNUNET_free (current_envelope);
+ return;
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq,
+ mq->current_envelope->mh,
+ mq->impl_state);
}
@@ -506,22 +464,32 @@ impl_send_continue (void *cls)
void
GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
{
- /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
- if (NULL != mq->send_task)
- {
- GNUNET_SCHEDULER_cancel (mq->send_task);
- }
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ GNUNET_assert (0 < mq->queue_length);
+ mq->queue_length--;
+ current_envelope = mq->current_envelope;
+ current_envelope->parent_queue = NULL;
+ mq->current_envelope = NULL;
+ GNUNET_assert (NULL == mq->send_task);
mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
- mq);
+ mq);
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
+ GNUNET_free (current_envelope);
}
/**
* Call the send notification for the current message, but do not
- * try to send the next message until #gnunet_mq_impl_send_continue
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
* is called.
*
- * only useful for implementing message queues, results in undefined
+ * Only useful for implementing message queues, results in undefined
* behavior if not used carefully.
*
* @param mq message queue to send the next message with
@@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
void
GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
{
- GNUNET_assert (NULL == mq->send_task);
- mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
- mq);
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ mq->in_flight = GNUNET_YES;
+ /* call is only valid if we're actually currently sending
+ * a message */
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
+ /* can't call cancel from now on anymore */
+ current_envelope->parent_queue = NULL;
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
}
@@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
mq->envelope_tail,
mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
mq->send_impl (mq,
mq->current_envelope->mh,
mq->impl_state);