From c015c83440507c1503c07d39740f13075efd926c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 24 Nov 2012 22:17:15 +0000 Subject: -ensure that either stream_api calls callbacks last or that we don't destroy a stream handle while it is in use below us on the stack --- src/fs/gnunet-service-fs_stream.c | 128 +++++++++++++++++++++++++++++++------- src/include/gnunet_stream_lib.h | 2 - src/stream/stream_api.c | 26 +++++--- 3 files changed, 121 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index 856a21a1a..b444e282c 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c @@ -25,7 +25,6 @@ * * TODO: * - limit # concurrent clients, have timeouts for server-side - * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!) */ #include "platform.h" #include "gnunet_constants.h" @@ -77,6 +76,11 @@ struct StreamClient */ struct GNUNET_DATASTORE_QueueEntry *qe; + /** + * Task that is scheduled to asynchronously terminate the connection. + */ + GNUNET_SCHEDULER_TaskIdentifier terminate_task; + /** * Size of the last write that was initiated. */ @@ -247,6 +251,13 @@ struct StreamHandle */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** + * Task to reset streams that had errors (asynchronously, + * as we may not be able to do it immediately during a + * callback from the stream API). + */ + GNUNET_SCHEDULER_TaskIdentifier reset_task; + /** * Is this stream ready for transmission? */ @@ -377,6 +388,55 @@ reset_stream (struct StreamHandle *sh) } +/** + * Task called when it is time to destroy an inactive stream. + * + * @param cls the 'struct StreamHandle' to tear down + * @param tc scheduler context, unused + */ +static void +stream_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamHandle *sh = cls; + + sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + destroy_stream_handle (sh); +} + + +/** + * Task called when it is time to reset an stream. + * + * @param cls the 'struct StreamHandle' to tear down + * @param tc scheduler context, unused + */ +static void +reset_stream_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamHandle *sh = cls; + + sh->reset_task = GNUNET_SCHEDULER_NO_TASK; + reset_stream (sh); +} + + +/** + * We had a serious error, tear down and re-create stream from scratch, + * but do so asynchronously. + * + * @param sh stream to reset + */ +static void +reset_stream_async (struct StreamHandle *sh) +{ + if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task) + sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task, + sh); +} + + /** * We got a reply from the stream. Process it. * @@ -403,7 +463,7 @@ handle_stream_reply (void *cls, GNUNET_NO, GNUNET_NO)) { GNUNET_break_op (0); - reset_stream (sh); + reset_stream_async (sh); return size; } sh->rh = GNUNET_STREAM_read (sh->stream, @@ -513,6 +573,7 @@ reply_cb (void *cls, if (sizeof (struct StreamReplyMessage) > msize) { GNUNET_break_op (0); + reset_stream_async (sh); return GNUNET_SYSERR; } srm = (const struct StreamReplyMessage *) message; @@ -523,7 +584,8 @@ reply_cb (void *cls, type, &srm[1], msize, &query)) { - GNUNET_break_op (0); + GNUNET_break_op (0); + reset_stream_async (sh); return GNUNET_SYSERR; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -553,6 +615,7 @@ reply_cb (void *cls, return GNUNET_OK; default: GNUNET_break_op (0); + reset_stream_async (sh); return GNUNET_SYSERR; } } @@ -632,23 +695,6 @@ GSF_stream_query (const struct GNUNET_PeerIdentity *target, } -/** - * Task called when it is time to destroy an inactive stream. - * - * @param cls the 'struct StreamHandle' to tear down - * @param tc scheduler context, unused - */ -static void -stream_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct StreamHandle *sh = cls; - - sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; - destroy_stream_handle (sh); -} - - /** * Cancel an active request; must not be called after 'proc' * was calld. @@ -691,7 +737,9 @@ terminate_stream (struct StreamClient *sc) GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# stream connections active"), -1, GNUNET_NO); - if (NULL != sc->rh) + if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) + GNUNET_SCHEDULER_cancel (sc->terminate_task); + if (NULL != sc->rh) GNUNET_STREAM_io_read_cancel (sc->rh); if (NULL != sc->wh) GNUNET_STREAM_io_write_cancel (sc->wh); @@ -706,6 +754,38 @@ terminate_stream (struct StreamClient *sc) } +/** + * Task run to asynchronously terminate the stream. + * + * @param cls the 'struct StreamClient' + * @param tc scheduler context + */ +static void +terminate_stream_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamClient *sc = cls; + + sc->terminate_task = GNUNET_SCHEDULER_NO_TASK; + terminate_stream (sc); +} + + +/** + * We had a serious error, termiante stream, + * but do so asynchronously. + * + * @param sc stream to reset + */ +static void +terminate_stream_async (struct StreamClient *sc) +{ + if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task) + sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task, + sc); +} + + /** * Functions of this signature are called whenever data is available from the * stream. @@ -782,7 +862,7 @@ process_request (void *cls, if (GNUNET_SYSERR == ret) { GNUNET_break_op (0); - terminate_stream (sc); + terminate_stream_async (sc); return size; } break; @@ -790,7 +870,7 @@ process_request (void *cls, case GNUNET_STREAM_SHUTDOWN: case GNUNET_STREAM_SYSERR: case GNUNET_STREAM_BROKEN: - terminate_stream (sc); + terminate_stream_async (sc); return size; default: GNUNET_break (0); @@ -922,6 +1002,7 @@ request_cb (void *cls, ntohs (message->size)) { GNUNET_break_op (0); + terminate_stream_async (sc); return GNUNET_SYSERR; } sqm = (const struct StreamQueryMessage *) message; @@ -944,6 +1025,7 @@ request_cb (void *cls, return GNUNET_OK; default: GNUNET_break_op (0); + terminate_stream_async (sc); return GNUNET_SYSERR; } } diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index b7e3e4ce2..d81218364 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -246,12 +246,10 @@ struct GNUNET_STREAM_ListenSocket; * Listens for stream connections for a specific application ports * * @param cfg the configuration to use - * * @param app_port the application port for which new streams will be * accepted. If another stream is listening on the same port the * listen_cb will be called to signal binding error and the returned * ListenSocket will be invalidated. - * * @param listen_cb this function will be called when a peer tries to establish * a stream with us * @param listen_cb_cls closure for listen_cb diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 46f7abb47..f9152749d 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -1917,13 +1917,16 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, that that stream has been shutdown */ if (NULL != socket->write_handle) { - // FIXME: this breaks if 'write_cont' decides to - // call SOCKET_close! - if (NULL != socket->write_handle->write_cont) - socket->write_handle->write_cont (socket->write_handle->write_cont_cls, - GNUNET_STREAM_SHUTDOWN, 0); + GNUNET_STREAM_CompletionContinuation wc; + void *wc_cls; + + wc = socket->write_handle->write_cont; + wc_cls = socket->write_handle->write_cont_cls; GNUNET_STREAM_io_write_cancel (socket->write_handle); socket->write_handle = NULL; + if (NULL != wc) + wc (wc_cls, + GNUNET_STREAM_SHUTDOWN, 0); } return GNUNET_OK; } @@ -2041,13 +2044,16 @@ handle_close (struct GNUNET_STREAM_Socket *socket, that that stream has been shutdown */ if (NULL != socket->write_handle) { - // FIXME: this breaks if 'write_cont' decides to - // call SOCKET_close! - if (NULL != socket->write_handle->write_cont) - socket->write_handle->write_cont (socket->write_handle->write_cont_cls, - GNUNET_STREAM_SHUTDOWN, 0); + GNUNET_STREAM_CompletionContinuation wc; + void *wc_cls; + + wc = socket->write_handle->write_cont; + wc_cls = socket->write_handle->write_cont_cls; GNUNET_STREAM_io_write_cancel (socket->write_handle); socket->write_handle = NULL; + if (NULL != wc) + wc (wc_cls, + GNUNET_STREAM_SHUTDOWN, 0); } return GNUNET_OK; } -- cgit v1.2.3