diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-19 10:48:54 +0000 |
commit | a900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch) | |
tree | 52e1a9697b0abf4618cd5684359ec5f0a040898a /src/stream | |
parent | 17353bc0a47c89bda205f23e7995377c9bfe7769 (diff) | |
download | gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip |
- opaque mq structs
- mq for mesh
- faster hashing for IBFs
- mesh replaces stream in set
- new set profiler (work in progress)
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/stream_api.c | 103 |
1 files changed, 32 insertions, 71 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 34f1ea0fa..47ed04117 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) | |||
3779 | * @param size the number of bytes written | 3779 | * @param size the number of bytes written |
3780 | */ | 3780 | */ |
3781 | static void | 3781 | static void |
3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, |
3783 | size_t size) | ||
3783 | { | 3784 | { |
3784 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3785 | struct GNUNET_MQ_Handle *mq = cls; |
3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3786 | struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); |
3786 | struct GNUNET_MQ_Message *mqm; | ||
3787 | 3787 | ||
3788 | switch (status) | 3788 | switch (status) |
3789 | { | 3789 | { |
@@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size | |||
3793 | /* FIXME: call shutdown handler */ | 3793 | /* FIXME: call shutdown handler */ |
3794 | return; | 3794 | return; |
3795 | case GNUNET_STREAM_TIMEOUT: | 3795 | case GNUNET_STREAM_TIMEOUT: |
3796 | if (NULL == mq->error_handler) | 3796 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); |
3797 | LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); | ||
3798 | else | ||
3799 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3800 | return; | 3797 | return; |
3801 | case GNUNET_STREAM_SYSERR: | 3798 | case GNUNET_STREAM_SYSERR: |
3802 | if (NULL == mq->error_handler) | 3799 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); |
3803 | LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); | ||
3804 | else | ||
3805 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); | ||
3806 | return; | 3800 | return; |
3807 | default: | 3801 | default: |
3808 | GNUNET_assert (0); | 3802 | GNUNET_assert (0); |
3809 | return; | 3803 | return; |
3810 | } | 3804 | } |
3811 | |||
3812 | /* call cb for message we finished sending */ | ||
3813 | mqm = mq->current_msg; | ||
3814 | GNUNET_assert (NULL != mq->current_msg); | ||
3815 | if (NULL != mqm->sent_cb) | ||
3816 | mqm->sent_cb (mqm->sent_cls); | ||
3817 | GNUNET_free (mqm); | ||
3818 | 3805 | ||
3819 | mss->wh = NULL; | 3806 | mss->wh = NULL; |
3820 | 3807 | ||
3821 | mqm = mq->msg_head; | 3808 | GNUNET_MQ_impl_send_continue (mq); |
3822 | mq->current_msg = mqm; | ||
3823 | if (NULL == mqm) | ||
3824 | return; | ||
3825 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
3826 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3827 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3828 | mq_stream_write_queued, mq); | ||
3829 | GNUNET_assert (NULL != mss->wh); | ||
3830 | } | 3809 | } |
3831 | 3810 | ||
3832 | 3811 | ||
3833 | static void | 3812 | static void |
3834 | mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | 3813 | mq_stream_send_impl (struct GNUNET_MQ_Handle *mq, |
3835 | struct GNUNET_MQ_Message *mqm) | 3814 | const struct GNUNET_MessageHeader *msg, void *impl_state) |
3836 | { | 3815 | { |
3837 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3816 | struct MQStreamState *mss = impl_state; |
3838 | 3817 | ||
3839 | if (NULL != mq->current_msg) | 3818 | /* no way to cancel sending now */ |
3840 | { | 3819 | GNUNET_MQ_impl_send_commit (mq); |
3841 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | 3820 | |
3842 | return; | 3821 | mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size), |
3843 | } | ||
3844 | mq->current_msg = mqm; | ||
3845 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3846 | GNUNET_TIME_UNIT_FOREVER_REL, | 3822 | GNUNET_TIME_UNIT_FOREVER_REL, |
3847 | mq_stream_write_queued, mq); | 3823 | mq_stream_write_queued, mq); |
3848 | } | 3824 | } |
@@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | |||
3862 | */ | 3838 | */ |
3863 | static int | 3839 | static int |
3864 | mq_stream_mst_callback (void *cls, void *client, | 3840 | mq_stream_mst_callback (void *cls, void *client, |
3865 | const struct GNUNET_MessageHeader *message) | 3841 | const struct GNUNET_MessageHeader *message) |
3866 | { | 3842 | { |
3867 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3843 | struct GNUNET_MQ_Handle *mq = cls; |
3868 | 3844 | ||
3869 | GNUNET_assert (NULL != message); | 3845 | GNUNET_assert (NULL != message); |
3870 | GNUNET_MQ_dispatch (mq, message); | 3846 | GNUNET_MQ_inject_message (mq, message); |
3871 | return GNUNET_OK; | 3847 | return GNUNET_OK; |
3872 | } | 3848 | } |
3873 | 3849 | ||
@@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls, | |||
3889 | const void *data, | 3865 | const void *data, |
3890 | size_t size) | 3866 | size_t size) |
3891 | { | 3867 | { |
3892 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3868 | struct GNUNET_MQ_Handle *mq = cls; |
3893 | struct MQStreamState *mss; | 3869 | struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); |
3894 | int ret; | 3870 | int ret; |
3895 | 3871 | ||
3896 | switch (status) | 3872 | switch (status) |
@@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls, | |||
3901 | /* FIXME: call shutdown handler */ | 3877 | /* FIXME: call shutdown handler */ |
3902 | return 0; | 3878 | return 0; |
3903 | case GNUNET_STREAM_TIMEOUT: | 3879 | case GNUNET_STREAM_TIMEOUT: |
3904 | if (NULL == mq->error_handler) | 3880 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); |
3905 | LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); | ||
3906 | else | ||
3907 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3908 | return 0; | 3881 | return 0; |
3909 | case GNUNET_STREAM_SYSERR: | 3882 | case GNUNET_STREAM_SYSERR: |
3910 | if (NULL == mq->error_handler) | 3883 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
3911 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); | ||
3912 | else | ||
3913 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3914 | return 0; | 3884 | return 0; |
3915 | default: | 3885 | default: |
3916 | GNUNET_assert (0); | 3886 | GNUNET_assert (0); |
3917 | return 0; | 3887 | return 0; |
3918 | } | 3888 | } |
3919 | 3889 | ||
3920 | mss = (struct MQStreamState *) mq->impl_state; | ||
3921 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3922 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | 3890 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); |
3923 | if (GNUNET_OK != ret) | 3891 | if (GNUNET_OK != ret) |
3924 | { | 3892 | { |
3925 | if (NULL == mq->error_handler) | 3893 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
3926 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
3927 | "read error (message stream malformed), but no error handler installed for message queue\n"); | ||
3928 | else | ||
3929 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3930 | return 0; | 3894 | return 0; |
3931 | } | 3895 | } |
3932 | /* we always read all data */ | ||
3933 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 3896 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
3934 | mq_stream_data_processor, mq); | 3897 | mq_stream_data_processor, mq); |
3898 | /* we always read all data */ | ||
3935 | return size; | 3899 | return size; |
3936 | } | 3900 | } |
3937 | 3901 | ||
3938 | 3902 | ||
3939 | static void | 3903 | static void |
3940 | mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 3904 | mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) |
3941 | { | 3905 | { |
3942 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3906 | struct MQStreamState *mss = impl_state; |
3943 | 3907 | ||
3944 | if (NULL != mss->rh) | 3908 | if (NULL != mss->rh) |
3945 | { | 3909 | { |
@@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | |||
3972 | * @param error_handler callback for errors | 3936 | * @param error_handler callback for errors |
3973 | * @return the message queue for the socket | 3937 | * @return the message queue for the socket |
3974 | */ | 3938 | */ |
3975 | struct GNUNET_MQ_MessageQueue * | 3939 | struct GNUNET_MQ_Handle * |
3976 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | 3940 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, |
3977 | const struct GNUNET_MQ_Handler *msg_handlers, | 3941 | const struct GNUNET_MQ_MessageHandler *msg_handlers, |
3978 | GNUNET_MQ_ErrorHandler error_handler, | 3942 | GNUNET_MQ_ErrorHandler error_handler, |
3979 | void *cls) | 3943 | void *cls) |
3980 | { | 3944 | { |
3981 | struct GNUNET_MQ_MessageQueue *mq; | 3945 | struct GNUNET_MQ_Handle *mq; |
3982 | struct MQStreamState *mss; | 3946 | struct MQStreamState *mss; |
3983 | 3947 | ||
3984 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
3985 | mss = GNUNET_new (struct MQStreamState); | 3948 | mss = GNUNET_new (struct MQStreamState); |
3986 | mss->socket = socket; | 3949 | mss->socket = socket; |
3987 | mq->impl_state = mss; | 3950 | mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl, |
3988 | mq->send_impl = mq_stream_send_impl; | 3951 | mq_stream_destroy_impl, |
3989 | mq->destroy_impl = mq_stream_destroy_impl; | 3952 | NULL, |
3990 | mq->handlers = msg_handlers; | 3953 | mss, msg_handlers, error_handler, cls); |
3991 | mq->handlers_cls = cls; | ||
3992 | mq->error_handler = error_handler; | ||
3993 | if (NULL != msg_handlers) | 3954 | if (NULL != msg_handlers) |
3994 | { | 3955 | { |
3995 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); | 3956 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); |