aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r--src/stream/stream_api.c213
1 files changed, 213 insertions, 0 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 8994afc24..b4a47b53d 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -579,6 +579,37 @@ struct GNUNET_STREAM_ShutdownHandle
579 579
580 580
581/** 581/**
582 * Collection of the state necessary to read and write gnunet messages
583 * to a stream socket. Should be used as closure for stream_data_processor.
584 */
585struct MQStreamState
586{
587 /**
588 * Message stream tokenizer for the data received from the
589 * stream socket.
590 */
591 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
592
593 /**
594 * The stream socket to use for receiving and transmitting
595 * messages with the message queue.
596 */
597 struct GNUNET_STREAM_Socket *socket;
598
599 /**
600 * Current read handle, NULL if no read active.
601 */
602 struct GNUNET_STREAM_ReadHandle *rh;
603
604 /**
605 * Current write handle, NULL if no write active.
606 */
607 struct GNUNET_STREAM_WriteHandle *wh;
608};
609
610
611
612/**
582 * Default value in seconds for various timeouts 613 * Default value in seconds for various timeouts
583 */ 614 */
584static const unsigned int default_timeout = 10; 615static const unsigned int default_timeout = 10;
@@ -3731,4 +3762,186 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3731 cleanup_read_handle (socket); 3762 cleanup_read_handle (socket);
3732} 3763}
3733 3764
3765
3766/**
3767 * Functions of this signature are called whenever writing operations
3768 * on a stream are executed
3769 *
3770 * @param cls the closure from GNUNET_STREAM_write
3771 * @param status the status of the stream at the time this function is called;
3772 * GNUNET_STREAM_OK if writing to stream was completed successfully;
3773 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
3774 * (this doesn't mean that the data is never sent, the receiver may
3775 * have read the data but its ACKs may have been lost);
3776 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
3777 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
3778 * be processed.
3779 * @param size the number of bytes written
3780 */
3781static void
3782mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
3783{
3784 struct GNUNET_MQ_MessageQueue *mq = cls;
3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3786 struct GNUNET_MQ_Message *mqm;
3787
3788 GNUNET_assert (GNUNET_STREAM_OK == status);
3789
3790 /* call cb for message we finished sending */
3791 mqm = mq->current_msg;
3792 GNUNET_assert (NULL != mq->current_msg);
3793 if (NULL != mqm->sent_cb)
3794 mqm->sent_cb (mqm->sent_cls);
3795 GNUNET_free (mqm);
3796
3797 mss->wh = NULL;
3798
3799 mqm = mq->msg_head;
3800 mq->current_msg = mqm;
3801 if (NULL == mqm)
3802 return;
3803 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
3804 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3805 GNUNET_TIME_UNIT_FOREVER_REL,
3806 mq_stream_write_queued, mq);
3807 GNUNET_assert (NULL != mss->wh);
3808}
3809
3810
3811static void
3812mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
3813 struct GNUNET_MQ_Message *mqm)
3814{
3815 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3816
3817 if (NULL != mq->current_msg)
3818 {
3819 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
3820 return;
3821 }
3822 mq->current_msg = mqm;
3823 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3824 GNUNET_TIME_UNIT_FOREVER_REL,
3825 mq_stream_write_queued, mq);
3826}
3827
3828
3829/**
3830 * Functions with this signature are called whenever a
3831 * complete message is received by the tokenizer.
3832 *
3833 * Do not call GNUNET_SERVER_mst_destroy in callback
3834 *
3835 * @param cls closure
3836 * @param client identification of the client
3837 * @param message the actual message
3838 *
3839 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
3840 */
3841static int
3842mq_stream_mst_callback (void *cls, void *client,
3843 const struct GNUNET_MessageHeader *message)
3844{
3845 struct GNUNET_MQ_MessageQueue *mq = cls;
3846
3847 GNUNET_assert (NULL != message);
3848 GNUNET_MQ_dispatch (mq, message);
3849 return GNUNET_OK;
3850}
3851
3852
3853/**
3854 * Functions of this signature are called whenever data is available from the
3855 * stream.
3856 *
3857 * @param cls the closure from GNUNET_STREAM_read
3858 * @param status the status of the stream at the time this function is called
3859 * @param data traffic from the other side
3860 * @param size the number of bytes available in data read; will be 0 on timeout
3861 * @return number of bytes of processed from 'data' (any data remaining should be
3862 * given to the next time the read processor is called).
3863 */
3864static size_t
3865mq_stream_data_processor (void *cls,
3866 enum GNUNET_STREAM_Status status,
3867 const void *data,
3868 size_t size)
3869{
3870 struct GNUNET_MQ_MessageQueue *mq = cls;
3871 struct MQStreamState *mss;
3872 int ret;
3873
3874 mss = (struct MQStreamState *) mq->impl_state;
3875 GNUNET_assert (GNUNET_STREAM_OK == status);
3876 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3877 GNUNET_assert (GNUNET_OK == ret);
3878 /* we always read all data */
3879 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3880 mq_stream_data_processor, mq);
3881 return size;
3882}
3883
3884
3885static void
3886mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
3887{
3888 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3889
3890 if (NULL != mss->rh)
3891 {
3892 GNUNET_STREAM_read_cancel (mss->rh);
3893 mss->rh = NULL;
3894 }
3895
3896 if (NULL != mss->wh)
3897 {
3898 GNUNET_STREAM_write_cancel (mss->wh);
3899 mss->wh = NULL;
3900 }
3901
3902 if (NULL != mss->mst)
3903 {
3904 GNUNET_SERVER_mst_destroy (mss->mst);
3905 mss->mst = NULL;
3906 }
3907
3908 GNUNET_free (mss);
3909}
3910
3911
3912
3913/**
3914 * Create a message queue for a stream socket.
3915 *
3916 * @param socket the socket to read/write in the message queue
3917 * @param msg_handlers message handler array
3918 * @param error_handler callback for errors
3919 * @return the message queue for the socket
3920 */
3921struct GNUNET_MQ_MessageQueue *
3922GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3923 const struct GNUNET_MQ_Handler *msg_handlers,
3924 GNUNET_MQ_ErrorHandler error_handler,
3925 void *cls)
3926{
3927 struct GNUNET_MQ_MessageQueue *mq;
3928 struct MQStreamState *mss;
3929
3930 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
3931 mss = GNUNET_new (struct MQStreamState);
3932 mss->socket = socket;
3933 mq->impl_state = mss;
3934 mq->send_impl = mq_stream_send_impl;
3935 mq->destroy_impl = mq_stream_destroy_impl;
3936 mq->handlers = msg_handlers;
3937 mq->handlers_cls = cls;
3938 if (NULL != msg_handlers)
3939 {
3940 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
3941 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
3942 mq_stream_data_processor, mq);
3943 }
3944 return mq;
3945}
3946
3734/* end of stream_api.c */ 3947/* end of stream_api.c */