From 6e5be59c1f6f958c8403a3d0d70eaab82fc7908b Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Sat, 14 Apr 2012 14:59:50 +0000 Subject: -stream connection halfclose and test cases --- src/include/gnunet_stream_lib.h | 12 +- src/stream/Makefile.am | 17 +- src/stream/stream_api.c | 415 ++++++++++++++----- src/stream/test_stream_local_halfclose.c | 677 ++++++++++++++++++++++++------- 4 files changed, 872 insertions(+), 249 deletions(-) diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index ac2ce0854..099f37ab2 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -262,7 +262,10 @@ struct GNUNET_STREAM_IOReadHandle; * @param write_cont the function to call upon writing some bytes into the * stream * @param write_cont_cls the closure - * @return handle to cancel the operation; NULL if a previous write is pending + * + * @return handle to cancel the operation; if a previous write is pending or + * the stream has been shutdown for this operation then write_cont is + * immediately called and NULL is returned. */ struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, @@ -291,13 +294,16 @@ typedef size_t (*GNUNET_STREAM_DataProcessor) (void *cls, /** - * Tries to read data from the stream + * Tries to read data from the stream. * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if the stream has been shutdown for + * this type of opeartion then the DataProcessor is immediately + * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am index 45617e481..6116937c5 100644 --- a/src/stream/Makefile.am +++ b/src/stream/Makefile.am @@ -21,8 +21,7 @@ libgnunetstream_la_LDFLAGS = \ check_PROGRAMS = \ test_stream_local \ - test_stream_api -# test_stream_halfclose + test_stream_local_halfclose EXTRA_DIST = test_stream_local.conf @@ -37,16 +36,10 @@ test_stream_local_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/testing/libgnunettesting.la -test_stream_api_SOURCES = \ - test_stream_api.c -test_stream_api_LDADD = \ + +test_stream_local_halfclose_SOURCES = \ + test_stream_local_halfclose.c +test_stream_local_halfclose_LDADD = \ $(top_builddir)/src/stream/libgnunetstream.la \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/testing/libgnunettesting.la - -#test_stream_halfclose_SOURCES = \ -# test_stream_halfclose.c -#test_stream_halfclose_LDADD = \ -# $(top_builddir)/src/stream/libgnunetstream.la \ -# $(top_builddir)/src/util/libgnunetutil.la - diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 9ae34752c..ef0065b22 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -696,7 +696,7 @@ ack_task (void *cls, return; } - socket->ack_task_id = 0; + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); @@ -1561,6 +1561,141 @@ client_handle_transmit_close (void *cls, } +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @param operation the close operation which is being ACK'ed + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + int operation) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + shutdown_handle = socket->shutdown_handle; + if (NULL == shutdown_handle) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received *CLOSE_ACK when shutdown handle is NULL\n", + socket->our_id); + return GNUNET_OK; + } + + switch (operation) + { + case SHUT_RDWR: + switch (socket->state) + { + case STATE_CLOSE_WAIT: + if (SHUT_RDWR != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received CLOSE_ACK when shutdown handle " + "is not for SHUT_RDWR\n", + socket->our_id); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received CLOSE_ACK from %x\n", + socket->our_id, + socket->other_peer); + socket->state = STATE_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received CLOSE_ACK when in it not expected\n", + socket->our_id); + return GNUNET_OK; + } + break; + + case SHUT_RD: + switch (socket->state) + { + case STATE_RECEIVE_CLOSE_WAIT: + if (SHUT_RD != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received RECEIVE_CLOSE_ACK when shutdown handle " + "is not for SHUT_RD\n", + socket->our_id); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received RECEIVE_CLOSE_ACK from %x\n", + socket->our_id, + socket->other_peer); + socket->state = STATE_RECEIVE_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received RECEIVE_CLOSE_ACK when in it not expected\n", + socket->our_id); + return GNUNET_OK; + } + + break; + case SHUT_WR: + switch (socket->state) + { + case STATE_TRANSMIT_CLOSE_WAIT: + if (SHUT_WR != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received TRANSMIT_CLOSE_ACK when shutdown handle " + "is not for SHUT_WR\n", + socket->our_id); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received TRAMSMIT_CLOSE_ACK from %x\n", + socket->our_id, + socket->other_peer); + socket->state = STATE_TRANSMIT_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received TRANSMIT_CLOSE_ACK when in it not expected\n", + socket->our_id); + + return GNUNET_OK; + } + break; + default: + GNUNET_assert (0); + } + + if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ + shutdown_handle->completion_cb(shutdown_handle->completion_cls, + operation); + GNUNET_free (shutdown_handle); /* Free shutdown handle */ + socket->shutdown_handle = NULL; + if (GNUNET_SCHEDULER_NO_TASK + != shutdown_handle->close_msg_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; +} + + /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK * @@ -1583,6 +1718,67 @@ client_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_receive_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_MessageHeader *receive_close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + socket->our_id); + return GNUNET_OK; + default: + break; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received RECEIVE_CLOSE from %x\n", + socket->our_id, + socket->other_peer); + receive_close_ack = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); + queue_message (socket, + receive_close_ack, + &set_state_closed, + NULL); + + /* FIXME: Handle the case where write handle is present; the write operation + should be deemed as finised and the write continuation callback + has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ return GNUNET_OK; } @@ -1609,7 +1805,12 @@ client_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1635,7 +1836,13 @@ client_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } @@ -1659,6 +1866,19 @@ handle_close (struct GNUNET_STREAM_Socket *socket, { struct GNUNET_STREAM_MessageHeader *close_ack; + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + socket->our_id); + return GNUNET_OK; + default: + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%x: Received CLOSE from %x\n", socket->our_id, @@ -1710,69 +1930,6 @@ client_handle_close (void *cls, } -/** - * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK - * - * @param socket the socket - * @param tunnel connection to the other end - * @param sender who sent the message - * @param message the actual message - * @param atsi performance data for the connection - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_close_ack (struct GNUNET_STREAM_Socket *socket, - struct GNUNET_MESH_Tunnel *tunnel, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *message, - const struct GNUNET_ATS_Information *atsi) -{ - struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; - - shutdown_handle = socket->shutdown_handle; - switch (socket->state) - { - case STATE_CLOSE_WAIT: - if ( (NULL == shutdown_handle) || - (SHUT_RDWR != shutdown_handle->operation) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK when shutdown handle is NULL or " - "not for SHUT_RDWR\n", - socket->our_id); - return GNUNET_OK; - } - - if (GNUNET_SCHEDULER_NO_TASK - != shutdown_handle->close_msg_retransmission_task_id) - { - GNUNET_SCHEDULER_cancel - (shutdown_handle->close_msg_retransmission_task_id); - shutdown_handle->close_msg_retransmission_task_id = - GNUNET_SCHEDULER_NO_TASK; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK from %x\n", - socket->our_id, - socket->other_peer); - socket->state = STATE_CLOSED; - if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ - shutdown_handle->completion_cb(shutdown_handle->completion_cls, - SHUT_RDWR); - GNUNET_free (shutdown_handle); /* Free shutdown handle */ - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK when in it not expected\n", - socket->our_id); - break; - } - return GNUNET_OK; -} - - /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK * @@ -1795,12 +1952,13 @@ client_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return handle_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) - message, - atsi); + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /*****************************/ @@ -2027,7 +2185,13 @@ server_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); } @@ -2053,7 +2217,12 @@ server_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -2079,7 +2248,13 @@ server_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } @@ -2136,16 +2311,18 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) message, - atsi); + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param socket the socket through which the ack was received * @param tunnel connection to the other end @@ -2177,6 +2354,8 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, switch (socket->state) { case (STATE_ESTABLISHED): + case (STATE_RECEIVE_CLOSED): + case (STATE_RECEIVE_CLOSE_WAIT): if (NULL == socket->write_handle) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2284,7 +2463,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the 'struct GNUNET_STREAM_Socket' * @param tunnel connection to the other end @@ -2311,7 +2490,7 @@ client_handle_ack (void *cls, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the server's listen socket * @param tunnel connection to the other end @@ -2855,15 +3034,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) /** - * Tries to write the given data to the stream + * Tries to write the given data to the stream. The maximum size of data that + * can be written as part of a write operation is (64 * (64000 - sizeof (struct + * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API + * violation, however only the said number of maximum bytes will be written. * * @param socket the socket representing a stream * @param data the data buffer from where the data is written into the stream * @param size the number of bytes to be written from the data buffer * @param timeout the timeout period - * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont the function to call upon writing some bytes into the + * stream * @param write_cont_cls the closure - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if a previous write is pending or + * the stream has been shutdown for this operation then write_cont is + * immediately called and NULL is returned. */ struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, @@ -2891,16 +3077,33 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, GNUNET_break (0); return NULL; } - if (!((STATE_ESTABLISHED == socket->state) - || (STATE_RECEIVE_CLOSE_WAIT == socket->state) - || (STATE_RECEIVE_CLOSED == socket->state))) + + switch (socket->state) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%x: Attempting to write on a closed (OR) not-yet-established" - "stream\n", - socket->our_id); + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + if (NULL != write_cont) + write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + if (NULL != write_cont) + /* FIXME: GNUNET_STREAM_SYSERR?? */ + write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return NULL; - } + case STATE_ESTABLISHED: + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + break; + } + if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; @@ -2957,14 +3160,18 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, } + /** - * Tries to read data from the stream + * Tries to read data from the stream. * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if the stream has been shutdown for + * this type of opeartion then the DataProcessor is immediately + * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, @@ -2985,6 +3192,22 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, GNUNET_assert (NULL != proc); + switch (socket->state) + { + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: %s() END\n", + socket->our_id, + __func__); + return NULL; + default: + break; + } + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; read_handle->proc_cls = proc_cls; diff --git a/src/stream/test_stream_local_halfclose.c b/src/stream/test_stream_local_halfclose.c index 61e531ae8..a9d794800 100644 --- a/src/stream/test_stream_local_halfclose.c +++ b/src/stream/test_stream_local_halfclose.c @@ -19,21 +19,27 @@ */ /** - * @file stream/test_stream_local_halfclose.c - * @brief Stream API testing between local peers with half closed connections + * @file stream/test_stream_local.c + * @brief Testcases for Stream API halfclosed connections * @author Sree Harsha Totakura */ #include -#include /* For SHUT_RD, SHUT_WR */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_mesh_service.h" #include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_scheduler_lib.h" #define VERBOSE 1 +/** + * Number of peers + */ +#define NUM_PEERS 2 + /** * Structure for holding peer's sockets and IO Handles */ @@ -45,9 +51,24 @@ struct PeerData struct GNUNET_STREAM_Socket *socket; /** - * Peer's io handle + * Peer's io write handle + */ + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; + + /** + * Peer's shutdown handle + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** + * Our Peer id */ - struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_PeerIdentity our_id; /** * Bytes the peer has written @@ -58,41 +79,300 @@ struct PeerData * Byte the peer has read */ unsigned int bytes_read; + + /** + * GNUNET_YES if the peer has successfully completed the current test + */ + unsigned int test_ok; + + /** + * The shutdown operation that has to be used by the stream_shutdown_task + */ + int shutdown_operation; }; -static struct GNUNET_OS_Process *arm_pid; +/** + * The current peer group + */ +static struct GNUNET_TESTING_PeerGroup *pg; + +/** + * Peer 1 daemon + */ +static struct GNUNET_TESTING_Daemon *d1; + +/** + * Peer 2 daemon + */ +static struct GNUNET_TESTING_Daemon *d2; + + +/** + * Peer1 writes first and then calls for SHUT_WR + * Peer2 reads first and then calls for SHUT_RD + * Attempt to write again by Peer1 should be rejected + * Attempt to read again by Peer2 should be rejected + * Peer1 then reads from Peer2 which writes + */ static struct PeerData peer1; static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config; static GNUNET_SCHEDULER_TaskIdentifier abort_task; -static GNUNET_SCHEDULER_TaskIdentifier test_task; static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; /** - * Shutdown nicely + * Enumeration for various tests that are to be passed in the same order as + * below + */ +enum Test + { + /** + * Peer1 writing; Peer2 reading + */ + PEER1_WRITE, + + /** + * Peer1 write shutdown; Peer2 should get an error when it tries to read; + */ + PEER1_WRITE_SHUTDOWN, + + /** + * Peer1 reads; Peer2 writes (connection is halfclosed) + */ + PEER1_HALFCLOSE_READ, + + /** + * Peer1 attempts to write; Should fail with stream already shutdown error + */ + PEER1_HALFCLOSE_WRITE_FAIL, + + /** + * Peer1 read shutdown; Peer2 should get stream shutdown error during write + */ + PEER1_READ_SHUTDOWN, + + /** + * All tests successfully finished + */ + SUCCESS + }; + +/** + * Current running test + */ +enum Test current_test; + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + + +/** + * The transition function; responsible for the transitions among tests + */ +static void +transition(); + + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + cls); + switch (current_test) + { + case PEER1_WRITE_SHUTDOWN: + GNUNET_assert (&peer2 == peer); + GNUNET_assert (NULL == peer->io_read_handle); + transition (); /* to PEER1_HALFCLOSE_READ */ + break; + default: + GNUNET_assert (NULL != peer->io_read_handle); + } +} + + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context */ static void -do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_STREAM_close (peer1.socket); - GNUNET_STREAM_close (peer2.socket); + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + switch (current_test) + { + case PEER1_HALFCLOSE_WRITE_FAIL: + GNUNET_assert (&peer1 == peer); + GNUNET_assert (NULL == peer->io_write_handle); + transition(); /* To PEER1_READ_SHUTDOWN */ + break; + case PEER1_READ_SHUTDOWN: + GNUNET_assert (&peer2 == peer); + GNUNET_assert (NULL == peer->io_write_handle); + transition (); /* To SUCCESS */ + break; + default: + GNUNET_assert (NULL != peer->io_write_handle); + } +} + + +/** + * Check whether peers successfully shut down. + */ +static void +peergroup_shutdown_callback (void *cls, const char *emsg) +{ + if (emsg != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Shutdown of peers failed!\n"); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "All peers successfully shut down!\n"); + } + GNUNET_CONFIGURATION_destroy (config); +} + + +/** + * Close sockets and stop testing deamons nicely + */ +static void +do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != peer1.socket) + GNUNET_STREAM_close (peer1.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { GNUNET_SCHEDULER_cancel (abort_task); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: arm\n"); - if (0 != GNUNET_OS_process_kill (arm_pid, SIGTERM)) - { - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); - GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); - GNUNET_OS_process_close (arm_pid); + + GNUNET_TESTING_daemons_stop (pg, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &peergroup_shutdown_callback, + NULL); +} + + +/** + * Completion callback for shutdown + * + * @param cls the closure from GNUNET_STREAM_shutdown call + * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, + * SHUT_RDWR) + */ +void +shutdown_completion (void *cls, + int operation) +{ + switch (current_test) + { + case PEER1_WRITE: + GNUNET_assert (0); + case PEER1_WRITE_SHUTDOWN: + peer1.test_ok = GNUNET_YES; + /* Peer2 should read with error */ + peer2.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); + break; + case PEER1_READ_SHUTDOWN: + peer1.test_ok = GNUNET_YES; + peer2.bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2); + break; + case PEER1_HALFCLOSE_READ: + case PEER1_HALFCLOSE_WRITE_FAIL: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + } +} + + +/** + * Task for calling STREAM_shutdown + * + * @param cls the peer entity + * @param tc the TaskContext + */ +static void +stream_shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket, + peer->shutdown_operation, + &shutdown_completion, + peer); + GNUNET_assert (NULL != peer->shutdown_handle); } @@ -103,20 +383,71 @@ static void do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); - if (0 != test_task) - { - GNUNET_SCHEDULER_cancel (test_task); - } if (0 != read_task) { GNUNET_SCHEDULER_cancel (read_task); } result = GNUNET_SYSERR; abort_task = 0; - do_shutdown (cls, tc); + do_close (cls, tc); } +/** + * The transition function; responsible for the transitions among tests + */ +static void +transition() +{ + if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok)) + { + peer1.test_ok = GNUNET_NO; + peer2.test_ok = GNUNET_NO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "TEST %d SUCCESSFULL\n", current_test); + switch (current_test) + { + case PEER1_WRITE: + current_test = PEER1_WRITE_SHUTDOWN; + /* Peer1 should shutdown writing */ + peer1.shutdown_operation = SHUT_WR; + GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1); + break; + case PEER1_WRITE_SHUTDOWN: + current_test = PEER1_HALFCLOSE_READ; + /* Peer2 should be able to write successfully */ + peer2.bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2); + + /* Peer1 should be able to read successfully */ + peer1.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1); + break; + case PEER1_HALFCLOSE_READ: + current_test = PEER1_HALFCLOSE_WRITE_FAIL; + peer1.bytes_wrote = 0; + peer2.bytes_read = 0; + peer2.test_ok = GNUNET_YES; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1); + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + current_test = PEER1_READ_SHUTDOWN; + peer1.shutdown_operation = SHUT_RD; + GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1); + break; + case PEER1_READ_SHUTDOWN: + current_test = SUCCESS; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All tests successful\n"); + GNUNET_SCHEDULER_add_now (&do_close, NULL); + break; + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + + } + } +} + /** * The write completion function; called upon writing some data to stream or * upon error @@ -130,37 +461,54 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer; - - peer = (struct PeerData *) cls; - GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); - peer->bytes_wrote += size; + struct PeerData *peer = cls; - if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ - { - peer->io_handle = GNUNET_STREAM_write (peer->socket, - (void *) data, - strlen(data) - peer->bytes_wrote, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); - } - else + switch (current_test) { - if (&peer1 == peer) /* Peer1 has finished writing; should read now */ - { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL!=peer->io_handle); - } - } + case PEER1_WRITE: + case PEER1_HALFCLOSE_READ: + + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + peer->bytes_wrote += size; + + if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ + { + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + + if (&peer1 == peer) + { + peer1.test_ok = GNUNET_YES; + transition (); /* to PEER1_WRITE_SHUTDOWN */ + } + else /* This will happen during PEER1_HALFCLOSE_READ */ + { + peer2.test_ok = GNUNET_YES; + transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */ + } + } + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + GNUNET_assert (peer == &peer1); + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + GNUNET_assert (0 == size); + peer1.test_ok = GNUNET_YES; + break; + case PEER1_READ_SHUTDOWN: + GNUNET_assert (peer == &peer2); + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + GNUNET_assert (0 == size); + peer2.test_ok = GNUNET_YES; + break; + case PEER1_WRITE_SHUTDOWN: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + } } @@ -176,18 +524,18 @@ stream_open_cb (void *cls, { struct PeerData *peer; + GNUNET_assert (socket == peer1.socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Stream established from peer1\n", + GNUNET_i2s (&peer1.our_id)); peer = (struct PeerData *) cls; peer->bytes_wrote = 0; GNUNET_assert (socket == peer1.socket); GNUNET_assert (socket == peer->socket); - peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + peer1.test_ok = GNUNET_NO; + peer2.test_ok = GNUNET_NO; + current_test = PEER1_WRITE; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -211,34 +559,54 @@ input_processor (void *cls, peer = (struct PeerData *) cls; - /* Peer1 is expected to read when it first finishes writing */ - if (peer == &peer1) + switch (current_test) { - /* since p2 closed write */ - GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); - /* Test passed; shutdown now */ - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); - return 0; - } + case PEER1_WRITE: + case PEER1_HALFCLOSE_READ: + if (GNUNET_STREAM_TIMEOUT == status) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Read operation timedout - reading again!\n"); + GNUNET_assert (0 == size); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + return 0; + } - GNUNET_assert (GNUNET_STERAM_OK == status); - GNUNET_assert (size < strlen (data)); - GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, - (const char *) input_data, - size)); - peer->bytes_read += size; + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); + peer->bytes_read += size; - if (peer->bytes_read < strlen (data)) - { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_handle); + if (peer->bytes_read < strlen (data)) + { + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + if (&peer2 == peer) /* Peer2 has completed reading; should write */ + { + peer2.test_ok = GNUNET_YES; + transition (); /* Transition to PEER1_WRITE_SHUTDOWN */ + } + else /* Peer1 has completed reading. End of tests */ + { + peer1.test_ok = GNUNET_YES; + transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */ + } + } + break; + case PEER1_WRITE_SHUTDOWN: + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + peer2.test_ok = GNUNET_YES; + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + case PEER1_READ_SHUTDOWN: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ } - + return size; } @@ -253,16 +621,7 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) read_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (NULL != cls); peer2.bytes_read = 0; - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - /* Close the stream for writing */ - GNUNET_STREAM_shutdown ((struct GNUNET_STREAM_Socket *) cls, - SHUT_WR); - peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - (void *) &peer2); - GNUNET_assert (NULL != peer2.io_handle); + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); } @@ -278,40 +637,80 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) */ static int stream_listen_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) { GNUNET_assert (NULL != socket); - GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ + GNUNET_assert (NULL != initiator); GNUNET_assert (socket != peer1.socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer connected: %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s(initiator)); + peer2.socket = socket; + /* FIXME: reading should be done right now instead of a scheduled call */ read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); return GNUNET_OK; } /** - * Testing function + * Callback to be called when testing peer group is ready + * + * @param cls NULL + * @param emsg NULL on success */ -static void -test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +void +peergroup_ready (void *cls, const char *emsg) { - test_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Starting peer group failed: %s\n", emsg); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer group is now ready\n"); + + GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg)); + + d1 = GNUNET_TESTING_daemon_get (pg, 0); + GNUNET_assert (NULL != d1); + + d2 = GNUNET_TESTING_daemon_get (pg, 1); + GNUNET_assert (NULL != d2); + + GNUNET_TESTING_get_peer_identity (d1->cfg, + &peer1.our_id); + GNUNET_TESTING_get_peer_identity (d2->cfg, + &peer2.our_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer1.our_id), + GNUNET_i2s (&d1->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s (&d2->id)); + + peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg, + 10, /* App port */ + &stream_listen_cb, + NULL); + GNUNET_assert (NULL != peer2_listen_socket); /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ + peer1.socket = GNUNET_STREAM_open (d1->cfg, + &d2->id, /* Null for local peer? */ 10, /* App port */ &stream_open_cb, - (void *) &peer1); + &peer1); GNUNET_assert (NULL != peer1.socket); - peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */ - &stream_listen_cb, - NULL); - GNUNET_assert (NULL != peer2_listen_socket); - } + /** * Initialize framework and start test */ @@ -319,28 +718,33 @@ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { - GNUNET_log_setup ("test_stream_local", -#if VERBOSE - "DEBUG", -#else - "WARNING", -#endif - NULL); - arm_pid = - GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm", - "gnunet-service-arm", -#if VERBOSE_ARM - "-L", "DEBUG", -#endif - "-c", "test_stream_local.conf", NULL); - - abort_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort, - NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg); + struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */ + + /* GNUNET_log_setup ("test_stream_local", */ + /* "DEBUG", */ + /* NULL); */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting test\n"); + /* Duplicate the configuration */ + config = GNUNET_CONFIGURATION_dup (cfg); + + hosts = GNUNET_TESTING_hosts_load (config); + + pg = GNUNET_TESTING_peergroup_start (config, + 2, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 3), + NULL, + &peergroup_ready, + NULL, + hosts); + GNUNET_assert (NULL != pg); + + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort, + NULL); } /** @@ -350,18 +754,15 @@ int main (int argc, char **argv) { int ret; - char *const argv2[] = { "test-stream-local", - "-c", "test_stream.conf", -#if VERBOSE - "-L", "DEBUG", -#endif - NULL - }; + char *argv2[] = { "test-stream-local", + "-L", "DEBUG", + "-c", "test_stream_local.conf", + NULL}; struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_OPTION_END }; - + ret = GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, "test-stream-local", "nohelp", options, &run, NULL); @@ -377,6 +778,6 @@ int main (int argc, char **argv) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); return 1; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); return 0; } -- cgit v1.2.3