aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c537
1 files changed, 276 insertions, 261 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 7d7d04375..b64bfb182 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -42,10 +42,10 @@
42 * effective flow control (for uni-directional transports!) 42 * effective flow control (for uni-directional transports!)
43 * #4 UDP broadcasting logic must be extended to use the new API 43 * #4 UDP broadcasting logic must be extended to use the new API
44 * #5 only validated addresses go to ATS for scheduling; that 44 * #5 only validated addresses go to ATS for scheduling; that
45 * also ensures we know the RTT 45 * also ensures we know the RTT
46 * #6 to ensure flow control and RTT are OK, we always do the 46 * #6 to ensure flow control and RTT are OK, we always do the
47 * 'validation', even if address comes from PEERSTORE 47 * 'validation', even if address comes from PEERSTORE
48 * #7 48 * #7
49 * - ACK handling / retransmission 49 * - ACK handling / retransmission
50 * - address verification 50 * - address verification
51 * - track RTT, distance, loss, etc. 51 * - track RTT, distance, loss, etc.
@@ -1497,7 +1497,7 @@ static struct Neighbour *
1497lookup_neighbour (const struct GNUNET_PeerIdentity *pid) 1497lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1498{ 1498{
1499 return GNUNET_CONTAINER_multipeermap_get (neighbours, 1499 return GNUNET_CONTAINER_multipeermap_get (neighbours,
1500 pid); 1500 pid);
1501} 1501}
1502 1502
1503 1503
@@ -1561,9 +1561,9 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
1561 if (NULL == dv->dv_head) 1561 if (NULL == dv->dv_head)
1562 { 1562 {
1563 GNUNET_assert (GNUNET_YES == 1563 GNUNET_assert (GNUNET_YES ==
1564 GNUNET_CONTAINER_multipeermap_remove (dv_routes, 1564 GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1565 &dv->target, 1565 &dv->target,
1566 dv)); 1566 dv));
1567 if (NULL != dv->timeout_task) 1567 if (NULL != dv->timeout_task)
1568 GNUNET_SCHEDULER_cancel (dv->timeout_task); 1568 GNUNET_SCHEDULER_cancel (dv->timeout_task);
1569 GNUNET_free (dv); 1569 GNUNET_free (dv);
@@ -1602,18 +1602,18 @@ free_dv_route (struct DistanceVector *dv)
1602 */ 1602 */
1603static void 1603static void
1604notify_monitor (struct TransportClient *tc, 1604notify_monitor (struct TransportClient *tc,
1605 const struct GNUNET_PeerIdentity *peer, 1605 const struct GNUNET_PeerIdentity *peer,
1606 const char *address, 1606 const char *address,
1607 enum GNUNET_NetworkType nt, 1607 enum GNUNET_NetworkType nt,
1608 const struct MonitorEvent *me) 1608 const struct MonitorEvent *me)
1609{ 1609{
1610 struct GNUNET_MQ_Envelope *env; 1610 struct GNUNET_MQ_Envelope *env;
1611 struct GNUNET_TRANSPORT_MonitorData *md; 1611 struct GNUNET_TRANSPORT_MonitorData *md;
1612 size_t addr_len = strlen (address) + 1; 1612 size_t addr_len = strlen (address) + 1;
1613 1613
1614 env = GNUNET_MQ_msg_extra (md, 1614 env = GNUNET_MQ_msg_extra (md,
1615 addr_len, 1615 addr_len,
1616 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); 1616 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1617 md->nt = htonl ((uint32_t) nt); 1617 md->nt = htonl ((uint32_t) nt);
1618 md->peer = *peer; 1618 md->peer = *peer;
1619 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation); 1619 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
@@ -1624,10 +1624,10 @@ notify_monitor (struct TransportClient *tc,
1624 md->num_msg_pending = htonl (me->num_msg_pending); 1624 md->num_msg_pending = htonl (me->num_msg_pending);
1625 md->num_bytes_pending = htonl (me->num_bytes_pending); 1625 md->num_bytes_pending = htonl (me->num_bytes_pending);
1626 memcpy (&md[1], 1626 memcpy (&md[1],
1627 address, 1627 address,
1628 addr_len); 1628 addr_len);
1629 GNUNET_MQ_send (tc->mq, 1629 GNUNET_MQ_send (tc->mq,
1630 env); 1630 env);
1631} 1631}
1632 1632
1633 1633
@@ -1642,9 +1642,9 @@ notify_monitor (struct TransportClient *tc,
1642 */ 1642 */
1643static void 1643static void
1644notify_monitors (const struct GNUNET_PeerIdentity *peer, 1644notify_monitors (const struct GNUNET_PeerIdentity *peer,
1645 const char *address, 1645 const char *address,
1646 enum GNUNET_NetworkType nt, 1646 enum GNUNET_NetworkType nt,
1647 const struct MonitorEvent *me) 1647 const struct MonitorEvent *me)
1648{ 1648{
1649 static struct GNUNET_PeerIdentity zero; 1649 static struct GNUNET_PeerIdentity zero;
1650 1650
@@ -1657,17 +1657,17 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
1657 if (tc->details.monitor.one_shot) 1657 if (tc->details.monitor.one_shot)
1658 continue; 1658 continue;
1659 if ( (0 != memcmp (&tc->details.monitor.peer, 1659 if ( (0 != memcmp (&tc->details.monitor.peer,
1660 &zero, 1660 &zero,
1661 sizeof (zero))) && 1661 sizeof (zero))) &&
1662 (0 != memcmp (&tc->details.monitor.peer, 1662 (0 != memcmp (&tc->details.monitor.peer,
1663 peer, 1663 peer,
1664 sizeof (*peer))) ) 1664 sizeof (*peer))) )
1665 continue; 1665 continue;
1666 notify_monitor (tc, 1666 notify_monitor (tc,
1667 peer, 1667 peer,
1668 address, 1668 address,
1669 nt, 1669 nt,
1670 me); 1670 me);
1671 } 1671 }
1672} 1672}
1673 1673
@@ -1683,8 +1683,8 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
1683 */ 1683 */
1684static void * 1684static void *
1685client_connect_cb (void *cls, 1685client_connect_cb (void *cls,
1686 struct GNUNET_SERVICE_Client *client, 1686 struct GNUNET_SERVICE_Client *client,
1687 struct GNUNET_MQ_Handle *mq) 1687 struct GNUNET_MQ_Handle *mq)
1688{ 1688{
1689 struct TransportClient *tc; 1689 struct TransportClient *tc;
1690 1690
@@ -1712,11 +1712,11 @@ free_reassembly_context (struct ReassemblyContext *rc)
1712 struct Neighbour *n = rc->neighbour; 1712 struct Neighbour *n = rc->neighbour;
1713 1713
1714 GNUNET_assert (rc == 1714 GNUNET_assert (rc ==
1715 GNUNET_CONTAINER_heap_remove_node (rc->hn)); 1715 GNUNET_CONTAINER_heap_remove_node (rc->hn));
1716 GNUNET_assert (GNUNET_OK == 1716 GNUNET_assert (GNUNET_OK ==
1717 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map, 1717 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1718 &rc->msg_uuid, 1718 &rc->msg_uuid,
1719 rc)); 1719 rc));
1720 GNUNET_free (rc); 1720 GNUNET_free (rc);
1721} 1721}
1722 1722
@@ -1742,8 +1742,8 @@ reassembly_cleanup_task (void *cls)
1742 } 1742 }
1743 GNUNET_assert (NULL == n->reassembly_timeout_task); 1743 GNUNET_assert (NULL == n->reassembly_timeout_task);
1744 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout, 1744 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1745 &reassembly_cleanup_task, 1745 &reassembly_cleanup_task,
1746 n); 1746 n);
1747 return; 1747 return;
1748 } 1748 }
1749} 1749}
@@ -1783,16 +1783,16 @@ free_neighbour (struct Neighbour *neighbour)
1783 1783
1784 GNUNET_assert (NULL == neighbour->session_head); 1784 GNUNET_assert (NULL == neighbour->session_head);
1785 GNUNET_assert (GNUNET_YES == 1785 GNUNET_assert (GNUNET_YES ==
1786 GNUNET_CONTAINER_multipeermap_remove (neighbours, 1786 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1787 &neighbour->pid, 1787 &neighbour->pid,
1788 neighbour)); 1788 neighbour));
1789 if (NULL != neighbour->timeout_task) 1789 if (NULL != neighbour->timeout_task)
1790 GNUNET_SCHEDULER_cancel (neighbour->timeout_task); 1790 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1791 if (NULL != neighbour->reassembly_map) 1791 if (NULL != neighbour->reassembly_map)
1792 { 1792 {
1793 GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map, 1793 GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1794 &free_reassembly_cb, 1794 &free_reassembly_cb,
1795 NULL); 1795 NULL);
1796 GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map); 1796 GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1797 neighbour->reassembly_map = NULL; 1797 neighbour->reassembly_map = NULL;
1798 GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap); 1798 GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
@@ -1815,15 +1815,15 @@ free_neighbour (struct Neighbour *neighbour)
1815 */ 1815 */
1816static void 1816static void
1817core_send_connect_info (struct TransportClient *tc, 1817core_send_connect_info (struct TransportClient *tc,
1818 const struct GNUNET_PeerIdentity *pid, 1818 const struct GNUNET_PeerIdentity *pid,
1819 struct GNUNET_BANDWIDTH_Value32NBO quota_out) 1819 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1820{ 1820{
1821 struct GNUNET_MQ_Envelope *env; 1821 struct GNUNET_MQ_Envelope *env;
1822 struct ConnectInfoMessage *cim; 1822 struct ConnectInfoMessage *cim;
1823 1823
1824 GNUNET_assert (CT_CORE == tc->type); 1824 GNUNET_assert (CT_CORE == tc->type);
1825 env = GNUNET_MQ_msg (cim, 1825 env = GNUNET_MQ_msg (cim,
1826 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 1826 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1827 cim->quota_out = quota_out; 1827 cim->quota_out = quota_out;
1828 cim->id = *pid; 1828 cim->id = *pid;
1829 GNUNET_MQ_send (tc->mq, 1829 GNUNET_MQ_send (tc->mq,
@@ -1839,7 +1839,7 @@ core_send_connect_info (struct TransportClient *tc,
1839 */ 1839 */
1840static void 1840static void
1841cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, 1841cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1842 struct GNUNET_BANDWIDTH_Value32NBO quota_out) 1842 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1843{ 1843{
1844 for (struct TransportClient *tc = clients_head; 1844 for (struct TransportClient *tc = clients_head;
1845 NULL != tc; 1845 NULL != tc;
@@ -1848,8 +1848,8 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1848 if (CT_CORE != tc->type) 1848 if (CT_CORE != tc->type)
1849 continue; 1849 continue;
1850 core_send_connect_info (tc, 1850 core_send_connect_info (tc,
1851 pid, 1851 pid,
1852 quota_out); 1852 quota_out);
1853 } 1853 }
1854} 1854}
1855 1855
@@ -1872,10 +1872,10 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1872 if (CT_CORE != tc->type) 1872 if (CT_CORE != tc->type)
1873 continue; 1873 continue;
1874 env = GNUNET_MQ_msg (dim, 1874 env = GNUNET_MQ_msg (dim,
1875 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); 1875 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1876 dim->peer = *pid; 1876 dim->peer = *pid;
1877 GNUNET_MQ_send (tc->mq, 1877 GNUNET_MQ_send (tc->mq,
1878 env); 1878 env);
1879 } 1879 }
1880} 1880}
1881 1881
@@ -1910,20 +1910,21 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1910 unsigned int wsize; 1910 unsigned int wsize;
1911 1911
1912 GNUNET_assert (NULL != pm); 1912 GNUNET_assert (NULL != pm);
1913 if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) 1913 if (queue->tc->details.communicator.total_queue_length >=
1914 COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1914 { 1915 {
1915 GNUNET_STATISTICS_update (GST_stats, 1916 GNUNET_STATISTICS_update (GST_stats,
1916 "# Transmission throttled due to communicator queue limit", 1917 "# Transmission throttled due to communicator queue limit",
1917 1, 1918 1,
1918 GNUNET_NO); 1919 GNUNET_NO);
1919 return; 1920 return;
1920 } 1921 }
1921 if (queue->queue_length >= SESSION_QUEUE_LIMIT) 1922 if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1922 { 1923 {
1923 GNUNET_STATISTICS_update (GST_stats, 1924 GNUNET_STATISTICS_update (GST_stats,
1924 "# Transmission throttled due to session queue limit", 1925 "# Transmission throttled due to session queue limit",
1925 1, 1926 1,
1926 GNUNET_NO); 1927 GNUNET_NO);
1927 return; 1928 return;
1928 } 1929 }
1929 1930
@@ -1931,27 +1932,28 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1931 ? pm->bytes_msg /* FIXME: add overheads? */ 1932 ? pm->bytes_msg /* FIXME: add overheads? */
1932 : queue->mtu; 1933 : queue->mtu;
1933 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, 1934 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1934 wsize); 1935 wsize);
1935 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), 1936 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1936 out_delay); 1937 out_delay);
1937 if (0 == out_delay.rel_value_us) 1938 if (0 == out_delay.rel_value_us)
1938 return; /* we should run immediately! */ 1939 return; /* we should run immediately! */
1939 /* queue has changed since we were scheduled, reschedule again */ 1940 /* queue has changed since we were scheduled, reschedule again */
1940 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, 1941 queue->transmit_task
1941 &transmit_on_queue, 1942 = GNUNET_SCHEDULER_add_delayed (out_delay,
1942 queue); 1943 &transmit_on_queue,
1944 queue);
1943 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) 1945 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1944 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1946 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1945 "Next transmission on queue `%s' in %s (high delay)\n", 1947 "Next transmission on queue `%s' in %s (high delay)\n",
1946 queue->address, 1948 queue->address,
1947 GNUNET_STRINGS_relative_time_to_string (out_delay, 1949 GNUNET_STRINGS_relative_time_to_string (out_delay,
1948 GNUNET_YES)); 1950 GNUNET_YES));
1949 else 1951 else
1950 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1951 "Next transmission on queue `%s' in %s\n", 1953 "Next transmission on queue `%s' in %s\n",
1952 queue->address, 1954 queue->address,
1953 GNUNET_STRINGS_relative_time_to_string (out_delay, 1955 GNUNET_STRINGS_relative_time_to_string (out_delay,
1954 GNUNET_YES)); 1956 GNUNET_YES));
1955} 1957}
1956 1958
1957 1959
@@ -1978,19 +1980,19 @@ free_session (struct GNUNET_ATS_Session *session)
1978 session->transmit_task = NULL; 1980 session->transmit_task = NULL;
1979 } 1981 }
1980 GNUNET_CONTAINER_MDLL_remove (neighbour, 1982 GNUNET_CONTAINER_MDLL_remove (neighbour,
1981 neighbour->session_head, 1983 neighbour->session_head,
1982 neighbour->session_tail, 1984 neighbour->session_tail,
1983 session); 1985 session);
1984 GNUNET_CONTAINER_MDLL_remove (client, 1986 GNUNET_CONTAINER_MDLL_remove (client,
1985 tc->details.communicator.session_head, 1987 tc->details.communicator.session_head,
1986 tc->details.communicator.session_tail, 1988 tc->details.communicator.session_tail,
1987 session); 1989 session);
1988 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); 1990 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1989 while (NULL != (qe = session->queue_head)) 1991 while (NULL != (qe = session->queue_head))
1990 { 1992 {
1991 GNUNET_CONTAINER_DLL_remove (session->queue_head, 1993 GNUNET_CONTAINER_DLL_remove (session->queue_head,
1992 session->queue_tail, 1994 session->queue_tail,
1993 qe); 1995 qe);
1994 session->queue_length--; 1996 session->queue_length--;
1995 tc->details.communicator.total_queue_length--; 1997 tc->details.communicator.total_queue_length--;
1996 GNUNET_free (qe); 1998 GNUNET_free (qe);
@@ -2001,18 +2003,18 @@ free_session (struct GNUNET_ATS_Session *session)
2001 { 2003 {
2002 /* Communicator dropped below threshold, resume all queues */ 2004 /* Communicator dropped below threshold, resume all queues */
2003 GNUNET_STATISTICS_update (GST_stats, 2005 GNUNET_STATISTICS_update (GST_stats,
2004 "# Transmission throttled due to communicator queue limit", 2006 "# Transmission throttled due to communicator queue limit",
2005 -1, 2007 -1,
2006 GNUNET_NO); 2008 GNUNET_NO);
2007 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; 2009 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
2008 NULL != s; 2010 NULL != s;
2009 s = s->next_client) 2011 s = s->next_client)
2010 schedule_transmit_on_queue (s); 2012 schedule_transmit_on_queue (s);
2011 } 2013 }
2012 notify_monitors (&neighbour->pid, 2014 notify_monitors (&neighbour->pid,
2013 session->address, 2015 session->address,
2014 session->nt, 2016 session->nt,
2015 &me); 2017 &me);
2016 GNUNET_ATS_session_del (session->sr); 2018 GNUNET_ATS_session_del (session->sr);
2017 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); 2019 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
2018 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); 2020 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
@@ -2036,8 +2038,8 @@ free_address_list_entry (struct AddressListEntry *ale)
2036 struct TransportClient *tc = ale->tc; 2038 struct TransportClient *tc = ale->tc;
2037 2039
2038 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head, 2040 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2039 tc->details.communicator.addr_tail, 2041 tc->details.communicator.addr_tail,
2040 ale); 2042 ale);
2041 if (NULL != ale->sc) 2043 if (NULL != ale->sc)
2042 { 2044 {
2043 GNUNET_PEERSTORE_store_cancel (ale->sc); 2045 GNUNET_PEERSTORE_store_cancel (ale->sc);
@@ -2062,8 +2064,8 @@ free_address_list_entry (struct AddressListEntry *ale)
2062 */ 2064 */
2063static void 2065static void
2064client_disconnect_cb (void *cls, 2066client_disconnect_cb (void *cls,
2065 struct GNUNET_SERVICE_Client *client, 2067 struct GNUNET_SERVICE_Client *client,
2066 void *app_ctx) 2068 void *app_ctx)
2067{ 2069{
2068 struct TransportClient *tc = app_ctx; 2070 struct TransportClient *tc = app_ctx;
2069 2071
@@ -2083,11 +2085,11 @@ client_disconnect_cb (void *cls,
2083 2085
2084 while (NULL != (pm = tc->details.core.pending_msg_head)) 2086 while (NULL != (pm = tc->details.core.pending_msg_head))
2085 { 2087 {
2086 GNUNET_CONTAINER_MDLL_remove (client, 2088 GNUNET_CONTAINER_MDLL_remove (client,
2087 tc->details.core.pending_msg_head, 2089 tc->details.core.pending_msg_head,
2088 tc->details.core.pending_msg_tail, 2090 tc->details.core.pending_msg_tail,
2089 pm); 2091 pm);
2090 pm->client = NULL; 2092 pm->client = NULL;
2091 } 2093 }
2092 } 2094 }
2093 break; 2095 break;
@@ -2121,15 +2123,15 @@ client_disconnect_cb (void *cls,
2121 */ 2123 */
2122static int 2124static int
2123notify_client_connect_info (void *cls, 2125notify_client_connect_info (void *cls,
2124 const struct GNUNET_PeerIdentity *pid, 2126 const struct GNUNET_PeerIdentity *pid,
2125 void *value) 2127 void *value)
2126{ 2128{
2127 struct TransportClient *tc = cls; 2129 struct TransportClient *tc = cls;
2128 struct Neighbour *neighbour = value; 2130 struct Neighbour *neighbour = value;
2129 2131
2130 core_send_connect_info (tc, 2132 core_send_connect_info (tc,
2131 pid, 2133 pid,
2132 neighbour->quota_out); 2134 neighbour->quota_out);
2133 return GNUNET_OK; 2135 return GNUNET_OK;
2134} 2136}
2135 2137
@@ -2144,7 +2146,7 @@ notify_client_connect_info (void *cls,
2144 */ 2146 */
2145static void 2147static void
2146handle_client_start (void *cls, 2148handle_client_start (void *cls,
2147 const struct StartMessage *start) 2149 const struct StartMessage *start)
2148{ 2150{
2149 struct TransportClient *tc = cls; 2151 struct TransportClient *tc = cls;
2150 uint32_t options; 2152 uint32_t options;
@@ -2169,8 +2171,8 @@ handle_client_start (void *cls,
2169 } 2171 }
2170 tc->type = CT_CORE; 2172 tc->type = CT_CORE;
2171 GNUNET_CONTAINER_multipeermap_iterate (neighbours, 2173 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2172 &notify_client_connect_info, 2174 &notify_client_connect_info,
2173 tc); 2175 tc);
2174 GNUNET_SERVICE_client_continue (tc->client); 2176 GNUNET_SERVICE_client_continue (tc->client);
2175} 2177}
2176 2178
@@ -2183,7 +2185,7 @@ handle_client_start (void *cls,
2183 */ 2185 */
2184static int 2186static int
2185check_client_send (void *cls, 2187check_client_send (void *cls,
2186 const struct OutboundMessage *obm) 2188 const struct OutboundMessage *obm)
2187{ 2189{
2188 struct TransportClient *tc = cls; 2190 struct TransportClient *tc = cls;
2189 uint16_t size; 2191 uint16_t size;
@@ -2248,14 +2250,14 @@ free_pending_message (struct PendingMessage *pm)
2248 if (NULL != tc) 2250 if (NULL != tc)
2249 { 2251 {
2250 GNUNET_CONTAINER_MDLL_remove (client, 2252 GNUNET_CONTAINER_MDLL_remove (client,
2251 tc->details.core.pending_msg_head, 2253 tc->details.core.pending_msg_head,
2252 tc->details.core.pending_msg_tail, 2254 tc->details.core.pending_msg_tail,
2253 pm); 2255 pm);
2254 } 2256 }
2255 GNUNET_CONTAINER_MDLL_remove (neighbour, 2257 GNUNET_CONTAINER_MDLL_remove (neighbour,
2256 target->pending_msg_head, 2258 target->pending_msg_head,
2257 target->pending_msg_tail, 2259 target->pending_msg_tail,
2258 pm); 2260 pm);
2259 free_fragment_tree (pm); 2261 free_fragment_tree (pm);
2260 GNUNET_free_non_null (pm->bpm); 2262 GNUNET_free_non_null (pm->bpm);
2261 GNUNET_free (pm); 2263 GNUNET_free (pm);
@@ -2276,8 +2278,8 @@ free_pending_message (struct PendingMessage *pm)
2276 */ 2278 */
2277static void 2279static void
2278client_send_response (struct PendingMessage *pm, 2280client_send_response (struct PendingMessage *pm,
2279 int success, 2281 int success,
2280 uint32_t bytes_physical) 2282 uint32_t bytes_physical)
2281{ 2283{
2282 struct TransportClient *tc = pm->client; 2284 struct TransportClient *tc = pm->client;
2283 struct Neighbour *target = pm->target; 2285 struct Neighbour *target = pm->target;
@@ -2287,7 +2289,7 @@ client_send_response (struct PendingMessage *pm,
2287 if (NULL != tc) 2289 if (NULL != tc)
2288 { 2290 {
2289 env = GNUNET_MQ_msg (som, 2291 env = GNUNET_MQ_msg (som,
2290 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); 2292 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2291 som->success = htonl ((uint32_t) success); 2293 som->success = htonl ((uint32_t) success);
2292 som->bytes_msg = htons (pm->bytes_msg); 2294 som->bytes_msg = htons (pm->bytes_msg);
2293 som->bytes_physical = htonl (bytes_physical); 2295 som->bytes_physical = htonl (bytes_physical);
@@ -2324,22 +2326,22 @@ check_queue_timeouts (void *cls)
2324 if (pos->timeout.abs_value_us <= now.abs_value_us) 2326 if (pos->timeout.abs_value_us <= now.abs_value_us)
2325 { 2327 {
2326 GNUNET_STATISTICS_update (GST_stats, 2328 GNUNET_STATISTICS_update (GST_stats,
2327 "# messages dropped (timeout before confirmation)", 2329 "# messages dropped (timeout before confirmation)",
2328 1, 2330 1,
2329 GNUNET_NO); 2331 GNUNET_NO);
2330 client_send_response (pm, 2332 client_send_response (pm,
2331 GNUNET_NO, 2333 GNUNET_NO,
2332 0); 2334 0);
2333 continue; 2335 continue;
2334 } 2336 }
2335 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout, 2337 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2336 pos->timeout); 2338 pos->timeout);
2337 } 2339 }
2338 n->earliest_timeout = earliest_timeout; 2340 n->earliest_timeout = earliest_timeout;
2339 if (NULL != n->pending_msg_head) 2341 if (NULL != n->pending_msg_head)
2340 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout, 2342 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2341 &check_queue_timeouts, 2343 &check_queue_timeouts,
2342 n); 2344 n);
2343} 2345}
2344 2346
2345 2347
@@ -2351,13 +2353,14 @@ check_queue_timeouts (void *cls)
2351 */ 2353 */
2352static void 2354static void
2353handle_client_send (void *cls, 2355handle_client_send (void *cls,
2354 const struct OutboundMessage *obm) 2356 const struct OutboundMessage *obm)
2355{ 2357{
2356 struct TransportClient *tc = cls; 2358 struct TransportClient *tc = cls;
2357 struct PendingMessage *pm; 2359 struct PendingMessage *pm;
2358 const struct GNUNET_MessageHeader *obmm; 2360 const struct GNUNET_MessageHeader *obmm;
2359 struct Neighbour *target; 2361 struct Neighbour *target;
2360 uint32_t bytes_msg; 2362 uint32_t bytes_msg;
2363 int was_empty;
2361 2364
2362 GNUNET_assert (CT_CORE == tc->type); 2365 GNUNET_assert (CT_CORE == tc->type);
2363 obmm = (const struct GNUNET_MessageHeader *) &obm[1]; 2366 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
@@ -2373,36 +2376,37 @@ handle_client_send (void *cls,
2373 struct SendOkMessage *som; 2376 struct SendOkMessage *som;
2374 2377
2375 env = GNUNET_MQ_msg (som, 2378 env = GNUNET_MQ_msg (som,
2376 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); 2379 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2377 som->success = htonl (GNUNET_SYSERR); 2380 som->success = htonl (GNUNET_SYSERR);
2378 som->bytes_msg = htonl (bytes_msg); 2381 som->bytes_msg = htonl (bytes_msg);
2379 som->bytes_physical = htonl (0); 2382 som->bytes_physical = htonl (0);
2380 som->peer = obm->peer; 2383 som->peer = obm->peer;
2381 GNUNET_MQ_send (tc->mq, 2384 GNUNET_MQ_send (tc->mq,
2382 env); 2385 env);
2383 GNUNET_SERVICE_client_continue (tc->client); 2386 GNUNET_SERVICE_client_continue (tc->client);
2384 GNUNET_STATISTICS_update (GST_stats, 2387 GNUNET_STATISTICS_update (GST_stats,
2385 "# messages dropped (neighbour unknown)", 2388 "# messages dropped (neighbour unknown)",
2386 1, 2389 1,
2387 GNUNET_NO); 2390 GNUNET_NO);
2388 return; 2391 return;
2389 } 2392 }
2393 was_empty = (NULL == target->pending_msg_head);
2390 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); 2394 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2391 pm->client = tc; 2395 pm->client = tc;
2392 pm->target = target; 2396 pm->target = target;
2393 pm->bytes_msg = bytes_msg; 2397 pm->bytes_msg = bytes_msg;
2394 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); 2398 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2395 memcpy (&pm[1], 2399 memcpy (&pm[1],
2396 &obm[1], 2400 &obm[1],
2397 bytes_msg); 2401 bytes_msg);
2398 GNUNET_CONTAINER_MDLL_insert (neighbour, 2402 GNUNET_CONTAINER_MDLL_insert (neighbour,
2399 target->pending_msg_head, 2403 target->pending_msg_head,
2400 target->pending_msg_tail, 2404 target->pending_msg_tail,
2401 pm); 2405 pm);
2402 GNUNET_CONTAINER_MDLL_insert (client, 2406 GNUNET_CONTAINER_MDLL_insert (client,
2403 tc->details.core.pending_msg_head, 2407 tc->details.core.pending_msg_head,
2404 tc->details.core.pending_msg_tail, 2408 tc->details.core.pending_msg_tail,
2405 pm); 2409 pm);
2406 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) 2410 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2407 { 2411 {
2408 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; 2412 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
@@ -2410,8 +2414,19 @@ handle_client_send (void *cls,
2410 GNUNET_SCHEDULER_cancel (target->timeout_task); 2414 GNUNET_SCHEDULER_cancel (target->timeout_task);
2411 target->timeout_task 2415 target->timeout_task
2412 = GNUNET_SCHEDULER_add_at (target->earliest_timeout, 2416 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2413 &check_queue_timeouts, 2417 &check_queue_timeouts,
2414 target); 2418 target);
2419 }
2420 if (! was_empty)
2421 return; /* all queues must already be busy */
2422 for (struct GNUNET_ATS_Session *queue = target->session_head;
2423 NULL != queue;
2424 queue = queue->next_neighbour)
2425 {
2426 /* try transmission on any queue that is idle */
2427 if (NULL == queue->transmit_task)
2428 queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2429 queue);
2415 } 2430 }
2416} 2431}
2417 2432
@@ -3835,9 +3850,9 @@ transmit_on_queue (void *cls)
3835 respect that even if MTU is 0 for 3850 respect that even if MTU is 0 for
3836 this queue */) ) 3851 this queue */) )
3837 s = fragment_message (s, 3852 s = fragment_message (s,
3838 (0 == queue->mtu) 3853 (0 == queue->mtu)
3839 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) 3854 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3840 : queue->mtu); 3855 : queue->mtu);
3841 if (NULL == s) 3856 if (NULL == s)
3842 { 3857 {
3843 /* Fragmentation failed, try next message... */ 3858 /* Fragmentation failed, try next message... */
@@ -3868,13 +3883,13 @@ transmit_on_queue (void *cls)
3868 smt->mid = qe->mid; 3883 smt->mid = qe->mid;
3869 smt->receiver = n->pid; 3884 smt->receiver = n->pid;
3870 memcpy (&smt[1], 3885 memcpy (&smt[1],
3871 &s[1], 3886 &s[1],
3872 s->bytes_msg); 3887 s->bytes_msg);
3873 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); 3888 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3874 queue->queue_length++; 3889 queue->queue_length++;
3875 queue->tc->details.communicator.total_queue_length++; 3890 queue->tc->details.communicator.total_queue_length++;
3876 GNUNET_MQ_send (queue->tc->mq, 3891 GNUNET_MQ_send (queue->tc->mq,
3877 env); 3892 env);
3878 3893
3879 // FIXME: do something similar to the logic below 3894 // FIXME: do something similar to the logic below
3880 // in defragmentation / reliability ACK handling! 3895 // in defragmentation / reliability ACK handling!
@@ -3886,8 +3901,8 @@ transmit_on_queue (void *cls)
3886 { 3901 {
3887 /* Full message sent, and over reliabile channel */ 3902 /* Full message sent, and over reliabile channel */
3888 client_send_response (pm, 3903 client_send_response (pm,
3889 GNUNET_YES, 3904 GNUNET_YES,
3890 pm->bytes_msg); 3905 pm->bytes_msg);
3891 } 3906 }
3892 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) && 3907 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3893 (PMT_FRAGMENT_BOX == s->pmt) ) 3908 (PMT_FRAGMENT_BOX == s->pmt) )
@@ -3898,9 +3913,9 @@ transmit_on_queue (void *cls)
3898 free_fragment_tree (s); 3913 free_fragment_tree (s);
3899 pos = s->frag_parent; 3914 pos = s->frag_parent;
3900 GNUNET_CONTAINER_MDLL_remove (frag, 3915 GNUNET_CONTAINER_MDLL_remove (frag,
3901 pos->head_frag, 3916 pos->head_frag,
3902 pos->tail_frag, 3917 pos->tail_frag,
3903 s); 3918 s);
3904 GNUNET_free (s); 3919 GNUNET_free (s);
3905 /* check if subtree is done */ 3920 /* check if subtree is done */
3906 while ( (NULL == pos->head_frag) && 3921 while ( (NULL == pos->head_frag) &&
@@ -3910,9 +3925,9 @@ transmit_on_queue (void *cls)
3910 s = pos; 3925 s = pos;
3911 pos = s->frag_parent; 3926 pos = s->frag_parent;
3912 GNUNET_CONTAINER_MDLL_remove (frag, 3927 GNUNET_CONTAINER_MDLL_remove (frag,
3913 pos->head_frag, 3928 pos->head_frag,
3914 pos->tail_frag, 3929 pos->tail_frag,
3915 s); 3930 s);
3916 GNUNET_free (s); 3931 GNUNET_free (s);
3917 } 3932 }
3918 3933
@@ -3920,8 +3935,8 @@ transmit_on_queue (void *cls)
3920 if ( (NULL == pm->head_frag) && 3935 if ( (NULL == pm->head_frag) &&
3921 (pm->frag_off == pm->bytes_msg) ) 3936 (pm->frag_off == pm->bytes_msg) )
3922 client_send_response (pm, 3937 client_send_response (pm,
3923 GNUNET_YES, 3938 GNUNET_YES,
3924 pm->bytes_msg /* FIXME: calculate and add overheads! */); 3939 pm->bytes_msg /* FIXME: calculate and add overheads! */);
3925 } 3940 }
3926 else if (PMT_CORE != pm->pmt) 3941 else if (PMT_CORE != pm->pmt)
3927 { 3942 {
@@ -3941,25 +3956,25 @@ transmit_on_queue (void *cls)
3941 message urgency and size when delaying ACKs, etc.) */ 3956 message urgency and size when delaying ACKs, etc.) */
3942 s->next_attempt = GNUNET_TIME_relative_to_absolute 3957 s->next_attempt = GNUNET_TIME_relative_to_absolute
3943 (GNUNET_TIME_relative_multiply (queue->rtt, 3958 (GNUNET_TIME_relative_multiply (queue->rtt,
3944 4)); 3959 4));
3945 if (s == pm) 3960 if (s == pm)
3946 { 3961 {
3947 struct PendingMessage *pos; 3962 struct PendingMessage *pos;
3948 3963
3949 /* re-insert sort in neighbour list */ 3964 /* re-insert sort in neighbour list */
3950 GNUNET_CONTAINER_MDLL_remove (neighbour, 3965 GNUNET_CONTAINER_MDLL_remove (neighbour,
3951 neighbour->pending_msg_head, 3966 neighbour->pending_msg_head,
3952 neighbour->pending_msg_tail, 3967 neighbour->pending_msg_tail,
3953 pm); 3968 pm);
3954 pos = neighbour->pending_msg_tail; 3969 pos = neighbour->pending_msg_tail;
3955 while ( (NULL != pos) && 3970 while ( (NULL != pos) &&
3956 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) 3971 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3957 pos = pos->prev_neighbour; 3972 pos = pos->prev_neighbour;
3958 GNUNET_CONTAINER_MDLL_insert_after (neighbour, 3973 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3959 neighbour->pending_msg_head, 3974 neighbour->pending_msg_head,
3960 neighbour->pending_msg_tail, 3975 neighbour->pending_msg_tail,
3961 pos, 3976 pos,
3962 pm); 3977 pm);
3963 } 3978 }
3964 else 3979 else
3965 { 3980 {
@@ -3968,18 +3983,18 @@ transmit_on_queue (void *cls)
3968 struct PendingMessage *pos; 3983 struct PendingMessage *pos;
3969 3984
3970 GNUNET_CONTAINER_MDLL_remove (frag, 3985 GNUNET_CONTAINER_MDLL_remove (frag,
3971 fp->head_frag, 3986 fp->head_frag,
3972 fp->tail_frag, 3987 fp->tail_frag,
3973 s); 3988 s);
3974 pos = fp->tail_frag; 3989 pos = fp->tail_frag;
3975 while ( (NULL != pos) && 3990 while ( (NULL != pos) &&
3976 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) 3991 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3977 pos = pos->prev_frag; 3992 pos = pos->prev_frag;
3978 GNUNET_CONTAINER_MDLL_insert_after (frag, 3993 GNUNET_CONTAINER_MDLL_insert_after (frag,
3979 fp->head_frag, 3994 fp->head_frag,
3980 fp->tail_frag, 3995 fp->tail_frag,
3981 pos, 3996 pos,
3982 s); 3997 s);
3983 } 3998 }
3984 } 3999 }
3985 4000
@@ -4028,9 +4043,9 @@ tracker_excess_out_cb (void *cls)
4028 from here via a message instead! */ 4043 from here via a message instead! */
4029 /* TODO: maybe inform ATS at this point? */ 4044 /* TODO: maybe inform ATS at this point? */
4030 GNUNET_STATISTICS_update (GST_stats, 4045 GNUNET_STATISTICS_update (GST_stats,
4031 "# Excess outbound bandwidth reported", 4046 "# Excess outbound bandwidth reported",
4032 1, 4047 1,
4033 GNUNET_NO); 4048 GNUNET_NO);
4034} 4049}
4035 4050
4036 4051
@@ -4046,9 +4061,9 @@ tracker_excess_in_cb (void *cls)
4046{ 4061{
4047 /* TODO: maybe inform ATS at this point? */ 4062 /* TODO: maybe inform ATS at this point? */
4048 GNUNET_STATISTICS_update (GST_stats, 4063 GNUNET_STATISTICS_update (GST_stats,
4049 "# Excess inbound bandwidth reported", 4064 "# Excess inbound bandwidth reported",
4050 1, 4065 1,
4051 GNUNET_NO); 4066 GNUNET_NO);
4052} 4067}
4053 4068
4054 4069
@@ -4083,12 +4098,12 @@ handle_add_queue_message (void *cls,
4083 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; 4098 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4084 neighbour->pid = aqm->receiver; 4099 neighbour->pid = aqm->receiver;
4085 GNUNET_assert (GNUNET_OK == 4100 GNUNET_assert (GNUNET_OK ==
4086 GNUNET_CONTAINER_multipeermap_put (neighbours, 4101 GNUNET_CONTAINER_multipeermap_put (neighbours,
4087 &neighbour->pid, 4102 &neighbour->pid,
4088 neighbour, 4103 neighbour,
4089 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 4104 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4090 cores_send_connect_info (&neighbour->pid, 4105 cores_send_connect_info (&neighbour->pid,
4091 GNUNET_BANDWIDTH_ZERO); 4106 GNUNET_BANDWIDTH_ZERO);
4092 } 4107 }
4093 addr_len = ntohs (aqm->header.size) - sizeof (*aqm); 4108 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4094 addr = (const char *) &aqm[1]; 4109 addr = (const char *) &aqm[1];
@@ -4117,8 +4132,8 @@ handle_add_queue_message (void *cls,
4117 &tracker_excess_out_cb, 4132 &tracker_excess_out_cb,
4118 queue); 4133 queue);
4119 memcpy (&queue[1], 4134 memcpy (&queue[1],
4120 addr, 4135 addr,
4121 addr_len); 4136 addr_len);
4122 /* notify ATS about new queue */ 4137 /* notify ATS about new queue */
4123 { 4138 {
4124 struct GNUNET_ATS_Properties prop = { 4139 struct GNUNET_ATS_Properties prop = {
@@ -4129,10 +4144,10 @@ handle_add_queue_message (void *cls,
4129 }; 4144 };
4130 4145
4131 queue->sr = GNUNET_ATS_session_add (ats, 4146 queue->sr = GNUNET_ATS_session_add (ats,
4132 &neighbour->pid, 4147 &neighbour->pid,
4133 queue->address, 4148 queue->address,
4134 queue, 4149 queue,
4135 &prop); 4150 &prop);
4136 if (NULL == queue->sr) 4151 if (NULL == queue->sr)
4137 { 4152 {
4138 /* This can only happen if the 'address' was way too long for ATS 4153 /* This can only happen if the 'address' was way too long for ATS
@@ -4159,18 +4174,18 @@ handle_add_queue_message (void *cls,
4159 }; 4174 };
4160 4175
4161 notify_monitors (&neighbour->pid, 4176 notify_monitors (&neighbour->pid,
4162 queue->address, 4177 queue->address,
4163 queue->nt, 4178 queue->nt,
4164 &me); 4179 &me);
4165 } 4180 }
4166 GNUNET_CONTAINER_MDLL_insert (neighbour, 4181 GNUNET_CONTAINER_MDLL_insert (neighbour,
4167 neighbour->session_head, 4182 neighbour->session_head,
4168 neighbour->session_tail, 4183 neighbour->session_tail,
4169 queue); 4184 queue);
4170 GNUNET_CONTAINER_MDLL_insert (client, 4185 GNUNET_CONTAINER_MDLL_insert (client,
4171 tc->details.communicator.session_head, 4186 tc->details.communicator.session_head,
4172 tc->details.communicator.session_tail, 4187 tc->details.communicator.session_tail,
4173 queue); 4188 queue);
4174 GNUNET_SERVICE_client_continue (tc->client); 4189 GNUNET_SERVICE_client_continue (tc->client);
4175} 4190}
4176 4191
@@ -4273,21 +4288,21 @@ handle_send_message_ack (void *cls,
4273 { 4288 {
4274 /* Communicator dropped below threshold, resume all queues */ 4289 /* Communicator dropped below threshold, resume all queues */
4275 GNUNET_STATISTICS_update (GST_stats, 4290 GNUNET_STATISTICS_update (GST_stats,
4276 "# Transmission throttled due to communicator queue limit", 4291 "# Transmission throttled due to communicator queue limit",
4277 -1, 4292 -1,
4278 GNUNET_NO); 4293 GNUNET_NO);
4279 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; 4294 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4280 NULL != session; 4295 NULL != session;
4281 session = session->next_client) 4296 session = session->next_client)
4282 schedule_transmit_on_queue (session); 4297 schedule_transmit_on_queue (session);
4283 } 4298 }
4284 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) 4299 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
4285 { 4300 {
4286 /* queue dropped below threshold; only resume this one queue */ 4301 /* queue dropped below threshold; only resume this one queue */
4287 GNUNET_STATISTICS_update (GST_stats, 4302 GNUNET_STATISTICS_update (GST_stats,
4288 "# Transmission throttled due to session queue limit", 4303 "# Transmission throttled due to session queue limit",
4289 -1, 4304 -1,
4290 GNUNET_NO); 4305 GNUNET_NO);
4291 schedule_transmit_on_queue (queue->session); 4306 schedule_transmit_on_queue (queue->session);
4292 } 4307 }
4293 4308
@@ -4361,8 +4376,8 @@ handle_monitor_start (void *cls,
4361 tc->details.monitor.peer = start->peer; 4376 tc->details.monitor.peer = start->peer;
4362 tc->details.monitor.one_shot = ntohl (start->one_shot); 4377 tc->details.monitor.one_shot = ntohl (start->one_shot);
4363 GNUNET_CONTAINER_multipeermap_iterate (neighbours, 4378 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4364 &notify_client_queues, 4379 &notify_client_queues,
4365 tc); 4380 tc);
4366 GNUNET_SERVICE_client_mark_monitor (tc->client); 4381 GNUNET_SERVICE_client_mark_monitor (tc->client);
4367 GNUNET_SERVICE_client_continue (tc->client); 4382 GNUNET_SERVICE_client_continue (tc->client);
4368} 4383}
@@ -4414,8 +4429,8 @@ lookup_communicator (const char *prefix)
4414 return tc; 4429 return tc;
4415 } 4430 }
4416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 4431 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4417 "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n", 4432 "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
4418 prefix); 4433 prefix);
4419 return NULL; 4434 return NULL;
4420} 4435}
4421 4436
@@ -4451,21 +4466,21 @@ ats_suggestion_cb (void *cls,
4451 if (NULL == tc) 4466 if (NULL == tc)
4452 { 4467 {
4453 GNUNET_STATISTICS_update (GST_stats, 4468 GNUNET_STATISTICS_update (GST_stats,
4454 "# ATS suggestions ignored due to missing communicator", 4469 "# ATS suggestions ignored due to missing communicator",
4455 1, 4470 1,
4456 GNUNET_NO); 4471 GNUNET_NO);
4457 return; 4472 return;
4458 } 4473 }
4459 /* forward suggestion for queue creation to communicator */ 4474 /* forward suggestion for queue creation to communicator */
4460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4461 "Request #%u for `%s' communicator to create queue to `%s'\n", 4476 "Request #%u for `%s' communicator to create queue to `%s'\n",
4462 (unsigned int) idgen, 4477 (unsigned int) idgen,
4463 prefix, 4478 prefix,
4464 address); 4479 address);
4465 alen = strlen (address) + 1; 4480 alen = strlen (address) + 1;
4466 env = GNUNET_MQ_msg_extra (cqm, 4481 env = GNUNET_MQ_msg_extra (cqm,
4467 alen, 4482 alen,
4468 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); 4483 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4469 cqm->request_id = htonl (idgen++); 4484 cqm->request_id = htonl (idgen++);
4470 cqm->receiver = *pid; 4485 cqm->receiver = *pid;
4471 memcpy (&cqm[1], 4486 memcpy (&cqm[1],
@@ -4485,7 +4500,7 @@ ats_suggestion_cb (void *cls,
4485 */ 4500 */
4486static void 4501static void
4487handle_queue_create_ok (void *cls, 4502handle_queue_create_ok (void *cls,
4488 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) 4503 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4489{ 4504{
4490 struct TransportClient *tc = cls; 4505 struct TransportClient *tc = cls;
4491 4506
@@ -4496,12 +4511,12 @@ handle_queue_create_ok (void *cls,
4496 return; 4511 return;
4497 } 4512 }
4498 GNUNET_STATISTICS_update (GST_stats, 4513 GNUNET_STATISTICS_update (GST_stats,
4499 "# ATS suggestions succeeded at communicator", 4514 "# ATS suggestions succeeded at communicator",
4500 1, 4515 1,
4501 GNUNET_NO); 4516 GNUNET_NO);
4502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4503 "Request #%u for communicator to create queue succeeded\n", 4518 "Request #%u for communicator to create queue succeeded\n",
4504 (unsigned int) ntohs (cqr->request_id)); 4519 (unsigned int) ntohs (cqr->request_id));
4505 GNUNET_SERVICE_client_continue (tc->client); 4520 GNUNET_SERVICE_client_continue (tc->client);
4506} 4521}
4507 4522
@@ -4527,12 +4542,12 @@ handle_queue_create_fail (void *cls,
4527 return; 4542 return;
4528 } 4543 }
4529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4530 "Request #%u for communicator to create queue failed\n", 4545 "Request #%u for communicator to create queue failed\n",
4531 (unsigned int) ntohs (cqr->request_id)); 4546 (unsigned int) ntohs (cqr->request_id));
4532 GNUNET_STATISTICS_update (GST_stats, 4547 GNUNET_STATISTICS_update (GST_stats,
4533 "# ATS suggestions failed in queue creation at communicator", 4548 "# ATS suggestions failed in queue creation at communicator",
4534 1, 4549 1,
4535 GNUNET_NO); 4550 GNUNET_NO);
4536 GNUNET_SERVICE_client_continue (tc->client); 4551 GNUNET_SERVICE_client_continue (tc->client);
4537} 4552}
4538 4553
@@ -4601,8 +4616,8 @@ handle_address_consider_verify (void *cls,
4601 */ 4616 */
4602static int 4617static int
4603free_neighbour_cb (void *cls, 4618free_neighbour_cb (void *cls,
4604 const struct GNUNET_PeerIdentity *pid, 4619 const struct GNUNET_PeerIdentity *pid,
4605 void *value) 4620 void *value)
4606{ 4621{
4607 struct Neighbour *neighbour = value; 4622 struct Neighbour *neighbour = value;
4608 4623
@@ -4625,8 +4640,8 @@ free_neighbour_cb (void *cls,
4625 */ 4640 */
4626static int 4641static int
4627free_dv_routes_cb (void *cls, 4642free_dv_routes_cb (void *cls,
4628 const struct GNUNET_PeerIdentity *pid, 4643 const struct GNUNET_PeerIdentity *pid,
4629 void *value) 4644 void *value)
4630{ 4645{
4631 struct DistanceVector *dv = value; 4646 struct DistanceVector *dv = value;
4632 4647
@@ -4648,8 +4663,8 @@ free_dv_routes_cb (void *cls,
4648 */ 4663 */
4649static int 4664static int
4650free_ephemeral_cb (void *cls, 4665free_ephemeral_cb (void *cls,
4651 const struct GNUNET_PeerIdentity *pid, 4666 const struct GNUNET_PeerIdentity *pid,
4652 void *value) 4667 void *value)
4653{ 4668{
4654 struct EphemeralCacheEntry *ece = value; 4669 struct EphemeralCacheEntry *ece = value;
4655 4670
@@ -4734,9 +4749,9 @@ run (void *cls,
4734 /* setup globals */ 4749 /* setup globals */
4735 GST_cfg = c; 4750 GST_cfg = c;
4736 neighbours = GNUNET_CONTAINER_multipeermap_create (1024, 4751 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4737 GNUNET_YES); 4752 GNUNET_YES);
4738 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, 4753 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4739 GNUNET_YES); 4754 GNUNET_YES);
4740 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, 4755 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4741 GNUNET_YES); 4756 GNUNET_YES);
4742 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 4757 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -4790,50 +4805,50 @@ GNUNET_SERVICE_MAIN
4790 NULL, 4805 NULL,
4791 /* communication with core */ 4806 /* communication with core */
4792 GNUNET_MQ_hd_fixed_size (client_start, 4807 GNUNET_MQ_hd_fixed_size (client_start,
4793 GNUNET_MESSAGE_TYPE_TRANSPORT_START, 4808 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4794 struct StartMessage, 4809 struct StartMessage,
4795 NULL), 4810 NULL),
4796 GNUNET_MQ_hd_var_size (client_send, 4811 GNUNET_MQ_hd_var_size (client_send,
4797 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 4812 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4798 struct OutboundMessage, 4813 struct OutboundMessage,
4799 NULL), 4814 NULL),
4800 /* communication with communicators */ 4815 /* communication with communicators */
4801 GNUNET_MQ_hd_var_size (communicator_available, 4816 GNUNET_MQ_hd_var_size (communicator_available,
4802 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, 4817 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4803 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage, 4818 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4804 NULL), 4819 NULL),
4805 GNUNET_MQ_hd_var_size (communicator_backchannel, 4820 GNUNET_MQ_hd_var_size (communicator_backchannel,
4806 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL, 4821 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4807 struct GNUNET_TRANSPORT_CommunicatorBackchannel, 4822 struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4808 NULL), 4823 NULL),
4809 GNUNET_MQ_hd_var_size (add_address, 4824 GNUNET_MQ_hd_var_size (add_address,
4810 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, 4825 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4811 struct GNUNET_TRANSPORT_AddAddressMessage, 4826 struct GNUNET_TRANSPORT_AddAddressMessage,
4812 NULL), 4827 NULL),
4813 GNUNET_MQ_hd_fixed_size (del_address, 4828 GNUNET_MQ_hd_fixed_size (del_address,
4814 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS, 4829 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4815 struct GNUNET_TRANSPORT_DelAddressMessage, 4830 struct GNUNET_TRANSPORT_DelAddressMessage,
4816 NULL), 4831 NULL),
4817 GNUNET_MQ_hd_var_size (incoming_msg, 4832 GNUNET_MQ_hd_var_size (incoming_msg,
4818 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, 4833 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4819 struct GNUNET_TRANSPORT_IncomingMessage, 4834 struct GNUNET_TRANSPORT_IncomingMessage,
4820 NULL), 4835 NULL),
4821 GNUNET_MQ_hd_fixed_size (queue_create_ok, 4836 GNUNET_MQ_hd_fixed_size (queue_create_ok,
4822 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, 4837 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4823 struct GNUNET_TRANSPORT_CreateQueueResponse, 4838 struct GNUNET_TRANSPORT_CreateQueueResponse,
4824 NULL), 4839 NULL),
4825 GNUNET_MQ_hd_fixed_size (queue_create_fail, 4840 GNUNET_MQ_hd_fixed_size (queue_create_fail,
4826 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, 4841 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4827 struct GNUNET_TRANSPORT_CreateQueueResponse, 4842 struct GNUNET_TRANSPORT_CreateQueueResponse,
4828 NULL), 4843 NULL),
4829 GNUNET_MQ_hd_var_size (add_queue_message, 4844 GNUNET_MQ_hd_var_size (add_queue_message,
4830 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, 4845 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4831 struct GNUNET_TRANSPORT_AddQueueMessage, 4846 struct GNUNET_TRANSPORT_AddQueueMessage,
4832 NULL), 4847 NULL),
4833 GNUNET_MQ_hd_var_size (address_consider_verify, 4848 GNUNET_MQ_hd_var_size (address_consider_verify,
4834 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY, 4849 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
4835 struct GNUNET_TRANSPORT_AddressToVerify, 4850 struct GNUNET_TRANSPORT_AddressToVerify,
4836 NULL), 4851 NULL),
4837 GNUNET_MQ_hd_fixed_size (del_queue_message, 4852 GNUNET_MQ_hd_fixed_size (del_queue_message,
4838 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, 4853 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4839 struct GNUNET_TRANSPORT_DelQueueMessage, 4854 struct GNUNET_TRANSPORT_DelQueueMessage,