aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-21 17:45:07 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-21 17:45:07 +0100
commit32b38707097f8dc9f7f39c526f67414f24283eca (patch)
treec390919ff3dcc21a0328367afc05d6592278f5da /src
parent5391d3d34f3bf7f40f37f9e6038466002f422bb3 (diff)
downloadgnunet-32b38707097f8dc9f7f39c526f67414f24283eca.tar.gz
gnunet-32b38707097f8dc9f7f39c526f67414f24283eca.zip
handle transmission timeouts
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c192
1 files changed, 163 insertions, 29 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 3673958ec..e205fa3d7 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -440,6 +440,11 @@ struct Neighbour
440 struct GNUNET_ATS_Session *session_tail; 440 struct GNUNET_ATS_Session *session_tail;
441 441
442 /** 442 /**
443 * Task run to cleanup pending messages that have exceeded their timeout.
444 */
445 struct GNUNET_SCHEDULER_Task *timeout_task;
446
447 /**
443 * Quota at which CORE is allowed to transmit to this peer 448 * Quota at which CORE is allowed to transmit to this peer
444 * according to ATS. 449 * according to ATS.
445 * 450 *
@@ -451,6 +456,11 @@ struct Neighbour
451 */ 456 */
452 struct GNUNET_BANDWIDTH_Value32NBO quota_out; 457 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
453 458
459 /**
460 * What is the earliest timeout of any message in @e pending_msg_tail?
461 */
462 struct GNUNET_TIME_Absolute earliest_timeout;
463
454}; 464};
455 465
456 466
@@ -490,10 +500,21 @@ struct PendingMessage
490 struct TransportClient *client; 500 struct TransportClient *client;
491 501
492 /** 502 /**
503 * At what time should we give up on the transmission (and no longer retry)?
504 */
505 struct GNUNET_TIME_Absolute timeout;
506
507 /**
508 * What is the earliest time for us to retry transmission of this message?
509 */
510 struct GNUNET_TIME_Absolute next_attempt;
511
512 /**
493 * Size of the original message. 513 * Size of the original message.
494 */ 514 */
495 uint32_t bytes_msg; 515 uint32_t bytes_msg;
496 516
517 /* Followed by @e bytes_msg to transmit */
497}; 518};
498 519
499 520
@@ -592,7 +613,8 @@ struct TransportClient
592 struct { 613 struct {
593 614
594 /** 615 /**
595 * Head of list of messages pending for this client. 616 * Head of list of messages pending for this client, sorted by
617 * transmission time ("next_attempt" + possibly internal prioritization).
596 */ 618 */
597 struct PendingMessage *pending_msg_head; 619 struct PendingMessage *pending_msg_head;
598 620
@@ -920,6 +942,8 @@ free_neighbour (struct Neighbour *neighbour)
920 GNUNET_CONTAINER_multipeermap_remove (neighbours, 942 GNUNET_CONTAINER_multipeermap_remove (neighbours,
921 &neighbour->pid, 943 &neighbour->pid,
922 neighbour)); 944 neighbour));
945 if (NULL != neighbour->timeout_task)
946 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
923 GNUNET_free (neighbour); 947 GNUNET_free (neighbour);
924} 948}
925 949
@@ -1273,6 +1297,50 @@ client_send_response (struct PendingMessage *pm,
1273 1297
1274 1298
1275/** 1299/**
1300 * Checks the message queue for a neighbour for messages that have timed
1301 * out and purges them.
1302 *
1303 * @param cls a `struct Neighbour`
1304 */
1305static void
1306check_queue_timeouts (void *cls)
1307{
1308 struct Neighbour *n = cls;
1309 struct PendingMessage *pm;
1310 struct GNUNET_TIME_Absolute now;
1311 struct GNUNET_TIME_Absolute earliest_timeout;
1312
1313 n->timeout_task = NULL;
1314 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1315 now = GNUNET_TIME_absolute_get ();
1316 for (struct PendingMessage *pos = n->pending_msg_head;
1317 NULL != pos;
1318 pos = pm)
1319 {
1320 pm = pos->next_neighbour;
1321 if (pos->timeout.abs_value_us <= now.abs_value_us)
1322 {
1323 GNUNET_STATISTICS_update (GST_stats,
1324 "# messages dropped (timeout before confirmation)",
1325 1,
1326 GNUNET_NO);
1327 client_send_response (pm,
1328 GNUNET_NO,
1329 0);
1330 continue;
1331 }
1332 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
1333 pos->timeout);
1334 }
1335 n->earliest_timeout = earliest_timeout;
1336 if (NULL != n->pending_msg_head)
1337 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
1338 &check_queue_timeouts,
1339 n);
1340}
1341
1342
1343/**
1276 * Client asked for transmission to a peer. Process the request. 1344 * Client asked for transmission to a peer. Process the request.
1277 * 1345 *
1278 * @param cls the client 1346 * @param cls the client
@@ -1316,10 +1384,14 @@ handle_client_send (void *cls,
1316 GNUNET_NO); 1384 GNUNET_NO);
1317 return; 1385 return;
1318 } 1386 }
1319 pm = GNUNET_new (struct PendingMessage); 1387 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
1320 pm->client = tc; 1388 pm->client = tc;
1321 pm->target = target; 1389 pm->target = target;
1322 pm->bytes_msg = bytes_msg; 1390 pm->bytes_msg = bytes_msg;
1391 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
1392 memcpy (&pm[1],
1393 &obm[1],
1394 bytes_msg);
1323 GNUNET_CONTAINER_MDLL_insert (neighbour, 1395 GNUNET_CONTAINER_MDLL_insert (neighbour,
1324 target->pending_msg_head, 1396 target->pending_msg_head,
1325 target->pending_msg_tail, 1397 target->pending_msg_tail,
@@ -1328,10 +1400,16 @@ handle_client_send (void *cls,
1328 tc->details.core.pending_msg_head, 1400 tc->details.core.pending_msg_head,
1329 tc->details.core.pending_msg_tail, 1401 tc->details.core.pending_msg_tail,
1330 pm); 1402 pm);
1331 // FIXME: do the work, final continuation with call to: 1403 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
1332 client_send_response (pm, 1404 {
1333 GNUNET_NO, 1405 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
1334 0); 1406 if (NULL != target->timeout_task)
1407 GNUNET_SCHEDULER_cancel (target->timeout_task);
1408 target->timeout_task
1409 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
1410 &check_queue_timeouts,
1411 target);
1412 }
1335} 1413}
1336 1414
1337 1415
@@ -1652,45 +1730,37 @@ tracker_update_in_cb (void *cls)
1652 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for 1730 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1653 */ 1731 */
1654static void 1732static void
1655transmit_on_queue (void *cls) 1733transmit_on_queue (void *cls);
1656{
1657 struct GNUNET_ATS_Session *queue = cls;
1658
1659 queue->transmit_task = NULL;
1660 // FIXME: check if transmission is really ready
1661 // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.)
1662 // FIXME: re-schedule self
1663}
1664 1734
1665 1735
1666/** 1736/**
1667 * Bandwidth tracker informs us that the delay until we 1737 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1668 * can transmit again changed. 1738 * we should run immediately or if the message queue is empty.
1739 * Test for no task being added AND queue not being empty to
1740 * transmit immediately afterwards! This function must only
1741 * be called if the message queue is non-empty!
1669 * 1742 *
1670 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed 1743 * @param queue the queue to do scheduling for
1671 */ 1744 */
1672static void 1745static void
1673tracker_update_out_cb (void *cls) 1746schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1674{ 1747{
1675 struct GNUNET_ATS_Session *queue = cls;
1676 struct Neighbour *n = queue->neighbour; 1748 struct Neighbour *n = queue->neighbour;
1677 struct PendingMessage *pm = n->pending_msg_head; 1749 struct PendingMessage *pm = n->pending_msg_head;
1678 struct GNUNET_TIME_Relative out_delay; 1750 struct GNUNET_TIME_Relative out_delay;
1679 unsigned int wsize; 1751 unsigned int wsize;
1680 1752
1681 if (NULL == pm) 1753 GNUNET_assert (NULL != pm);
1682 {
1683 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1684 "Bandwidth allocation updated for empty transmission queue `%s'\n",
1685 queue->address);
1686 return; /* no message pending, nothing to do here! */
1687 }
1688 wsize = (0 == queue->mtu) 1754 wsize = (0 == queue->mtu)
1689 ? pm->bytes_msg /* FIXME: add overheads? */ 1755 ? pm->bytes_msg /* FIXME: add overheads? */
1690 : queue->mtu; 1756 : queue->mtu;
1691 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, 1757 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1692 wsize); 1758 wsize);
1693 GNUNET_SCHEDULER_cancel (queue->transmit_task); 1759 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1760 out_delay);
1761 if (0 == out_delay.rel_value_us)
1762 return; /* we should run immediately! */
1763 /* queue has changed since we were scheduled, reschedule again */
1694 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, 1764 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1695 &transmit_on_queue, 1765 &transmit_on_queue,
1696 queue); 1766 queue);
@@ -1710,6 +1780,69 @@ tracker_update_out_cb (void *cls)
1710 1780
1711 1781
1712/** 1782/**
1783 * We believe we are ready to transmit a message on a queue. Double-checks
1784 * with the queue's "tracker_out" and then gives the message to the
1785 * communicator for transmission (updating the tracker, and re-scheduling
1786 * itself if applicable).
1787 *
1788 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1789 */
1790static void
1791transmit_on_queue (void *cls)
1792{
1793 struct GNUNET_ATS_Session *queue = cls;
1794 struct Neighbour *n = queue->neighbour;
1795 struct PendingMessage *pm;
1796
1797 queue->transmit_task = NULL;
1798 if (NULL == (pm = n->pending_msg_head))
1799 {
1800 /* no message pending, nothing to do here! */
1801 return;
1802 }
1803 schedule_transmit_on_queue (queue);
1804 if (NULL != queue->transmit_task)
1805 return; /* do it later */
1806
1807 // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.)
1808 // FIXME: upon success, do (not here in continuation!)
1809 if (0)
1810 {
1811 client_send_response (pm,
1812 GNUNET_YES,
1813 0);
1814 }
1815 /* finally, re-schedule self */
1816 schedule_transmit_on_queue (queue);
1817}
1818
1819
1820/**
1821 * Bandwidth tracker informs us that the delay until we
1822 * can transmit again changed.
1823 *
1824 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
1825 */
1826static void
1827tracker_update_out_cb (void *cls)
1828{
1829 struct GNUNET_ATS_Session *queue = cls;
1830 struct Neighbour *n = queue->neighbour;
1831
1832 if (NULL == n->pending_msg_head)
1833 {
1834 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1835 "Bandwidth allocation updated for empty transmission queue `%s'\n",
1836 queue->address);
1837 return; /* no message pending, nothing to do here! */
1838 }
1839 GNUNET_SCHEDULER_cancel (queue->transmit_task);
1840 queue->transmit_task = NULL;
1841 schedule_transmit_on_queue (queue);
1842}
1843
1844
1845/**
1713 * Bandwidth tracker informs us that excessive outbound bandwidth was 1846 * Bandwidth tracker informs us that excessive outbound bandwidth was
1714 * allocated which is not being used. 1847 * allocated which is not being used.
1715 * 1848 *
@@ -1768,6 +1901,7 @@ handle_add_queue_message (void *cls,
1768 if (NULL == neighbour) 1901 if (NULL == neighbour)
1769 { 1902 {
1770 neighbour = GNUNET_new (struct Neighbour); 1903 neighbour = GNUNET_new (struct Neighbour);
1904 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1771 neighbour->pid = aqm->receiver; 1905 neighbour->pid = aqm->receiver;
1772 GNUNET_assert (GNUNET_OK == 1906 GNUNET_assert (GNUNET_OK ==
1773 GNUNET_CONTAINER_multipeermap_put (neighbours, 1907 GNUNET_CONTAINER_multipeermap_put (neighbours,