aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
commita900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch)
tree52e1a9697b0abf4618cd5684359ec5f0a040898a /src/stream
parent17353bc0a47c89bda205f23e7995377c9bfe7769 (diff)
downloadgnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz
gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip
- opaque mq structs
- mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress)
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c103
1 files changed, 32 insertions, 71 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 34f1ea0fa..47ed04117 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3779 * @param size the number of bytes written 3779 * @param size the number of bytes written
3780 */ 3780 */
3781static void 3781static void
3782mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) 3782mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status,
3783 size_t size)
3783{ 3784{
3784 struct GNUNET_MQ_MessageQueue *mq = cls; 3785 struct GNUNET_MQ_Handle *mq = cls;
3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3786 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3786 struct GNUNET_MQ_Message *mqm;
3787 3787
3788 switch (status) 3788 switch (status)
3789 { 3789 {
@@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size
3793 /* FIXME: call shutdown handler */ 3793 /* FIXME: call shutdown handler */
3794 return; 3794 return;
3795 case GNUNET_STREAM_TIMEOUT: 3795 case GNUNET_STREAM_TIMEOUT:
3796 if (NULL == mq->error_handler) 3796 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3797 LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
3798 else
3799 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3800 return; 3797 return;
3801 case GNUNET_STREAM_SYSERR: 3798 case GNUNET_STREAM_SYSERR:
3802 if (NULL == mq->error_handler) 3799 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
3803 LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
3804 else
3805 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
3806 return; 3800 return;
3807 default: 3801 default:
3808 GNUNET_assert (0); 3802 GNUNET_assert (0);
3809 return; 3803 return;
3810 } 3804 }
3811
3812 /* call cb for message we finished sending */
3813 mqm = mq->current_msg;
3814 GNUNET_assert (NULL != mq->current_msg);
3815 if (NULL != mqm->sent_cb)
3816 mqm->sent_cb (mqm->sent_cls);
3817 GNUNET_free (mqm);
3818 3805
3819 mss->wh = NULL; 3806 mss->wh = NULL;
3820 3807
3821 mqm = mq->msg_head; 3808 GNUNET_MQ_impl_send_continue (mq);
3822 mq->current_msg = mqm;
3823 if (NULL == mqm)
3824 return;
3825 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
3826 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3827 GNUNET_TIME_UNIT_FOREVER_REL,
3828 mq_stream_write_queued, mq);
3829 GNUNET_assert (NULL != mss->wh);
3830} 3809}
3831 3810
3832 3811
3833static void 3812static void
3834mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, 3813mq_stream_send_impl (struct GNUNET_MQ_Handle *mq,
3835 struct GNUNET_MQ_Message *mqm) 3814 const struct GNUNET_MessageHeader *msg, void *impl_state)
3836{ 3815{
3837 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3816 struct MQStreamState *mss = impl_state;
3838 3817
3839 if (NULL != mq->current_msg) 3818 /* no way to cancel sending now */
3840 { 3819 GNUNET_MQ_impl_send_commit (mq);
3841 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 3820
3842 return; 3821 mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size),
3843 }
3844 mq->current_msg = mqm;
3845 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3846 GNUNET_TIME_UNIT_FOREVER_REL, 3822 GNUNET_TIME_UNIT_FOREVER_REL,
3847 mq_stream_write_queued, mq); 3823 mq_stream_write_queued, mq);
3848} 3824}
@@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
3862 */ 3838 */
3863static int 3839static int
3864mq_stream_mst_callback (void *cls, void *client, 3840mq_stream_mst_callback (void *cls, void *client,
3865 const struct GNUNET_MessageHeader *message) 3841 const struct GNUNET_MessageHeader *message)
3866{ 3842{
3867 struct GNUNET_MQ_MessageQueue *mq = cls; 3843 struct GNUNET_MQ_Handle *mq = cls;
3868 3844
3869 GNUNET_assert (NULL != message); 3845 GNUNET_assert (NULL != message);
3870 GNUNET_MQ_dispatch (mq, message); 3846 GNUNET_MQ_inject_message (mq, message);
3871 return GNUNET_OK; 3847 return GNUNET_OK;
3872} 3848}
3873 3849
@@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls,
3889 const void *data, 3865 const void *data,
3890 size_t size) 3866 size_t size)
3891{ 3867{
3892 struct GNUNET_MQ_MessageQueue *mq = cls; 3868 struct GNUNET_MQ_Handle *mq = cls;
3893 struct MQStreamState *mss; 3869 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3894 int ret; 3870 int ret;
3895 3871
3896 switch (status) 3872 switch (status)
@@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls,
3901 /* FIXME: call shutdown handler */ 3877 /* FIXME: call shutdown handler */
3902 return 0; 3878 return 0;
3903 case GNUNET_STREAM_TIMEOUT: 3879 case GNUNET_STREAM_TIMEOUT:
3904 if (NULL == mq->error_handler) 3880 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3905 LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
3906 else
3907 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3908 return 0; 3881 return 0;
3909 case GNUNET_STREAM_SYSERR: 3882 case GNUNET_STREAM_SYSERR:
3910 if (NULL == mq->error_handler) 3883 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3911 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
3912 else
3913 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3914 return 0; 3884 return 0;
3915 default: 3885 default:
3916 GNUNET_assert (0); 3886 GNUNET_assert (0);
3917 return 0; 3887 return 0;
3918 } 3888 }
3919 3889
3920 mss = (struct MQStreamState *) mq->impl_state;
3921 GNUNET_assert (GNUNET_STREAM_OK == status);
3922 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 3890 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3923 if (GNUNET_OK != ret) 3891 if (GNUNET_OK != ret)
3924 { 3892 {
3925 if (NULL == mq->error_handler) 3893 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3926 LOG (GNUNET_ERROR_TYPE_WARNING,
3927 "read error (message stream malformed), but no error handler installed for message queue\n");
3928 else
3929 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3930 return 0; 3894 return 0;
3931 } 3895 }
3932 /* we always read all data */
3933 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 3896 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3934 mq_stream_data_processor, mq); 3897 mq_stream_data_processor, mq);
3898 /* we always read all data */
3935 return size; 3899 return size;
3936} 3900}
3937 3901
3938 3902
3939static void 3903static void
3940mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 3904mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
3941{ 3905{
3942 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3906 struct MQStreamState *mss = impl_state;
3943 3907
3944 if (NULL != mss->rh) 3908 if (NULL != mss->rh)
3945 { 3909 {
@@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
3972 * @param error_handler callback for errors 3936 * @param error_handler callback for errors
3973 * @return the message queue for the socket 3937 * @return the message queue for the socket
3974 */ 3938 */
3975struct GNUNET_MQ_MessageQueue * 3939struct GNUNET_MQ_Handle *
3976GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 3940GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3977 const struct GNUNET_MQ_Handler *msg_handlers, 3941 const struct GNUNET_MQ_MessageHandler *msg_handlers,
3978 GNUNET_MQ_ErrorHandler error_handler, 3942 GNUNET_MQ_ErrorHandler error_handler,
3979 void *cls) 3943 void *cls)
3980{ 3944{
3981 struct GNUNET_MQ_MessageQueue *mq; 3945 struct GNUNET_MQ_Handle *mq;
3982 struct MQStreamState *mss; 3946 struct MQStreamState *mss;
3983 3947
3984 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
3985 mss = GNUNET_new (struct MQStreamState); 3948 mss = GNUNET_new (struct MQStreamState);
3986 mss->socket = socket; 3949 mss->socket = socket;
3987 mq->impl_state = mss; 3950 mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl,
3988 mq->send_impl = mq_stream_send_impl; 3951 mq_stream_destroy_impl,
3989 mq->destroy_impl = mq_stream_destroy_impl; 3952 NULL,
3990 mq->handlers = msg_handlers; 3953 mss, msg_handlers, error_handler, cls);
3991 mq->handlers_cls = cls;
3992 mq->error_handler = error_handler;
3993 if (NULL != msg_handlers) 3954 if (NULL != msg_handlers)
3994 { 3955 {
3995 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); 3956 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);