From 4c1a6478b3ae43bb009addd982390a5db949913b Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Sat, 17 May 2014 17:39:50 +0000 Subject: add missing cancel implementation for MQ --- po/POTFILES.in | 1 + src/util/mq.c | 63 +++++++++++++++++++++++++++++++++++++++++++++++ src/util/test_mq_client.c | 12 ++++++++- 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/po/POTFILES.in b/po/POTFILES.in index d6774cacd..fa710ee58 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -232,6 +232,7 @@ src/peerinfo-tool/gnunet-peerinfo_plugins.c src/peerstore/gnunet-peerstore.c src/peerstore/gnunet-service-peerstore.c src/peerstore/peerstore_api.c +src/peerstore/peerstore_common.c src/peerstore/plugin_peerstore_sqlite.c src/postgres/postgres.c src/psyc/gnunet-service-psyc.c diff --git a/src/util/mq.c b/src/util/mq.c index a4691ff2c..941a5c43e 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -93,6 +93,11 @@ struct GNUNET_MQ_Handle */ GNUNET_MQ_DestroyImpl destroy_impl; + /** + * Implementation-dependent send cancel function + */ + GNUNET_MQ_CancelImpl cancel_impl; + /** * Implementation-specific state */ @@ -242,6 +247,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); + ev->parent_queue = mq; + /* is the implementation busy? queue it! */ if (NULL != mq->current_envelope) { @@ -276,6 +283,7 @@ impl_send_continue (void *cls, * a message */ current_envelope = mq->current_envelope; GNUNET_assert (NULL != current_envelope); + current_envelope->parent_queue = NULL; if (NULL == mq->envelope_head) { mq->current_envelope = NULL; @@ -337,6 +345,7 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, mq = GNUNET_new (struct GNUNET_MQ_Handle); mq->send_impl = send; mq->destroy_impl = destroy; + mq->cancel_impl = cancel; mq->handlers = handlers; mq->handlers_cls = cls; mq->impl_state = impl_state; @@ -613,6 +622,17 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, } +static void +connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct ClientConnectionState *state = impl_state; + GNUNET_assert (NULL != state->th); + GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); + state->th = NULL; +} + + struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, const struct GNUNET_MQ_MessageHandler *handlers, @@ -633,6 +653,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti mq->impl_state = state; mq->send_impl = connection_client_send_impl; mq->destroy_impl = connection_client_destroy_impl; + mq->cancel_impl = connection_client_cancel_impl; if (NULL != handlers) state->receive_requested = GNUNET_YES; @@ -777,3 +798,45 @@ GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t ba } +/** + * Cancel sending the message. Message must have been sent with + * #GNUNET_MQ_send before. May not be called after the notify sent + * callback has been called + * + * @param ev queued envelope to cancel + */ +void +GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) +{ + struct GNUNET_MQ_Handle *mq = ev->parent_queue; + + GNUNET_assert (NULL != mq); + GNUNET_assert (NULL != mq->cancel_impl); + + if (mq->current_envelope == ev) { + // complex case, we already started with transmitting + // the message + mq->cancel_impl (mq, mq->impl_state); + // continue sending the next message, if any + if (NULL == mq->envelope_head) + { + mq->current_envelope = NULL; + } + else + { + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, + mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); + } + } else { + // simple case, message is still waiting in the queue + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); + } + + ev->parent_queue = NULL; + ev->mh = NULL; + GNUNET_free (ev); +} + diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c index 1c1bcee86..88113ffd0 100644 --- a/src/util/test_mq_client.c +++ b/src/util/test_mq_client.c @@ -102,6 +102,12 @@ send_cb (void *cls) notify = GNUNET_YES; } +static void +send_trap_cb (void *cls) +{ + GNUNET_abort (); +} + static void test_mq (struct GNUNET_CLIENT_Connection *client) @@ -115,11 +121,15 @@ test_mq (struct GNUNET_CLIENT_Connection *client) mqm = GNUNET_MQ_msg_header (MY_TYPE); GNUNET_MQ_send (mq, mqm); + mqm = GNUNET_MQ_msg_header (MY_TYPE); + GNUNET_MQ_notify_sent (mqm, send_trap_cb, NULL); + GNUNET_MQ_send (mq, mqm); + GNUNET_MQ_send_cancel (mqm); + mqm = GNUNET_MQ_msg_header (MY_TYPE); GNUNET_MQ_notify_sent (mqm, send_cb, NULL); GNUNET_MQ_send (mq, mqm); - /* FIXME: add a message that will be canceled */ } -- cgit v1.2.3