diff options
author | Christian Grothoff <christian@grothoff.org> | 2012-11-24 22:17:15 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2012-11-24 22:17:15 +0000 |
commit | c015c83440507c1503c07d39740f13075efd926c (patch) | |
tree | 6131a6dcdd64ca175023a3929bf74444c9565b7d /src | |
parent | ad16d03db236ff504ec7ed2cbd32c5a6653bc315 (diff) | |
download | gnunet-c015c83440507c1503c07d39740f13075efd926c.tar.gz gnunet-c015c83440507c1503c07d39740f13075efd926c.zip |
-ensure that either stream_api calls callbacks last or that we don't destroy a stream handle while it is in use below us on the stack
Diffstat (limited to 'src')
-rw-r--r-- | src/fs/gnunet-service-fs_stream.c | 128 | ||||
-rw-r--r-- | src/include/gnunet_stream_lib.h | 2 | ||||
-rw-r--r-- | src/stream/stream_api.c | 26 |
3 files changed, 121 insertions, 35 deletions
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index 856a21a1a..b444e282c 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c | |||
@@ -25,7 +25,6 @@ | |||
25 | * | 25 | * |
26 | * TODO: | 26 | * TODO: |
27 | * - limit # concurrent clients, have timeouts for server-side | 27 | * - limit # concurrent clients, have timeouts for server-side |
28 | * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!) | ||
29 | */ | 28 | */ |
30 | #include "platform.h" | 29 | #include "platform.h" |
31 | #include "gnunet_constants.h" | 30 | #include "gnunet_constants.h" |
@@ -78,6 +77,11 @@ struct StreamClient | |||
78 | struct GNUNET_DATASTORE_QueueEntry *qe; | 77 | struct GNUNET_DATASTORE_QueueEntry *qe; |
79 | 78 | ||
80 | /** | 79 | /** |
80 | * Task that is scheduled to asynchronously terminate the connection. | ||
81 | */ | ||
82 | GNUNET_SCHEDULER_TaskIdentifier terminate_task; | ||
83 | |||
84 | /** | ||
81 | * Size of the last write that was initiated. | 85 | * Size of the last write that was initiated. |
82 | */ | 86 | */ |
83 | size_t reply_size; | 87 | size_t reply_size; |
@@ -248,6 +252,13 @@ struct StreamHandle | |||
248 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | 252 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; |
249 | 253 | ||
250 | /** | 254 | /** |
255 | * Task to reset streams that had errors (asynchronously, | ||
256 | * as we may not be able to do it immediately during a | ||
257 | * callback from the stream API). | ||
258 | */ | ||
259 | GNUNET_SCHEDULER_TaskIdentifier reset_task; | ||
260 | |||
261 | /** | ||
251 | * Is this stream ready for transmission? | 262 | * Is this stream ready for transmission? |
252 | */ | 263 | */ |
253 | int is_ready; | 264 | int is_ready; |
@@ -378,6 +389,55 @@ reset_stream (struct StreamHandle *sh) | |||
378 | 389 | ||
379 | 390 | ||
380 | /** | 391 | /** |
392 | * Task called when it is time to destroy an inactive stream. | ||
393 | * | ||
394 | * @param cls the 'struct StreamHandle' to tear down | ||
395 | * @param tc scheduler context, unused | ||
396 | */ | ||
397 | static void | ||
398 | stream_timeout (void *cls, | ||
399 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
400 | { | ||
401 | struct StreamHandle *sh = cls; | ||
402 | |||
403 | sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
404 | destroy_stream_handle (sh); | ||
405 | } | ||
406 | |||
407 | |||
408 | /** | ||
409 | * Task called when it is time to reset an stream. | ||
410 | * | ||
411 | * @param cls the 'struct StreamHandle' to tear down | ||
412 | * @param tc scheduler context, unused | ||
413 | */ | ||
414 | static void | ||
415 | reset_stream_task (void *cls, | ||
416 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
417 | { | ||
418 | struct StreamHandle *sh = cls; | ||
419 | |||
420 | sh->reset_task = GNUNET_SCHEDULER_NO_TASK; | ||
421 | reset_stream (sh); | ||
422 | } | ||
423 | |||
424 | |||
425 | /** | ||
426 | * We had a serious error, tear down and re-create stream from scratch, | ||
427 | * but do so asynchronously. | ||
428 | * | ||
429 | * @param sh stream to reset | ||
430 | */ | ||
431 | static void | ||
432 | reset_stream_async (struct StreamHandle *sh) | ||
433 | { | ||
434 | if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task) | ||
435 | sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task, | ||
436 | sh); | ||
437 | } | ||
438 | |||
439 | |||
440 | /** | ||
381 | * We got a reply from the stream. Process it. | 441 | * We got a reply from the stream. Process it. |
382 | * | 442 | * |
383 | * @param cls the struct StreamHandle | 443 | * @param cls the struct StreamHandle |
@@ -403,7 +463,7 @@ handle_stream_reply (void *cls, | |||
403 | GNUNET_NO, GNUNET_NO)) | 463 | GNUNET_NO, GNUNET_NO)) |
404 | { | 464 | { |
405 | GNUNET_break_op (0); | 465 | GNUNET_break_op (0); |
406 | reset_stream (sh); | 466 | reset_stream_async (sh); |
407 | return size; | 467 | return size; |
408 | } | 468 | } |
409 | sh->rh = GNUNET_STREAM_read (sh->stream, | 469 | sh->rh = GNUNET_STREAM_read (sh->stream, |
@@ -513,6 +573,7 @@ reply_cb (void *cls, | |||
513 | if (sizeof (struct StreamReplyMessage) > msize) | 573 | if (sizeof (struct StreamReplyMessage) > msize) |
514 | { | 574 | { |
515 | GNUNET_break_op (0); | 575 | GNUNET_break_op (0); |
576 | reset_stream_async (sh); | ||
516 | return GNUNET_SYSERR; | 577 | return GNUNET_SYSERR; |
517 | } | 578 | } |
518 | srm = (const struct StreamReplyMessage *) message; | 579 | srm = (const struct StreamReplyMessage *) message; |
@@ -523,7 +584,8 @@ reply_cb (void *cls, | |||
523 | type, | 584 | type, |
524 | &srm[1], msize, &query)) | 585 | &srm[1], msize, &query)) |
525 | { | 586 | { |
526 | GNUNET_break_op (0); | 587 | GNUNET_break_op (0); |
588 | reset_stream_async (sh); | ||
527 | return GNUNET_SYSERR; | 589 | return GNUNET_SYSERR; |
528 | } | 590 | } |
529 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 591 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -553,6 +615,7 @@ reply_cb (void *cls, | |||
553 | return GNUNET_OK; | 615 | return GNUNET_OK; |
554 | default: | 616 | default: |
555 | GNUNET_break_op (0); | 617 | GNUNET_break_op (0); |
618 | reset_stream_async (sh); | ||
556 | return GNUNET_SYSERR; | 619 | return GNUNET_SYSERR; |
557 | } | 620 | } |
558 | } | 621 | } |
@@ -633,23 +696,6 @@ GSF_stream_query (const struct GNUNET_PeerIdentity *target, | |||
633 | 696 | ||
634 | 697 | ||
635 | /** | 698 | /** |
636 | * Task called when it is time to destroy an inactive stream. | ||
637 | * | ||
638 | * @param cls the 'struct StreamHandle' to tear down | ||
639 | * @param tc scheduler context, unused | ||
640 | */ | ||
641 | static void | ||
642 | stream_timeout (void *cls, | ||
643 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
644 | { | ||
645 | struct StreamHandle *sh = cls; | ||
646 | |||
647 | sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
648 | destroy_stream_handle (sh); | ||
649 | } | ||
650 | |||
651 | |||
652 | /** | ||
653 | * Cancel an active request; must not be called after 'proc' | 699 | * Cancel an active request; must not be called after 'proc' |
654 | * was calld. | 700 | * was calld. |
655 | * | 701 | * |
@@ -691,7 +737,9 @@ terminate_stream (struct StreamClient *sc) | |||
691 | GNUNET_STATISTICS_update (GSF_stats, | 737 | GNUNET_STATISTICS_update (GSF_stats, |
692 | gettext_noop ("# stream connections active"), -1, | 738 | gettext_noop ("# stream connections active"), -1, |
693 | GNUNET_NO); | 739 | GNUNET_NO); |
694 | if (NULL != sc->rh) | 740 | if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) |
741 | GNUNET_SCHEDULER_cancel (sc->terminate_task); | ||
742 | if (NULL != sc->rh) | ||
695 | GNUNET_STREAM_io_read_cancel (sc->rh); | 743 | GNUNET_STREAM_io_read_cancel (sc->rh); |
696 | if (NULL != sc->wh) | 744 | if (NULL != sc->wh) |
697 | GNUNET_STREAM_io_write_cancel (sc->wh); | 745 | GNUNET_STREAM_io_write_cancel (sc->wh); |
@@ -707,6 +755,38 @@ terminate_stream (struct StreamClient *sc) | |||
707 | 755 | ||
708 | 756 | ||
709 | /** | 757 | /** |
758 | * Task run to asynchronously terminate the stream. | ||
759 | * | ||
760 | * @param cls the 'struct StreamClient' | ||
761 | * @param tc scheduler context | ||
762 | */ | ||
763 | static void | ||
764 | terminate_stream_task (void *cls, | ||
765 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
766 | { | ||
767 | struct StreamClient *sc = cls; | ||
768 | |||
769 | sc->terminate_task = GNUNET_SCHEDULER_NO_TASK; | ||
770 | terminate_stream (sc); | ||
771 | } | ||
772 | |||
773 | |||
774 | /** | ||
775 | * We had a serious error, termiante stream, | ||
776 | * but do so asynchronously. | ||
777 | * | ||
778 | * @param sc stream to reset | ||
779 | */ | ||
780 | static void | ||
781 | terminate_stream_async (struct StreamClient *sc) | ||
782 | { | ||
783 | if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task) | ||
784 | sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task, | ||
785 | sc); | ||
786 | } | ||
787 | |||
788 | |||
789 | /** | ||
710 | * Functions of this signature are called whenever data is available from the | 790 | * Functions of this signature are called whenever data is available from the |
711 | * stream. | 791 | * stream. |
712 | * | 792 | * |
@@ -782,7 +862,7 @@ process_request (void *cls, | |||
782 | if (GNUNET_SYSERR == ret) | 862 | if (GNUNET_SYSERR == ret) |
783 | { | 863 | { |
784 | GNUNET_break_op (0); | 864 | GNUNET_break_op (0); |
785 | terminate_stream (sc); | 865 | terminate_stream_async (sc); |
786 | return size; | 866 | return size; |
787 | } | 867 | } |
788 | break; | 868 | break; |
@@ -790,7 +870,7 @@ process_request (void *cls, | |||
790 | case GNUNET_STREAM_SHUTDOWN: | 870 | case GNUNET_STREAM_SHUTDOWN: |
791 | case GNUNET_STREAM_SYSERR: | 871 | case GNUNET_STREAM_SYSERR: |
792 | case GNUNET_STREAM_BROKEN: | 872 | case GNUNET_STREAM_BROKEN: |
793 | terminate_stream (sc); | 873 | terminate_stream_async (sc); |
794 | return size; | 874 | return size; |
795 | default: | 875 | default: |
796 | GNUNET_break (0); | 876 | GNUNET_break (0); |
@@ -922,6 +1002,7 @@ request_cb (void *cls, | |||
922 | ntohs (message->size)) | 1002 | ntohs (message->size)) |
923 | { | 1003 | { |
924 | GNUNET_break_op (0); | 1004 | GNUNET_break_op (0); |
1005 | terminate_stream_async (sc); | ||
925 | return GNUNET_SYSERR; | 1006 | return GNUNET_SYSERR; |
926 | } | 1007 | } |
927 | sqm = (const struct StreamQueryMessage *) message; | 1008 | sqm = (const struct StreamQueryMessage *) message; |
@@ -944,6 +1025,7 @@ request_cb (void *cls, | |||
944 | return GNUNET_OK; | 1025 | return GNUNET_OK; |
945 | default: | 1026 | default: |
946 | GNUNET_break_op (0); | 1027 | GNUNET_break_op (0); |
1028 | terminate_stream_async (sc); | ||
947 | return GNUNET_SYSERR; | 1029 | return GNUNET_SYSERR; |
948 | } | 1030 | } |
949 | } | 1031 | } |
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index b7e3e4ce2..d81218364 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h | |||
@@ -246,12 +246,10 @@ struct GNUNET_STREAM_ListenSocket; | |||
246 | * Listens for stream connections for a specific application ports | 246 | * Listens for stream connections for a specific application ports |
247 | * | 247 | * |
248 | * @param cfg the configuration to use | 248 | * @param cfg the configuration to use |
249 | * | ||
250 | * @param app_port the application port for which new streams will be | 249 | * @param app_port the application port for which new streams will be |
251 | * accepted. If another stream is listening on the same port the | 250 | * accepted. If another stream is listening on the same port the |
252 | * listen_cb will be called to signal binding error and the returned | 251 | * listen_cb will be called to signal binding error and the returned |
253 | * ListenSocket will be invalidated. | 252 | * ListenSocket will be invalidated. |
254 | * | ||
255 | * @param listen_cb this function will be called when a peer tries to establish | 253 | * @param listen_cb this function will be called when a peer tries to establish |
256 | * a stream with us | 254 | * a stream with us |
257 | * @param listen_cb_cls closure for listen_cb | 255 | * @param listen_cb_cls closure for listen_cb |
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 46f7abb47..f9152749d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -1917,13 +1917,16 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, | |||
1917 | that that stream has been shutdown */ | 1917 | that that stream has been shutdown */ |
1918 | if (NULL != socket->write_handle) | 1918 | if (NULL != socket->write_handle) |
1919 | { | 1919 | { |
1920 | // FIXME: this breaks if 'write_cont' decides to | 1920 | GNUNET_STREAM_CompletionContinuation wc; |
1921 | // call SOCKET_close! | 1921 | void *wc_cls; |
1922 | if (NULL != socket->write_handle->write_cont) | 1922 | |
1923 | socket->write_handle->write_cont (socket->write_handle->write_cont_cls, | 1923 | wc = socket->write_handle->write_cont; |
1924 | GNUNET_STREAM_SHUTDOWN, 0); | 1924 | wc_cls = socket->write_handle->write_cont_cls; |
1925 | GNUNET_STREAM_io_write_cancel (socket->write_handle); | 1925 | GNUNET_STREAM_io_write_cancel (socket->write_handle); |
1926 | socket->write_handle = NULL; | 1926 | socket->write_handle = NULL; |
1927 | if (NULL != wc) | ||
1928 | wc (wc_cls, | ||
1929 | GNUNET_STREAM_SHUTDOWN, 0); | ||
1927 | } | 1930 | } |
1928 | return GNUNET_OK; | 1931 | return GNUNET_OK; |
1929 | } | 1932 | } |
@@ -2041,13 +2044,16 @@ handle_close (struct GNUNET_STREAM_Socket *socket, | |||
2041 | that that stream has been shutdown */ | 2044 | that that stream has been shutdown */ |
2042 | if (NULL != socket->write_handle) | 2045 | if (NULL != socket->write_handle) |
2043 | { | 2046 | { |
2044 | // FIXME: this breaks if 'write_cont' decides to | 2047 | GNUNET_STREAM_CompletionContinuation wc; |
2045 | // call SOCKET_close! | 2048 | void *wc_cls; |
2046 | if (NULL != socket->write_handle->write_cont) | 2049 | |
2047 | socket->write_handle->write_cont (socket->write_handle->write_cont_cls, | 2050 | wc = socket->write_handle->write_cont; |
2048 | GNUNET_STREAM_SHUTDOWN, 0); | 2051 | wc_cls = socket->write_handle->write_cont_cls; |
2049 | GNUNET_STREAM_io_write_cancel (socket->write_handle); | 2052 | GNUNET_STREAM_io_write_cancel (socket->write_handle); |
2050 | socket->write_handle = NULL; | 2053 | socket->write_handle = NULL; |
2054 | if (NULL != wc) | ||
2055 | wc (wc_cls, | ||
2056 | GNUNET_STREAM_SHUTDOWN, 0); | ||
2051 | } | 2057 | } |
2052 | return GNUNET_OK; | 2058 | return GNUNET_OK; |
2053 | } | 2059 | } |