aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-11-24 22:17:15 +0000
committerChristian Grothoff <christian@grothoff.org>2012-11-24 22:17:15 +0000
commitc015c83440507c1503c07d39740f13075efd926c (patch)
tree6131a6dcdd64ca175023a3929bf74444c9565b7d /src
parentad16d03db236ff504ec7ed2cbd32c5a6653bc315 (diff)
downloadgnunet-c015c83440507c1503c07d39740f13075efd926c.tar.gz
gnunet-c015c83440507c1503c07d39740f13075efd926c.zip
-ensure that either stream_api calls callbacks last or that we don't destroy a stream handle while it is in use below us on the stack
Diffstat (limited to 'src')
-rw-r--r--src/fs/gnunet-service-fs_stream.c128
-rw-r--r--src/include/gnunet_stream_lib.h2
-rw-r--r--src/stream/stream_api.c26
3 files changed, 121 insertions, 35 deletions
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c
index 856a21a1a..b444e282c 100644
--- a/src/fs/gnunet-service-fs_stream.c
+++ b/src/fs/gnunet-service-fs_stream.c
@@ -25,7 +25,6 @@
25 * 25 *
26 * TODO: 26 * TODO:
27 * - limit # concurrent clients, have timeouts for server-side 27 * - limit # concurrent clients, have timeouts for server-side
28 * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!)
29 */ 28 */
30#include "platform.h" 29#include "platform.h"
31#include "gnunet_constants.h" 30#include "gnunet_constants.h"
@@ -78,6 +77,11 @@ struct StreamClient
78 struct GNUNET_DATASTORE_QueueEntry *qe; 77 struct GNUNET_DATASTORE_QueueEntry *qe;
79 78
80 /** 79 /**
80 * Task that is scheduled to asynchronously terminate the connection.
81 */
82 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
83
84 /**
81 * Size of the last write that was initiated. 85 * Size of the last write that was initiated.
82 */ 86 */
83 size_t reply_size; 87 size_t reply_size;
@@ -248,6 +252,13 @@ struct StreamHandle
248 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 252 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
249 253
250 /** 254 /**
255 * Task to reset streams that had errors (asynchronously,
256 * as we may not be able to do it immediately during a
257 * callback from the stream API).
258 */
259 GNUNET_SCHEDULER_TaskIdentifier reset_task;
260
261 /**
251 * Is this stream ready for transmission? 262 * Is this stream ready for transmission?
252 */ 263 */
253 int is_ready; 264 int is_ready;
@@ -378,6 +389,55 @@ reset_stream (struct StreamHandle *sh)
378 389
379 390
380/** 391/**
392 * Task called when it is time to destroy an inactive stream.
393 *
394 * @param cls the 'struct StreamHandle' to tear down
395 * @param tc scheduler context, unused
396 */
397static void
398stream_timeout (void *cls,
399 const struct GNUNET_SCHEDULER_TaskContext *tc)
400{
401 struct StreamHandle *sh = cls;
402
403 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
404 destroy_stream_handle (sh);
405}
406
407
408/**
409 * Task called when it is time to reset an stream.
410 *
411 * @param cls the 'struct StreamHandle' to tear down
412 * @param tc scheduler context, unused
413 */
414static void
415reset_stream_task (void *cls,
416 const struct GNUNET_SCHEDULER_TaskContext *tc)
417{
418 struct StreamHandle *sh = cls;
419
420 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
421 reset_stream (sh);
422}
423
424
425/**
426 * We had a serious error, tear down and re-create stream from scratch,
427 * but do so asynchronously.
428 *
429 * @param sh stream to reset
430 */
431static void
432reset_stream_async (struct StreamHandle *sh)
433{
434 if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
435 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
436 sh);
437}
438
439
440/**
381 * We got a reply from the stream. Process it. 441 * We got a reply from the stream. Process it.
382 * 442 *
383 * @param cls the struct StreamHandle 443 * @param cls the struct StreamHandle
@@ -403,7 +463,7 @@ handle_stream_reply (void *cls,
403 GNUNET_NO, GNUNET_NO)) 463 GNUNET_NO, GNUNET_NO))
404 { 464 {
405 GNUNET_break_op (0); 465 GNUNET_break_op (0);
406 reset_stream (sh); 466 reset_stream_async (sh);
407 return size; 467 return size;
408 } 468 }
409 sh->rh = GNUNET_STREAM_read (sh->stream, 469 sh->rh = GNUNET_STREAM_read (sh->stream,
@@ -513,6 +573,7 @@ reply_cb (void *cls,
513 if (sizeof (struct StreamReplyMessage) > msize) 573 if (sizeof (struct StreamReplyMessage) > msize)
514 { 574 {
515 GNUNET_break_op (0); 575 GNUNET_break_op (0);
576 reset_stream_async (sh);
516 return GNUNET_SYSERR; 577 return GNUNET_SYSERR;
517 } 578 }
518 srm = (const struct StreamReplyMessage *) message; 579 srm = (const struct StreamReplyMessage *) message;
@@ -523,7 +584,8 @@ reply_cb (void *cls,
523 type, 584 type,
524 &srm[1], msize, &query)) 585 &srm[1], msize, &query))
525 { 586 {
526 GNUNET_break_op (0); 587 GNUNET_break_op (0);
588 reset_stream_async (sh);
527 return GNUNET_SYSERR; 589 return GNUNET_SYSERR;
528 } 590 }
529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 591 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -553,6 +615,7 @@ reply_cb (void *cls,
553 return GNUNET_OK; 615 return GNUNET_OK;
554 default: 616 default:
555 GNUNET_break_op (0); 617 GNUNET_break_op (0);
618 reset_stream_async (sh);
556 return GNUNET_SYSERR; 619 return GNUNET_SYSERR;
557 } 620 }
558} 621}
@@ -633,23 +696,6 @@ GSF_stream_query (const struct GNUNET_PeerIdentity *target,
633 696
634 697
635/** 698/**
636 * Task called when it is time to destroy an inactive stream.
637 *
638 * @param cls the 'struct StreamHandle' to tear down
639 * @param tc scheduler context, unused
640 */
641static void
642stream_timeout (void *cls,
643 const struct GNUNET_SCHEDULER_TaskContext *tc)
644{
645 struct StreamHandle *sh = cls;
646
647 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
648 destroy_stream_handle (sh);
649}
650
651
652/**
653 * Cancel an active request; must not be called after 'proc' 699 * Cancel an active request; must not be called after 'proc'
654 * was calld. 700 * was calld.
655 * 701 *
@@ -691,7 +737,9 @@ terminate_stream (struct StreamClient *sc)
691 GNUNET_STATISTICS_update (GSF_stats, 737 GNUNET_STATISTICS_update (GSF_stats,
692 gettext_noop ("# stream connections active"), -1, 738 gettext_noop ("# stream connections active"), -1,
693 GNUNET_NO); 739 GNUNET_NO);
694 if (NULL != sc->rh) 740 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
741 GNUNET_SCHEDULER_cancel (sc->terminate_task);
742 if (NULL != sc->rh)
695 GNUNET_STREAM_io_read_cancel (sc->rh); 743 GNUNET_STREAM_io_read_cancel (sc->rh);
696 if (NULL != sc->wh) 744 if (NULL != sc->wh)
697 GNUNET_STREAM_io_write_cancel (sc->wh); 745 GNUNET_STREAM_io_write_cancel (sc->wh);
@@ -707,6 +755,38 @@ terminate_stream (struct StreamClient *sc)
707 755
708 756
709/** 757/**
758 * Task run to asynchronously terminate the stream.
759 *
760 * @param cls the 'struct StreamClient'
761 * @param tc scheduler context
762 */
763static void
764terminate_stream_task (void *cls,
765 const struct GNUNET_SCHEDULER_TaskContext *tc)
766{
767 struct StreamClient *sc = cls;
768
769 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
770 terminate_stream (sc);
771}
772
773
774/**
775 * We had a serious error, termiante stream,
776 * but do so asynchronously.
777 *
778 * @param sc stream to reset
779 */
780static void
781terminate_stream_async (struct StreamClient *sc)
782{
783 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
784 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
785 sc);
786}
787
788
789/**
710 * Functions of this signature are called whenever data is available from the 790 * Functions of this signature are called whenever data is available from the
711 * stream. 791 * stream.
712 * 792 *
@@ -782,7 +862,7 @@ process_request (void *cls,
782 if (GNUNET_SYSERR == ret) 862 if (GNUNET_SYSERR == ret)
783 { 863 {
784 GNUNET_break_op (0); 864 GNUNET_break_op (0);
785 terminate_stream (sc); 865 terminate_stream_async (sc);
786 return size; 866 return size;
787 } 867 }
788 break; 868 break;
@@ -790,7 +870,7 @@ process_request (void *cls,
790 case GNUNET_STREAM_SHUTDOWN: 870 case GNUNET_STREAM_SHUTDOWN:
791 case GNUNET_STREAM_SYSERR: 871 case GNUNET_STREAM_SYSERR:
792 case GNUNET_STREAM_BROKEN: 872 case GNUNET_STREAM_BROKEN:
793 terminate_stream (sc); 873 terminate_stream_async (sc);
794 return size; 874 return size;
795 default: 875 default:
796 GNUNET_break (0); 876 GNUNET_break (0);
@@ -922,6 +1002,7 @@ request_cb (void *cls,
922 ntohs (message->size)) 1002 ntohs (message->size))
923 { 1003 {
924 GNUNET_break_op (0); 1004 GNUNET_break_op (0);
1005 terminate_stream_async (sc);
925 return GNUNET_SYSERR; 1006 return GNUNET_SYSERR;
926 } 1007 }
927 sqm = (const struct StreamQueryMessage *) message; 1008 sqm = (const struct StreamQueryMessage *) message;
@@ -944,6 +1025,7 @@ request_cb (void *cls,
944 return GNUNET_OK; 1025 return GNUNET_OK;
945 default: 1026 default:
946 GNUNET_break_op (0); 1027 GNUNET_break_op (0);
1028 terminate_stream_async (sc);
947 return GNUNET_SYSERR; 1029 return GNUNET_SYSERR;
948 } 1030 }
949} 1031}
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h
index b7e3e4ce2..d81218364 100644
--- a/src/include/gnunet_stream_lib.h
+++ b/src/include/gnunet_stream_lib.h
@@ -246,12 +246,10 @@ struct GNUNET_STREAM_ListenSocket;
246 * Listens for stream connections for a specific application ports 246 * Listens for stream connections for a specific application ports
247 * 247 *
248 * @param cfg the configuration to use 248 * @param cfg the configuration to use
249 *
250 * @param app_port the application port for which new streams will be 249 * @param app_port the application port for which new streams will be
251 * accepted. If another stream is listening on the same port the 250 * accepted. If another stream is listening on the same port the
252 * listen_cb will be called to signal binding error and the returned 251 * listen_cb will be called to signal binding error and the returned
253 * ListenSocket will be invalidated. 252 * ListenSocket will be invalidated.
254 *
255 * @param listen_cb this function will be called when a peer tries to establish 253 * @param listen_cb this function will be called when a peer tries to establish
256 * a stream with us 254 * a stream with us
257 * @param listen_cb_cls closure for listen_cb 255 * @param listen_cb_cls closure for listen_cb
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 46f7abb47..f9152749d 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -1917,13 +1917,16 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1917 that that stream has been shutdown */ 1917 that that stream has been shutdown */
1918 if (NULL != socket->write_handle) 1918 if (NULL != socket->write_handle)
1919 { 1919 {
1920 // FIXME: this breaks if 'write_cont' decides to 1920 GNUNET_STREAM_CompletionContinuation wc;
1921 // call SOCKET_close! 1921 void *wc_cls;
1922 if (NULL != socket->write_handle->write_cont) 1922
1923 socket->write_handle->write_cont (socket->write_handle->write_cont_cls, 1923 wc = socket->write_handle->write_cont;
1924 GNUNET_STREAM_SHUTDOWN, 0); 1924 wc_cls = socket->write_handle->write_cont_cls;
1925 GNUNET_STREAM_io_write_cancel (socket->write_handle); 1925 GNUNET_STREAM_io_write_cancel (socket->write_handle);
1926 socket->write_handle = NULL; 1926 socket->write_handle = NULL;
1927 if (NULL != wc)
1928 wc (wc_cls,
1929 GNUNET_STREAM_SHUTDOWN, 0);
1927 } 1930 }
1928 return GNUNET_OK; 1931 return GNUNET_OK;
1929} 1932}
@@ -2041,13 +2044,16 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
2041 that that stream has been shutdown */ 2044 that that stream has been shutdown */
2042 if (NULL != socket->write_handle) 2045 if (NULL != socket->write_handle)
2043 { 2046 {
2044 // FIXME: this breaks if 'write_cont' decides to 2047 GNUNET_STREAM_CompletionContinuation wc;
2045 // call SOCKET_close! 2048 void *wc_cls;
2046 if (NULL != socket->write_handle->write_cont) 2049
2047 socket->write_handle->write_cont (socket->write_handle->write_cont_cls, 2050 wc = socket->write_handle->write_cont;
2048 GNUNET_STREAM_SHUTDOWN, 0); 2051 wc_cls = socket->write_handle->write_cont_cls;
2049 GNUNET_STREAM_io_write_cancel (socket->write_handle); 2052 GNUNET_STREAM_io_write_cancel (socket->write_handle);
2050 socket->write_handle = NULL; 2053 socket->write_handle = NULL;
2054 if (NULL != wc)
2055 wc (wc_cls,
2056 GNUNET_STREAM_SHUTDOWN, 0);
2051 } 2057 }
2052 return GNUNET_OK; 2058 return GNUNET_OK;
2053} 2059}