diff options
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r-- | src/stream/stream_api.c | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 8994afc24..b4a47b53d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -579,6 +579,37 @@ struct GNUNET_STREAM_ShutdownHandle | |||
579 | 579 | ||
580 | 580 | ||
581 | /** | 581 | /** |
582 | * Collection of the state necessary to read and write gnunet messages | ||
583 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
584 | */ | ||
585 | struct MQStreamState | ||
586 | { | ||
587 | /** | ||
588 | * Message stream tokenizer for the data received from the | ||
589 | * stream socket. | ||
590 | */ | ||
591 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
592 | |||
593 | /** | ||
594 | * The stream socket to use for receiving and transmitting | ||
595 | * messages with the message queue. | ||
596 | */ | ||
597 | struct GNUNET_STREAM_Socket *socket; | ||
598 | |||
599 | /** | ||
600 | * Current read handle, NULL if no read active. | ||
601 | */ | ||
602 | struct GNUNET_STREAM_ReadHandle *rh; | ||
603 | |||
604 | /** | ||
605 | * Current write handle, NULL if no write active. | ||
606 | */ | ||
607 | struct GNUNET_STREAM_WriteHandle *wh; | ||
608 | }; | ||
609 | |||
610 | |||
611 | |||
612 | /** | ||
582 | * Default value in seconds for various timeouts | 613 | * Default value in seconds for various timeouts |
583 | */ | 614 | */ |
584 | static const unsigned int default_timeout = 10; | 615 | static const unsigned int default_timeout = 10; |
@@ -3731,4 +3762,186 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) | |||
3731 | cleanup_read_handle (socket); | 3762 | cleanup_read_handle (socket); |
3732 | } | 3763 | } |
3733 | 3764 | ||
3765 | |||
3766 | /** | ||
3767 | * Functions of this signature are called whenever writing operations | ||
3768 | * on a stream are executed | ||
3769 | * | ||
3770 | * @param cls the closure from GNUNET_STREAM_write | ||
3771 | * @param status the status of the stream at the time this function is called; | ||
3772 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
3773 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
3774 | * (this doesn't mean that the data is never sent, the receiver may | ||
3775 | * have read the data but its ACKs may have been lost); | ||
3776 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
3777 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
3778 | * be processed. | ||
3779 | * @param size the number of bytes written | ||
3780 | */ | ||
3781 | static void | ||
3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
3783 | { | ||
3784 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3786 | struct GNUNET_MQ_Message *mqm; | ||
3787 | |||
3788 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3789 | |||
3790 | /* call cb for message we finished sending */ | ||
3791 | mqm = mq->current_msg; | ||
3792 | GNUNET_assert (NULL != mq->current_msg); | ||
3793 | if (NULL != mqm->sent_cb) | ||
3794 | mqm->sent_cb (mqm->sent_cls); | ||
3795 | GNUNET_free (mqm); | ||
3796 | |||
3797 | mss->wh = NULL; | ||
3798 | |||
3799 | mqm = mq->msg_head; | ||
3800 | mq->current_msg = mqm; | ||
3801 | if (NULL == mqm) | ||
3802 | return; | ||
3803 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
3804 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3805 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3806 | mq_stream_write_queued, mq); | ||
3807 | GNUNET_assert (NULL != mss->wh); | ||
3808 | } | ||
3809 | |||
3810 | |||
3811 | static void | ||
3812 | mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | ||
3813 | struct GNUNET_MQ_Message *mqm) | ||
3814 | { | ||
3815 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3816 | |||
3817 | if (NULL != mq->current_msg) | ||
3818 | { | ||
3819 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
3820 | return; | ||
3821 | } | ||
3822 | mq->current_msg = mqm; | ||
3823 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3824 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3825 | mq_stream_write_queued, mq); | ||
3826 | } | ||
3827 | |||
3828 | |||
3829 | /** | ||
3830 | * Functions with this signature are called whenever a | ||
3831 | * complete message is received by the tokenizer. | ||
3832 | * | ||
3833 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
3834 | * | ||
3835 | * @param cls closure | ||
3836 | * @param client identification of the client | ||
3837 | * @param message the actual message | ||
3838 | * | ||
3839 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
3840 | */ | ||
3841 | static int | ||
3842 | mq_stream_mst_callback (void *cls, void *client, | ||
3843 | const struct GNUNET_MessageHeader *message) | ||
3844 | { | ||
3845 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3846 | |||
3847 | GNUNET_assert (NULL != message); | ||
3848 | GNUNET_MQ_dispatch (mq, message); | ||
3849 | return GNUNET_OK; | ||
3850 | } | ||
3851 | |||
3852 | |||
3853 | /** | ||
3854 | * Functions of this signature are called whenever data is available from the | ||
3855 | * stream. | ||
3856 | * | ||
3857 | * @param cls the closure from GNUNET_STREAM_read | ||
3858 | * @param status the status of the stream at the time this function is called | ||
3859 | * @param data traffic from the other side | ||
3860 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
3861 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
3862 | * given to the next time the read processor is called). | ||
3863 | */ | ||
3864 | static size_t | ||
3865 | mq_stream_data_processor (void *cls, | ||
3866 | enum GNUNET_STREAM_Status status, | ||
3867 | const void *data, | ||
3868 | size_t size) | ||
3869 | { | ||
3870 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
3871 | struct MQStreamState *mss; | ||
3872 | int ret; | ||
3873 | |||
3874 | mss = (struct MQStreamState *) mq->impl_state; | ||
3875 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3876 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | ||
3877 | GNUNET_assert (GNUNET_OK == ret); | ||
3878 | /* we always read all data */ | ||
3879 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
3880 | mq_stream_data_processor, mq); | ||
3881 | return size; | ||
3882 | } | ||
3883 | |||
3884 | |||
3885 | static void | ||
3886 | mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
3887 | { | ||
3888 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | ||
3889 | |||
3890 | if (NULL != mss->rh) | ||
3891 | { | ||
3892 | GNUNET_STREAM_read_cancel (mss->rh); | ||
3893 | mss->rh = NULL; | ||
3894 | } | ||
3895 | |||
3896 | if (NULL != mss->wh) | ||
3897 | { | ||
3898 | GNUNET_STREAM_write_cancel (mss->wh); | ||
3899 | mss->wh = NULL; | ||
3900 | } | ||
3901 | |||
3902 | if (NULL != mss->mst) | ||
3903 | { | ||
3904 | GNUNET_SERVER_mst_destroy (mss->mst); | ||
3905 | mss->mst = NULL; | ||
3906 | } | ||
3907 | |||
3908 | GNUNET_free (mss); | ||
3909 | } | ||
3910 | |||
3911 | |||
3912 | |||
3913 | /** | ||
3914 | * Create a message queue for a stream socket. | ||
3915 | * | ||
3916 | * @param socket the socket to read/write in the message queue | ||
3917 | * @param msg_handlers message handler array | ||
3918 | * @param error_handler callback for errors | ||
3919 | * @return the message queue for the socket | ||
3920 | */ | ||
3921 | struct GNUNET_MQ_MessageQueue * | ||
3922 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | ||
3923 | const struct GNUNET_MQ_Handler *msg_handlers, | ||
3924 | GNUNET_MQ_ErrorHandler error_handler, | ||
3925 | void *cls) | ||
3926 | { | ||
3927 | struct GNUNET_MQ_MessageQueue *mq; | ||
3928 | struct MQStreamState *mss; | ||
3929 | |||
3930 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
3931 | mss = GNUNET_new (struct MQStreamState); | ||
3932 | mss->socket = socket; | ||
3933 | mq->impl_state = mss; | ||
3934 | mq->send_impl = mq_stream_send_impl; | ||
3935 | mq->destroy_impl = mq_stream_destroy_impl; | ||
3936 | mq->handlers = msg_handlers; | ||
3937 | mq->handlers_cls = cls; | ||
3938 | if (NULL != msg_handlers) | ||
3939 | { | ||
3940 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); | ||
3941 | mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
3942 | mq_stream_data_processor, mq); | ||
3943 | } | ||
3944 | return mq; | ||
3945 | } | ||
3946 | |||
3734 | /* end of stream_api.c */ | 3947 | /* end of stream_api.c */ |