aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c407
1 files changed, 292 insertions, 115 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 6c3373013..8febbdfff 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -99,18 +99,26 @@
99#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 99#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
100 100
101/** 101/**
102 * How many messages can we have pending for a given client process 102 * How many messages can we have pending for a given communicator
103 * before we start to drop incoming messages? We typically should 103 * process before we start to throttle that communicator?
104 * have only one client and so this would be the primary buffer for 104 *
105 * messages, so the number should be chosen rather generously. 105 * Used if a communicator might be CPU-bound and cannot handle the traffic.
106 * 106 */
107 * The expectation here is that most of the time the queue is large 107#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
108 * enough so that a drop is virtually never required. Note that 108
109 * this value must be about as large as 'TOTAL_MSGS' in the 109/**
110 * 'test_transport_api_reliability.c', otherwise that testcase may 110 * How many messages can we have pending for a given session (queue to
111 * fail. 111 * a particular peer via a communicator) process before we start to
112 * throttle that queue?
113 *
114 * Used if ATS assigns more bandwidth to a particular transmission
115 * method than that transmission method can right now handle. (Yes,
116 * ATS should eventually notice utilization below allocation and
117 * adjust, but we don't want to queue up tons of messages in the
118 * meantime). Must be significantly below
119 * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
112 */ 120 */
113#define MAX_PENDING (128 * 1024) 121#define SESSION_QUEUE_LIMIT 32
114 122
115 123
116GNUNET_NETWORK_STRUCT_BEGIN 124GNUNET_NETWORK_STRUCT_BEGIN
@@ -555,6 +563,40 @@ struct Neighbour;
555 563
556 564
557/** 565/**
566 * Entry identifying transmission in one of our `struct
567 * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
568 * ensure we do not overwhelm a communicator and limit the number of
569 * messages outstanding per communicator (say in case communicator is
570 * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
571 * what the communicator can actually provide towards a particular
572 * peer/target).
573 */
574struct QueueEntry
575{
576
577 /**
578 * Kept as a DLL.
579 */
580 struct QueueEntry *next;
581
582 /**
583 * Kept as a DLL.
584 */
585 struct QueueEntry *prev;
586
587 /**
588 * ATS session this entry is queued with.
589 */
590 struct GNUNET_ATS_Session *session;
591
592 /**
593 * Message ID used for this message with the queue used for transmission.
594 */
595 uint64_t mid;
596};
597
598
599/**
558 * An ATS session is a message queue provided by a communicator 600 * An ATS session is a message queue provided by a communicator
559 * via which we can reach a particular neighbour. 601 * via which we can reach a particular neighbour.
560 */ 602 */
@@ -581,6 +623,16 @@ struct GNUNET_ATS_Session
581 struct GNUNET_ATS_Session *next_client; 623 struct GNUNET_ATS_Session *next_client;
582 624
583 /** 625 /**
626 * Head of DLL of unacked transmission requests.
627 */
628 struct QueueEntry *queue_head;
629
630 /**
631 * End of DLL of unacked transmission requests.
632 */
633 struct QueueEntry *queue_tail;
634
635 /**
584 * Which neighbour is this ATS session for? 636 * Which neighbour is this ATS session for?
585 */ 637 */
586 struct Neighbour *neighbour; 638 struct Neighbour *neighbour;
@@ -612,6 +664,11 @@ struct GNUNET_ATS_Session
612 struct GNUNET_TIME_Relative rtt; 664 struct GNUNET_TIME_Relative rtt;
613 665
614 /** 666 /**
667 * Message ID generator for transmissions on this queue.
668 */
669 uint64_t mid_gen;
670
671 /**
615 * Unique identifier of this ATS session with the communicator. 672 * Unique identifier of this ATS session with the communicator.
616 */ 673 */
617 uint32_t qid; 674 uint32_t qid;
@@ -627,24 +684,29 @@ struct GNUNET_ATS_Session
627 uint32_t distance; 684 uint32_t distance;
628 685
629 /** 686 /**
630 * Network type offered by this ATS session. 687 * Messages pending.
631 */ 688 */
632 enum GNUNET_NetworkType nt; 689 uint32_t num_msg_pending;
633 690
634 /** 691 /**
635 * Connection status for this ATS session. 692 * Bytes pending.
636 */ 693 */
637 enum GNUNET_TRANSPORT_ConnectionStatus cs; 694 uint32_t num_bytes_pending;
638 695
639 /** 696 /**
640 * Messages pending. 697 * Length of the DLL starting at @e queue_head.
641 */ 698 */
642 uint32_t num_msg_pending; 699 unsigned int queue_length;
700
701 /**
702 * Network type offered by this ATS session.
703 */
704 enum GNUNET_NetworkType nt;
643 705
644 /** 706 /**
645 * Bytes pending. 707 * Connection status for this ATS session.
646 */ 708 */
647 uint32_t num_bytes_pending; 709 enum GNUNET_TRANSPORT_ConnectionStatus cs;
648 710
649 /** 711 /**
650 * How much outbound bandwidth do we have available for this session? 712 * How much outbound bandwidth do we have available for this session?
@@ -842,11 +904,6 @@ struct PendingMessage
842 * initialized if @e msg_uuid_set is #GNUNET_YES). 904 * initialized if @e msg_uuid_set is #GNUNET_YES).
843 */ 905 */
844 struct GNUNET_ShortHashCode msg_uuid; 906 struct GNUNET_ShortHashCode msg_uuid;
845
846 /**
847 * Message ID used for this message with the queue used for transmission.
848 */
849 uint64_t mid;
850 907
851 /** 908 /**
852 * Counter incremented per generated fragment. 909 * Counter incremented per generated fragment.
@@ -1035,6 +1092,13 @@ struct TransportClient
1035 struct AddressListEntry *addr_tail; 1092 struct AddressListEntry *addr_tail;
1036 1093
1037 /** 1094 /**
1095 * Number of queue entries in all queues to this communicator. Used
1096 * throttle sending to a communicator if we see that the communicator
1097 * is globally unable to keep up.
1098 */
1099 unsigned int total_queue_length;
1100
1101 /**
1038 * Characteristics of this communicator. 1102 * Characteristics of this communicator.
1039 */ 1103 */
1040 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; 1104 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
@@ -1382,41 +1446,142 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1382 1446
1383 1447
1384/** 1448/**
1385 * Free @a queue. 1449 * We believe we are ready to transmit a message on a queue. Double-checks
1450 * with the queue's "tracker_out" and then gives the message to the
1451 * communicator for transmission (updating the tracker, and re-scheduling
1452 * itself if applicable).
1453 *
1454 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1455 */
1456static void
1457transmit_on_queue (void *cls);
1458
1459
1460/**
1461 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1462 * we should run immediately or if the message queue is empty.
1463 * Test for no task being added AND queue not being empty to
1464 * transmit immediately afterwards! This function must only
1465 * be called if the message queue is non-empty!
1386 * 1466 *
1387 * @param queue the queue to free 1467 * @param queue the queue to do scheduling for
1468 */
1469static void
1470schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1471{
1472 struct Neighbour *n = queue->neighbour;
1473 struct PendingMessage *pm = n->pending_msg_head;
1474 struct GNUNET_TIME_Relative out_delay;
1475 unsigned int wsize;
1476
1477 GNUNET_assert (NULL != pm);
1478 if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1479 {
1480 GNUNET_STATISTICS_update (GST_stats,
1481 "# Transmission throttled due to communicator queue limit",
1482 1,
1483 GNUNET_NO);
1484 return;
1485 }
1486 if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1487 {
1488 GNUNET_STATISTICS_update (GST_stats,
1489 "# Transmission throttled due to session queue limit",
1490 1,
1491 GNUNET_NO);
1492 return;
1493 }
1494
1495 wsize = (0 == queue->mtu)
1496 ? pm->bytes_msg /* FIXME: add overheads? */
1497 : queue->mtu;
1498 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1499 wsize);
1500 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1501 out_delay);
1502 if (0 == out_delay.rel_value_us)
1503 return; /* we should run immediately! */
1504 /* queue has changed since we were scheduled, reschedule again */
1505 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1506 &transmit_on_queue,
1507 queue);
1508 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1509 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510 "Next transmission on queue `%s' in %s (high delay)\n",
1511 queue->address,
1512 GNUNET_STRINGS_relative_time_to_string (out_delay,
1513 GNUNET_YES));
1514 else
1515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516 "Next transmission on queue `%s' in %s\n",
1517 queue->address,
1518 GNUNET_STRINGS_relative_time_to_string (out_delay,
1519 GNUNET_YES));
1520}
1521
1522
1523/**
1524 * Free @a session.
1525 *
1526 * @param session the session to free
1388 */ 1527 */
1389static void 1528static void
1390free_queue (struct GNUNET_ATS_Session *queue) 1529free_session (struct GNUNET_ATS_Session *session)
1391{ 1530{
1392 struct Neighbour *neighbour = queue->neighbour; 1531 struct Neighbour *neighbour = session->neighbour;
1393 struct TransportClient *tc = queue->tc; 1532 struct TransportClient *tc = session->tc;
1394 struct MonitorEvent me = { 1533 struct MonitorEvent me = {
1395 .cs = GNUNET_TRANSPORT_CS_DOWN, 1534 .cs = GNUNET_TRANSPORT_CS_DOWN,
1396 .rtt = GNUNET_TIME_UNIT_FOREVER_REL 1535 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1397 }; 1536 };
1537 struct QueueEntry *qe;
1538 int maxxed;
1398 1539
1399 if (NULL != queue->transmit_task) 1540 if (NULL != session->transmit_task)
1400 { 1541 {
1401 GNUNET_SCHEDULER_cancel (queue->transmit_task); 1542 GNUNET_SCHEDULER_cancel (session->transmit_task);
1402 queue->transmit_task = NULL; 1543 session->transmit_task = NULL;
1403 } 1544 }
1404 GNUNET_CONTAINER_MDLL_remove (neighbour, 1545 GNUNET_CONTAINER_MDLL_remove (neighbour,
1405 neighbour->session_head, 1546 neighbour->session_head,
1406 neighbour->session_tail, 1547 neighbour->session_tail,
1407 queue); 1548 session);
1408 GNUNET_CONTAINER_MDLL_remove (client, 1549 GNUNET_CONTAINER_MDLL_remove (client,
1409 tc->details.communicator.session_head, 1550 tc->details.communicator.session_head,
1410 tc->details.communicator.session_tail, 1551 tc->details.communicator.session_tail,
1411 queue); 1552 session);
1553 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1554 while (NULL != (qe = session->queue_head))
1555 {
1556 GNUNET_CONTAINER_DLL_remove (session->queue_head,
1557 session->queue_tail,
1558 qe);
1559 session->queue_length--;
1560 tc->details.communicator.total_queue_length--;
1561 GNUNET_free (qe);
1562 }
1563 GNUNET_assert (0 == session->queue_length);
1564 if ( (maxxed) &&
1565 (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
1566 {
1567 /* Communicator dropped below threshold, resume all queues */
1568 GNUNET_STATISTICS_update (GST_stats,
1569 "# Transmission throttled due to communicator queue limit",
1570 -1,
1571 GNUNET_NO);
1572 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
1573 NULL != s;
1574 s = s->next_client)
1575 schedule_transmit_on_queue (s);
1576 }
1412 notify_monitors (&neighbour->pid, 1577 notify_monitors (&neighbour->pid,
1413 queue->address, 1578 session->address,
1414 queue->nt, 1579 session->nt,
1415 &me); 1580 &me);
1416 GNUNET_ATS_session_del (queue->sr); 1581 GNUNET_ATS_session_del (session->sr);
1417 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); 1582 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
1418 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); 1583 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
1419 GNUNET_free (queue); 1584 GNUNET_free (session);
1420 if (NULL == neighbour->session_head) 1585 if (NULL == neighbour->session_head)
1421 { 1586 {
1422 cores_send_disconnect_info (&neighbour->pid); 1587 cores_send_disconnect_info (&neighbour->pid);
@@ -1499,7 +1664,7 @@ client_disconnect_cb (void *cls,
1499 struct AddressListEntry *ale; 1664 struct AddressListEntry *ale;
1500 1665
1501 while (NULL != (q = tc->details.communicator.session_head)) 1666 while (NULL != (q = tc->details.communicator.session_head))
1502 free_queue (q); 1667 free_session (q);
1503 while (NULL != (ale = tc->details.communicator.addr_head)) 1668 while (NULL != (ale = tc->details.communicator.addr_head))
1504 free_address_list_entry (ale); 1669 free_address_list_entry (ale);
1505 GNUNET_free (tc->details.communicator.address_prefix); 1670 GNUNET_free (tc->details.communicator.address_prefix);
@@ -2104,64 +2269,6 @@ tracker_update_in_cb (void *cls)
2104 2269
2105 2270
2106/** 2271/**
2107 * We believe we are ready to transmit a message on a queue. Double-checks
2108 * with the queue's "tracker_out" and then gives the message to the
2109 * communicator for transmission (updating the tracker, and re-scheduling
2110 * itself if applicable).
2111 *
2112 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2113 */
2114static void
2115transmit_on_queue (void *cls);
2116
2117
2118/**
2119 * Schedule next run of #transmit_on_queue(). Does NOTHING if
2120 * we should run immediately or if the message queue is empty.
2121 * Test for no task being added AND queue not being empty to
2122 * transmit immediately afterwards! This function must only
2123 * be called if the message queue is non-empty!
2124 *
2125 * @param queue the queue to do scheduling for
2126 */
2127static void
2128schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
2129{
2130 struct Neighbour *n = queue->neighbour;
2131 struct PendingMessage *pm = n->pending_msg_head;
2132 struct GNUNET_TIME_Relative out_delay;
2133 unsigned int wsize;
2134
2135 GNUNET_assert (NULL != pm);
2136 wsize = (0 == queue->mtu)
2137 ? pm->bytes_msg /* FIXME: add overheads? */
2138 : queue->mtu;
2139 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2140 wsize);
2141 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2142 out_delay);
2143 if (0 == out_delay.rel_value_us)
2144 return; /* we should run immediately! */
2145 /* queue has changed since we were scheduled, reschedule again */
2146 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
2147 &transmit_on_queue,
2148 queue);
2149 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2150 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2151 "Next transmission on queue `%s' in %s (high delay)\n",
2152 queue->address,
2153 GNUNET_STRINGS_relative_time_to_string (out_delay,
2154 GNUNET_YES));
2155 else
2156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2157 "Next transmission on queue `%s' in %s\n",
2158 queue->address,
2159 GNUNET_STRINGS_relative_time_to_string (out_delay,
2160 GNUNET_YES));
2161}
2162
2163
2164/**
2165 * Fragment the given @a pm to the given @a mtu. Adds 2272 * Fragment the given @a pm to the given @a mtu. Adds
2166 * additional fragments to the neighbour as well. If the 2273 * additional fragments to the neighbour as well. If the
2167 * @a mtu is too small, generates and error for the @a pm 2274 * @a mtu is too small, generates and error for the @a pm
@@ -2317,6 +2424,7 @@ transmit_on_queue (void *cls)
2317{ 2424{
2318 struct GNUNET_ATS_Session *queue = cls; 2425 struct GNUNET_ATS_Session *queue = cls;
2319 struct Neighbour *n = queue->neighbour; 2426 struct Neighbour *n = queue->neighbour;
2427 struct QueueEntry *qe;
2320 struct PendingMessage *pm; 2428 struct PendingMessage *pm;
2321 struct PendingMessage *s; 2429 struct PendingMessage *s;
2322 uint32_t overhead; 2430 uint32_t overhead;
@@ -2361,19 +2469,29 @@ transmit_on_queue (void *cls)
2361 return; 2469 return;
2362 } 2470 }
2363 2471
2364 // pm->mid = queue->mid_gen++; 2472 /* Pass 's' for transission to the communicator */
2473 qe = GNUNET_new (struct QueueEntry);
2474 qe->mid = queue->mid_gen++;
2475 qe->session = queue;
2476 // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
2477 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
2478 queue->queue_tail,
2479 qe);
2365 env = GNUNET_MQ_msg_extra (smt, 2480 env = GNUNET_MQ_msg_extra (smt,
2366 s->bytes_msg, 2481 s->bytes_msg,
2367 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); 2482 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
2368 smt->qid = queue->qid; 2483 smt->qid = queue->qid;
2369 // smt->mid = pm->mid; 2484 smt->mid = qe->mid;
2370 // smt->receiver = pid; 2485 smt->receiver = n->pid;
2371 memcpy (&smt[1], 2486 memcpy (&smt[1],
2372 &s[1], 2487 &s[1],
2373 s->bytes_msg); 2488 s->bytes_msg);
2489 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
2490 queue->queue_length++;
2491 queue->tc->details.communicator.total_queue_length++;
2492 GNUNET_MQ_send (queue->tc->mq,
2493 env);
2374 2494
2375 // FIXME: actually give 's' to communicator for transmission here!
2376
2377 // FIXME: do something similar to the logic below 2495 // FIXME: do something similar to the logic below
2378 // in defragmentation / reliability ACK handling! 2496 // in defragmentation / reliability ACK handling!
2379 2497
@@ -2653,18 +2771,18 @@ handle_del_queue_message (void *cls,
2653 GNUNET_SERVICE_client_drop (tc->client); 2771 GNUNET_SERVICE_client_drop (tc->client);
2654 return; 2772 return;
2655 } 2773 }
2656 for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head; 2774 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2657 NULL != queue; 2775 NULL != session;
2658 queue = queue->next_client) 2776 session = session->next_client)
2659 { 2777 {
2660 struct Neighbour *neighbour = queue->neighbour; 2778 struct Neighbour *neighbour = session->neighbour;
2661 2779
2662 if ( (dqm->qid != queue->qid) || 2780 if ( (dqm->qid != session->qid) ||
2663 (0 != memcmp (&dqm->receiver, 2781 (0 != memcmp (&dqm->receiver,
2664 &neighbour->pid, 2782 &neighbour->pid,
2665 sizeof (struct GNUNET_PeerIdentity))) ) 2783 sizeof (struct GNUNET_PeerIdentity))) )
2666 continue; 2784 continue;
2667 free_queue (queue); 2785 free_session (session);
2668 GNUNET_SERVICE_client_continue (tc->client); 2786 GNUNET_SERVICE_client_continue (tc->client);
2669 return; 2787 return;
2670 } 2788 }
@@ -2684,20 +2802,79 @@ handle_send_message_ack (void *cls,
2684 const struct GNUNET_TRANSPORT_SendMessageToAck *sma) 2802 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
2685{ 2803{
2686 struct TransportClient *tc = cls; 2804 struct TransportClient *tc = cls;
2687 2805 struct QueueEntry *queue;
2806
2688 if (CT_COMMUNICATOR != tc->type) 2807 if (CT_COMMUNICATOR != tc->type)
2689 { 2808 {
2690 GNUNET_break (0); 2809 GNUNET_break (0);
2691 GNUNET_SERVICE_client_drop (tc->client); 2810 GNUNET_SERVICE_client_drop (tc->client);
2692 return; 2811 return;
2693 } 2812 }
2813
2814 /* find our queue entry matching the ACK */
2815 queue = NULL;
2816 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2817 NULL != session;
2818 session = session->next_client)
2819 {
2820 if (0 != memcmp (&session->neighbour->pid,
2821 &sma->receiver,
2822 sizeof (struct GNUNET_PeerIdentity)))
2823 continue;
2824 for (struct QueueEntry *qe = session->queue_head;
2825 NULL != qe;
2826 qe = qe->next)
2827 {
2828 if (qe->mid != sma->mid)
2829 continue;
2830 queue = qe;
2831 break;
2832 }
2833 break;
2834 }
2835 if (NULL == queue)
2836 {
2837 /* this should never happen */
2838 GNUNET_break (0);
2839 GNUNET_SERVICE_client_drop (tc->client);
2840 return;
2841 }
2842 GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
2843 queue->session->queue_tail,
2844 queue);
2845 queue->session->queue_length--;
2846 tc->details.communicator.total_queue_length--;
2847 GNUNET_SERVICE_client_continue (tc->client);
2848
2849 /* if applicable, resume transmissions that waited on ACK */
2850 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
2851 {
2852 /* Communicator dropped below threshold, resume all queues */
2853 GNUNET_STATISTICS_update (GST_stats,
2854 "# Transmission throttled due to communicator queue limit",
2855 -1,
2856 GNUNET_NO);
2857 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2858 NULL != session;
2859 session = session->next_client)
2860 schedule_transmit_on_queue (session);
2861 }
2862 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
2863 {
2864 /* queue dropped below threshold; only resume this one queue */
2865 GNUNET_STATISTICS_update (GST_stats,
2866 "# Transmission throttled due to session queue limit",
2867 -1,
2868 GNUNET_NO);
2869 schedule_transmit_on_queue (queue->session);
2870 }
2871
2872 /* TODO: we also should react on the status! */
2873 // FIXME: this probably requires queue->pm = s assignment!
2694 // FIXME: react to communicator status about transmission request. We got: 2874 // FIXME: react to communicator status about transmission request. We got:
2695 sma->status; // OK success, SYSERR failure 2875 sma->status; // OK success, SYSERR failure
2696 sma->mid; // message ID of original message
2697 sma->receiver; // receiver of original message
2698 2876
2699 2877 GNUNET_free (queue);
2700 GNUNET_SERVICE_client_continue (tc->client);
2701} 2878}
2702 2879
2703 2880