From a900b29ddaa9ea46c731b054b5e3ef3e725b95a8 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 19 Jun 2013 10:48:54 +0000 Subject: - opaque mq structs - mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress) --- src/stream/stream_api.c | 103 +++++++++++++++--------------------------------- 1 file changed, 32 insertions(+), 71 deletions(-) (limited to 'src/stream') diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 34f1ea0fa..47ed04117 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) * @param size the number of bytes written */ static void -mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) +mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, + size_t size) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Handle *mq = cls; + struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); switch (status) { @@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size /* FIXME: call shutdown handler */ return; case GNUNET_STREAM_TIMEOUT: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); return; case GNUNET_STREAM_SYSERR: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); return; default: GNUNET_assert (0); return; } - - /* call cb for message we finished sending */ - mqm = mq->current_msg; - GNUNET_assert (NULL != mq->current_msg); - if (NULL != mqm->sent_cb) - mqm->sent_cb (mqm->sent_cls); - GNUNET_free (mqm); mss->wh = NULL; - mqm = mq->msg_head; - mq->current_msg = mqm; - if (NULL == mqm) - return; - GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); - mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), - GNUNET_TIME_UNIT_FOREVER_REL, - mq_stream_write_queued, mq); - GNUNET_assert (NULL != mss->wh); + GNUNET_MQ_impl_send_continue (mq); } static void -mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, - struct GNUNET_MQ_Message *mqm) +mq_stream_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, void *impl_state) { - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + struct MQStreamState *mss = impl_state; - if (NULL != mq->current_msg) - { - GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); - return; - } - mq->current_msg = mqm; - mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), + /* no way to cancel sending now */ + GNUNET_MQ_impl_send_commit (mq); + + mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, mq_stream_write_queued, mq); } @@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, */ static int mq_stream_mst_callback (void *cls, void *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) { - struct GNUNET_MQ_MessageQueue *mq = cls; + struct GNUNET_MQ_Handle *mq = cls; GNUNET_assert (NULL != message); - GNUNET_MQ_dispatch (mq, message); + GNUNET_MQ_inject_message (mq, message); return GNUNET_OK; } @@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls, const void *data, size_t size) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct MQStreamState *mss; + struct GNUNET_MQ_Handle *mq = cls; + struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); int ret; switch (status) @@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls, /* FIXME: call shutdown handler */ return 0; case GNUNET_STREAM_TIMEOUT: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); return 0; case GNUNET_STREAM_SYSERR: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return 0; default: GNUNET_assert (0); return 0; } - mss = (struct MQStreamState *) mq->impl_state; - GNUNET_assert (GNUNET_STREAM_OK == status); ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); if (GNUNET_OK != ret) { - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, - "read error (message stream malformed), but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return 0; } - /* we always read all data */ mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, mq_stream_data_processor, mq); + /* we always read all data */ return size; } static void -mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + struct MQStreamState *mss = impl_state; if (NULL != mss->rh) { @@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) * @param error_handler callback for errors * @return the message queue for the socket */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_MQ_Handler *msg_handlers, + const struct GNUNET_MQ_MessageHandler *msg_handlers, GNUNET_MQ_ErrorHandler error_handler, void *cls) { - struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Handle *mq; struct MQStreamState *mss; - mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); mss = GNUNET_new (struct MQStreamState); mss->socket = socket; - mq->impl_state = mss; - mq->send_impl = mq_stream_send_impl; - mq->destroy_impl = mq_stream_destroy_impl; - mq->handlers = msg_handlers; - mq->handlers_cls = cls; - mq->error_handler = error_handler; + mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl, + mq_stream_destroy_impl, + NULL, + mss, msg_handlers, error_handler, cls); if (NULL != msg_handlers) { mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); -- cgit v1.2.3