diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-01-21 15:09:16 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-01-21 15:09:16 +0100 |
commit | 5391d3d34f3bf7f40f37f9e6038466002f422bb3 (patch) | |
tree | 323db351bbb0ef83955293232c9cca39376c1e99 /src/transport | |
parent | 634aea297cc983c3d70f65a004f698f215abe590 (diff) | |
download | gnunet-5391d3d34f3bf7f40f37f9e6038466002f422bb3.tar.gz gnunet-5391d3d34f3bf7f40f37f9e6038466002f422bb3.zip |
more work on tng
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 237 | ||||
-rw-r--r-- | src/transport/transport.h | 2 |
2 files changed, 224 insertions, 15 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 0a129af80..3673958ec 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -86,6 +86,19 @@ | |||
86 | 86 | ||
87 | 87 | ||
88 | /** | 88 | /** |
89 | * What is the size we assume for a read operation in the | ||
90 | * absence of an MTU for the purpose of flow control? | ||
91 | */ | ||
92 | #define IN_PACKET_SIZE_WITHOUT_MTU 128 | ||
93 | |||
94 | /** | ||
95 | * If a queue delays the next message by more than this number | ||
96 | * of seconds we log a warning. Note: this is for testing, | ||
97 | * the value chosen here might be too aggressively low! | ||
98 | */ | ||
99 | #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | ||
100 | |||
101 | /** | ||
89 | * How many messages can we have pending for a given client process | 102 | * How many messages can we have pending for a given client process |
90 | * before we start to drop incoming messages? We typically should | 103 | * before we start to drop incoming messages? We typically should |
91 | * have only one client and so this would be the primary buffer for | 104 | * have only one client and so this would be the primary buffer for |
@@ -336,6 +349,12 @@ struct GNUNET_ATS_Session | |||
336 | * Handle by which we inform ATS about this queue. | 349 | * Handle by which we inform ATS about this queue. |
337 | */ | 350 | */ |
338 | struct GNUNET_ATS_SessionRecord *sr; | 351 | struct GNUNET_ATS_SessionRecord *sr; |
352 | |||
353 | /** | ||
354 | * Task scheduled for the time when this queue can (likely) transmit the | ||
355 | * next message. Still needs to check with the @e tracker_out to be sure. | ||
356 | */ | ||
357 | struct GNUNET_SCHEDULER_Task *transmit_task; | ||
339 | 358 | ||
340 | /** | 359 | /** |
341 | * Our current RTT estimate for this ATS session. | 360 | * Our current RTT estimate for this ATS session. |
@@ -994,6 +1013,11 @@ free_queue (struct GNUNET_ATS_Session *queue) | |||
994 | .rtt = GNUNET_TIME_UNIT_FOREVER_REL | 1013 | .rtt = GNUNET_TIME_UNIT_FOREVER_REL |
995 | }; | 1014 | }; |
996 | 1015 | ||
1016 | if (NULL != queue->transmit_task) | ||
1017 | { | ||
1018 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | ||
1019 | queue->transmit_task = NULL; | ||
1020 | } | ||
997 | GNUNET_CONTAINER_MDLL_remove (neighbour, | 1021 | GNUNET_CONTAINER_MDLL_remove (neighbour, |
998 | neighbour->session_head, | 1022 | neighbour->session_head, |
999 | neighbour->session_tail, | 1023 | neighbour->session_tail, |
@@ -1600,30 +1624,127 @@ check_add_queue_message (void *cls, | |||
1600 | 1624 | ||
1601 | 1625 | ||
1602 | /** | 1626 | /** |
1627 | * Bandwidth tracker informs us that the delay until we should receive | ||
1628 | * more has changed. | ||
1629 | * | ||
1630 | * @param cls a `struct GNUNET_ATS_Session` for which the delay changed | ||
1631 | */ | ||
1632 | static void | ||
1633 | tracker_update_in_cb (void *cls) | ||
1634 | { | ||
1635 | struct GNUNET_ATS_Session *queue = cls; | ||
1636 | struct GNUNET_TIME_Relative in_delay; | ||
1637 | unsigned int rsize; | ||
1638 | |||
1639 | rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu; | ||
1640 | in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, | ||
1641 | rsize); | ||
1642 | // FIXME: how exactly do we do inbound flow control? | ||
1643 | } | ||
1644 | |||
1645 | |||
1646 | /** | ||
1647 | * We believe we are ready to transmit a message on a queue. Double-checks | ||
1648 | * with the queue's "tracker_out" and then gives the message to the | ||
1649 | * communicator for transmission (updating the tracker, and re-scheduling | ||
1650 | * itself if applicable). | ||
1651 | * | ||
1652 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for | ||
1653 | */ | ||
1654 | static void | ||
1655 | transmit_on_queue (void *cls) | ||
1656 | { | ||
1657 | struct GNUNET_ATS_Session *queue = cls; | ||
1658 | |||
1659 | queue->transmit_task = NULL; | ||
1660 | // FIXME: check if transmission is really ready | ||
1661 | // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) | ||
1662 | // FIXME: re-schedule self | ||
1663 | } | ||
1664 | |||
1665 | |||
1666 | /** | ||
1603 | * Bandwidth tracker informs us that the delay until we | 1667 | * Bandwidth tracker informs us that the delay until we |
1604 | * can transmit again changed. | 1668 | * can transmit again changed. |
1605 | * | 1669 | * |
1606 | * @param cls a `struct GNUNET_ATS_Session` for which the delay changed | 1670 | * @param cls a `struct GNUNET_ATS_Session` for which the delay changed |
1607 | */ | 1671 | */ |
1608 | static void | 1672 | static void |
1609 | tracker_update_cb (void *cls) | 1673 | tracker_update_out_cb (void *cls) |
1610 | { | 1674 | { |
1611 | struct GNUNET_ATS_Session *queue = cls; | 1675 | struct GNUNET_ATS_Session *queue = cls; |
1676 | struct Neighbour *n = queue->neighbour; | ||
1677 | struct PendingMessage *pm = n->pending_msg_head; | ||
1678 | struct GNUNET_TIME_Relative out_delay; | ||
1679 | unsigned int wsize; | ||
1612 | 1680 | ||
1613 | // FIXME: re-schedule transmission tasks if applicable! | 1681 | if (NULL == pm) |
1682 | { | ||
1683 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1684 | "Bandwidth allocation updated for empty transmission queue `%s'\n", | ||
1685 | queue->address); | ||
1686 | return; /* no message pending, nothing to do here! */ | ||
1687 | } | ||
1688 | wsize = (0 == queue->mtu) | ||
1689 | ? pm->bytes_msg /* FIXME: add overheads? */ | ||
1690 | : queue->mtu; | ||
1691 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, | ||
1692 | wsize); | ||
1693 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | ||
1694 | queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, | ||
1695 | &transmit_on_queue, | ||
1696 | queue); | ||
1697 | if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) | ||
1698 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1699 | "Next transmission on queue `%s' in %s (high delay)\n", | ||
1700 | queue->address, | ||
1701 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
1702 | GNUNET_YES)); | ||
1703 | else | ||
1704 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1705 | "Next transmission on queue `%s' in %s\n", | ||
1706 | queue->address, | ||
1707 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
1708 | GNUNET_YES)); | ||
1709 | } | ||
1710 | |||
1711 | |||
1712 | /** | ||
1713 | * Bandwidth tracker informs us that excessive outbound bandwidth was | ||
1714 | * allocated which is not being used. | ||
1715 | * | ||
1716 | * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted | ||
1717 | */ | ||
1718 | static void | ||
1719 | tracker_excess_out_cb (void *cls) | ||
1720 | { | ||
1721 | /* FIXME: trigger excess bandwidth report to core? Right now, | ||
1722 | this is done internally within transport_api2_core already, | ||
1723 | but we probably want to change the logic and trigger it | ||
1724 | from here via a message instead! */ | ||
1725 | /* TODO: maybe inform ATS at this point? */ | ||
1726 | GNUNET_STATISTICS_update (GST_stats, | ||
1727 | "# Excess outbound bandwidth reported", | ||
1728 | 1, | ||
1729 | GNUNET_NO); | ||
1614 | } | 1730 | } |
1615 | 1731 | ||
1616 | 1732 | ||
1733 | |||
1617 | /** | 1734 | /** |
1618 | * Bandwidth tracker informs us that excessive bandwidth was allocated | 1735 | * Bandwidth tracker informs us that excessive inbound bandwidth was allocated |
1619 | * which is not being used. | 1736 | * which is not being used. |
1620 | * | 1737 | * |
1621 | * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted | 1738 | * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted |
1622 | */ | 1739 | */ |
1623 | static void | 1740 | static void |
1624 | tracker_excess_cb (void *cls) | 1741 | tracker_excess_in_cb (void *cls) |
1625 | { | 1742 | { |
1626 | /* FIXME: what do we do? */ | 1743 | /* TODO: maybe inform ATS at this point? */ |
1744 | GNUNET_STATISTICS_update (GST_stats, | ||
1745 | "# Excess inbound bandwidth reported", | ||
1746 | 1, | ||
1747 | GNUNET_NO); | ||
1627 | } | 1748 | } |
1628 | 1749 | ||
1629 | 1750 | ||
@@ -1669,18 +1790,18 @@ handle_add_queue_message (void *cls, | |||
1669 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); | 1790 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); |
1670 | queue->neighbour = neighbour; | 1791 | queue->neighbour = neighbour; |
1671 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, | 1792 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, |
1672 | &tracker_update_cb, | 1793 | &tracker_update_in_cb, |
1673 | queue, | 1794 | queue, |
1674 | GNUNET_BANDWIDTH_ZERO, | 1795 | GNUNET_BANDWIDTH_ZERO, |
1675 | 0 /* FIXME: max carry in seconds! */, | 1796 | GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, |
1676 | &tracker_excess_cb, | 1797 | &tracker_excess_in_cb, |
1677 | queue); | 1798 | queue); |
1678 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, | 1799 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, |
1679 | &tracker_update_cb, | 1800 | &tracker_update_out_cb, |
1680 | queue, | 1801 | queue, |
1681 | GNUNET_BANDWIDTH_ZERO, | 1802 | GNUNET_BANDWIDTH_ZERO, |
1682 | 0 /* FIXME: max carry in seconds! */, | 1803 | GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, |
1683 | &tracker_excess_cb, | 1804 | &tracker_excess_out_cb, |
1684 | queue); | 1805 | queue); |
1685 | memcpy (&queue[1], | 1806 | memcpy (&queue[1], |
1686 | addr, | 1807 | addr, |
@@ -1940,8 +2061,12 @@ ats_suggestion_cb (void *cls, | |||
1940 | const struct GNUNET_PeerIdentity *pid, | 2061 | const struct GNUNET_PeerIdentity *pid, |
1941 | const char *address) | 2062 | const char *address) |
1942 | { | 2063 | { |
2064 | static uint32_t idgen; | ||
1943 | struct TransportClient *tc; | 2065 | struct TransportClient *tc; |
1944 | char *prefix; | 2066 | char *prefix; |
2067 | struct GNUNET_TRANSPORT_CreateQueue *cqm; | ||
2068 | struct GNUNET_MQ_Envelope *env; | ||
2069 | size_t alen; | ||
1945 | 2070 | ||
1946 | (void) cls; | 2071 | (void) cls; |
1947 | prefix = GNUNET_HELLO_address_to_prefix (address); | 2072 | prefix = GNUNET_HELLO_address_to_prefix (address); |
@@ -1956,11 +2081,87 @@ ats_suggestion_cb (void *cls, | |||
1956 | GNUNET_STATISTICS_update (GST_stats, | 2081 | GNUNET_STATISTICS_update (GST_stats, |
1957 | "# ATS suggestions ignored due to missing communicator", | 2082 | "# ATS suggestions ignored due to missing communicator", |
1958 | 1, | 2083 | 1, |
1959 | GNUNET_NO); | 2084 | GNUNET_NO); |
1960 | 2085 | return; | |
2086 | } | ||
2087 | /* forward suggestion for queue creation to communicator */ | ||
2088 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2089 | "Request #%u for `%s' communicator to create queue to `%s'\n", | ||
2090 | (unsigned int) idgen, | ||
2091 | prefix, | ||
2092 | address); | ||
2093 | alen = strlen (address) + 1; | ||
2094 | env = GNUNET_MQ_msg_extra (cqm, | ||
2095 | alen, | ||
2096 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); | ||
2097 | cqm->request_id = htonl (idgen++); | ||
2098 | cqm->receiver = *pid; | ||
2099 | memcpy (&cqm[1], | ||
2100 | address, | ||
2101 | alen); | ||
2102 | GNUNET_MQ_send (tc->mq, | ||
2103 | env); | ||
2104 | } | ||
2105 | |||
2106 | |||
2107 | /** | ||
2108 | * Communicator tells us that our request to create a queue "worked", that | ||
2109 | * is setting up the queue is now in process. | ||
2110 | * | ||
2111 | * @param cls the `struct TransportClient` | ||
2112 | * @param cqr confirmation message | ||
2113 | */ | ||
2114 | static void | ||
2115 | handle_queue_create_ok (void *cls, | ||
2116 | const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) | ||
2117 | { | ||
2118 | struct TransportClient *tc = cls; | ||
2119 | |||
2120 | if (CT_COMMUNICATOR != tc->type) | ||
2121 | { | ||
2122 | GNUNET_break (0); | ||
2123 | GNUNET_SERVICE_client_drop (tc->client); | ||
1961 | return; | 2124 | return; |
1962 | } | 2125 | } |
1963 | // FIXME: forward suggestion to tc | 2126 | GNUNET_STATISTICS_update (GST_stats, |
2127 | "# ATS suggestions succeeded at communicator", | ||
2128 | 1, | ||
2129 | GNUNET_NO); | ||
2130 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2131 | "Request #%u for communicator to create queue succeeded\n", | ||
2132 | (unsigned int) ntohs (cqr->request_id)); | ||
2133 | GNUNET_SERVICE_client_continue (tc->client); | ||
2134 | } | ||
2135 | |||
2136 | |||
2137 | /** | ||
2138 | * Communicator tells us that our request to create a queue failed. This usually | ||
2139 | * indicates that the provided address is simply invalid or that the communicator's | ||
2140 | * resources are exhausted. | ||
2141 | * | ||
2142 | * @param cls the `struct TransportClient` | ||
2143 | * @param cqr failure message | ||
2144 | */ | ||
2145 | static void | ||
2146 | handle_queue_create_fail (void *cls, | ||
2147 | const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) | ||
2148 | { | ||
2149 | struct TransportClient *tc = cls; | ||
2150 | |||
2151 | if (CT_COMMUNICATOR != tc->type) | ||
2152 | { | ||
2153 | GNUNET_break (0); | ||
2154 | GNUNET_SERVICE_client_drop (tc->client); | ||
2155 | return; | ||
2156 | } | ||
2157 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2158 | "Request #%u for communicator to create queue failed\n", | ||
2159 | (unsigned int) ntohs (cqr->request_id)); | ||
2160 | GNUNET_STATISTICS_update (GST_stats, | ||
2161 | "# ATS suggestions failed in queue creation at communicator", | ||
2162 | 1, | ||
2163 | GNUNET_NO); | ||
2164 | GNUNET_SERVICE_client_continue (tc->client); | ||
1964 | } | 2165 | } |
1965 | 2166 | ||
1966 | 2167 | ||
@@ -2152,6 +2353,14 @@ GNUNET_SERVICE_MAIN | |||
2152 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, | 2353 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, |
2153 | struct GNUNET_TRANSPORT_IncomingMessage, | 2354 | struct GNUNET_TRANSPORT_IncomingMessage, |
2154 | NULL), | 2355 | NULL), |
2356 | GNUNET_MQ_hd_fixed_size (queue_create_ok, | ||
2357 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, | ||
2358 | struct GNUNET_TRANSPORT_CreateQueueResponse, | ||
2359 | NULL), | ||
2360 | GNUNET_MQ_hd_fixed_size (queue_create_fail, | ||
2361 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, | ||
2362 | struct GNUNET_TRANSPORT_CreateQueueResponse, | ||
2363 | NULL), | ||
2155 | GNUNET_MQ_hd_var_size (add_queue_message, | 2364 | GNUNET_MQ_hd_var_size (add_queue_message, |
2156 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, | 2365 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, |
2157 | struct GNUNET_TRANSPORT_AddQueueMessage, | 2366 | struct GNUNET_TRANSPORT_AddQueueMessage, |
diff --git a/src/transport/transport.h b/src/transport/transport.h index 88656a012..00d475e2b 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h | |||
@@ -871,7 +871,7 @@ struct GNUNET_TRANSPORT_CreateQueue | |||
871 | 871 | ||
872 | 872 | ||
873 | /** | 873 | /** |
874 | * Transport tells communicator that it wants a new queue. | 874 | * Communicator tells transport how queue creation went down. |
875 | */ | 875 | */ |
876 | struct GNUNET_TRANSPORT_CreateQueueResponse | 876 | struct GNUNET_TRANSPORT_CreateQueueResponse |
877 | { | 877 | { |