aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-04-14 14:59:50 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-04-14 14:59:50 +0000
commit6e5be59c1f6f958c8403a3d0d70eaab82fc7908b (patch)
tree7dec58e8255150b5390850d97f450514cca34408
parent38f48d5fb82ca04c0c725189c47983c957c56c8f (diff)
downloadgnunet-6e5be59c1f6f958c8403a3d0d70eaab82fc7908b.tar.gz
gnunet-6e5be59c1f6f958c8403a3d0d70eaab82fc7908b.zip
-stream connection halfclose and test cases
-rw-r--r--src/include/gnunet_stream_lib.h12
-rw-r--r--src/stream/Makefile.am17
-rw-r--r--src/stream/stream_api.c415
-rw-r--r--src/stream/test_stream_local_halfclose.c677
4 files changed, 872 insertions, 249 deletions
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h
index ac2ce0854..099f37ab2 100644
--- a/src/include/gnunet_stream_lib.h
+++ b/src/include/gnunet_stream_lib.h
@@ -262,7 +262,10 @@ struct GNUNET_STREAM_IOReadHandle;
262 * @param write_cont the function to call upon writing some bytes into the 262 * @param write_cont the function to call upon writing some bytes into the
263 * stream 263 * stream
264 * @param write_cont_cls the closure 264 * @param write_cont_cls the closure
265 * @return handle to cancel the operation; NULL if a previous write is pending 265 *
266 * @return handle to cancel the operation; if a previous write is pending or
267 * the stream has been shutdown for this operation then write_cont is
268 * immediately called and NULL is returned.
266 */ 269 */
267struct GNUNET_STREAM_IOWriteHandle * 270struct GNUNET_STREAM_IOWriteHandle *
268GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, 271GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
@@ -291,13 +294,16 @@ typedef size_t (*GNUNET_STREAM_DataProcessor) (void *cls,
291 294
292 295
293/** 296/**
294 * Tries to read data from the stream 297 * Tries to read data from the stream.
295 * 298 *
296 * @param socket the socket representing a stream 299 * @param socket the socket representing a stream
297 * @param timeout the timeout period 300 * @param timeout the timeout period
298 * @param proc function to call with data (once only) 301 * @param proc function to call with data (once only)
299 * @param proc_cls the closure for proc 302 * @param proc_cls the closure for proc
300 * @return handle to cancel the operation 303 *
304 * @return handle to cancel the operation; if the stream has been shutdown for
305 * this type of opeartion then the DataProcessor is immediately
306 * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
301 */ 307 */
302struct GNUNET_STREAM_IOReadHandle * 308struct GNUNET_STREAM_IOReadHandle *
303GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, 309GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am
index 45617e481..6116937c5 100644
--- a/src/stream/Makefile.am
+++ b/src/stream/Makefile.am
@@ -21,8 +21,7 @@ libgnunetstream_la_LDFLAGS = \
21 21
22check_PROGRAMS = \ 22check_PROGRAMS = \
23 test_stream_local \ 23 test_stream_local \
24 test_stream_api 24 test_stream_local_halfclose
25# test_stream_halfclose
26 25
27EXTRA_DIST = test_stream_local.conf 26EXTRA_DIST = test_stream_local.conf
28 27
@@ -37,16 +36,10 @@ test_stream_local_LDADD = \
37 $(top_builddir)/src/util/libgnunetutil.la \ 36 $(top_builddir)/src/util/libgnunetutil.la \
38 $(top_builddir)/src/testing/libgnunettesting.la 37 $(top_builddir)/src/testing/libgnunettesting.la
39 38
40test_stream_api_SOURCES = \ 39
41 test_stream_api.c 40test_stream_local_halfclose_SOURCES = \
42test_stream_api_LDADD = \ 41 test_stream_local_halfclose.c
42test_stream_local_halfclose_LDADD = \
43 $(top_builddir)/src/stream/libgnunetstream.la \ 43 $(top_builddir)/src/stream/libgnunetstream.la \
44 $(top_builddir)/src/util/libgnunetutil.la \ 44 $(top_builddir)/src/util/libgnunetutil.la \
45 $(top_builddir)/src/testing/libgnunettesting.la 45 $(top_builddir)/src/testing/libgnunettesting.la
46
47#test_stream_halfclose_SOURCES = \
48# test_stream_halfclose.c
49#test_stream_halfclose_LDADD = \
50# $(top_builddir)/src/stream/libgnunetstream.la \
51# $(top_builddir)/src/util/libgnunetutil.la
52
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 9ae34752c..ef0065b22 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -696,7 +696,7 @@ ack_task (void *cls,
696 return; 696 return;
697 } 697 }
698 698
699 socket->ack_task_id = 0; 699 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
700 700
701 /* Create the ACK Message */ 701 /* Create the ACK Message */
702 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); 702 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
@@ -1562,6 +1562,141 @@ client_handle_transmit_close (void *cls,
1562 1562
1563 1563
1564/** 1564/**
1565 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1566 *
1567 * @param socket the socket
1568 * @param tunnel connection to the other end
1569 * @param sender who sent the message
1570 * @param message the actual message
1571 * @param atsi performance data for the connection
1572 * @param operation the close operation which is being ACK'ed
1573 * @return GNUNET_OK to keep the connection open,
1574 * GNUNET_SYSERR to close it (signal serious error)
1575 */
1576static int
1577handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1578 struct GNUNET_MESH_Tunnel *tunnel,
1579 const struct GNUNET_PeerIdentity *sender,
1580 const struct GNUNET_STREAM_MessageHeader *message,
1581 const struct GNUNET_ATS_Information *atsi,
1582 int operation)
1583{
1584 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1585
1586 shutdown_handle = socket->shutdown_handle;
1587 if (NULL == shutdown_handle)
1588 {
1589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590 "%x: Received *CLOSE_ACK when shutdown handle is NULL\n",
1591 socket->our_id);
1592 return GNUNET_OK;
1593 }
1594
1595 switch (operation)
1596 {
1597 case SHUT_RDWR:
1598 switch (socket->state)
1599 {
1600 case STATE_CLOSE_WAIT:
1601 if (SHUT_RDWR != shutdown_handle->operation)
1602 {
1603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604 "%x: Received CLOSE_ACK when shutdown handle "
1605 "is not for SHUT_RDWR\n",
1606 socket->our_id);
1607 return GNUNET_OK;
1608 }
1609
1610 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1611 "%x: Received CLOSE_ACK from %x\n",
1612 socket->our_id,
1613 socket->other_peer);
1614 socket->state = STATE_CLOSED;
1615 break;
1616 default:
1617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618 "%x: Received CLOSE_ACK when in it not expected\n",
1619 socket->our_id);
1620 return GNUNET_OK;
1621 }
1622 break;
1623
1624 case SHUT_RD:
1625 switch (socket->state)
1626 {
1627 case STATE_RECEIVE_CLOSE_WAIT:
1628 if (SHUT_RD != shutdown_handle->operation)
1629 {
1630 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631 "%x: Received RECEIVE_CLOSE_ACK when shutdown handle "
1632 "is not for SHUT_RD\n",
1633 socket->our_id);
1634 return GNUNET_OK;
1635 }
1636
1637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638 "%x: Received RECEIVE_CLOSE_ACK from %x\n",
1639 socket->our_id,
1640 socket->other_peer);
1641 socket->state = STATE_RECEIVE_CLOSED;
1642 break;
1643 default:
1644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645 "%x: Received RECEIVE_CLOSE_ACK when in it not expected\n",
1646 socket->our_id);
1647 return GNUNET_OK;
1648 }
1649
1650 break;
1651 case SHUT_WR:
1652 switch (socket->state)
1653 {
1654 case STATE_TRANSMIT_CLOSE_WAIT:
1655 if (SHUT_WR != shutdown_handle->operation)
1656 {
1657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658 "%x: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1659 "is not for SHUT_WR\n",
1660 socket->our_id);
1661 return GNUNET_OK;
1662 }
1663
1664 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665 "%x: Received TRAMSMIT_CLOSE_ACK from %x\n",
1666 socket->our_id,
1667 socket->other_peer);
1668 socket->state = STATE_TRANSMIT_CLOSED;
1669 break;
1670 default:
1671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672 "%x: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1673 socket->our_id);
1674
1675 return GNUNET_OK;
1676 }
1677 break;
1678 default:
1679 GNUNET_assert (0);
1680 }
1681
1682 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1683 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1684 operation);
1685 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1686 socket->shutdown_handle = NULL;
1687 if (GNUNET_SCHEDULER_NO_TASK
1688 != shutdown_handle->close_msg_retransmission_task_id)
1689 {
1690 GNUNET_SCHEDULER_cancel
1691 (shutdown_handle->close_msg_retransmission_task_id);
1692 shutdown_handle->close_msg_retransmission_task_id =
1693 GNUNET_SCHEDULER_NO_TASK;
1694 }
1695 return GNUNET_OK;
1696}
1697
1698
1699/**
1565 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK 1700 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1566 * 1701 *
1567 * @param cls the socket (set from GNUNET_MESH_connect) 1702 * @param cls the socket (set from GNUNET_MESH_connect)
@@ -1583,6 +1718,67 @@ client_handle_transmit_close_ack (void *cls,
1583{ 1718{
1584 struct GNUNET_STREAM_Socket *socket = cls; 1719 struct GNUNET_STREAM_Socket *socket = cls;
1585 1720
1721 return handle_generic_close_ack (socket,
1722 tunnel,
1723 sender,
1724 (const struct GNUNET_STREAM_MessageHeader *)
1725 message,
1726 atsi,
1727 SHUT_WR);
1728}
1729
1730
1731/**
1732 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1733 *
1734 * @param socket the socket
1735 * @param tunnel connection to the other end
1736 * @param sender who sent the message
1737 * @param message the actual message
1738 * @param atsi performance data for the connection
1739 * @return GNUNET_OK to keep the connection open,
1740 * GNUNET_SYSERR to close it (signal serious error)
1741 */
1742static int
1743handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1744 struct GNUNET_MESH_Tunnel *tunnel,
1745 const struct GNUNET_PeerIdentity *sender,
1746 const struct GNUNET_STREAM_MessageHeader *message,
1747 const struct GNUNET_ATS_Information *atsi)
1748{
1749 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1750
1751 switch (socket->state)
1752 {
1753 case STATE_INIT:
1754 case STATE_LISTEN:
1755 case STATE_HELLO_WAIT:
1756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757 "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1758 socket->our_id);
1759 return GNUNET_OK;
1760 default:
1761 break;
1762 }
1763
1764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765 "%x: Received RECEIVE_CLOSE from %x\n",
1766 socket->our_id,
1767 socket->other_peer);
1768 receive_close_ack =
1769 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1770 receive_close_ack->header.size =
1771 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1772 receive_close_ack->header.type =
1773 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1774 queue_message (socket,
1775 receive_close_ack,
1776 &set_state_closed,
1777 NULL);
1778
1779 /* FIXME: Handle the case where write handle is present; the write operation
1780 should be deemed as finised and the write continuation callback
1781 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1586 return GNUNET_OK; 1782 return GNUNET_OK;
1587} 1783}
1588 1784
@@ -1609,7 +1805,12 @@ client_handle_receive_close (void *cls,
1609{ 1805{
1610 struct GNUNET_STREAM_Socket *socket = cls; 1806 struct GNUNET_STREAM_Socket *socket = cls;
1611 1807
1612 return GNUNET_OK; 1808 return
1809 handle_receive_close (socket,
1810 tunnel,
1811 sender,
1812 (const struct GNUNET_STREAM_MessageHeader *) message,
1813 atsi);
1613} 1814}
1614 1815
1615 1816
@@ -1635,7 +1836,13 @@ client_handle_receive_close_ack (void *cls,
1635{ 1836{
1636 struct GNUNET_STREAM_Socket *socket = cls; 1837 struct GNUNET_STREAM_Socket *socket = cls;
1637 1838
1638 return GNUNET_OK; 1839 return handle_generic_close_ack (socket,
1840 tunnel,
1841 sender,
1842 (const struct GNUNET_STREAM_MessageHeader *)
1843 message,
1844 atsi,
1845 SHUT_RD);
1639} 1846}
1640 1847
1641 1848
@@ -1659,6 +1866,19 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
1659{ 1866{
1660 struct GNUNET_STREAM_MessageHeader *close_ack; 1867 struct GNUNET_STREAM_MessageHeader *close_ack;
1661 1868
1869 switch (socket->state)
1870 {
1871 case STATE_INIT:
1872 case STATE_LISTEN:
1873 case STATE_HELLO_WAIT:
1874 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875 "%x: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1876 socket->our_id);
1877 return GNUNET_OK;
1878 default:
1879 break;
1880 }
1881
1662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1882 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663 "%x: Received CLOSE from %x\n", 1883 "%x: Received CLOSE from %x\n",
1664 socket->our_id, 1884 socket->our_id,
@@ -1711,69 +1931,6 @@ client_handle_close (void *cls,
1711 1931
1712 1932
1713/** 1933/**
1714 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1715 *
1716 * @param socket the socket
1717 * @param tunnel connection to the other end
1718 * @param sender who sent the message
1719 * @param message the actual message
1720 * @param atsi performance data for the connection
1721 * @return GNUNET_OK to keep the connection open,
1722 * GNUNET_SYSERR to close it (signal serious error)
1723 */
1724static int
1725handle_close_ack (struct GNUNET_STREAM_Socket *socket,
1726 struct GNUNET_MESH_Tunnel *tunnel,
1727 const struct GNUNET_PeerIdentity *sender,
1728 const struct GNUNET_STREAM_MessageHeader *message,
1729 const struct GNUNET_ATS_Information *atsi)
1730{
1731 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1732
1733 shutdown_handle = socket->shutdown_handle;
1734 switch (socket->state)
1735 {
1736 case STATE_CLOSE_WAIT:
1737 if ( (NULL == shutdown_handle) ||
1738 (SHUT_RDWR != shutdown_handle->operation) )
1739 {
1740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1741 "%x: Received CLOSE_ACK when shutdown handle is NULL or "
1742 "not for SHUT_RDWR\n",
1743 socket->our_id);
1744 return GNUNET_OK;
1745 }
1746
1747 if (GNUNET_SCHEDULER_NO_TASK
1748 != shutdown_handle->close_msg_retransmission_task_id)
1749 {
1750 GNUNET_SCHEDULER_cancel
1751 (shutdown_handle->close_msg_retransmission_task_id);
1752 shutdown_handle->close_msg_retransmission_task_id =
1753 GNUNET_SCHEDULER_NO_TASK;
1754 }
1755
1756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757 "%x: Received CLOSE_ACK from %x\n",
1758 socket->our_id,
1759 socket->other_peer);
1760 socket->state = STATE_CLOSED;
1761 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1762 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1763 SHUT_RDWR);
1764 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1765 break;
1766 default:
1767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768 "%x: Received CLOSE_ACK when in it not expected\n",
1769 socket->our_id);
1770 break;
1771 }
1772 return GNUNET_OK;
1773}
1774
1775
1776/**
1777 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK 1934 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
1778 * 1935 *
1779 * @param cls the socket (set from GNUNET_MESH_connect) 1936 * @param cls the socket (set from GNUNET_MESH_connect)
@@ -1795,12 +1952,13 @@ client_handle_close_ack (void *cls,
1795{ 1952{
1796 struct GNUNET_STREAM_Socket *socket = cls; 1953 struct GNUNET_STREAM_Socket *socket = cls;
1797 1954
1798 return handle_close_ack (socket, 1955 return handle_generic_close_ack (socket,
1799 tunnel, 1956 tunnel,
1800 sender, 1957 sender,
1801 (const struct GNUNET_STREAM_MessageHeader *) 1958 (const struct GNUNET_STREAM_MessageHeader *)
1802 message, 1959 message,
1803 atsi); 1960 atsi,
1961 SHUT_RDWR);
1804} 1962}
1805 1963
1806/*****************************/ 1964/*****************************/
@@ -2027,7 +2185,13 @@ server_handle_transmit_close_ack (void *cls,
2027{ 2185{
2028 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; 2186 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2029 2187
2030 return GNUNET_OK; 2188 return handle_generic_close_ack (socket,
2189 tunnel,
2190 sender,
2191 (const struct GNUNET_STREAM_MessageHeader *)
2192 message,
2193 atsi,
2194 SHUT_WR);
2031} 2195}
2032 2196
2033 2197
@@ -2053,7 +2217,12 @@ server_handle_receive_close (void *cls,
2053{ 2217{
2054 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; 2218 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2055 2219
2056 return GNUNET_OK; 2220 return
2221 handle_receive_close (socket,
2222 tunnel,
2223 sender,
2224 (const struct GNUNET_STREAM_MessageHeader *) message,
2225 atsi);
2057} 2226}
2058 2227
2059 2228
@@ -2079,7 +2248,13 @@ server_handle_receive_close_ack (void *cls,
2079{ 2248{
2080 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; 2249 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2081 2250
2082 return GNUNET_OK; 2251 return handle_generic_close_ack (socket,
2252 tunnel,
2253 sender,
2254 (const struct GNUNET_STREAM_MessageHeader *)
2255 message,
2256 atsi,
2257 SHUT_RD);
2083} 2258}
2084 2259
2085 2260
@@ -2136,16 +2311,18 @@ server_handle_close_ack (void *cls,
2136{ 2311{
2137 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; 2312 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2138 2313
2139 return handle_close_ack (socket, 2314 return handle_generic_close_ack (socket,
2140 tunnel, 2315 tunnel,
2141 sender, 2316 sender,
2142 (const struct GNUNET_STREAM_MessageHeader *) message, 2317 (const struct GNUNET_STREAM_MessageHeader *)
2143 atsi); 2318 message,
2319 atsi,
2320 SHUT_RDWR);
2144} 2321}
2145 2322
2146 2323
2147/** 2324/**
2148 * Message Handler for mesh 2325 * Handler for DATA_ACK messages
2149 * 2326 *
2150 * @param socket the socket through which the ack was received 2327 * @param socket the socket through which the ack was received
2151 * @param tunnel connection to the other end 2328 * @param tunnel connection to the other end
@@ -2177,6 +2354,8 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2177 switch (socket->state) 2354 switch (socket->state)
2178 { 2355 {
2179 case (STATE_ESTABLISHED): 2356 case (STATE_ESTABLISHED):
2357 case (STATE_RECEIVE_CLOSED):
2358 case (STATE_RECEIVE_CLOSE_WAIT):
2180 if (NULL == socket->write_handle) 2359 if (NULL == socket->write_handle)
2181 { 2360 {
2182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2361 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2284,7 +2463,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2284 2463
2285 2464
2286/** 2465/**
2287 * Message Handler for mesh 2466 * Handler for DATA_ACK messages
2288 * 2467 *
2289 * @param cls the 'struct GNUNET_STREAM_Socket' 2468 * @param cls the 'struct GNUNET_STREAM_Socket'
2290 * @param tunnel connection to the other end 2469 * @param tunnel connection to the other end
@@ -2311,7 +2490,7 @@ client_handle_ack (void *cls,
2311 2490
2312 2491
2313/** 2492/**
2314 * Message Handler for mesh 2493 * Handler for DATA_ACK messages
2315 * 2494 *
2316 * @param cls the server's listen socket 2495 * @param cls the server's listen socket
2317 * @param tunnel connection to the other end 2496 * @param tunnel connection to the other end
@@ -2855,15 +3034,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
2855 3034
2856 3035
2857/** 3036/**
2858 * Tries to write the given data to the stream 3037 * Tries to write the given data to the stream. The maximum size of data that
3038 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
3039 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3040 * violation, however only the said number of maximum bytes will be written.
2859 * 3041 *
2860 * @param socket the socket representing a stream 3042 * @param socket the socket representing a stream
2861 * @param data the data buffer from where the data is written into the stream 3043 * @param data the data buffer from where the data is written into the stream
2862 * @param size the number of bytes to be written from the data buffer 3044 * @param size the number of bytes to be written from the data buffer
2863 * @param timeout the timeout period 3045 * @param timeout the timeout period
2864 * @param write_cont the function to call upon writing some bytes into the stream 3046 * @param write_cont the function to call upon writing some bytes into the
3047 * stream
2865 * @param write_cont_cls the closure 3048 * @param write_cont_cls the closure
2866 * @return handle to cancel the operation 3049 *
3050 * @return handle to cancel the operation; if a previous write is pending or
3051 * the stream has been shutdown for this operation then write_cont is
3052 * immediately called and NULL is returned.
2867 */ 3053 */
2868struct GNUNET_STREAM_IOWriteHandle * 3054struct GNUNET_STREAM_IOWriteHandle *
2869GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, 3055GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
@@ -2891,16 +3077,33 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2891 GNUNET_break (0); 3077 GNUNET_break (0);
2892 return NULL; 3078 return NULL;
2893 } 3079 }
2894 if (!((STATE_ESTABLISHED == socket->state) 3080
2895 || (STATE_RECEIVE_CLOSE_WAIT == socket->state) 3081 switch (socket->state)
2896 || (STATE_RECEIVE_CLOSED == socket->state)))
2897 { 3082 {
2898 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 3083 case STATE_TRANSMIT_CLOSED:
2899 "%x: Attempting to write on a closed (OR) not-yet-established" 3084 case STATE_TRANSMIT_CLOSE_WAIT:
2900 "stream\n", 3085 case STATE_CLOSED:
2901 socket->our_id); 3086 case STATE_CLOSE_WAIT:
3087 if (NULL != write_cont)
3088 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3090 "%s() END\n", __func__);
3091 return NULL;
3092 case STATE_INIT:
3093 case STATE_LISTEN:
3094 case STATE_HELLO_WAIT:
3095 if (NULL != write_cont)
3096 /* FIXME: GNUNET_STREAM_SYSERR?? */
3097 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3098 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3099 "%s() END\n", __func__);
2902 return NULL; 3100 return NULL;
2903 } 3101 case STATE_ESTABLISHED:
3102 case STATE_RECEIVE_CLOSED:
3103 case STATE_RECEIVE_CLOSE_WAIT:
3104 break;
3105 }
3106
2904 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) 3107 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
2905 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; 3108 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
2906 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; 3109 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
@@ -2957,14 +3160,18 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2957} 3160}
2958 3161
2959 3162
3163
2960/** 3164/**
2961 * Tries to read data from the stream 3165 * Tries to read data from the stream.
2962 * 3166 *
2963 * @param socket the socket representing a stream 3167 * @param socket the socket representing a stream
2964 * @param timeout the timeout period 3168 * @param timeout the timeout period
2965 * @param proc function to call with data (once only) 3169 * @param proc function to call with data (once only)
2966 * @param proc_cls the closure for proc 3170 * @param proc_cls the closure for proc
2967 * @return handle to cancel the operation 3171 *
3172 * @return handle to cancel the operation; if the stream has been shutdown for
3173 * this type of opeartion then the DataProcessor is immediately
3174 * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
2968 */ 3175 */
2969struct GNUNET_STREAM_IOReadHandle * 3176struct GNUNET_STREAM_IOReadHandle *
2970GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, 3177GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -2985,6 +3192,22 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2985 3192
2986 GNUNET_assert (NULL != proc); 3193 GNUNET_assert (NULL != proc);
2987 3194
3195 switch (socket->state)
3196 {
3197 case STATE_RECEIVE_CLOSED:
3198 case STATE_RECEIVE_CLOSE_WAIT:
3199 case STATE_CLOSED:
3200 case STATE_CLOSE_WAIT:
3201 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3202 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3203 "%x: %s() END\n",
3204 socket->our_id,
3205 __func__);
3206 return NULL;
3207 default:
3208 break;
3209 }
3210
2988 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); 3211 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2989 read_handle->proc = proc; 3212 read_handle->proc = proc;
2990 read_handle->proc_cls = proc_cls; 3213 read_handle->proc_cls = proc_cls;
diff --git a/src/stream/test_stream_local_halfclose.c b/src/stream/test_stream_local_halfclose.c
index 61e531ae8..a9d794800 100644
--- a/src/stream/test_stream_local_halfclose.c
+++ b/src/stream/test_stream_local_halfclose.c
@@ -19,22 +19,28 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file stream/test_stream_local_halfclose.c 22 * @file stream/test_stream_local.c
23 * @brief Stream API testing between local peers with half closed connections 23 * @brief Testcases for Stream API halfclosed connections
24 * @author Sree Harsha Totakura 24 * @author Sree Harsha Totakura
25 */ 25 */
26 26
27#include <string.h> 27#include <string.h>
28#include <sys/socket.h> /* For SHUT_RD, SHUT_WR */
29 28
30#include "platform.h" 29#include "platform.h"
31#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
32#include "gnunet_mesh_service.h" 31#include "gnunet_mesh_service.h"
33#include "gnunet_stream_lib.h" 32#include "gnunet_stream_lib.h"
33#include "gnunet_testing_lib.h"
34#include "gnunet_scheduler_lib.h"
34 35
35#define VERBOSE 1 36#define VERBOSE 1
36 37
37/** 38/**
39 * Number of peers
40 */
41#define NUM_PEERS 2
42
43/**
38 * Structure for holding peer's sockets and IO Handles 44 * Structure for holding peer's sockets and IO Handles
39 */ 45 */
40struct PeerData 46struct PeerData
@@ -45,9 +51,24 @@ struct PeerData
45 struct GNUNET_STREAM_Socket *socket; 51 struct GNUNET_STREAM_Socket *socket;
46 52
47 /** 53 /**
48 * Peer's io handle 54 * Peer's io write handle
55 */
56 struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
57
58 /**
59 * Peer's io read handle
60 */
61 struct GNUNET_STREAM_IOReadHandle *io_read_handle;
62
63 /**
64 * Peer's shutdown handle
65 */
66 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
67
68 /**
69 * Our Peer id
49 */ 70 */
50 struct GNUNET_STREAM_IOHandle *io_handle; 71 struct GNUNET_PeerIdentity our_id;
51 72
52 /** 73 /**
53 * Bytes the peer has written 74 * Bytes the peer has written
@@ -58,41 +79,300 @@ struct PeerData
58 * Byte the peer has read 79 * Byte the peer has read
59 */ 80 */
60 unsigned int bytes_read; 81 unsigned int bytes_read;
82
83 /**
84 * GNUNET_YES if the peer has successfully completed the current test
85 */
86 unsigned int test_ok;
87
88 /**
89 * The shutdown operation that has to be used by the stream_shutdown_task
90 */
91 int shutdown_operation;
61}; 92};
62 93
63static struct GNUNET_OS_Process *arm_pid; 94/**
95 * The current peer group
96 */
97static struct GNUNET_TESTING_PeerGroup *pg;
98
99/**
100 * Peer 1 daemon
101 */
102static struct GNUNET_TESTING_Daemon *d1;
103
104/**
105 * Peer 2 daemon
106 */
107static struct GNUNET_TESTING_Daemon *d2;
108
109
110/**
111 * Peer1 writes first and then calls for SHUT_WR
112 * Peer2 reads first and then calls for SHUT_RD
113 * Attempt to write again by Peer1 should be rejected
114 * Attempt to read again by Peer2 should be rejected
115 * Peer1 then reads from Peer2 which writes
116 */
64static struct PeerData peer1; 117static struct PeerData peer1;
65static struct PeerData peer2; 118static struct PeerData peer2;
66static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; 119static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
120static struct GNUNET_CONFIGURATION_Handle *config;
67 121
68static GNUNET_SCHEDULER_TaskIdentifier abort_task; 122static GNUNET_SCHEDULER_TaskIdentifier abort_task;
69static GNUNET_SCHEDULER_TaskIdentifier test_task;
70static GNUNET_SCHEDULER_TaskIdentifier read_task; 123static GNUNET_SCHEDULER_TaskIdentifier read_task;
71 124
72static char *data = "ABCD"; 125static char *data = "ABCD";
73static int result; 126static int result;
74 127
75/** 128/**
76 * Shutdown nicely 129 * Enumeration for various tests that are to be passed in the same order as
130 * below
131 */
132enum Test
133 {
134 /**
135 * Peer1 writing; Peer2 reading
136 */
137 PEER1_WRITE,
138
139 /**
140 * Peer1 write shutdown; Peer2 should get an error when it tries to read;
141 */
142 PEER1_WRITE_SHUTDOWN,
143
144 /**
145 * Peer1 reads; Peer2 writes (connection is halfclosed)
146 */
147 PEER1_HALFCLOSE_READ,
148
149 /**
150 * Peer1 attempts to write; Should fail with stream already shutdown error
151 */
152 PEER1_HALFCLOSE_WRITE_FAIL,
153
154 /**
155 * Peer1 read shutdown; Peer2 should get stream shutdown error during write
156 */
157 PEER1_READ_SHUTDOWN,
158
159 /**
160 * All tests successfully finished
161 */
162 SUCCESS
163 };
164
165/**
166 * Current running test
167 */
168enum Test current_test;
169
170/**
171 * Input processor
172 *
173 * @param cls the closure from GNUNET_STREAM_write/read
174 * @param status the status of the stream at the time this function is called
175 * @param data traffic from the other side
176 * @param size the number of bytes available in data read
177 * @return number of bytes of processed from 'data' (any data remaining should be
178 * given to the next time the read processor is called).
179 */
180static size_t
181input_processor (void *cls,
182 enum GNUNET_STREAM_Status status,
183 const void *input_data,
184 size_t size);
185
186
187/**
188 * The transition function; responsible for the transitions among tests
189 */
190static void
191transition();
192
193
194/**
195 * Task for calling STREAM_read
196 *
197 * @param cls the peer data entity
198 * @param tc the task context
199 */
200static void
201stream_read_task (void *cls,
202 const struct GNUNET_SCHEDULER_TaskContext *tc)
203{
204 struct PeerData *peer = cls;
205
206 peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
207 GNUNET_TIME_relative_multiply
208 (GNUNET_TIME_UNIT_SECONDS, 5),
209 &input_processor,
210 cls);
211 switch (current_test)
212 {
213 case PEER1_WRITE_SHUTDOWN:
214 GNUNET_assert (&peer2 == peer);
215 GNUNET_assert (NULL == peer->io_read_handle);
216 transition (); /* to PEER1_HALFCLOSE_READ */
217 break;
218 default:
219 GNUNET_assert (NULL != peer->io_read_handle);
220 }
221}
222
223
224/**
225 * The write completion function; called upon writing some data to stream or
226 * upon error
227 *
228 * @param cls the closure from GNUNET_STREAM_write/read
229 * @param status the status of the stream at the time this function is called
230 * @param size the number of bytes read or written
231 */
232static void
233write_completion (void *cls,
234 enum GNUNET_STREAM_Status status,
235 size_t size);
236
237
238/**
239 * Task for calling STREAM_write
240 *
241 * @param cls the peer data entity
242 * @param tc the task context
77 */ 243 */
78static void 244static void
79do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 245stream_write_task (void *cls,
246 const struct GNUNET_SCHEDULER_TaskContext *tc)
80{ 247{
81 GNUNET_STREAM_close (peer1.socket); 248 struct PeerData *peer = cls;
82 GNUNET_STREAM_close (peer2.socket); 249
250 peer->io_write_handle =
251 GNUNET_STREAM_write (peer->socket,
252 (void *) data,
253 strlen(data) - peer->bytes_wrote,
254 GNUNET_TIME_relative_multiply
255 (GNUNET_TIME_UNIT_SECONDS, 5),
256 &write_completion,
257 peer);
258 switch (current_test)
259 {
260 case PEER1_HALFCLOSE_WRITE_FAIL:
261 GNUNET_assert (&peer1 == peer);
262 GNUNET_assert (NULL == peer->io_write_handle);
263 transition(); /* To PEER1_READ_SHUTDOWN */
264 break;
265 case PEER1_READ_SHUTDOWN:
266 GNUNET_assert (&peer2 == peer);
267 GNUNET_assert (NULL == peer->io_write_handle);
268 transition (); /* To SUCCESS */
269 break;
270 default:
271 GNUNET_assert (NULL != peer->io_write_handle);
272 }
273}
274
275
276/**
277 * Check whether peers successfully shut down.
278 */
279static void
280peergroup_shutdown_callback (void *cls, const char *emsg)
281{
282 if (emsg != NULL)
283 {
284 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
285 "Shutdown of peers failed!\n");
286 }
287 else
288 {
289 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
290 "All peers successfully shut down!\n");
291 }
292 GNUNET_CONFIGURATION_destroy (config);
293}
294
295
296/**
297 * Close sockets and stop testing deamons nicely
298 */
299static void
300do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
301{
302 if (NULL != peer1.socket)
303 GNUNET_STREAM_close (peer1.socket);
304 if (NULL != peer2.socket)
305 GNUNET_STREAM_close (peer2.socket);
306 if (NULL != peer2_listen_socket)
307 GNUNET_STREAM_listen_close (peer2_listen_socket);
308
83 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); 309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
84 if (0 != abort_task) 310 if (0 != abort_task)
85 { 311 {
86 GNUNET_SCHEDULER_cancel (abort_task); 312 GNUNET_SCHEDULER_cancel (abort_task);
87 } 313 }
88 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: arm\n"); 314
89 if (0 != GNUNET_OS_process_kill (arm_pid, SIGTERM))
90 {
91 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
92 }
93 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); 315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
94 GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); 316
95 GNUNET_OS_process_close (arm_pid); 317 GNUNET_TESTING_daemons_stop (pg,
318 GNUNET_TIME_relative_multiply
319 (GNUNET_TIME_UNIT_SECONDS, 5),
320 &peergroup_shutdown_callback,
321 NULL);
322}
323
324
325/**
326 * Completion callback for shutdown
327 *
328 * @param cls the closure from GNUNET_STREAM_shutdown call
329 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
330 * SHUT_RDWR)
331 */
332void
333shutdown_completion (void *cls,
334 int operation)
335{
336 switch (current_test)
337 {
338 case PEER1_WRITE:
339 GNUNET_assert (0);
340 case PEER1_WRITE_SHUTDOWN:
341 peer1.test_ok = GNUNET_YES;
342 /* Peer2 should read with error */
343 peer2.bytes_read = 0;
344 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
345 break;
346 case PEER1_READ_SHUTDOWN:
347 peer1.test_ok = GNUNET_YES;
348 peer2.bytes_wrote = 0;
349 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
350 break;
351 case PEER1_HALFCLOSE_READ:
352 case PEER1_HALFCLOSE_WRITE_FAIL:
353 case SUCCESS:
354 GNUNET_assert (0); /* We shouldn't reach here */
355 }
356}
357
358
359/**
360 * Task for calling STREAM_shutdown
361 *
362 * @param cls the peer entity
363 * @param tc the TaskContext
364 */
365static void
366stream_shutdown_task (void *cls,
367 const struct GNUNET_SCHEDULER_TaskContext *tc)
368{
369 struct PeerData *peer = cls;
370
371 peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket,
372 peer->shutdown_operation,
373 &shutdown_completion,
374 peer);
375 GNUNET_assert (NULL != peer->shutdown_handle);
96} 376}
97 377
98 378
@@ -103,21 +383,72 @@ static void
103do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 383do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
104{ 384{
105 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); 385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
106 if (0 != test_task)
107 {
108 GNUNET_SCHEDULER_cancel (test_task);
109 }
110 if (0 != read_task) 386 if (0 != read_task)
111 { 387 {
112 GNUNET_SCHEDULER_cancel (read_task); 388 GNUNET_SCHEDULER_cancel (read_task);
113 } 389 }
114 result = GNUNET_SYSERR; 390 result = GNUNET_SYSERR;
115 abort_task = 0; 391 abort_task = 0;
116 do_shutdown (cls, tc); 392 do_close (cls, tc);
117} 393}
118 394
119 395
120/** 396/**
397 * The transition function; responsible for the transitions among tests
398 */
399static void
400transition()
401{
402 if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok))
403 {
404 peer1.test_ok = GNUNET_NO;
405 peer2.test_ok = GNUNET_NO;
406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407 "TEST %d SUCCESSFULL\n", current_test);
408 switch (current_test)
409 {
410 case PEER1_WRITE:
411 current_test = PEER1_WRITE_SHUTDOWN;
412 /* Peer1 should shutdown writing */
413 peer1.shutdown_operation = SHUT_WR;
414 GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
415 break;
416 case PEER1_WRITE_SHUTDOWN:
417 current_test = PEER1_HALFCLOSE_READ;
418 /* Peer2 should be able to write successfully */
419 peer2.bytes_wrote = 0;
420 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
421
422 /* Peer1 should be able to read successfully */
423 peer1.bytes_read = 0;
424 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
425 break;
426 case PEER1_HALFCLOSE_READ:
427 current_test = PEER1_HALFCLOSE_WRITE_FAIL;
428 peer1.bytes_wrote = 0;
429 peer2.bytes_read = 0;
430 peer2.test_ok = GNUNET_YES;
431 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1);
432 break;
433 case PEER1_HALFCLOSE_WRITE_FAIL:
434 current_test = PEER1_READ_SHUTDOWN;
435 peer1.shutdown_operation = SHUT_RD;
436 GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
437 break;
438 case PEER1_READ_SHUTDOWN:
439 current_test = SUCCESS;
440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441 "All tests successful\n");
442 GNUNET_SCHEDULER_add_now (&do_close, NULL);
443 break;
444 case SUCCESS:
445 GNUNET_assert (0); /* We shouldn't reach here */
446
447 }
448 }
449}
450
451/**
121 * The write completion function; called upon writing some data to stream or 452 * The write completion function; called upon writing some data to stream or
122 * upon error 453 * upon error
123 * 454 *
@@ -130,37 +461,54 @@ write_completion (void *cls,
130 enum GNUNET_STREAM_Status status, 461 enum GNUNET_STREAM_Status status,
131 size_t size) 462 size_t size)
132{ 463{
133 struct PeerData *peer; 464 struct PeerData *peer = cls;
134
135 peer = (struct PeerData *) cls;
136 GNUNET_assert (GNUNET_STREAM_OK == status);
137 GNUNET_assert (size < strlen (data));
138 peer->bytes_wrote += size;
139 465
140 if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ 466 switch (current_test)
141 {
142 peer->io_handle = GNUNET_STREAM_write (peer->socket,
143 (void *) data,
144 strlen(data) - peer->bytes_wrote,
145 GNUNET_TIME_relative_multiply
146 (GNUNET_TIME_UNIT_SECONDS, 5),
147 &write_completion,
148 cls);
149 GNUNET_assert (NULL != peer->io_handle);
150 }
151 else
152 { 467 {
153 if (&peer1 == peer) /* Peer1 has finished writing; should read now */ 468 case PEER1_WRITE:
154 { 469 case PEER1_HALFCLOSE_READ:
155 peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) 470
156 peer->socket, 471 GNUNET_assert (GNUNET_STREAM_OK == status);
157 GNUNET_TIME_relative_multiply 472 GNUNET_assert (size <= strlen (data));
158 (GNUNET_TIME_UNIT_SECONDS, 5), 473 peer->bytes_wrote += size;
159 &input_processor, 474
160 cls); 475 if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
161 GNUNET_assert (NULL!=peer->io_handle); 476 {
162 } 477 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
163 } 478 }
479 else
480 {
481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482 "Writing completed\n");
483
484 if (&peer1 == peer)
485 {
486 peer1.test_ok = GNUNET_YES;
487 transition (); /* to PEER1_WRITE_SHUTDOWN */
488 }
489 else /* This will happen during PEER1_HALFCLOSE_READ */
490 {
491 peer2.test_ok = GNUNET_YES;
492 transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
493 }
494 }
495 break;
496 case PEER1_HALFCLOSE_WRITE_FAIL:
497 GNUNET_assert (peer == &peer1);
498 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
499 GNUNET_assert (0 == size);
500 peer1.test_ok = GNUNET_YES;
501 break;
502 case PEER1_READ_SHUTDOWN:
503 GNUNET_assert (peer == &peer2);
504 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
505 GNUNET_assert (0 == size);
506 peer2.test_ok = GNUNET_YES;
507 break;
508 case PEER1_WRITE_SHUTDOWN:
509 case SUCCESS:
510 GNUNET_assert (0); /* We shouldn't reach here */
511 }
164} 512}
165 513
166 514
@@ -176,18 +524,18 @@ stream_open_cb (void *cls,
176{ 524{
177 struct PeerData *peer; 525 struct PeerData *peer;
178 526
527 GNUNET_assert (socket == peer1.socket);
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "%s: Stream established from peer1\n",
530 GNUNET_i2s (&peer1.our_id));
179 peer = (struct PeerData *) cls; 531 peer = (struct PeerData *) cls;
180 peer->bytes_wrote = 0; 532 peer->bytes_wrote = 0;
181 GNUNET_assert (socket == peer1.socket); 533 GNUNET_assert (socket == peer1.socket);
182 GNUNET_assert (socket == peer->socket); 534 GNUNET_assert (socket == peer->socket);
183 peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */ 535 peer1.test_ok = GNUNET_NO;
184 (void *) data, /* data */ 536 peer2.test_ok = GNUNET_NO;
185 strlen(data), 537 current_test = PEER1_WRITE;
186 GNUNET_TIME_relative_multiply 538 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
187 (GNUNET_TIME_UNIT_SECONDS, 5),
188 &write_completion,
189 cls);
190 GNUNET_assert (NULL != peer->io_handle);
191} 539}
192 540
193 541
@@ -211,34 +559,54 @@ input_processor (void *cls,
211 559
212 peer = (struct PeerData *) cls; 560 peer = (struct PeerData *) cls;
213 561
214 /* Peer1 is expected to read when it first finishes writing */ 562 switch (current_test)
215 if (peer == &peer1)
216 { 563 {
217 /* since p2 closed write */ 564 case PEER1_WRITE:
218 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); 565 case PEER1_HALFCLOSE_READ:
219 /* Test passed; shutdown now */ 566 if (GNUNET_STREAM_TIMEOUT == status)
220 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); 567 {
221 return 0; 568 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
222 } 569 "Read operation timedout - reading again!\n");
570 GNUNET_assert (0 == size);
571 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
572 return 0;
573 }
223 574
224 GNUNET_assert (GNUNET_STERAM_OK == status); 575 GNUNET_assert (GNUNET_STREAM_OK == status);
225 GNUNET_assert (size < strlen (data)); 576 GNUNET_assert (size <= strlen (data));
226 GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 577 GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
227 (const char *) input_data, 578 (const char *) input_data,
228 size)); 579 size));
229 peer->bytes_read += size; 580 peer->bytes_read += size;
230 581
231 if (peer->bytes_read < strlen (data)) 582 if (peer->bytes_read < strlen (data))
232 { 583 {
233 peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) 584 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
234 peer->socket, 585 }
235 GNUNET_TIME_relative_multiply 586 else
236 (GNUNET_TIME_UNIT_SECONDS, 5), 587 {
237 &input_processor, 588 if (&peer2 == peer) /* Peer2 has completed reading; should write */
238 cls); 589 {
239 GNUNET_assert (NULL != peer->io_handle); 590 peer2.test_ok = GNUNET_YES;
591 transition (); /* Transition to PEER1_WRITE_SHUTDOWN */
592 }
593 else /* Peer1 has completed reading. End of tests */
594 {
595 peer1.test_ok = GNUNET_YES;
596 transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
597 }
598 }
599 break;
600 case PEER1_WRITE_SHUTDOWN:
601 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
602 peer2.test_ok = GNUNET_YES;
603 break;
604 case PEER1_HALFCLOSE_WRITE_FAIL:
605 case PEER1_READ_SHUTDOWN:
606 case SUCCESS:
607 GNUNET_assert (0); /* We shouldn't reach here */
240 } 608 }
241 609
242 return size; 610 return size;
243} 611}
244 612
@@ -253,16 +621,7 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
253 read_task = GNUNET_SCHEDULER_NO_TASK; 621 read_task = GNUNET_SCHEDULER_NO_TASK;
254 GNUNET_assert (NULL != cls); 622 GNUNET_assert (NULL != cls);
255 peer2.bytes_read = 0; 623 peer2.bytes_read = 0;
256 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ 624 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
257 /* Close the stream for writing */
258 GNUNET_STREAM_shutdown ((struct GNUNET_STREAM_Socket *) cls,
259 SHUT_WR);
260 peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
261 GNUNET_TIME_relative_multiply
262 (GNUNET_TIME_UNIT_SECONDS, 5),
263 &input_processor,
264 (void *) &peer2);
265 GNUNET_assert (NULL != peer2.io_handle);
266} 625}
267 626
268 627
@@ -278,40 +637,80 @@ stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
278 */ 637 */
279static int 638static int
280stream_listen_cb (void *cls, 639stream_listen_cb (void *cls,
281 struct GNUNET_STREAM_Socket *socket, 640 struct GNUNET_STREAM_Socket *socket,
282 const struct GNUNET_PeerIdentity *initiator) 641 const struct GNUNET_PeerIdentity *initiator)
283{ 642{
284 GNUNET_assert (NULL != socket); 643 GNUNET_assert (NULL != socket);
285 GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ 644 GNUNET_assert (NULL != initiator);
286 GNUNET_assert (socket != peer1.socket); 645 GNUNET_assert (socket != peer1.socket);
287 646
647 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
648 "%s: Peer connected: %s\n",
649 GNUNET_i2s (&peer2.our_id),
650 GNUNET_i2s(initiator));
651
288 peer2.socket = socket; 652 peer2.socket = socket;
653 /* FIXME: reading should be done right now instead of a scheduled call */
289 read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); 654 read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
290 return GNUNET_OK; 655 return GNUNET_OK;
291} 656}
292 657
293 658
294/** 659/**
295 * Testing function 660 * Callback to be called when testing peer group is ready
661 *
662 * @param cls NULL
663 * @param emsg NULL on success
296 */ 664 */
297static void 665void
298test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 666peergroup_ready (void *cls, const char *emsg)
299{ 667{
300 test_task = GNUNET_SCHEDULER_NO_TASK; 668 if (NULL != emsg)
669 {
670 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
671 "Starting peer group failed: %s\n", emsg);
672 return;
673 }
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675 "Peer group is now ready\n");
676
677 GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
678
679 d1 = GNUNET_TESTING_daemon_get (pg, 0);
680 GNUNET_assert (NULL != d1);
681
682 d2 = GNUNET_TESTING_daemon_get (pg, 1);
683 GNUNET_assert (NULL != d2);
684
685 GNUNET_TESTING_get_peer_identity (d1->cfg,
686 &peer1.our_id);
687 GNUNET_TESTING_get_peer_identity (d2->cfg,
688 &peer2.our_id);
689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
690 "%s : %s\n",
691 GNUNET_i2s (&peer1.our_id),
692 GNUNET_i2s (&d1->id));
693 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694 "%s : %s\n",
695 GNUNET_i2s (&peer2.our_id),
696 GNUNET_i2s (&d2->id));
697
698 peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
699 10, /* App port */
700 &stream_listen_cb,
701 NULL);
702 GNUNET_assert (NULL != peer2_listen_socket);
301 703
302 /* Connect to stream library */ 704 /* Connect to stream library */
303 peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ 705 peer1.socket = GNUNET_STREAM_open (d1->cfg,
706 &d2->id, /* Null for local peer? */
304 10, /* App port */ 707 10, /* App port */
305 &stream_open_cb, 708 &stream_open_cb,
306 (void *) &peer1); 709 &peer1);
307 GNUNET_assert (NULL != peer1.socket); 710 GNUNET_assert (NULL != peer1.socket);
308 peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
309 &stream_listen_cb,
310 NULL);
311 GNUNET_assert (NULL != peer2_listen_socket);
312
313} 711}
314 712
713
315/** 714/**
316 * Initialize framework and start test 715 * Initialize framework and start test
317 */ 716 */
@@ -319,28 +718,33 @@ static void
319run (void *cls, char *const *args, const char *cfgfile, 718run (void *cls, char *const *args, const char *cfgfile,
320 const struct GNUNET_CONFIGURATION_Handle *cfg) 719 const struct GNUNET_CONFIGURATION_Handle *cfg)
321{ 720{
322 GNUNET_log_setup ("test_stream_local", 721 struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
323#if VERBOSE 722
324 "DEBUG", 723 /* GNUNET_log_setup ("test_stream_local", */
325#else 724 /* "DEBUG", */
326 "WARNING", 725 /* NULL); */
327#endif
328 NULL);
329 arm_pid =
330 GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
331 "gnunet-service-arm",
332#if VERBOSE_ARM
333 "-L", "DEBUG",
334#endif
335 "-c", "test_stream_local.conf", NULL);
336
337 abort_task =
338 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
339 (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
340 NULL);
341
342 test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
343 726
727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728 "Starting test\n");
729 /* Duplicate the configuration */
730 config = GNUNET_CONFIGURATION_dup (cfg);
731
732 hosts = GNUNET_TESTING_hosts_load (config);
733
734 pg = GNUNET_TESTING_peergroup_start (config,
735 2,
736 GNUNET_TIME_relative_multiply
737 (GNUNET_TIME_UNIT_SECONDS, 3),
738 NULL,
739 &peergroup_ready,
740 NULL,
741 hosts);
742 GNUNET_assert (NULL != pg);
743
744 abort_task =
745 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
746 (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
747 NULL);
344} 748}
345 749
346/** 750/**
@@ -350,18 +754,15 @@ int main (int argc, char **argv)
350{ 754{
351 int ret; 755 int ret;
352 756
353 char *const argv2[] = { "test-stream-local", 757 char *argv2[] = { "test-stream-local",
354 "-c", "test_stream.conf", 758 "-L", "DEBUG",
355#if VERBOSE 759 "-c", "test_stream_local.conf",
356 "-L", "DEBUG", 760 NULL};
357#endif
358 NULL
359 };
360 761
361 struct GNUNET_GETOPT_CommandLineOption options[] = { 762 struct GNUNET_GETOPT_CommandLineOption options[] = {
362 GNUNET_GETOPT_OPTION_END 763 GNUNET_GETOPT_OPTION_END
363 }; 764 };
364 765
365 ret = 766 ret =
366 GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, 767 GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
367 "test-stream-local", "nohelp", options, &run, NULL); 768 "test-stream-local", "nohelp", options, &run, NULL);
@@ -377,6 +778,6 @@ int main (int argc, char **argv)
377 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); 778 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
378 return 1; 779 return 1;
379 } 780 }
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); 781 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
381 return 0; 782 return 0;
382} 783}