aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c237
1 files changed, 223 insertions, 14 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 0a129af80..3673958ec 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -86,6 +86,19 @@
86 86
87 87
88/** 88/**
89 * What is the size we assume for a read operation in the
90 * absence of an MTU for the purpose of flow control?
91 */
92#define IN_PACKET_SIZE_WITHOUT_MTU 128
93
94/**
95 * If a queue delays the next message by more than this number
96 * of seconds we log a warning. Note: this is for testing,
97 * the value chosen here might be too aggressively low!
98 */
99#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
100
101/**
89 * How many messages can we have pending for a given client process 102 * How many messages can we have pending for a given client process
90 * before we start to drop incoming messages? We typically should 103 * before we start to drop incoming messages? We typically should
91 * have only one client and so this would be the primary buffer for 104 * have only one client and so this would be the primary buffer for
@@ -336,6 +349,12 @@ struct GNUNET_ATS_Session
336 * Handle by which we inform ATS about this queue. 349 * Handle by which we inform ATS about this queue.
337 */ 350 */
338 struct GNUNET_ATS_SessionRecord *sr; 351 struct GNUNET_ATS_SessionRecord *sr;
352
353 /**
354 * Task scheduled for the time when this queue can (likely) transmit the
355 * next message. Still needs to check with the @e tracker_out to be sure.
356 */
357 struct GNUNET_SCHEDULER_Task *transmit_task;
339 358
340 /** 359 /**
341 * Our current RTT estimate for this ATS session. 360 * Our current RTT estimate for this ATS session.
@@ -994,6 +1013,11 @@ free_queue (struct GNUNET_ATS_Session *queue)
994 .rtt = GNUNET_TIME_UNIT_FOREVER_REL 1013 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
995 }; 1014 };
996 1015
1016 if (NULL != queue->transmit_task)
1017 {
1018 GNUNET_SCHEDULER_cancel (queue->transmit_task);
1019 queue->transmit_task = NULL;
1020 }
997 GNUNET_CONTAINER_MDLL_remove (neighbour, 1021 GNUNET_CONTAINER_MDLL_remove (neighbour,
998 neighbour->session_head, 1022 neighbour->session_head,
999 neighbour->session_tail, 1023 neighbour->session_tail,
@@ -1600,30 +1624,127 @@ check_add_queue_message (void *cls,
1600 1624
1601 1625
1602/** 1626/**
1627 * Bandwidth tracker informs us that the delay until we should receive
1628 * more has changed.
1629 *
1630 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
1631 */
1632static void
1633tracker_update_in_cb (void *cls)
1634{
1635 struct GNUNET_ATS_Session *queue = cls;
1636 struct GNUNET_TIME_Relative in_delay;
1637 unsigned int rsize;
1638
1639 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
1640 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
1641 rsize);
1642 // FIXME: how exactly do we do inbound flow control?
1643}
1644
1645
1646/**
1647 * We believe we are ready to transmit a message on a queue. Double-checks
1648 * with the queue's "tracker_out" and then gives the message to the
1649 * communicator for transmission (updating the tracker, and re-scheduling
1650 * itself if applicable).
1651 *
1652 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1653 */
1654static void
1655transmit_on_queue (void *cls)
1656{
1657 struct GNUNET_ATS_Session *queue = cls;
1658
1659 queue->transmit_task = NULL;
1660 // FIXME: check if transmission is really ready
1661 // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.)
1662 // FIXME: re-schedule self
1663}
1664
1665
1666/**
1603 * Bandwidth tracker informs us that the delay until we 1667 * Bandwidth tracker informs us that the delay until we
1604 * can transmit again changed. 1668 * can transmit again changed.
1605 * 1669 *
1606 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed 1670 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
1607 */ 1671 */
1608static void 1672static void
1609tracker_update_cb (void *cls) 1673tracker_update_out_cb (void *cls)
1610{ 1674{
1611 struct GNUNET_ATS_Session *queue = cls; 1675 struct GNUNET_ATS_Session *queue = cls;
1676 struct Neighbour *n = queue->neighbour;
1677 struct PendingMessage *pm = n->pending_msg_head;
1678 struct GNUNET_TIME_Relative out_delay;
1679 unsigned int wsize;
1612 1680
1613 // FIXME: re-schedule transmission tasks if applicable! 1681 if (NULL == pm)
1682 {
1683 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1684 "Bandwidth allocation updated for empty transmission queue `%s'\n",
1685 queue->address);
1686 return; /* no message pending, nothing to do here! */
1687 }
1688 wsize = (0 == queue->mtu)
1689 ? pm->bytes_msg /* FIXME: add overheads? */
1690 : queue->mtu;
1691 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1692 wsize);
1693 GNUNET_SCHEDULER_cancel (queue->transmit_task);
1694 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1695 &transmit_on_queue,
1696 queue);
1697 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1698 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1699 "Next transmission on queue `%s' in %s (high delay)\n",
1700 queue->address,
1701 GNUNET_STRINGS_relative_time_to_string (out_delay,
1702 GNUNET_YES));
1703 else
1704 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1705 "Next transmission on queue `%s' in %s\n",
1706 queue->address,
1707 GNUNET_STRINGS_relative_time_to_string (out_delay,
1708 GNUNET_YES));
1709}
1710
1711
1712/**
1713 * Bandwidth tracker informs us that excessive outbound bandwidth was
1714 * allocated which is not being used.
1715 *
1716 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
1717 */
1718static void
1719tracker_excess_out_cb (void *cls)
1720{
1721 /* FIXME: trigger excess bandwidth report to core? Right now,
1722 this is done internally within transport_api2_core already,
1723 but we probably want to change the logic and trigger it
1724 from here via a message instead! */
1725 /* TODO: maybe inform ATS at this point? */
1726 GNUNET_STATISTICS_update (GST_stats,
1727 "# Excess outbound bandwidth reported",
1728 1,
1729 GNUNET_NO);
1614} 1730}
1615 1731
1616 1732
1733
1617/** 1734/**
1618 * Bandwidth tracker informs us that excessive bandwidth was allocated 1735 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
1619 * which is not being used. 1736 * which is not being used.
1620 * 1737 *
1621 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted 1738 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
1622 */ 1739 */
1623static void 1740static void
1624tracker_excess_cb (void *cls) 1741tracker_excess_in_cb (void *cls)
1625{ 1742{
1626 /* FIXME: what do we do? */ 1743 /* TODO: maybe inform ATS at this point? */
1744 GNUNET_STATISTICS_update (GST_stats,
1745 "# Excess inbound bandwidth reported",
1746 1,
1747 GNUNET_NO);
1627} 1748}
1628 1749
1629 1750
@@ -1669,18 +1790,18 @@ handle_add_queue_message (void *cls,
1669 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); 1790 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
1670 queue->neighbour = neighbour; 1791 queue->neighbour = neighbour;
1671 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, 1792 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
1672 &tracker_update_cb, 1793 &tracker_update_in_cb,
1673 queue, 1794 queue,
1674 GNUNET_BANDWIDTH_ZERO, 1795 GNUNET_BANDWIDTH_ZERO,
1675 0 /* FIXME: max carry in seconds! */, 1796 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
1676 &tracker_excess_cb, 1797 &tracker_excess_in_cb,
1677 queue); 1798 queue);
1678 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, 1799 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
1679 &tracker_update_cb, 1800 &tracker_update_out_cb,
1680 queue, 1801 queue,
1681 GNUNET_BANDWIDTH_ZERO, 1802 GNUNET_BANDWIDTH_ZERO,
1682 0 /* FIXME: max carry in seconds! */, 1803 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
1683 &tracker_excess_cb, 1804 &tracker_excess_out_cb,
1684 queue); 1805 queue);
1685 memcpy (&queue[1], 1806 memcpy (&queue[1],
1686 addr, 1807 addr,
@@ -1940,8 +2061,12 @@ ats_suggestion_cb (void *cls,
1940 const struct GNUNET_PeerIdentity *pid, 2061 const struct GNUNET_PeerIdentity *pid,
1941 const char *address) 2062 const char *address)
1942{ 2063{
2064 static uint32_t idgen;
1943 struct TransportClient *tc; 2065 struct TransportClient *tc;
1944 char *prefix; 2066 char *prefix;
2067 struct GNUNET_TRANSPORT_CreateQueue *cqm;
2068 struct GNUNET_MQ_Envelope *env;
2069 size_t alen;
1945 2070
1946 (void) cls; 2071 (void) cls;
1947 prefix = GNUNET_HELLO_address_to_prefix (address); 2072 prefix = GNUNET_HELLO_address_to_prefix (address);
@@ -1956,11 +2081,87 @@ ats_suggestion_cb (void *cls,
1956 GNUNET_STATISTICS_update (GST_stats, 2081 GNUNET_STATISTICS_update (GST_stats,
1957 "# ATS suggestions ignored due to missing communicator", 2082 "# ATS suggestions ignored due to missing communicator",
1958 1, 2083 1,
1959 GNUNET_NO); 2084 GNUNET_NO);
1960 2085 return;
2086 }
2087 /* forward suggestion for queue creation to communicator */
2088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089 "Request #%u for `%s' communicator to create queue to `%s'\n",
2090 (unsigned int) idgen,
2091 prefix,
2092 address);
2093 alen = strlen (address) + 1;
2094 env = GNUNET_MQ_msg_extra (cqm,
2095 alen,
2096 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
2097 cqm->request_id = htonl (idgen++);
2098 cqm->receiver = *pid;
2099 memcpy (&cqm[1],
2100 address,
2101 alen);
2102 GNUNET_MQ_send (tc->mq,
2103 env);
2104}
2105
2106
2107/**
2108 * Communicator tells us that our request to create a queue "worked", that
2109 * is setting up the queue is now in process.
2110 *
2111 * @param cls the `struct TransportClient`
2112 * @param cqr confirmation message
2113 */
2114static void
2115handle_queue_create_ok (void *cls,
2116 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
2117{
2118 struct TransportClient *tc = cls;
2119
2120 if (CT_COMMUNICATOR != tc->type)
2121 {
2122 GNUNET_break (0);
2123 GNUNET_SERVICE_client_drop (tc->client);
1961 return; 2124 return;
1962 } 2125 }
1963 // FIXME: forward suggestion to tc 2126 GNUNET_STATISTICS_update (GST_stats,
2127 "# ATS suggestions succeeded at communicator",
2128 1,
2129 GNUNET_NO);
2130 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2131 "Request #%u for communicator to create queue succeeded\n",
2132 (unsigned int) ntohs (cqr->request_id));
2133 GNUNET_SERVICE_client_continue (tc->client);
2134}
2135
2136
2137/**
2138 * Communicator tells us that our request to create a queue failed. This usually
2139 * indicates that the provided address is simply invalid or that the communicator's
2140 * resources are exhausted.
2141 *
2142 * @param cls the `struct TransportClient`
2143 * @param cqr failure message
2144 */
2145static void
2146handle_queue_create_fail (void *cls,
2147 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
2148{
2149 struct TransportClient *tc = cls;
2150
2151 if (CT_COMMUNICATOR != tc->type)
2152 {
2153 GNUNET_break (0);
2154 GNUNET_SERVICE_client_drop (tc->client);
2155 return;
2156 }
2157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2158 "Request #%u for communicator to create queue failed\n",
2159 (unsigned int) ntohs (cqr->request_id));
2160 GNUNET_STATISTICS_update (GST_stats,
2161 "# ATS suggestions failed in queue creation at communicator",
2162 1,
2163 GNUNET_NO);
2164 GNUNET_SERVICE_client_continue (tc->client);
1964} 2165}
1965 2166
1966 2167
@@ -2152,6 +2353,14 @@ GNUNET_SERVICE_MAIN
2152 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, 2353 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
2153 struct GNUNET_TRANSPORT_IncomingMessage, 2354 struct GNUNET_TRANSPORT_IncomingMessage,
2154 NULL), 2355 NULL),
2356 GNUNET_MQ_hd_fixed_size (queue_create_ok,
2357 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
2358 struct GNUNET_TRANSPORT_CreateQueueResponse,
2359 NULL),
2360 GNUNET_MQ_hd_fixed_size (queue_create_fail,
2361 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
2362 struct GNUNET_TRANSPORT_CreateQueueResponse,
2363 NULL),
2155 GNUNET_MQ_hd_var_size (add_queue_message, 2364 GNUNET_MQ_hd_var_size (add_queue_message,
2156 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, 2365 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
2157 struct GNUNET_TRANSPORT_AddQueueMessage, 2366 struct GNUNET_TRANSPORT_AddQueueMessage,