diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-01-21 17:45:07 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-01-21 17:45:07 +0100 |
commit | 32b38707097f8dc9f7f39c526f67414f24283eca (patch) | |
tree | c390919ff3dcc21a0328367afc05d6592278f5da /src | |
parent | 5391d3d34f3bf7f40f37f9e6038466002f422bb3 (diff) | |
download | gnunet-32b38707097f8dc9f7f39c526f67414f24283eca.tar.gz gnunet-32b38707097f8dc9f7f39c526f67414f24283eca.zip |
handle transmission timeouts
Diffstat (limited to 'src')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 192 |
1 files changed, 163 insertions, 29 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 3673958ec..e205fa3d7 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -440,6 +440,11 @@ struct Neighbour | |||
440 | struct GNUNET_ATS_Session *session_tail; | 440 | struct GNUNET_ATS_Session *session_tail; |
441 | 441 | ||
442 | /** | 442 | /** |
443 | * Task run to cleanup pending messages that have exceeded their timeout. | ||
444 | */ | ||
445 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
446 | |||
447 | /** | ||
443 | * Quota at which CORE is allowed to transmit to this peer | 448 | * Quota at which CORE is allowed to transmit to this peer |
444 | * according to ATS. | 449 | * according to ATS. |
445 | * | 450 | * |
@@ -451,6 +456,11 @@ struct Neighbour | |||
451 | */ | 456 | */ |
452 | struct GNUNET_BANDWIDTH_Value32NBO quota_out; | 457 | struct GNUNET_BANDWIDTH_Value32NBO quota_out; |
453 | 458 | ||
459 | /** | ||
460 | * What is the earliest timeout of any message in @e pending_msg_tail? | ||
461 | */ | ||
462 | struct GNUNET_TIME_Absolute earliest_timeout; | ||
463 | |||
454 | }; | 464 | }; |
455 | 465 | ||
456 | 466 | ||
@@ -490,10 +500,21 @@ struct PendingMessage | |||
490 | struct TransportClient *client; | 500 | struct TransportClient *client; |
491 | 501 | ||
492 | /** | 502 | /** |
503 | * At what time should we give up on the transmission (and no longer retry)? | ||
504 | */ | ||
505 | struct GNUNET_TIME_Absolute timeout; | ||
506 | |||
507 | /** | ||
508 | * What is the earliest time for us to retry transmission of this message? | ||
509 | */ | ||
510 | struct GNUNET_TIME_Absolute next_attempt; | ||
511 | |||
512 | /** | ||
493 | * Size of the original message. | 513 | * Size of the original message. |
494 | */ | 514 | */ |
495 | uint32_t bytes_msg; | 515 | uint32_t bytes_msg; |
496 | 516 | ||
517 | /* Followed by @e bytes_msg to transmit */ | ||
497 | }; | 518 | }; |
498 | 519 | ||
499 | 520 | ||
@@ -592,7 +613,8 @@ struct TransportClient | |||
592 | struct { | 613 | struct { |
593 | 614 | ||
594 | /** | 615 | /** |
595 | * Head of list of messages pending for this client. | 616 | * Head of list of messages pending for this client, sorted by |
617 | * transmission time ("next_attempt" + possibly internal prioritization). | ||
596 | */ | 618 | */ |
597 | struct PendingMessage *pending_msg_head; | 619 | struct PendingMessage *pending_msg_head; |
598 | 620 | ||
@@ -920,6 +942,8 @@ free_neighbour (struct Neighbour *neighbour) | |||
920 | GNUNET_CONTAINER_multipeermap_remove (neighbours, | 942 | GNUNET_CONTAINER_multipeermap_remove (neighbours, |
921 | &neighbour->pid, | 943 | &neighbour->pid, |
922 | neighbour)); | 944 | neighbour)); |
945 | if (NULL != neighbour->timeout_task) | ||
946 | GNUNET_SCHEDULER_cancel (neighbour->timeout_task); | ||
923 | GNUNET_free (neighbour); | 947 | GNUNET_free (neighbour); |
924 | } | 948 | } |
925 | 949 | ||
@@ -1273,6 +1297,50 @@ client_send_response (struct PendingMessage *pm, | |||
1273 | 1297 | ||
1274 | 1298 | ||
1275 | /** | 1299 | /** |
1300 | * Checks the message queue for a neighbour for messages that have timed | ||
1301 | * out and purges them. | ||
1302 | * | ||
1303 | * @param cls a `struct Neighbour` | ||
1304 | */ | ||
1305 | static void | ||
1306 | check_queue_timeouts (void *cls) | ||
1307 | { | ||
1308 | struct Neighbour *n = cls; | ||
1309 | struct PendingMessage *pm; | ||
1310 | struct GNUNET_TIME_Absolute now; | ||
1311 | struct GNUNET_TIME_Absolute earliest_timeout; | ||
1312 | |||
1313 | n->timeout_task = NULL; | ||
1314 | earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; | ||
1315 | now = GNUNET_TIME_absolute_get (); | ||
1316 | for (struct PendingMessage *pos = n->pending_msg_head; | ||
1317 | NULL != pos; | ||
1318 | pos = pm) | ||
1319 | { | ||
1320 | pm = pos->next_neighbour; | ||
1321 | if (pos->timeout.abs_value_us <= now.abs_value_us) | ||
1322 | { | ||
1323 | GNUNET_STATISTICS_update (GST_stats, | ||
1324 | "# messages dropped (timeout before confirmation)", | ||
1325 | 1, | ||
1326 | GNUNET_NO); | ||
1327 | client_send_response (pm, | ||
1328 | GNUNET_NO, | ||
1329 | 0); | ||
1330 | continue; | ||
1331 | } | ||
1332 | earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout, | ||
1333 | pos->timeout); | ||
1334 | } | ||
1335 | n->earliest_timeout = earliest_timeout; | ||
1336 | if (NULL != n->pending_msg_head) | ||
1337 | n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout, | ||
1338 | &check_queue_timeouts, | ||
1339 | n); | ||
1340 | } | ||
1341 | |||
1342 | |||
1343 | /** | ||
1276 | * Client asked for transmission to a peer. Process the request. | 1344 | * Client asked for transmission to a peer. Process the request. |
1277 | * | 1345 | * |
1278 | * @param cls the client | 1346 | * @param cls the client |
@@ -1316,10 +1384,14 @@ handle_client_send (void *cls, | |||
1316 | GNUNET_NO); | 1384 | GNUNET_NO); |
1317 | return; | 1385 | return; |
1318 | } | 1386 | } |
1319 | pm = GNUNET_new (struct PendingMessage); | 1387 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); |
1320 | pm->client = tc; | 1388 | pm->client = tc; |
1321 | pm->target = target; | 1389 | pm->target = target; |
1322 | pm->bytes_msg = bytes_msg; | 1390 | pm->bytes_msg = bytes_msg; |
1391 | pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); | ||
1392 | memcpy (&pm[1], | ||
1393 | &obm[1], | ||
1394 | bytes_msg); | ||
1323 | GNUNET_CONTAINER_MDLL_insert (neighbour, | 1395 | GNUNET_CONTAINER_MDLL_insert (neighbour, |
1324 | target->pending_msg_head, | 1396 | target->pending_msg_head, |
1325 | target->pending_msg_tail, | 1397 | target->pending_msg_tail, |
@@ -1328,10 +1400,16 @@ handle_client_send (void *cls, | |||
1328 | tc->details.core.pending_msg_head, | 1400 | tc->details.core.pending_msg_head, |
1329 | tc->details.core.pending_msg_tail, | 1401 | tc->details.core.pending_msg_tail, |
1330 | pm); | 1402 | pm); |
1331 | // FIXME: do the work, final continuation with call to: | 1403 | if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) |
1332 | client_send_response (pm, | 1404 | { |
1333 | GNUNET_NO, | 1405 | target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; |
1334 | 0); | 1406 | if (NULL != target->timeout_task) |
1407 | GNUNET_SCHEDULER_cancel (target->timeout_task); | ||
1408 | target->timeout_task | ||
1409 | = GNUNET_SCHEDULER_add_at (target->earliest_timeout, | ||
1410 | &check_queue_timeouts, | ||
1411 | target); | ||
1412 | } | ||
1335 | } | 1413 | } |
1336 | 1414 | ||
1337 | 1415 | ||
@@ -1652,45 +1730,37 @@ tracker_update_in_cb (void *cls) | |||
1652 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for | 1730 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for |
1653 | */ | 1731 | */ |
1654 | static void | 1732 | static void |
1655 | transmit_on_queue (void *cls) | 1733 | transmit_on_queue (void *cls); |
1656 | { | ||
1657 | struct GNUNET_ATS_Session *queue = cls; | ||
1658 | |||
1659 | queue->transmit_task = NULL; | ||
1660 | // FIXME: check if transmission is really ready | ||
1661 | // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) | ||
1662 | // FIXME: re-schedule self | ||
1663 | } | ||
1664 | 1734 | ||
1665 | 1735 | ||
1666 | /** | 1736 | /** |
1667 | * Bandwidth tracker informs us that the delay until we | 1737 | * Schedule next run of #transmit_on_queue(). Does NOTHING if |
1668 | * can transmit again changed. | 1738 | * we should run immediately or if the message queue is empty. |
1739 | * Test for no task being added AND queue not being empty to | ||
1740 | * transmit immediately afterwards! This function must only | ||
1741 | * be called if the message queue is non-empty! | ||
1669 | * | 1742 | * |
1670 | * @param cls a `struct GNUNET_ATS_Session` for which the delay changed | 1743 | * @param queue the queue to do scheduling for |
1671 | */ | 1744 | */ |
1672 | static void | 1745 | static void |
1673 | tracker_update_out_cb (void *cls) | 1746 | schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) |
1674 | { | 1747 | { |
1675 | struct GNUNET_ATS_Session *queue = cls; | ||
1676 | struct Neighbour *n = queue->neighbour; | 1748 | struct Neighbour *n = queue->neighbour; |
1677 | struct PendingMessage *pm = n->pending_msg_head; | 1749 | struct PendingMessage *pm = n->pending_msg_head; |
1678 | struct GNUNET_TIME_Relative out_delay; | 1750 | struct GNUNET_TIME_Relative out_delay; |
1679 | unsigned int wsize; | 1751 | unsigned int wsize; |
1680 | 1752 | ||
1681 | if (NULL == pm) | 1753 | GNUNET_assert (NULL != pm); |
1682 | { | ||
1683 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1684 | "Bandwidth allocation updated for empty transmission queue `%s'\n", | ||
1685 | queue->address); | ||
1686 | return; /* no message pending, nothing to do here! */ | ||
1687 | } | ||
1688 | wsize = (0 == queue->mtu) | 1754 | wsize = (0 == queue->mtu) |
1689 | ? pm->bytes_msg /* FIXME: add overheads? */ | 1755 | ? pm->bytes_msg /* FIXME: add overheads? */ |
1690 | : queue->mtu; | 1756 | : queue->mtu; |
1691 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, | 1757 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, |
1692 | wsize); | 1758 | wsize); |
1693 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 1759 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), |
1760 | out_delay); | ||
1761 | if (0 == out_delay.rel_value_us) | ||
1762 | return; /* we should run immediately! */ | ||
1763 | /* queue has changed since we were scheduled, reschedule again */ | ||
1694 | queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, | 1764 | queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, |
1695 | &transmit_on_queue, | 1765 | &transmit_on_queue, |
1696 | queue); | 1766 | queue); |
@@ -1710,6 +1780,69 @@ tracker_update_out_cb (void *cls) | |||
1710 | 1780 | ||
1711 | 1781 | ||
1712 | /** | 1782 | /** |
1783 | * We believe we are ready to transmit a message on a queue. Double-checks | ||
1784 | * with the queue's "tracker_out" and then gives the message to the | ||
1785 | * communicator for transmission (updating the tracker, and re-scheduling | ||
1786 | * itself if applicable). | ||
1787 | * | ||
1788 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for | ||
1789 | */ | ||
1790 | static void | ||
1791 | transmit_on_queue (void *cls) | ||
1792 | { | ||
1793 | struct GNUNET_ATS_Session *queue = cls; | ||
1794 | struct Neighbour *n = queue->neighbour; | ||
1795 | struct PendingMessage *pm; | ||
1796 | |||
1797 | queue->transmit_task = NULL; | ||
1798 | if (NULL == (pm = n->pending_msg_head)) | ||
1799 | { | ||
1800 | /* no message pending, nothing to do here! */ | ||
1801 | return; | ||
1802 | } | ||
1803 | schedule_transmit_on_queue (queue); | ||
1804 | if (NULL != queue->transmit_task) | ||
1805 | return; /* do it later */ | ||
1806 | |||
1807 | // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) | ||
1808 | // FIXME: upon success, do (not here in continuation!) | ||
1809 | if (0) | ||
1810 | { | ||
1811 | client_send_response (pm, | ||
1812 | GNUNET_YES, | ||
1813 | 0); | ||
1814 | } | ||
1815 | /* finally, re-schedule self */ | ||
1816 | schedule_transmit_on_queue (queue); | ||
1817 | } | ||
1818 | |||
1819 | |||
1820 | /** | ||
1821 | * Bandwidth tracker informs us that the delay until we | ||
1822 | * can transmit again changed. | ||
1823 | * | ||
1824 | * @param cls a `struct GNUNET_ATS_Session` for which the delay changed | ||
1825 | */ | ||
1826 | static void | ||
1827 | tracker_update_out_cb (void *cls) | ||
1828 | { | ||
1829 | struct GNUNET_ATS_Session *queue = cls; | ||
1830 | struct Neighbour *n = queue->neighbour; | ||
1831 | |||
1832 | if (NULL == n->pending_msg_head) | ||
1833 | { | ||
1834 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1835 | "Bandwidth allocation updated for empty transmission queue `%s'\n", | ||
1836 | queue->address); | ||
1837 | return; /* no message pending, nothing to do here! */ | ||
1838 | } | ||
1839 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | ||
1840 | queue->transmit_task = NULL; | ||
1841 | schedule_transmit_on_queue (queue); | ||
1842 | } | ||
1843 | |||
1844 | |||
1845 | /** | ||
1713 | * Bandwidth tracker informs us that excessive outbound bandwidth was | 1846 | * Bandwidth tracker informs us that excessive outbound bandwidth was |
1714 | * allocated which is not being used. | 1847 | * allocated which is not being used. |
1715 | * | 1848 | * |
@@ -1768,6 +1901,7 @@ handle_add_queue_message (void *cls, | |||
1768 | if (NULL == neighbour) | 1901 | if (NULL == neighbour) |
1769 | { | 1902 | { |
1770 | neighbour = GNUNET_new (struct Neighbour); | 1903 | neighbour = GNUNET_new (struct Neighbour); |
1904 | neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; | ||
1771 | neighbour->pid = aqm->receiver; | 1905 | neighbour->pid = aqm->receiver; |
1772 | GNUNET_assert (GNUNET_OK == | 1906 | GNUNET_assert (GNUNET_OK == |
1773 | GNUNET_CONTAINER_multipeermap_put (neighbours, | 1907 | GNUNET_CONTAINER_multipeermap_put (neighbours, |