diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-01-24 21:08:12 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-01-24 21:08:12 +0100 |
commit | 0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3 (patch) | |
tree | 25c42af3f33ed869451e83e46ed88e2adaaae5f3 /src/transport | |
parent | 4aef73f70ec3bd117236b81b777022dc8cca966c (diff) | |
download | gnunet-0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3.tar.gz gnunet-0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3.zip |
tng: towards communicator flow control:
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 407 |
1 files changed, 292 insertions, 115 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 6c3373013..8febbdfff 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -99,18 +99,26 @@ | |||
99 | #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | 99 | #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) |
100 | 100 | ||
101 | /** | 101 | /** |
102 | * How many messages can we have pending for a given client process | 102 | * How many messages can we have pending for a given communicator |
103 | * before we start to drop incoming messages? We typically should | 103 | * process before we start to throttle that communicator? |
104 | * have only one client and so this would be the primary buffer for | 104 | * |
105 | * messages, so the number should be chosen rather generously. | 105 | * Used if a communicator might be CPU-bound and cannot handle the traffic. |
106 | * | 106 | */ |
107 | * The expectation here is that most of the time the queue is large | 107 | #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512 |
108 | * enough so that a drop is virtually never required. Note that | 108 | |
109 | * this value must be about as large as 'TOTAL_MSGS' in the | 109 | /** |
110 | * 'test_transport_api_reliability.c', otherwise that testcase may | 110 | * How many messages can we have pending for a given session (queue to |
111 | * fail. | 111 | * a particular peer via a communicator) process before we start to |
112 | * throttle that queue? | ||
113 | * | ||
114 | * Used if ATS assigns more bandwidth to a particular transmission | ||
115 | * method than that transmission method can right now handle. (Yes, | ||
116 | * ATS should eventually notice utilization below allocation and | ||
117 | * adjust, but we don't want to queue up tons of messages in the | ||
118 | * meantime). Must be significantly below | ||
119 | * #COMMUNICATOR_TOTAL_QUEUE_LIMIT. | ||
112 | */ | 120 | */ |
113 | #define MAX_PENDING (128 * 1024) | 121 | #define SESSION_QUEUE_LIMIT 32 |
114 | 122 | ||
115 | 123 | ||
116 | GNUNET_NETWORK_STRUCT_BEGIN | 124 | GNUNET_NETWORK_STRUCT_BEGIN |
@@ -555,6 +563,40 @@ struct Neighbour; | |||
555 | 563 | ||
556 | 564 | ||
557 | /** | 565 | /** |
566 | * Entry identifying transmission in one of our `struct | ||
567 | * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to | ||
568 | * ensure we do not overwhelm a communicator and limit the number of | ||
569 | * messages outstanding per communicator (say in case communicator is | ||
570 | * CPU bound) and per queue (in case ATS bandwidth allocation exceeds | ||
571 | * what the communicator can actually provide towards a particular | ||
572 | * peer/target). | ||
573 | */ | ||
574 | struct QueueEntry | ||
575 | { | ||
576 | |||
577 | /** | ||
578 | * Kept as a DLL. | ||
579 | */ | ||
580 | struct QueueEntry *next; | ||
581 | |||
582 | /** | ||
583 | * Kept as a DLL. | ||
584 | */ | ||
585 | struct QueueEntry *prev; | ||
586 | |||
587 | /** | ||
588 | * ATS session this entry is queued with. | ||
589 | */ | ||
590 | struct GNUNET_ATS_Session *session; | ||
591 | |||
592 | /** | ||
593 | * Message ID used for this message with the queue used for transmission. | ||
594 | */ | ||
595 | uint64_t mid; | ||
596 | }; | ||
597 | |||
598 | |||
599 | /** | ||
558 | * An ATS session is a message queue provided by a communicator | 600 | * An ATS session is a message queue provided by a communicator |
559 | * via which we can reach a particular neighbour. | 601 | * via which we can reach a particular neighbour. |
560 | */ | 602 | */ |
@@ -581,6 +623,16 @@ struct GNUNET_ATS_Session | |||
581 | struct GNUNET_ATS_Session *next_client; | 623 | struct GNUNET_ATS_Session *next_client; |
582 | 624 | ||
583 | /** | 625 | /** |
626 | * Head of DLL of unacked transmission requests. | ||
627 | */ | ||
628 | struct QueueEntry *queue_head; | ||
629 | |||
630 | /** | ||
631 | * End of DLL of unacked transmission requests. | ||
632 | */ | ||
633 | struct QueueEntry *queue_tail; | ||
634 | |||
635 | /** | ||
584 | * Which neighbour is this ATS session for? | 636 | * Which neighbour is this ATS session for? |
585 | */ | 637 | */ |
586 | struct Neighbour *neighbour; | 638 | struct Neighbour *neighbour; |
@@ -612,6 +664,11 @@ struct GNUNET_ATS_Session | |||
612 | struct GNUNET_TIME_Relative rtt; | 664 | struct GNUNET_TIME_Relative rtt; |
613 | 665 | ||
614 | /** | 666 | /** |
667 | * Message ID generator for transmissions on this queue. | ||
668 | */ | ||
669 | uint64_t mid_gen; | ||
670 | |||
671 | /** | ||
615 | * Unique identifier of this ATS session with the communicator. | 672 | * Unique identifier of this ATS session with the communicator. |
616 | */ | 673 | */ |
617 | uint32_t qid; | 674 | uint32_t qid; |
@@ -627,24 +684,29 @@ struct GNUNET_ATS_Session | |||
627 | uint32_t distance; | 684 | uint32_t distance; |
628 | 685 | ||
629 | /** | 686 | /** |
630 | * Network type offered by this ATS session. | 687 | * Messages pending. |
631 | */ | 688 | */ |
632 | enum GNUNET_NetworkType nt; | 689 | uint32_t num_msg_pending; |
633 | 690 | ||
634 | /** | 691 | /** |
635 | * Connection status for this ATS session. | 692 | * Bytes pending. |
636 | */ | 693 | */ |
637 | enum GNUNET_TRANSPORT_ConnectionStatus cs; | 694 | uint32_t num_bytes_pending; |
638 | 695 | ||
639 | /** | 696 | /** |
640 | * Messages pending. | 697 | * Length of the DLL starting at @e queue_head. |
641 | */ | 698 | */ |
642 | uint32_t num_msg_pending; | 699 | unsigned int queue_length; |
700 | |||
701 | /** | ||
702 | * Network type offered by this ATS session. | ||
703 | */ | ||
704 | enum GNUNET_NetworkType nt; | ||
643 | 705 | ||
644 | /** | 706 | /** |
645 | * Bytes pending. | 707 | * Connection status for this ATS session. |
646 | */ | 708 | */ |
647 | uint32_t num_bytes_pending; | 709 | enum GNUNET_TRANSPORT_ConnectionStatus cs; |
648 | 710 | ||
649 | /** | 711 | /** |
650 | * How much outbound bandwidth do we have available for this session? | 712 | * How much outbound bandwidth do we have available for this session? |
@@ -842,11 +904,6 @@ struct PendingMessage | |||
842 | * initialized if @e msg_uuid_set is #GNUNET_YES). | 904 | * initialized if @e msg_uuid_set is #GNUNET_YES). |
843 | */ | 905 | */ |
844 | struct GNUNET_ShortHashCode msg_uuid; | 906 | struct GNUNET_ShortHashCode msg_uuid; |
845 | |||
846 | /** | ||
847 | * Message ID used for this message with the queue used for transmission. | ||
848 | */ | ||
849 | uint64_t mid; | ||
850 | 907 | ||
851 | /** | 908 | /** |
852 | * Counter incremented per generated fragment. | 909 | * Counter incremented per generated fragment. |
@@ -1035,6 +1092,13 @@ struct TransportClient | |||
1035 | struct AddressListEntry *addr_tail; | 1092 | struct AddressListEntry *addr_tail; |
1036 | 1093 | ||
1037 | /** | 1094 | /** |
1095 | * Number of queue entries in all queues to this communicator. Used | ||
1096 | * throttle sending to a communicator if we see that the communicator | ||
1097 | * is globally unable to keep up. | ||
1098 | */ | ||
1099 | unsigned int total_queue_length; | ||
1100 | |||
1101 | /** | ||
1038 | * Characteristics of this communicator. | 1102 | * Characteristics of this communicator. |
1039 | */ | 1103 | */ |
1040 | enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; | 1104 | enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; |
@@ -1382,41 +1446,142 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) | |||
1382 | 1446 | ||
1383 | 1447 | ||
1384 | /** | 1448 | /** |
1385 | * Free @a queue. | 1449 | * We believe we are ready to transmit a message on a queue. Double-checks |
1450 | * with the queue's "tracker_out" and then gives the message to the | ||
1451 | * communicator for transmission (updating the tracker, and re-scheduling | ||
1452 | * itself if applicable). | ||
1453 | * | ||
1454 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for | ||
1455 | */ | ||
1456 | static void | ||
1457 | transmit_on_queue (void *cls); | ||
1458 | |||
1459 | |||
1460 | /** | ||
1461 | * Schedule next run of #transmit_on_queue(). Does NOTHING if | ||
1462 | * we should run immediately or if the message queue is empty. | ||
1463 | * Test for no task being added AND queue not being empty to | ||
1464 | * transmit immediately afterwards! This function must only | ||
1465 | * be called if the message queue is non-empty! | ||
1386 | * | 1466 | * |
1387 | * @param queue the queue to free | 1467 | * @param queue the queue to do scheduling for |
1468 | */ | ||
1469 | static void | ||
1470 | schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) | ||
1471 | { | ||
1472 | struct Neighbour *n = queue->neighbour; | ||
1473 | struct PendingMessage *pm = n->pending_msg_head; | ||
1474 | struct GNUNET_TIME_Relative out_delay; | ||
1475 | unsigned int wsize; | ||
1476 | |||
1477 | GNUNET_assert (NULL != pm); | ||
1478 | if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) | ||
1479 | { | ||
1480 | GNUNET_STATISTICS_update (GST_stats, | ||
1481 | "# Transmission throttled due to communicator queue limit", | ||
1482 | 1, | ||
1483 | GNUNET_NO); | ||
1484 | return; | ||
1485 | } | ||
1486 | if (queue->queue_length >= SESSION_QUEUE_LIMIT) | ||
1487 | { | ||
1488 | GNUNET_STATISTICS_update (GST_stats, | ||
1489 | "# Transmission throttled due to session queue limit", | ||
1490 | 1, | ||
1491 | GNUNET_NO); | ||
1492 | return; | ||
1493 | } | ||
1494 | |||
1495 | wsize = (0 == queue->mtu) | ||
1496 | ? pm->bytes_msg /* FIXME: add overheads? */ | ||
1497 | : queue->mtu; | ||
1498 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, | ||
1499 | wsize); | ||
1500 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), | ||
1501 | out_delay); | ||
1502 | if (0 == out_delay.rel_value_us) | ||
1503 | return; /* we should run immediately! */ | ||
1504 | /* queue has changed since we were scheduled, reschedule again */ | ||
1505 | queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, | ||
1506 | &transmit_on_queue, | ||
1507 | queue); | ||
1508 | if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) | ||
1509 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1510 | "Next transmission on queue `%s' in %s (high delay)\n", | ||
1511 | queue->address, | ||
1512 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
1513 | GNUNET_YES)); | ||
1514 | else | ||
1515 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1516 | "Next transmission on queue `%s' in %s\n", | ||
1517 | queue->address, | ||
1518 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
1519 | GNUNET_YES)); | ||
1520 | } | ||
1521 | |||
1522 | |||
1523 | /** | ||
1524 | * Free @a session. | ||
1525 | * | ||
1526 | * @param session the session to free | ||
1388 | */ | 1527 | */ |
1389 | static void | 1528 | static void |
1390 | free_queue (struct GNUNET_ATS_Session *queue) | 1529 | free_session (struct GNUNET_ATS_Session *session) |
1391 | { | 1530 | { |
1392 | struct Neighbour *neighbour = queue->neighbour; | 1531 | struct Neighbour *neighbour = session->neighbour; |
1393 | struct TransportClient *tc = queue->tc; | 1532 | struct TransportClient *tc = session->tc; |
1394 | struct MonitorEvent me = { | 1533 | struct MonitorEvent me = { |
1395 | .cs = GNUNET_TRANSPORT_CS_DOWN, | 1534 | .cs = GNUNET_TRANSPORT_CS_DOWN, |
1396 | .rtt = GNUNET_TIME_UNIT_FOREVER_REL | 1535 | .rtt = GNUNET_TIME_UNIT_FOREVER_REL |
1397 | }; | 1536 | }; |
1537 | struct QueueEntry *qe; | ||
1538 | int maxxed; | ||
1398 | 1539 | ||
1399 | if (NULL != queue->transmit_task) | 1540 | if (NULL != session->transmit_task) |
1400 | { | 1541 | { |
1401 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 1542 | GNUNET_SCHEDULER_cancel (session->transmit_task); |
1402 | queue->transmit_task = NULL; | 1543 | session->transmit_task = NULL; |
1403 | } | 1544 | } |
1404 | GNUNET_CONTAINER_MDLL_remove (neighbour, | 1545 | GNUNET_CONTAINER_MDLL_remove (neighbour, |
1405 | neighbour->session_head, | 1546 | neighbour->session_head, |
1406 | neighbour->session_tail, | 1547 | neighbour->session_tail, |
1407 | queue); | 1548 | session); |
1408 | GNUNET_CONTAINER_MDLL_remove (client, | 1549 | GNUNET_CONTAINER_MDLL_remove (client, |
1409 | tc->details.communicator.session_head, | 1550 | tc->details.communicator.session_head, |
1410 | tc->details.communicator.session_tail, | 1551 | tc->details.communicator.session_tail, |
1411 | queue); | 1552 | session); |
1553 | maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); | ||
1554 | while (NULL != (qe = session->queue_head)) | ||
1555 | { | ||
1556 | GNUNET_CONTAINER_DLL_remove (session->queue_head, | ||
1557 | session->queue_tail, | ||
1558 | qe); | ||
1559 | session->queue_length--; | ||
1560 | tc->details.communicator.total_queue_length--; | ||
1561 | GNUNET_free (qe); | ||
1562 | } | ||
1563 | GNUNET_assert (0 == session->queue_length); | ||
1564 | if ( (maxxed) && | ||
1565 | (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) ) | ||
1566 | { | ||
1567 | /* Communicator dropped below threshold, resume all queues */ | ||
1568 | GNUNET_STATISTICS_update (GST_stats, | ||
1569 | "# Transmission throttled due to communicator queue limit", | ||
1570 | -1, | ||
1571 | GNUNET_NO); | ||
1572 | for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; | ||
1573 | NULL != s; | ||
1574 | s = s->next_client) | ||
1575 | schedule_transmit_on_queue (s); | ||
1576 | } | ||
1412 | notify_monitors (&neighbour->pid, | 1577 | notify_monitors (&neighbour->pid, |
1413 | queue->address, | 1578 | session->address, |
1414 | queue->nt, | 1579 | session->nt, |
1415 | &me); | 1580 | &me); |
1416 | GNUNET_ATS_session_del (queue->sr); | 1581 | GNUNET_ATS_session_del (session->sr); |
1417 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); | 1582 | GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); |
1418 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); | 1583 | GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); |
1419 | GNUNET_free (queue); | 1584 | GNUNET_free (session); |
1420 | if (NULL == neighbour->session_head) | 1585 | if (NULL == neighbour->session_head) |
1421 | { | 1586 | { |
1422 | cores_send_disconnect_info (&neighbour->pid); | 1587 | cores_send_disconnect_info (&neighbour->pid); |
@@ -1499,7 +1664,7 @@ client_disconnect_cb (void *cls, | |||
1499 | struct AddressListEntry *ale; | 1664 | struct AddressListEntry *ale; |
1500 | 1665 | ||
1501 | while (NULL != (q = tc->details.communicator.session_head)) | 1666 | while (NULL != (q = tc->details.communicator.session_head)) |
1502 | free_queue (q); | 1667 | free_session (q); |
1503 | while (NULL != (ale = tc->details.communicator.addr_head)) | 1668 | while (NULL != (ale = tc->details.communicator.addr_head)) |
1504 | free_address_list_entry (ale); | 1669 | free_address_list_entry (ale); |
1505 | GNUNET_free (tc->details.communicator.address_prefix); | 1670 | GNUNET_free (tc->details.communicator.address_prefix); |
@@ -2104,64 +2269,6 @@ tracker_update_in_cb (void *cls) | |||
2104 | 2269 | ||
2105 | 2270 | ||
2106 | /** | 2271 | /** |
2107 | * We believe we are ready to transmit a message on a queue. Double-checks | ||
2108 | * with the queue's "tracker_out" and then gives the message to the | ||
2109 | * communicator for transmission (updating the tracker, and re-scheduling | ||
2110 | * itself if applicable). | ||
2111 | * | ||
2112 | * @param cls the `struct GNUNET_ATS_Session` to process transmissions for | ||
2113 | */ | ||
2114 | static void | ||
2115 | transmit_on_queue (void *cls); | ||
2116 | |||
2117 | |||
2118 | /** | ||
2119 | * Schedule next run of #transmit_on_queue(). Does NOTHING if | ||
2120 | * we should run immediately or if the message queue is empty. | ||
2121 | * Test for no task being added AND queue not being empty to | ||
2122 | * transmit immediately afterwards! This function must only | ||
2123 | * be called if the message queue is non-empty! | ||
2124 | * | ||
2125 | * @param queue the queue to do scheduling for | ||
2126 | */ | ||
2127 | static void | ||
2128 | schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) | ||
2129 | { | ||
2130 | struct Neighbour *n = queue->neighbour; | ||
2131 | struct PendingMessage *pm = n->pending_msg_head; | ||
2132 | struct GNUNET_TIME_Relative out_delay; | ||
2133 | unsigned int wsize; | ||
2134 | |||
2135 | GNUNET_assert (NULL != pm); | ||
2136 | wsize = (0 == queue->mtu) | ||
2137 | ? pm->bytes_msg /* FIXME: add overheads? */ | ||
2138 | : queue->mtu; | ||
2139 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, | ||
2140 | wsize); | ||
2141 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), | ||
2142 | out_delay); | ||
2143 | if (0 == out_delay.rel_value_us) | ||
2144 | return; /* we should run immediately! */ | ||
2145 | /* queue has changed since we were scheduled, reschedule again */ | ||
2146 | queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, | ||
2147 | &transmit_on_queue, | ||
2148 | queue); | ||
2149 | if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) | ||
2150 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
2151 | "Next transmission on queue `%s' in %s (high delay)\n", | ||
2152 | queue->address, | ||
2153 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
2154 | GNUNET_YES)); | ||
2155 | else | ||
2156 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2157 | "Next transmission on queue `%s' in %s\n", | ||
2158 | queue->address, | ||
2159 | GNUNET_STRINGS_relative_time_to_string (out_delay, | ||
2160 | GNUNET_YES)); | ||
2161 | } | ||
2162 | |||
2163 | |||
2164 | /** | ||
2165 | * Fragment the given @a pm to the given @a mtu. Adds | 2272 | * Fragment the given @a pm to the given @a mtu. Adds |
2166 | * additional fragments to the neighbour as well. If the | 2273 | * additional fragments to the neighbour as well. If the |
2167 | * @a mtu is too small, generates and error for the @a pm | 2274 | * @a mtu is too small, generates and error for the @a pm |
@@ -2317,6 +2424,7 @@ transmit_on_queue (void *cls) | |||
2317 | { | 2424 | { |
2318 | struct GNUNET_ATS_Session *queue = cls; | 2425 | struct GNUNET_ATS_Session *queue = cls; |
2319 | struct Neighbour *n = queue->neighbour; | 2426 | struct Neighbour *n = queue->neighbour; |
2427 | struct QueueEntry *qe; | ||
2320 | struct PendingMessage *pm; | 2428 | struct PendingMessage *pm; |
2321 | struct PendingMessage *s; | 2429 | struct PendingMessage *s; |
2322 | uint32_t overhead; | 2430 | uint32_t overhead; |
@@ -2361,19 +2469,29 @@ transmit_on_queue (void *cls) | |||
2361 | return; | 2469 | return; |
2362 | } | 2470 | } |
2363 | 2471 | ||
2364 | // pm->mid = queue->mid_gen++; | 2472 | /* Pass 's' for transission to the communicator */ |
2473 | qe = GNUNET_new (struct QueueEntry); | ||
2474 | qe->mid = queue->mid_gen++; | ||
2475 | qe->session = queue; | ||
2476 | // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'! | ||
2477 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, | ||
2478 | queue->queue_tail, | ||
2479 | qe); | ||
2365 | env = GNUNET_MQ_msg_extra (smt, | 2480 | env = GNUNET_MQ_msg_extra (smt, |
2366 | s->bytes_msg, | 2481 | s->bytes_msg, |
2367 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); | 2482 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); |
2368 | smt->qid = queue->qid; | 2483 | smt->qid = queue->qid; |
2369 | // smt->mid = pm->mid; | 2484 | smt->mid = qe->mid; |
2370 | // smt->receiver = pid; | 2485 | smt->receiver = n->pid; |
2371 | memcpy (&smt[1], | 2486 | memcpy (&smt[1], |
2372 | &s[1], | 2487 | &s[1], |
2373 | s->bytes_msg); | 2488 | s->bytes_msg); |
2489 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | ||
2490 | queue->queue_length++; | ||
2491 | queue->tc->details.communicator.total_queue_length++; | ||
2492 | GNUNET_MQ_send (queue->tc->mq, | ||
2493 | env); | ||
2374 | 2494 | ||
2375 | // FIXME: actually give 's' to communicator for transmission here! | ||
2376 | |||
2377 | // FIXME: do something similar to the logic below | 2495 | // FIXME: do something similar to the logic below |
2378 | // in defragmentation / reliability ACK handling! | 2496 | // in defragmentation / reliability ACK handling! |
2379 | 2497 | ||
@@ -2653,18 +2771,18 @@ handle_del_queue_message (void *cls, | |||
2653 | GNUNET_SERVICE_client_drop (tc->client); | 2771 | GNUNET_SERVICE_client_drop (tc->client); |
2654 | return; | 2772 | return; |
2655 | } | 2773 | } |
2656 | for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head; | 2774 | for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; |
2657 | NULL != queue; | 2775 | NULL != session; |
2658 | queue = queue->next_client) | 2776 | session = session->next_client) |
2659 | { | 2777 | { |
2660 | struct Neighbour *neighbour = queue->neighbour; | 2778 | struct Neighbour *neighbour = session->neighbour; |
2661 | 2779 | ||
2662 | if ( (dqm->qid != queue->qid) || | 2780 | if ( (dqm->qid != session->qid) || |
2663 | (0 != memcmp (&dqm->receiver, | 2781 | (0 != memcmp (&dqm->receiver, |
2664 | &neighbour->pid, | 2782 | &neighbour->pid, |
2665 | sizeof (struct GNUNET_PeerIdentity))) ) | 2783 | sizeof (struct GNUNET_PeerIdentity))) ) |
2666 | continue; | 2784 | continue; |
2667 | free_queue (queue); | 2785 | free_session (session); |
2668 | GNUNET_SERVICE_client_continue (tc->client); | 2786 | GNUNET_SERVICE_client_continue (tc->client); |
2669 | return; | 2787 | return; |
2670 | } | 2788 | } |
@@ -2684,20 +2802,79 @@ handle_send_message_ack (void *cls, | |||
2684 | const struct GNUNET_TRANSPORT_SendMessageToAck *sma) | 2802 | const struct GNUNET_TRANSPORT_SendMessageToAck *sma) |
2685 | { | 2803 | { |
2686 | struct TransportClient *tc = cls; | 2804 | struct TransportClient *tc = cls; |
2687 | 2805 | struct QueueEntry *queue; | |
2806 | |||
2688 | if (CT_COMMUNICATOR != tc->type) | 2807 | if (CT_COMMUNICATOR != tc->type) |
2689 | { | 2808 | { |
2690 | GNUNET_break (0); | 2809 | GNUNET_break (0); |
2691 | GNUNET_SERVICE_client_drop (tc->client); | 2810 | GNUNET_SERVICE_client_drop (tc->client); |
2692 | return; | 2811 | return; |
2693 | } | 2812 | } |
2813 | |||
2814 | /* find our queue entry matching the ACK */ | ||
2815 | queue = NULL; | ||
2816 | for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; | ||
2817 | NULL != session; | ||
2818 | session = session->next_client) | ||
2819 | { | ||
2820 | if (0 != memcmp (&session->neighbour->pid, | ||
2821 | &sma->receiver, | ||
2822 | sizeof (struct GNUNET_PeerIdentity))) | ||
2823 | continue; | ||
2824 | for (struct QueueEntry *qe = session->queue_head; | ||
2825 | NULL != qe; | ||
2826 | qe = qe->next) | ||
2827 | { | ||
2828 | if (qe->mid != sma->mid) | ||
2829 | continue; | ||
2830 | queue = qe; | ||
2831 | break; | ||
2832 | } | ||
2833 | break; | ||
2834 | } | ||
2835 | if (NULL == queue) | ||
2836 | { | ||
2837 | /* this should never happen */ | ||
2838 | GNUNET_break (0); | ||
2839 | GNUNET_SERVICE_client_drop (tc->client); | ||
2840 | return; | ||
2841 | } | ||
2842 | GNUNET_CONTAINER_DLL_remove (queue->session->queue_head, | ||
2843 | queue->session->queue_tail, | ||
2844 | queue); | ||
2845 | queue->session->queue_length--; | ||
2846 | tc->details.communicator.total_queue_length--; | ||
2847 | GNUNET_SERVICE_client_continue (tc->client); | ||
2848 | |||
2849 | /* if applicable, resume transmissions that waited on ACK */ | ||
2850 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length) | ||
2851 | { | ||
2852 | /* Communicator dropped below threshold, resume all queues */ | ||
2853 | GNUNET_STATISTICS_update (GST_stats, | ||
2854 | "# Transmission throttled due to communicator queue limit", | ||
2855 | -1, | ||
2856 | GNUNET_NO); | ||
2857 | for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; | ||
2858 | NULL != session; | ||
2859 | session = session->next_client) | ||
2860 | schedule_transmit_on_queue (session); | ||
2861 | } | ||
2862 | else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) | ||
2863 | { | ||
2864 | /* queue dropped below threshold; only resume this one queue */ | ||
2865 | GNUNET_STATISTICS_update (GST_stats, | ||
2866 | "# Transmission throttled due to session queue limit", | ||
2867 | -1, | ||
2868 | GNUNET_NO); | ||
2869 | schedule_transmit_on_queue (queue->session); | ||
2870 | } | ||
2871 | |||
2872 | /* TODO: we also should react on the status! */ | ||
2873 | // FIXME: this probably requires queue->pm = s assignment! | ||
2694 | // FIXME: react to communicator status about transmission request. We got: | 2874 | // FIXME: react to communicator status about transmission request. We got: |
2695 | sma->status; // OK success, SYSERR failure | 2875 | sma->status; // OK success, SYSERR failure |
2696 | sma->mid; // message ID of original message | ||
2697 | sma->receiver; // receiver of original message | ||
2698 | 2876 | ||
2699 | 2877 | GNUNET_free (queue); | |
2700 | GNUNET_SERVICE_client_continue (tc->client); | ||
2701 | } | 2878 | } |
2702 | 2879 | ||
2703 | 2880 | ||