From 4289fca5aeefd0652ae60bc16f90ed911c7e1c60 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 22 May 2013 10:29:15 +0000 Subject: - moved MQ to util - MQ support for stream - set api - starting to use set for consensus --- src/stream/stream_api.c | 213 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) (limited to 'src/stream') diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 8994afc24..b4a47b53d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -578,6 +578,37 @@ struct GNUNET_STREAM_ShutdownHandle }; +/** + * Collection of the state necessary to read and write gnunet messages + * to a stream socket. Should be used as closure for stream_data_processor. + */ +struct MQStreamState +{ + /** + * Message stream tokenizer for the data received from the + * stream socket. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + + /** + * The stream socket to use for receiving and transmitting + * messages with the message queue. + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Current read handle, NULL if no read active. + */ + struct GNUNET_STREAM_ReadHandle *rh; + + /** + * Current write handle, NULL if no write active. + */ + struct GNUNET_STREAM_WriteHandle *wh; +}; + + + /** * Default value in seconds for various timeouts */ @@ -3731,4 +3762,186 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) cleanup_read_handle (socket); } + +/** + * Functions of this signature are called whenever writing operations + * on a stream are executed + * + * @param cls the closure from GNUNET_STREAM_write + * @param status the status of the stream at the time this function is called; + * GNUNET_STREAM_OK if writing to stream was completed successfully; + * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully + * (this doesn't mean that the data is never sent, the receiver may + * have read the data but its ACKs may have been lost); + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot + * be processed. + * @param size the number of bytes written + */ +static void +mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) +{ + struct GNUNET_MQ_MessageQueue *mq = cls; + struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + struct GNUNET_MQ_Message *mqm; + + GNUNET_assert (GNUNET_STREAM_OK == status); + + /* call cb for message we finished sending */ + mqm = mq->current_msg; + GNUNET_assert (NULL != mq->current_msg); + if (NULL != mqm->sent_cb) + mqm->sent_cb (mqm->sent_cls); + GNUNET_free (mqm); + + mss->wh = NULL; + + mqm = mq->msg_head; + mq->current_msg = mqm; + if (NULL == mqm) + return; + GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); + mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), + GNUNET_TIME_UNIT_FOREVER_REL, + mq_stream_write_queued, mq); + GNUNET_assert (NULL != mss->wh); +} + + +static void +mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, + struct GNUNET_MQ_Message *mqm) +{ + struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + + if (NULL != mq->current_msg) + { + GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); + return; + } + mq->current_msg = mqm; + mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), + GNUNET_TIME_UNIT_FOREVER_REL, + mq_stream_write_queued, mq); +} + + +/** + * Functions with this signature are called whenever a + * complete message is received by the tokenizer. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure + * @param client identification of the client + * @param message the actual message + * + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +mq_stream_mst_callback (void *cls, void *client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_MQ_MessageQueue *mq = cls; + + GNUNET_assert (NULL != message); + GNUNET_MQ_dispatch (mq, message); + return GNUNET_OK; +} + + +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +mq_stream_data_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct GNUNET_MQ_MessageQueue *mq = cls; + struct MQStreamState *mss; + int ret; + + mss = (struct MQStreamState *) mq->impl_state; + GNUNET_assert (GNUNET_STREAM_OK == status); + ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); + GNUNET_assert (GNUNET_OK == ret); + /* we always read all data */ + mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, + mq_stream_data_processor, mq); + return size; +} + + +static void +mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +{ + struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + + if (NULL != mss->rh) + { + GNUNET_STREAM_read_cancel (mss->rh); + mss->rh = NULL; + } + + if (NULL != mss->wh) + { + GNUNET_STREAM_write_cancel (mss->wh); + mss->wh = NULL; + } + + if (NULL != mss->mst) + { + GNUNET_SERVER_mst_destroy (mss->mst); + mss->mst = NULL; + } + + GNUNET_free (mss); +} + + + +/** + * Create a message queue for a stream socket. + * + * @param socket the socket to read/write in the message queue + * @param msg_handlers message handler array + * @param error_handler callback for errors + * @return the message queue for the socket + */ +struct GNUNET_MQ_MessageQueue * +GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_MQ_Handler *msg_handlers, + GNUNET_MQ_ErrorHandler error_handler, + void *cls) +{ + struct GNUNET_MQ_MessageQueue *mq; + struct MQStreamState *mss; + + mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); + mss = GNUNET_new (struct MQStreamState); + mss->socket = socket; + mq->impl_state = mss; + mq->send_impl = mq_stream_send_impl; + mq->destroy_impl = mq_stream_destroy_impl; + mq->handlers = msg_handlers; + mq->handlers_cls = cls; + if (NULL != msg_handlers) + { + mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); + mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + mq_stream_data_processor, mq); + } + return mq; +} + /* end of stream_api.c */ -- cgit v1.2.3