aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-12-12 23:00:05 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-12-12 23:00:05 +0000
commitb19f5f159c18d5d0685af67628ee6f24c86b7c27 (patch)
treef43f63ed97d2fe69fa618283e61ca2ae8cb551bb /src/stream
parenta33a6a8017dfdd912a8b22354d9d4dc97200daff (diff)
downloadgnunet-b19f5f159c18d5d0685af67628ee6f24c86b7c27.tar.gz
gnunet-b19f5f159c18d5d0685af67628ee6f24c86b7c27.zip
- fixes
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c162
-rw-r--r--src/stream/test_stream_2peers_halfclose.c7
2 files changed, 104 insertions, 65 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 498d64d81..4b86bece0 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -296,11 +296,6 @@ struct GNUNET_STREAM_Socket
296 enum State state; 296 enum State state;
297 297
298 /** 298 /**
299 * The status of the socket
300 */
301 enum GNUNET_STREAM_Status status;
302
303 /**
304 * Whether testing mode is active or not 299 * Whether testing mode is active or not
305 */ 300 */
306 int testing_active; 301 int testing_active;
@@ -1022,7 +1017,7 @@ call_read_processor (void *cls,
1022 /* Call the data processor */ 1017 /* Call the data processor */
1023 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", 1018 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
1024 GNUNET_i2s (&socket->other_peer)); 1019 GNUNET_i2s (&socket->other_peer));
1025 read_size = proc (proc_cls, socket->status, 1020 read_size = proc (proc_cls, GNUNET_STREAM_OK,
1026 socket->receive_buffer + socket->copy_offset, 1021 socket->receive_buffer + socket->copy_offset,
1027 valid_read_size); 1022 valid_read_size);
1028 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", 1023 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
@@ -1660,6 +1655,63 @@ client_handle_reset (void *cls,
1660 1655
1661 1656
1662/** 1657/**
1658 * Frees the socket's receive buffers, marks the socket as receive closed and
1659 * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
1660 * is present
1661 *
1662 * @param socket the socket
1663 */
1664static void
1665do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
1666{
1667 socket->receive_closed = GNUNET_YES;
1668 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1669 socket->receive_buffer = NULL;
1670 socket->receive_buffer_size = 0;
1671 if (NULL != socket->read_handle)
1672 {
1673 GNUNET_STREAM_DataProcessor proc;
1674 void *proc_cls;
1675
1676 proc = socket->read_handle->proc;
1677 proc_cls = socket->read_handle->proc_cls;
1678 GNUNET_STREAM_read_cancel (socket->read_handle);
1679 socket->read_handle = NULL;
1680 if (NULL != proc)
1681 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
1682 }
1683}
1684
1685
1686/**
1687 * Marks the socket as transmit closed and calls the CompletionContinuation with
1688 * GNUNET_STREAM_SHUTDOWN status if a write handle is present
1689 *
1690 * @param socket the socket
1691 */
1692static void
1693do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
1694{
1695 socket->transmit_closed = GNUNET_YES;
1696 /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1697 that that stream has been shutdown */
1698 if (NULL != socket->write_handle)
1699 {
1700 GNUNET_STREAM_CompletionContinuation wc;
1701 void *wc_cls;
1702
1703 wc = socket->write_handle->write_cont;
1704 wc_cls = socket->write_handle->write_cont_cls;
1705 GNUNET_STREAM_write_cancel (socket->write_handle);
1706 socket->write_handle = NULL;
1707 if (NULL != wc)
1708 wc (wc_cls,
1709 GNUNET_STREAM_SHUTDOWN, 0);
1710 }
1711}
1712
1713
1714/**
1663 * Common message handler for handling TRANSMIT_CLOSE messages 1715 * Common message handler for handling TRANSMIT_CLOSE messages
1664 * 1716 *
1665 * @param socket the socket through which the ack was received 1717 * @param socket the socket through which the ack was received
@@ -1691,19 +1743,29 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1691 default: 1743 default:
1692 break; 1744 break;
1693 } 1745 }
1694 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1695 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1696 socket->receive_closed = GNUNET_YES;
1697 if (GNUNET_YES == socket->transmit_closed)
1698 socket->state = STATE_CLOSED;
1699 else
1700 socket->state = STATE_RECEIVE_CLOSED;
1701 /* Send TRANSMIT_CLOSE_ACK */ 1746 /* Send TRANSMIT_CLOSE_ACK */
1702 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1747 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1703 reply->header.type = 1748 reply->header.type =
1704 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); 1749 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1705 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1750 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1706 queue_message (socket, reply, NULL, NULL, GNUNET_NO); 1751 queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1752 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
1753 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1754 switch(socket->state)
1755 {
1756 case STATE_RECEIVE_CLOSED:
1757 case STATE_RECEIVE_CLOSE_WAIT:
1758 case STATE_CLOSE_WAIT:
1759 case STATE_CLOSED:
1760 return GNUNET_OK;
1761 default:
1762 break;
1763 }
1764 do_receive_shutdown (socket);
1765 if (GNUNET_YES == socket->transmit_closed)
1766 socket->state = STATE_CLOSED;
1767 else
1768 socket->state = STATE_RECEIVE_CLOSED;
1707 return GNUNET_OK; 1769 return GNUNET_OK;
1708} 1770}
1709 1771
@@ -1942,12 +2004,7 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1942 break; 2004 break;
1943 } 2005 }
1944 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n", 2006 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1945 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); 2007 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1946 socket->transmit_closed = GNUNET_YES;
1947 if (GNUNET_YES == socket->receive_closed)
1948 socket->state = STATE_CLOSED;
1949 else
1950 socket->state = STATE_TRANSMIT_CLOSED;
1951 receive_close_ack = 2008 receive_close_ack =
1952 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 2009 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1953 receive_close_ack->header.size = 2010 receive_close_ack->header.size =
@@ -1955,21 +2012,21 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1955 receive_close_ack->header.type = 2012 receive_close_ack->header.type =
1956 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); 2013 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1957 queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO); 2014 queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1958 /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal 2015 switch (socket->state)
1959 that that stream has been shutdown */
1960 if (NULL != socket->write_handle)
1961 { 2016 {
1962 GNUNET_STREAM_CompletionContinuation wc; 2017 case STATE_TRANSMIT_CLOSED:
1963 void *wc_cls; 2018 case STATE_TRANSMIT_CLOSE_WAIT:
1964 2019 case STATE_CLOSED:
1965 wc = socket->write_handle->write_cont; 2020 case STATE_CLOSE_WAIT:
1966 wc_cls = socket->write_handle->write_cont_cls; 2021 return GNUNET_OK;
1967 GNUNET_STREAM_write_cancel (socket->write_handle); 2022 default:
1968 socket->write_handle = NULL; 2023 break;
1969 if (NULL != wc)
1970 wc (wc_cls,
1971 GNUNET_STREAM_SHUTDOWN, 0);
1972 } 2024 }
2025 do_transmit_shutdown (socket);
2026 if (GNUNET_YES == socket->receive_closed)
2027 socket->state = STATE_CLOSED;
2028 else
2029 socket->state = STATE_TRANSMIT_CLOSED;
1973 return GNUNET_OK; 2030 return GNUNET_OK;
1974} 2031}
1975 2032
@@ -2075,28 +2132,12 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
2075 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 2132 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2076 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); 2133 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2077 queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO); 2134 queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2078 if (STATE_CLOSED == socket->state) 2135 if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
2079 return GNUNET_OK; 2136 return GNUNET_OK;
2080 socket->receive_closed = GNUNET_YES; 2137 if (GNUNET_NO == socket->transmit_closed)
2081 socket->transmit_closed = GNUNET_YES; 2138 do_transmit_shutdown (socket);
2082 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ 2139 if (GNUNET_NO == socket->receive_closed)
2083 socket->receive_buffer = NULL; 2140 do_receive_shutdown (socket);
2084 socket->receive_buffer_size = 0;
2085 /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
2086 that that stream has been shutdown */
2087 if (NULL != socket->write_handle)
2088 {
2089 GNUNET_STREAM_CompletionContinuation wc;
2090 void *wc_cls;
2091
2092 wc = socket->write_handle->write_cont;
2093 wc_cls = socket->write_handle->write_cont_cls;
2094 GNUNET_STREAM_write_cancel (socket->write_handle);
2095 socket->write_handle = NULL;
2096 if (NULL != wc)
2097 wc (wc_cls,
2098 GNUNET_STREAM_SHUTDOWN, 0);
2099 }
2100 return GNUNET_OK; 2141 return GNUNET_OK;
2101} 2142}
2102 2143
@@ -2665,7 +2706,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2665 socket->write_handle = NULL; 2706 socket->write_handle = NULL;
2666 if (NULL != write_handle->write_cont) 2707 if (NULL != write_handle->write_cont)
2667 write_handle->write_cont (write_handle->write_cont_cls, 2708 write_handle->write_cont (write_handle->write_cont_cls,
2668 socket->status, 2709 GNUNET_STREAM_OK,
2669 write_handle->size); 2710 write_handle->size);
2670 /* We are done with the write handle - Freeing it */ 2711 /* We are done with the write handle - Freeing it */
2671 GNUNET_free (write_handle); 2712 GNUNET_free (write_handle);
@@ -2941,7 +2982,6 @@ tunnel_cleaner (void *cls,
2941 GNUNET_STATISTICS_update (socket->stat_handle, 2982 GNUNET_STATISTICS_update (socket->stat_handle,
2942 "inbound connections", -1, GNUNET_NO); 2983 "inbound connections", -1, GNUNET_NO);
2943 } 2984 }
2944 socket->status = GNUNET_STREAM_SYSERR;
2945 /* Clear Transmit handles */ 2985 /* Clear Transmit handles */
2946 if (NULL != socket->transmit_handle) 2986 if (NULL != socket->transmit_handle)
2947 { 2987 {
@@ -3632,14 +3672,12 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3632 "%s: %s()\n", 3672 "%s: %s()\n",
3633 GNUNET_i2s (&socket->other_peer), 3673 GNUNET_i2s (&socket->other_peer),
3634 __func__); 3674 __func__);
3635 /* Return NULL if there is already a read handle; the user has to cancel that 3675 /* Only one read handle is permitted at any time; cancel the existing or wait
3636 first before continuing or has to wait until it is completed */ 3676 for it to complete */
3637 if (NULL != socket->read_handle) 3677 GNUNET_assert (NULL == socket->read_handle);
3638 {
3639 GNUNET_assert (0);
3640 return NULL;
3641 }
3642 GNUNET_assert (NULL != proc); 3678 GNUNET_assert (NULL != proc);
3679 if (GNUNET_YES == socket->receive_closed)
3680 return NULL;
3643 switch (socket->state) 3681 switch (socket->state)
3644 { 3682 {
3645 case STATE_RECEIVE_CLOSED: 3683 case STATE_RECEIVE_CLOSED:
diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c
index 58f9f19cf..fa7b8194e 100644
--- a/src/stream/test_stream_2peers_halfclose.c
+++ b/src/stream/test_stream_2peers_halfclose.c
@@ -259,6 +259,7 @@ stream_read_task (void *cls,
259 case PEER1_WRITE_SHUTDOWN: 259 case PEER1_WRITE_SHUTDOWN:
260 GNUNET_assert (&peer2 == peer); 260 GNUNET_assert (&peer2 == peer);
261 GNUNET_assert (NULL == peer->io_read_handle); 261 GNUNET_assert (NULL == peer->io_read_handle);
262 peer2.test_ok = GNUNET_YES;
262 transition (); /* to PEER1_HALFCLOSE_READ */ 263 transition (); /* to PEER1_HALFCLOSE_READ */
263 break; 264 break;
264 default: 265 default:
@@ -614,8 +615,8 @@ input_processor (void *cls,
614 } 615 }
615 break; 616 break;
616 case PEER1_WRITE_SHUTDOWN: 617 case PEER1_WRITE_SHUTDOWN:
617 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); 618 GNUNET_assert (0); /* This callback will not be called when stream
618 peer2.test_ok = GNUNET_YES; 619 is shutdown */
619 break; 620 break;
620 case PEER1_HALFCLOSE_WRITE_FAIL: 621 case PEER1_HALFCLOSE_WRITE_FAIL:
621 case PEER1_READ_SHUTDOWN: 622 case PEER1_READ_SHUTDOWN:
@@ -868,7 +869,7 @@ test_master (void *cls, unsigned int num_peers,
868 setup_state = INIT; 869 setup_state = INIT;
869 abort_task = 870 abort_task =
870 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 871 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
871 (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort, 872 (GNUNET_TIME_UNIT_SECONDS, 1000), &do_abort,
872 NULL); 873 NULL);
873} 874}
874 875