aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
commit68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch)
tree3442e4f25de90eab67c4f9813cb6e433c50b7482 /src/stream
parentfae7f583f2e11cac15fefcbefef64287ab6915d3 (diff)
downloadgnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz
gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip
- conclude for SET
- consensus with SET
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c69
1 files changed, 62 insertions, 7 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index b4a47b53d..34f1ea0fa 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -3785,7 +3785,29 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size
3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
3786 struct GNUNET_MQ_Message *mqm; 3786 struct GNUNET_MQ_Message *mqm;
3787 3787
3788 GNUNET_assert (GNUNET_STREAM_OK == status); 3788 switch (status)
3789 {
3790 case GNUNET_STREAM_OK:
3791 break;
3792 case GNUNET_STREAM_SHUTDOWN:
3793 /* FIXME: call shutdown handler */
3794 return;
3795 case GNUNET_STREAM_TIMEOUT:
3796 if (NULL == mq->error_handler)
3797 LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
3798 else
3799 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3800 return;
3801 case GNUNET_STREAM_SYSERR:
3802 if (NULL == mq->error_handler)
3803 LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
3804 else
3805 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
3806 return;
3807 default:
3808 GNUNET_assert (0);
3809 return;
3810 }
3789 3811
3790 /* call cb for message we finished sending */ 3812 /* call cb for message we finished sending */
3791 mqm = mq->current_msg; 3813 mqm = mq->current_msg;
@@ -3863,21 +3885,53 @@ mq_stream_mst_callback (void *cls, void *client,
3863 */ 3885 */
3864static size_t 3886static size_t
3865mq_stream_data_processor (void *cls, 3887mq_stream_data_processor (void *cls,
3866 enum GNUNET_STREAM_Status status, 3888 enum GNUNET_STREAM_Status status,
3867 const void *data, 3889 const void *data,
3868 size_t size) 3890 size_t size)
3869{ 3891{
3870 struct GNUNET_MQ_MessageQueue *mq = cls; 3892 struct GNUNET_MQ_MessageQueue *mq = cls;
3871 struct MQStreamState *mss; 3893 struct MQStreamState *mss;
3872 int ret; 3894 int ret;
3895
3896 switch (status)
3897 {
3898 case GNUNET_STREAM_OK:
3899 break;
3900 case GNUNET_STREAM_SHUTDOWN:
3901 /* FIXME: call shutdown handler */
3902 return 0;
3903 case GNUNET_STREAM_TIMEOUT:
3904 if (NULL == mq->error_handler)
3905 LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
3906 else
3907 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3908 return 0;
3909 case GNUNET_STREAM_SYSERR:
3910 if (NULL == mq->error_handler)
3911 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
3912 else
3913 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3914 return 0;
3915 default:
3916 GNUNET_assert (0);
3917 return 0;
3918 }
3873 3919
3874 mss = (struct MQStreamState *) mq->impl_state; 3920 mss = (struct MQStreamState *) mq->impl_state;
3875 GNUNET_assert (GNUNET_STREAM_OK == status); 3921 GNUNET_assert (GNUNET_STREAM_OK == status);
3876 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 3922 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3877 GNUNET_assert (GNUNET_OK == ret); 3923 if (GNUNET_OK != ret)
3924 {
3925 if (NULL == mq->error_handler)
3926 LOG (GNUNET_ERROR_TYPE_WARNING,
3927 "read error (message stream malformed), but no error handler installed for message queue\n");
3928 else
3929 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3930 return 0;
3931 }
3878 /* we always read all data */ 3932 /* we always read all data */
3879 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 3933 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3880 mq_stream_data_processor, mq); 3934 mq_stream_data_processor, mq);
3881 return size; 3935 return size;
3882} 3936}
3883 3937
@@ -3935,6 +3989,7 @@ GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3935 mq->destroy_impl = mq_stream_destroy_impl; 3989 mq->destroy_impl = mq_stream_destroy_impl;
3936 mq->handlers = msg_handlers; 3990 mq->handlers = msg_handlers;
3937 mq->handlers_cls = cls; 3991 mq->handlers_cls = cls;
3992 mq->error_handler = error_handler;
3938 if (NULL != msg_handlers) 3993 if (NULL != msg_handlers)
3939 { 3994 {
3940 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); 3995 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);