aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-11-22 21:42:57 +0100
committerChristian Grothoff <christian@grothoff.org>2018-11-22 21:42:57 +0100
commitdd7379052f3749d87d8c35969ec94b4580e998b5 (patch)
tree2f4babf46193a8eb6058be4713fadb4e0ac24ef7
parenta3a300f43a354c786d74c1952aba6a7816157c97 (diff)
downloadgnunet-dd7379052f3749d87d8c35969ec94b4580e998b5.tar.gz
gnunet-dd7379052f3749d87d8c35969ec94b4580e998b5.zip
implement monitor start logic
-rw-r--r--src/transport/gnunet-service-tng.c116
1 files changed, 93 insertions, 23 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index efbaf6fc9..ca8838380 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -21,11 +21,18 @@
21 * @author Christian Grothoff 21 * @author Christian Grothoff
22 * 22 *
23 * TODO: 23 * TODO:
24 * - monitor start: iterate to inform monitor about all existing queues! 24 * - design ATS-NG API
25 * - figure out how to transmit (selective) ACKs in case of uni-directional
26 * communicators (with/without core? DV-only?) When do we use ACKs?
27 * How/where do we distinguish between TCP/HTTP and unreliable communicators?
28 * => Should communicator provide reliable/unreliable ("flags") information?
25 * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc. 29 * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
26 * - inform ATS about RTT, goodput/loss, overheads, etc. 30 * - inform ATS about RTT, goodput/loss, overheads, etc.
27 * - ask ATS about bandwidth allocation! 31 * - ask ATS about bandwidth allocation!
28 * - 32 * - change transport-core API to provide proper flow control in both
33 * directions, allow multiple messages per peer simultaneously (tag
34 * confirmations with unique message ID), and replace quota-out with
35 * proper flow control;
29 */ 36 */
30#include "platform.h" 37#include "platform.h"
31#include "gnunet_util_lib.h" 38#include "gnunet_util_lib.h"
@@ -133,6 +140,11 @@ struct Queue
133 const char *address; 140 const char *address;
134 141
135 /** 142 /**
143 * Our current RTT estimate for this queue.
144 */
145 struct GNUNET_TIME_Relative rtt;
146
147 /**
136 * Unique identifier of this queue with the communicator. 148 * Unique identifier of this queue with the communicator.
137 */ 149 */
138 uint32_t qid; 150 uint32_t qid;
@@ -141,12 +153,27 @@ struct Queue
141 * Maximum transmission unit supported by this queue. 153 * Maximum transmission unit supported by this queue.
142 */ 154 */
143 uint32_t mtu; 155 uint32_t mtu;
144 156
145 /** 157 /**
146 * Network type offered by this queue. 158 * Network type offered by this queue.
147 */ 159 */
148 enum GNUNET_ATS_Network_Type nt; 160 enum GNUNET_ATS_Network_Type nt;
149 161
162 /**
163 * Connection status for this queue.
164 */
165 enum GNUNET_TRANSPORT_ConnectionStatus cs;
166
167 /**
168 * Messages pending.
169 */
170 uint32_t num_msg_pending;
171
172 /**
173 * Bytes pending.
174 */
175 uint32_t num_bytes_pending;
176
150 // FIXME: add ATS-specific fields here! 177 // FIXME: add ATS-specific fields here!
151}; 178};
152 179
@@ -516,7 +543,7 @@ notify_monitor (struct TransportClient *tc,
516 const struct MonitorEvent *me) 543 const struct MonitorEvent *me)
517{ 544{
518 struct GNUNET_MQ_Envelope *env; 545 struct GNUNET_MQ_Envelope *env;
519 struct GNUNET_TRANSPORT_MonitorData *md; 546 struct GNUNET_TRANSPORT_MonitorData *md;
520 size_t addr_len = strlen (address) + 1; 547 size_t addr_len = strlen (address) + 1;
521 548
522 env = GNUNET_MQ_msg_extra (md, 549 env = GNUNET_MQ_msg_extra (md,
@@ -577,7 +604,7 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
577 nt, 604 nt,
578 me); 605 me);
579 } 606 }
580} 607}
581 608
582 609
583/** 610/**
@@ -640,7 +667,7 @@ core_send_connect_info (struct TransportClient *tc,
640{ 667{
641 struct GNUNET_MQ_Envelope *env; 668 struct GNUNET_MQ_Envelope *env;
642 struct ConnectInfoMessage *cim; 669 struct ConnectInfoMessage *cim;
643 670
644 GNUNET_assert (CT_CORE == tc->type); 671 GNUNET_assert (CT_CORE == tc->type);
645 env = GNUNET_MQ_msg (cim, 672 env = GNUNET_MQ_msg (cim,
646 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 673 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
@@ -664,7 +691,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
664 for (struct TransportClient *tc = clients_head; 691 for (struct TransportClient *tc = clients_head;
665 NULL != tc; 692 NULL != tc;
666 tc = tc->next) 693 tc = tc->next)
667 { 694 {
668 if (CT_CORE != tc->type) 695 if (CT_CORE != tc->type)
669 continue; 696 continue;
670 core_send_connect_info (tc, 697 core_send_connect_info (tc,
@@ -688,7 +715,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
688 { 715 {
689 struct GNUNET_MQ_Envelope *env; 716 struct GNUNET_MQ_Envelope *env;
690 struct DisconnectInfoMessage *dim; 717 struct DisconnectInfoMessage *dim;
691 718
692 if (CT_CORE != tc->type) 719 if (CT_CORE != tc->type)
693 continue; 720 continue;
694 env = GNUNET_MQ_msg (dim, 721 env = GNUNET_MQ_msg (dim,
@@ -739,14 +766,14 @@ free_queue (struct Queue *queue)
739 766
740/** 767/**
741 * Free @a ale 768 * Free @a ale
742 * 769 *
743 * @param ale address list entry to free 770 * @param ale address list entry to free
744 */ 771 */
745static void 772static void
746free_address_list_entry (struct AddressListEntry *ale) 773free_address_list_entry (struct AddressListEntry *ale)
747{ 774{
748 struct TransportClient *tc = ale->tc; 775 struct TransportClient *tc = ale->tc;
749 776
750 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head, 777 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
751 tc->details.communicator.addr_tail, 778 tc->details.communicator.addr_tail,
752 ale); 779 ale);
@@ -838,7 +865,7 @@ notify_client_connect_info (void *cls,
838{ 865{
839 struct TransportClient *tc = cls; 866 struct TransportClient *tc = cls;
840 struct Neighbour *neighbour = value; 867 struct Neighbour *neighbour = value;
841 868
842 core_send_connect_info (tc, 869 core_send_connect_info (tc,
843 pid, 870 pid,
844 neighbour->quota_out); 871 neighbour->quota_out);
@@ -1142,7 +1169,7 @@ peerstore_store_cb (void *cls,
1142 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1169 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1143 "Failed to store our own address `%s' in peerstore!\n", 1170 "Failed to store our own address `%s' in peerstore!\n",
1144 ale->address); 1171 ale->address);
1145 /* refresh period is 1/4 of expiration time, that should be plenty 1172 /* refresh period is 1/4 of expiration time, that should be plenty
1146 without being excessive. */ 1173 without being excessive. */
1147 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration, 1174 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
1148 4ULL), 1175 4ULL),
@@ -1163,7 +1190,7 @@ store_pi (void *cls)
1163 void *addr; 1190 void *addr;
1164 size_t addr_len; 1191 size_t addr_len;
1165 struct GNUNET_TIME_Absolute expiration; 1192 struct GNUNET_TIME_Absolute expiration;
1166 1193
1167 ale->st = NULL; 1194 ale->st = NULL;
1168 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration); 1195 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
1169 GNUNET_HELLO_sign_address (ale->address, 1196 GNUNET_HELLO_sign_address (ale->address,
@@ -1171,7 +1198,7 @@ store_pi (void *cls)
1171 expiration, 1198 expiration,
1172 GST_my_private_key, 1199 GST_my_private_key,
1173 &addr, 1200 &addr,
1174 &addr_len); 1201 &addr_len);
1175 ale->sc = GNUNET_PEERSTORE_store (peerstore, 1202 ale->sc = GNUNET_PEERSTORE_store (peerstore,
1176 "transport", 1203 "transport",
1177 &GST_my_identity, 1204 &GST_my_identity,
@@ -1361,7 +1388,7 @@ handle_add_queue_message (void *cls,
1361 struct Queue *queue; 1388 struct Queue *queue;
1362 struct Neighbour *neighbour; 1389 struct Neighbour *neighbour;
1363 const char *addr; 1390 const char *addr;
1364 uint16_t addr_len; 1391 uint16_t addr_len;
1365 1392
1366 neighbour = lookup_neighbour (&aqm->receiver); 1393 neighbour = lookup_neighbour (&aqm->receiver);
1367 if (NULL == neighbour) 1394 if (NULL == neighbour)
@@ -1381,21 +1408,24 @@ handle_add_queue_message (void *cls,
1381 addr = (const char *) &aqm[1]; 1408 addr = (const char *) &aqm[1];
1382 1409
1383 queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); 1410 queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
1384 queue->mtu = ntohl (aqm->mtu); 1411 queue->tc = tc;
1412 queue->address = (const char *) &queue[1];
1413 queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
1385 queue->qid = aqm->qid; 1414 queue->qid = aqm->qid;
1415 queue->mtu = ntohl (aqm->mtu);
1386 queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); 1416 queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
1387 queue->tc = tc; 1417 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
1388 queue->neighbour = neighbour; 1418 queue->neighbour = neighbour;
1389 queue->address = (const char *) &queue[1];
1390 memcpy (&queue[1], 1419 memcpy (&queue[1],
1391 addr, 1420 addr,
1392 addr_len); 1421 addr_len);
1393 /* notify monitors about new queue */ 1422 /* notify monitors about new queue */
1394 { 1423 {
1395 struct MonitorEvent me = { 1424 struct MonitorEvent me = {
1396 .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs) 1425 .rtt = queue->rtt,
1426 .cs = queue->cs
1397 }; 1427 };
1398 1428
1399 notify_monitors (&neighbour->pid, 1429 notify_monitors (&neighbour->pid,
1400 queue->address, 1430 queue->address,
1401 queue->nt, 1431 queue->nt,
@@ -1475,6 +1505,45 @@ handle_send_message_ack (void *cls,
1475 1505
1476 1506
1477/** 1507/**
1508 * Iterator telling new MONITOR client about all existing
1509 * queues to peers.
1510 *
1511 * @param cls the new `struct TransportClient`
1512 * @param pid a connected peer
1513 * @param value the `struct Neighbour` with more information
1514 * @return #GNUNET_OK (continue to iterate)
1515 */
1516static int
1517notify_client_queues (void *cls,
1518 const struct GNUNET_PeerIdentity *pid,
1519 void *value)
1520{
1521 struct TransportClient *tc = cls;
1522 struct Neighbour *neighbour = value;
1523
1524 GNUNET_assert (CT_MONITOR == tc->type);
1525 for (struct Queue *q = neighbour->queue_head;
1526 NULL != q;
1527 q = q->next_neighbour)
1528 {
1529 struct MonitorEvent me = {
1530 .rtt = q->rtt,
1531 .cs = q->cs,
1532 .num_msg_pending = q->num_msg_pending,
1533 .num_bytes_pending = q->num_bytes_pending
1534 };
1535
1536 notify_monitor (tc,
1537 pid,
1538 q->address,
1539 q->nt,
1540 &me);
1541 }
1542 return GNUNET_OK;
1543}
1544
1545
1546/**
1478 * Initialize a monitor client. 1547 * Initialize a monitor client.
1479 * 1548 *
1480 * @param cls the client 1549 * @param cls the client
@@ -1495,8 +1564,9 @@ handle_monitor_start (void *cls,
1495 tc->type = CT_MONITOR; 1564 tc->type = CT_MONITOR;
1496 tc->details.monitor.peer = start->peer; 1565 tc->details.monitor.peer = start->peer;
1497 tc->details.monitor.one_shot = ntohl (start->one_shot); 1566 tc->details.monitor.one_shot = ntohl (start->one_shot);
1498 // FIXME: do work! 1567 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1499 1568 &notify_client_queues,
1569 tc);
1500 GNUNET_SERVICE_client_mark_monitor (tc->client); 1570 GNUNET_SERVICE_client_mark_monitor (tc->client);
1501 GNUNET_SERVICE_client_continue (tc->client); 1571 GNUNET_SERVICE_client_continue (tc->client);
1502} 1572}
@@ -1551,7 +1621,7 @@ do_shutdown (void *cls)
1551 GNUNET_STATISTICS_destroy (GST_stats, 1621 GNUNET_STATISTICS_destroy (GST_stats,
1552 GNUNET_NO); 1622 GNUNET_NO);
1553 GST_stats = NULL; 1623 GST_stats = NULL;
1554 } 1624 }
1555 if (NULL != GST_my_private_key) 1625 if (NULL != GST_my_private_key)
1556 { 1626 {
1557 GNUNET_free (GST_my_private_key); 1627 GNUNET_free (GST_my_private_key);