From a94d4a2677b856ab2dc68414c6f7e95a50943b76 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Fri, 6 Apr 2012 19:19:32 +0000 Subject: -added STREAM_shutdown --- src/include/gnunet_stream_lib.h | 8 +- src/stream/stream_api.c | 184 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 180 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 4a574dfe4..ac2ce0854 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -142,14 +142,14 @@ typedef void (*GNUNET_STREAM_ShutdownCompletion) (void *cls, /** - * Shutdown the stream for reading or writing (man 2 shutdown). + * Shutdown the stream for reading or writing (similar to man 2 shutdown). * * @param socket the stream socket - * @param opertion SHUT_RD, SHUT_WR or SHUT_RDWR + * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR * @param completion_cb the callback that will be called upon successful * shutdown of given operation * @param completion_cls the closure for the completion callback - * @return the shutdown handle + * @return the shutdown handle; NULL in case of any error */ struct GNUNET_STREAM_ShutdownHandle * GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, @@ -161,7 +161,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, /** * Cancels a pending shutdown * - * @param the shutdown handle returned from GNUNET_STREAM_shutdown + * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown */ void GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle); diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 93d9c9fe2..92c1093f7 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -27,6 +27,8 @@ * memory used for PEER interning * * Add code for write io timeout + * + * Include retransmission for control messages **/ /** @@ -826,6 +828,9 @@ call_read_processor (void *cls, if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (NULL == socket->receive_buffer) + return; + GNUNET_assert (NULL != socket->read_handle); GNUNET_assert (NULL != socket->read_handle->proc); @@ -1223,6 +1228,39 @@ set_state_hello_wait (void *cls, } +/** + * Callback to set state to CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Attaing CLOSE_WAIT state\n", + socket->our_id); + socket->state = STATE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to CLOSED + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_closed (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + socket->state = STATE_CLOSED; +} + /** * Returns a new HelloAckMessage. Also sets the write sequence number for the * socket @@ -1500,6 +1538,44 @@ client_handle_receive_close_ack (void *cls, } +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param tunnel_ctx this is NULL + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) +{ + struct GNUNET_STREAM_MessageHeader *close_ack; + + close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); + queue_message (socket, + close_ack, + &set_state_closed, + NULL); + if (socket->state == STATE_CLOSED) + return GNUNET_OK; + + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; + return GNUNET_OK; +} + + /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE * @@ -1522,6 +1598,44 @@ client_handle_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK + * + * @param socket the socket + * @param tunnel connection to the other end + * @param tunnel_ctx this is NULL + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_close_ack (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) +{ + switch (socket->state) + { + case STATE_CLOSE_WAIT: + socket->state = STATE_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received CLOSE_ACK when in it not expected\n", + socket->our_id); + break; + } return GNUNET_OK; } @@ -1548,7 +1662,12 @@ client_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi); } /*****************************/ @@ -1834,7 +1953,8 @@ server_handle_receive_close_ack (void *cls, /** * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE * - * @param cls the closure + * @param cls the listen socket (from GNUNET_MESH_connect in + * GNUNET_STREAM_listen) * @param tunnel connection to the other end * @param tunnel_ctx the socket * @param sender who sent the message @@ -1852,8 +1972,12 @@ server_handle_close (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - - return GNUNET_OK; + + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1879,7 +2003,11 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -2373,7 +2501,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, * Shutdown the stream for reading or writing (similar to man 2 shutdown). * * @param socket the stream socket - * @param opertion SHUT_RD, SHUT_WR or SHUT_RDWR + * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR * @param completion_cb the callback that will be called upon successful * shutdown of given operation * @param completion_cls the closure for the completion callback @@ -2385,14 +2513,54 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, GNUNET_STREAM_ShutdownCompletion completion_cb, void *completion_cls) { - return NULL; + struct GNUNET_STREAM_ShutdownHandle *handle; + struct GNUNET_STREAM_MessageHeader *msg; + + handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle)); + handle->socket = socket; + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (operation) + { + case SHUT_RD: + handle->operation = SHUT_RD; + + break; + case SHUT_WR: + handle->operation = SHUT_WR; + + break; + case SHUT_RDWR: + handle->operation = SHUT_RDWR; + if (NULL != socket->write_handle) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + if (NULL != socket->read_handle) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + queue_message (socket, + msg, + &set_state_close_wait, + NULL); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "GNUNET_STREAM_shutdown called with invalid value for " + "parameter operation -- Ignoring\n"); + GNUNET_free (handle); + return NULL; + } + return handle; } /** * Cancels a pending shutdown * - * @param the shutdown handle returned from GNUNET_STREAM_shutdown + * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown */ void GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) -- cgit v1.2.3