aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-22 16:51:36 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-22 16:51:36 +0200
commitf1834a5f1c8c2b06f5c140c4aaefe27e474d16d6 (patch)
treec87245b4da8ac4e10d177a91c6cdaf4a5f64c810 /src/transport/gnunet-service-tng.c
parent38300a8d3b441cc492adcd72d4a60b861eea0e95 (diff)
downloadgnunet-f1834a5f1c8c2b06f5c140c4aaefe27e474d16d6.tar.gz
gnunet-f1834a5f1c8c2b06f5c140c4aaefe27e474d16d6.zip
massive refactoring to intruce data structure
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c1162
1 files changed, 729 insertions, 433 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index a35357d9b..5c51ed59a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -117,6 +117,11 @@
117#include "gnunet_signatures.h" 117#include "gnunet_signatures.h"
118#include "transport.h" 118#include "transport.h"
119 119
120/**
121 * Maximum number of messages we acknowledge together in one
122 * cummulative ACK. Larger values may save a bit of bandwidth.
123 */
124#define MAX_CUMMULATIVE_ACKS 64
120 125
121/** 126/**
122 * What is the size we assume for a read operation in the 127 * What is the size we assume for a read operation in the
@@ -212,6 +217,14 @@
212 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1) 217 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
213 218
214/** 219/**
220 * How long until we forget about historic accumulators and thus
221 * reset the ACK counter? Should exceed the maximum time an
222 * active connection experiences without an ACK.
223 */
224#define ACK_CUMMULATOR_TIMEOUT \
225 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
226
227/**
215 * What is the non-randomized base frequency at which we 228 * What is the non-randomized base frequency at which we
216 * would initiate DV learn messages? 229 * would initiate DV learn messages?
217 */ 230 */
@@ -278,13 +291,19 @@ struct MessageUUIDP
278 * Unique value, generated by incrementing the 291 * Unique value, generated by incrementing the
279 * `message_uuid_ctr` of `struct Neighbour`. 292 * `message_uuid_ctr` of `struct Neighbour`.
280 */ 293 */
281 uint32_t uuid GNUNET_PACKED; 294 uint64_t uuid GNUNET_PACKED;
295};
282 296
297
298/**
299 * Unique identifier to map an acknowledgement to a transmission.
300 */
301struct AcknowledgementUUIDP
302{
283 /** 303 /**
284 * UUID of the queue that was used to transmit this message. 304 * The UUID value. Not actually a hash, but a random value.
285 * Used to map acknowledgements back to the respective queue.
286 */ 305 */
287 uint32_t queue_uuid GNUNET_PACKED; 306 struct GNUNET_ShortHashCode value;
288}; 307};
289 308
290 309
@@ -474,7 +493,27 @@ struct TransportReliabilityBoxMessage
474 * messages sent over possibly unreliable channels. Should 493 * messages sent over possibly unreliable channels. Should
475 * be a random. 494 * be a random.
476 */ 495 */
477 struct MessageUUIDP msg_uuid; 496 struct AcknowledgementUUIDP ack_uuid;
497};
498
499
500/**
501 * Acknowledgement payload.
502 */
503struct TransportCummulativeAckPayloadP
504{
505 /**
506 * How long was the ACK delayed for generating cummulative ACKs?
507 * Used to calculate the correct network RTT by taking the receipt
508 * time of the ack minus the transmission time of the sender minus
509 * this value.
510 */
511 struct GNUNET_TIME_RelativeNBO ack_delay;
512
513 /**
514 * UUID of a message being acknowledged.
515 */
516 struct AcknowledgementUUIDP ack_uuid;
478}; 517};
479 518
480 519
@@ -493,19 +532,12 @@ struct TransportReliabilityAckMessage
493 struct GNUNET_MessageHeader header; 532 struct GNUNET_MessageHeader header;
494 533
495 /** 534 /**
496 * Reserved. Zero. 535 * Counter of ACKs transmitted by the sender to us. Incremented
536 * by one for each ACK, used to detect how many ACKs were lost.
497 */ 537 */
498 uint32_t reserved GNUNET_PACKED; 538 uint32_t ack_counter GNUNET_PACKED;
499 539
500 /** 540 /* followed by any number of `struct TransportCummulativeAckPayloadP`
501 * How long was the ACK delayed relative to the average time of
502 * receipt of the messages being acknowledged? Used to calculate
503 * the average RTT by taking the receipt time of the ack minus the
504 * average transmission time of the sender minus this value.
505 */
506 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
507
508 /* followed by any number of `struct MessageUUIDP`
509 messages providing ACKs */ 541 messages providing ACKs */
510}; 542};
511 543
@@ -523,16 +555,15 @@ struct TransportFragmentBoxMessage
523 /** 555 /**
524 * Unique ID of this fragment (and fragment transmission!). Will 556 * Unique ID of this fragment (and fragment transmission!). Will
525 * change even if a fragement is retransmitted to make each 557 * change even if a fragement is retransmitted to make each
526 * transmission attempt unique! Should be incremented by one for 558 * transmission attempt unique! If a client receives a duplicate
527 * each fragment transmission. If a client receives a duplicate 559 * fragment (same @e frag_off for same @a msg_uuid, it must send
528 * fragment (same @e frag_off), it must send 560 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
529 * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
530 */ 561 */
531 struct FragmentUUIDP frag_uuid; 562 struct AcknowledgementUUIDP ack_uuid;
532 563
533 /** 564 /**
534 * Original message ID for of the message that all the 565 * Original message ID for of the message that all the fragments
535 * fragments belong to. Must be the same for all fragments. 566 * belong to. Must be the same for all fragments.
536 */ 567 */
537 struct MessageUUIDP msg_uuid; 568 struct MessageUUIDP msg_uuid;
538 569
@@ -549,54 +580,6 @@ struct TransportFragmentBoxMessage
549 580
550 581
551/** 582/**
552 * Outer layer of an fragmented application message sent over a queue
553 * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
554 * received, the receiver has two RTTs or 64 further fragments with
555 * the same basic message time to send an acknowledgement, possibly
556 * acknowledging up to 65 fragments in one ACK. ACKs must also be
557 * sent immediately once all fragments were sent.
558 */
559struct TransportFragmentAckMessage
560{
561 /**
562 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
563 */
564 struct GNUNET_MessageHeader header;
565
566 /**
567 * Unique ID of the lowest fragment UUID being acknowledged.
568 */
569 struct FragmentUUIDP frag_uuid;
570
571 /**
572 * Bitfield of up to 64 additional fragments following the
573 * @e msg_uuid being acknowledged by this message.
574 */
575 uint64_t extra_acks GNUNET_PACKED;
576
577 /**
578 * Original message ID for of the message that all the
579 * fragments belong to.
580 */
581 struct MessageUUIDP msg_uuid;
582
583 /**
584 * How long was the ACK delayed relative to the average time of
585 * receipt of the fragments being acknowledged? Used to calculate
586 * the average RTT by taking the receipt time of the ack minus the
587 * average transmission time of the sender minus this value.
588 */
589 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
590
591 /**
592 * How long until the receiver will stop trying reassembly
593 * of this message?
594 */
595 struct GNUNET_TIME_RelativeNBO reassembly_timeout;
596};
597
598
599/**
600 * Content signed by the initator during DV learning. 583 * Content signed by the initator during DV learning.
601 * 584 *
602 * The signature is required to prevent DDoS attacks. A peer sending out this 585 * The signature is required to prevent DDoS attacks. A peer sending out this
@@ -1025,13 +1008,11 @@ struct EphemeralCacheEntry
1025 */ 1008 */
1026struct TransportClient; 1009struct TransportClient;
1027 1010
1028
1029/** 1011/**
1030 * A neighbour that at least one communicator is connected to. 1012 * A neighbour that at least one communicator is connected to.
1031 */ 1013 */
1032struct Neighbour; 1014struct Neighbour;
1033 1015
1034
1035/** 1016/**
1036 * Entry in our #dv_routes table, representing a (set of) distance 1017 * Entry in our #dv_routes table, representing a (set of) distance
1037 * vector routes to a particular peer. 1018 * vector routes to a particular peer.
@@ -1039,6 +1020,118 @@ struct Neighbour;
1039struct DistanceVector; 1020struct DistanceVector;
1040 1021
1041/** 1022/**
1023 * A queue is a message queue provided by a communicator
1024 * via which we can reach a particular neighbour.
1025 */
1026struct Queue;
1027
1028/**
1029 * Message awaiting transmission. See detailed comments below.
1030 */
1031struct PendingMessage;
1032
1033/**
1034 * One possible hop towards a DV target.
1035 */
1036struct DistanceVectorHop;
1037
1038
1039/**
1040 * Data structure kept when we are waiting for an acknowledgement.
1041 */
1042struct PendingAcknowledgement
1043{
1044
1045 /**
1046 * If @e pm is non-NULL, this is the DLL in which this acknowledgement
1047 * is kept in relation to its pending message.
1048 */
1049 struct PendingAcknowledgement *next_pm;
1050
1051 /**
1052 * If @e pm is non-NULL, this is the DLL in which this acknowledgement
1053 * is kept in relation to its pending message.
1054 */
1055 struct PendingAcknowledgement *prev_pm;
1056
1057 /**
1058 * If @e queue is non-NULL, this is the DLL in which this acknowledgement
1059 * is kept in relation to the queue that was used to transmit the
1060 * @a pm.
1061 */
1062 struct PendingAcknowledgement *next_queue;
1063
1064 /**
1065 * If @e queue is non-NULL, this is the DLL in which this acknowledgement
1066 * is kept in relation to the queue that was used to transmit the
1067 * @a pm.
1068 */
1069 struct PendingAcknowledgement *prev_queue;
1070
1071 /**
1072 * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
1073 * is kept in relation to the DVH that was used to transmit the
1074 * @a pm.
1075 */
1076 struct PendingAcknowledgement *next_dvh;
1077
1078 /**
1079 * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
1080 * is kept in relation to the DVH that was used to transmit the
1081 * @a pm.
1082 */
1083 struct PendingAcknowledgement *prev_dvh;
1084
1085 /**
1086 * Pointers for the DLL of all pending acknowledgements.
1087 * This list is sorted by @e transmission time. If the list gets too
1088 * long, the oldest entries are discarded.
1089 */
1090 struct PendingAcknowledgement *next_pa;
1091
1092 /**
1093 * Pointers for the DLL of all pending acknowledgements.
1094 * This list is sorted by @e transmission time. If the list gets too
1095 * long, the oldest entries are discarded.
1096 */
1097 struct PendingAcknowledgement *prev_pa;
1098
1099 /**
1100 * Unique identifier for this transmission operation.
1101 */
1102 struct AcknowledgementUUIDP ack_uuid;
1103
1104 /**
1105 * Message that was transmitted, may be NULL if the message was ACKed
1106 * via another channel.
1107 */
1108 struct PendingMessage *pm;
1109
1110 /**
1111 * Distance vector path chosen for this transmission, NULL if transmission
1112 * was to a direct neighbour OR if the path was forgotten in the meantime.
1113 */
1114 struct DistanceVectorHop *dvh;
1115
1116 /**
1117 * Queue used for transmission, NULL if the queue has been destroyed
1118 * (which may happen before we get an acknowledgement).
1119 */
1120 struct Queue *queue;
1121
1122 /**
1123 * Time of the transmission, for RTT calculation.
1124 */
1125 struct GNUNET_TIME_Absolute transmission_time;
1126
1127 /**
1128 * Number of bytes of the original message (to calculate bandwidth).
1129 */
1130 uint16_t message_size;
1131};
1132
1133
1134/**
1042 * One possible hop towards a DV target. 1135 * One possible hop towards a DV target.
1043 */ 1136 */
1044struct DistanceVectorHop 1137struct DistanceVectorHop
@@ -1065,6 +1158,16 @@ struct DistanceVectorHop
1065 struct DistanceVectorHop *prev_neighbour; 1158 struct DistanceVectorHop *prev_neighbour;
1066 1159
1067 /** 1160 /**
1161 * Head of DLL of PAs that used our @a path.
1162 */
1163 struct PendingAcknowledgement *pa_head;
1164
1165 /**
1166 * Tail of DLL of PAs that used our @a path.
1167 */
1168 struct PendingAcknowledgement *pa_tail;
1169
1170 /**
1068 * What would be the next hop to @e target? 1171 * What would be the next hop to @e target?
1069 */ 1172 */
1070 struct Neighbour *next_hop; 1173 struct Neighbour *next_hop;
@@ -1162,17 +1265,6 @@ struct DistanceVector
1162 1265
1163 1266
1164/** 1267/**
1165 * A queue is a message queue provided by a communicator
1166 * via which we can reach a particular neighbour.
1167 */
1168struct Queue;
1169
1170/**
1171 * Message awaiting transmission. See detailed comments below.
1172 */
1173struct PendingMessage;
1174
1175/**
1176 * Entry identifying transmission in one of our `struct 1268 * Entry identifying transmission in one of our `struct
1177 * Queue` which still awaits an ACK. This is used to 1269 * Queue` which still awaits an ACK. This is used to
1178 * ensure we do not overwhelm a communicator and limit the number of 1270 * ensure we do not overwhelm a communicator and limit the number of
@@ -1238,6 +1330,16 @@ struct Queue
1238 struct Queue *next_client; 1330 struct Queue *next_client;
1239 1331
1240 /** 1332 /**
1333 * Head of DLL of PAs that used this queue.
1334 */
1335 struct PendingAcknowledgement *pa_head;
1336
1337 /**
1338 * Tail of DLL of PAs that used this queue.
1339 */
1340 struct PendingAcknowledgement *pa_tail;
1341
1342 /**
1241 * Head of DLL of unacked transmission requests. 1343 * Head of DLL of unacked transmission requests.
1242 */ 1344 */
1243 struct QueueEntry *queue_head; 1345 struct QueueEntry *queue_head;
@@ -1300,14 +1402,6 @@ struct Queue
1300 uint32_t qid; 1402 uint32_t qid;
1301 1403
1302 /** 1404 /**
1303 * UUID used to map acknowledgements back to the queue that
1304 * was used for transmission. Note that @e queue_uuid-s are
1305 * only unique per neighbour (generated via `queue_uuid_gen`
1306 * of `struct Neighbour`).
1307 */
1308 uint32_t queue_uuid;
1309
1310 /**
1311 * Maximum transmission unit supported by this queue. 1405 * Maximum transmission unit supported by this queue.
1312 */ 1406 */
1313 uint32_t mtu; 1407 uint32_t mtu;
@@ -1356,8 +1450,8 @@ struct ReassemblyContext
1356{ 1450{
1357 1451
1358 /** 1452 /**
1359 * Original message ID for of the message that all the 1453 * Original message ID for of the message that all the fragments
1360 * fragments belong to. 1454 * belong to.
1361 */ 1455 */
1362 struct MessageUUIDP msg_uuid; 1456 struct MessageUUIDP msg_uuid;
1363 1457
@@ -1393,36 +1487,12 @@ struct ReassemblyContext
1393 struct GNUNET_TIME_Absolute reassembly_timeout; 1487 struct GNUNET_TIME_Absolute reassembly_timeout;
1394 1488
1395 /** 1489 /**
1396 * Average delay of all acks in @e extra_acks and @e frag_uuid.
1397 * Should be reset to zero when @e num_acks is set to 0.
1398 */
1399 struct GNUNET_TIME_Relative avg_ack_delay;
1400
1401 /**
1402 * Time we received the last fragment. @e avg_ack_delay must be 1490 * Time we received the last fragment. @e avg_ack_delay must be
1403 * incremented by now - @e last_frag multiplied by @e num_acks. 1491 * incremented by now - @e last_frag multiplied by @e num_acks.
1404 */ 1492 */
1405 struct GNUNET_TIME_Absolute last_frag; 1493 struct GNUNET_TIME_Absolute last_frag;
1406 1494
1407 /** 1495 /**
1408 * Bitfield of up to 64 additional fragments following @e frag_uuid
1409 * to be acknowledged in the next cummulative ACK.
1410 */
1411 uint64_t extra_acks;
1412
1413 /**
1414 * Unique ID of the lowest fragment UUID to be acknowledged in the
1415 * next cummulative ACK. Only valid if @e num_acks > 0.
1416 */
1417 uint32_t frag_uuid;
1418
1419 /**
1420 * Number of ACKs we have accumulated so far. Reset to 0
1421 * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
1422 */
1423 unsigned int num_acks;
1424
1425 /**
1426 * How big is the message we are reassembling in total? 1496 * How big is the message we are reassembling in total?
1427 */ 1497 */
1428 uint16_t msg_size; 1498 uint16_t msg_size;
@@ -1527,26 +1597,6 @@ struct Neighbour
1527 struct GNUNET_TIME_Absolute earliest_timeout; 1597 struct GNUNET_TIME_Absolute earliest_timeout;
1528 1598
1529 /** 1599 /**
1530 * Incremented by one for each queue to generate
1531 * unique queue identifiers. Initially set to a random value.
1532 *
1533 * FIXME: Deal with wrap around (might be triggered by very
1534 * persistent adversary).
1535 */
1536 uint32_t queue_uuid_gen;
1537
1538 /**
1539 * Incremented by one for each message sent to this neighbour, to
1540 * uniquely identify that message in replies (note that fragments
1541 * use another additional counter). Initially set to a random value.
1542 *
1543 * It should be safe to assume that by the time this value may wrap
1544 * around, the original message is long "gone" and no longer
1545 * relevant.
1546 */
1547 uint32_t message_uuid_ctr;
1548
1549 /**
1550 * Do we have a confirmed working queue and are thus visible to 1600 * Do we have a confirmed working queue and are thus visible to
1551 * CORE? 1601 * CORE?
1552 */ 1602 */
@@ -1682,6 +1732,16 @@ struct PendingMessage
1682 struct PendingMessage *prev_frag; 1732 struct PendingMessage *prev_frag;
1683 1733
1684 /** 1734 /**
1735 * Head of DLL of PAs for this pending message.
1736 */
1737 struct PendingAcknowledgement *pa_head;
1738
1739 /**
1740 * Tail of DLL of PAs for this pending message.
1741 */
1742 struct PendingAcknowledgement *pa_tail;
1743
1744 /**
1685 * This message, reliability boxed. Only possibly available if @e pmt is 1745 * This message, reliability boxed. Only possibly available if @e pmt is
1686 * #PMT_CORE. 1746 * #PMT_CORE.
1687 */ 1747 */
@@ -1739,11 +1799,6 @@ struct PendingMessage
1739 struct MessageUUIDP msg_uuid; 1799 struct MessageUUIDP msg_uuid;
1740 1800
1741 /** 1801 /**
1742 * Counter incremented per generated fragment.
1743 */
1744 uint32_t frag_uuidgen;
1745
1746 /**
1747 * Type of the pending message. 1802 * Type of the pending message.
1748 */ 1803 */
1749 enum PendingMessageType pmt; 1804 enum PendingMessageType pmt;
@@ -1768,6 +1823,66 @@ struct PendingMessage
1768 1823
1769 1824
1770/** 1825/**
1826 * Acknowledgement payload.
1827 */
1828struct TransportCummulativeAckPayload
1829{
1830 /**
1831 * When did we receive the message we are ACKing? Used to calculate
1832 * the delay we introduced by cummulating ACKs.
1833 */
1834 struct GNUNET_TIME_Absolute receive_time;
1835
1836 /**
1837 * UUID of a message being acknowledged.
1838 */
1839 struct AcknowledgementUUIDP ack_uuid;
1840};
1841
1842
1843/**
1844 * Data structure in which we track acknowledgements still to
1845 * be sent to the
1846 */
1847struct AcknowledgementCummulator
1848{
1849 /**
1850 * Target peer for which we are accumulating ACKs here.
1851 */
1852 struct GNUNET_PeerIdentity target;
1853
1854 /**
1855 * ACK data being accumulated. Only @e num_acks slots are valid.
1856 */
1857 struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
1858
1859 /**
1860 * Task scheduled either to transmit the cummulative ACK message,
1861 * or to clean up this data structure after extended periods of
1862 * inactivity (if @e num_acks is zero).
1863 */
1864 struct GNUNET_SCHEDULER_Task *task;
1865
1866 /**
1867 * When is @e task run (only used if @e num_acks is non-zero)?
1868 */
1869 struct GNUNET_TIME_Absolute min_transmission_time;
1870
1871 /**
1872 * Counter to produce the `ack_counter` in the `struct
1873 * TransportReliabilityAckMessage`. Allows the receiver to detect
1874 * lost ACK messages. Incremented by @e num_acks upon transmission.
1875 */
1876 uint32_t ack_counter;
1877
1878 /**
1879 * Number of entries used in @e ack_uuids. Reset to 0 upon transmission.
1880 */
1881 unsigned int num_acks;
1882};
1883
1884
1885/**
1771 * One of the addresses of this peer. 1886 * One of the addresses of this peer.
1772 */ 1887 */
1773struct AddressListEntry 1888struct AddressListEntry
@@ -2165,6 +2280,18 @@ static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
2165static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers; 2280static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
2166 2281
2167/** 2282/**
2283 * Map from PIDs to `struct AcknowledgementCummulator`s.
2284 * Here we track the cummulative ACKs for transmission.
2285 */
2286static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
2287
2288/**
2289 * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
2290 * a `struct PendingAcknowledgement`.
2291 */
2292static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
2293
2294/**
2168 * Map from PIDs to `struct DistanceVector` entries describing 2295 * Map from PIDs to `struct DistanceVector` entries describing
2169 * known paths to the peer. 2296 * known paths to the peer.
2170 */ 2297 */
@@ -2233,6 +2360,61 @@ static struct GNUNET_SCHEDULER_Task *dvlearn_task;
2233 */ 2360 */
2234static struct GNUNET_SCHEDULER_Task *validation_task; 2361static struct GNUNET_SCHEDULER_Task *validation_task;
2235 2362
2363/**
2364 * The most recent PA we have created, head of DLL.
2365 * The length of the DLL is kept in #pa_count.
2366 */
2367static struct PendingAcknowledgement *pa_head;
2368
2369/**
2370 * The oldest PA we have created, tail of DLL.
2371 * The length of the DLL is kept in #pa_count.
2372 */
2373static struct PendingAcknowledgement *pa_tail;
2374
2375/**
2376 * Number of entries in the #pa_head/#pa_tail DLL. Used to
2377 * limit the size of the data structure.
2378 */
2379static unsigned int pa_count;
2380
2381
2382/**
2383 * Release @a pa data structure.
2384 *
2385 * @param pa data structure to release
2386 */
2387static void
2388free_pending_acknowledgement (struct PendingAcknowledgement *pa)
2389{
2390 struct Queue *q = pa->queue;
2391 struct PendingMessage *pm = pa->pm;
2392 struct DistanceVectorHop *dvh = pa->dvh;
2393
2394 GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
2395 pa_count--;
2396 if (NULL != q)
2397 {
2398 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
2399 pa->queue = NULL;
2400 }
2401 if (NULL != pm)
2402 {
2403 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2404 pa->pm = NULL;
2405 }
2406 if (NULL != dvh)
2407 {
2408 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2409 pa->queue = NULL;
2410 }
2411 GNUNET_assert (GNUNET_YES ==
2412 GNUNET_CONTAINER_multishortmap_remove (pending_acks,
2413 &pa->ack_uuid.value,
2414 pa));
2415 GNUNET_free (pa);
2416}
2417
2236 2418
2237/** 2419/**
2238 * Free cached ephemeral key. 2420 * Free cached ephemeral key.
@@ -2329,7 +2511,13 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
2329{ 2511{
2330 struct Neighbour *n = dvh->next_hop; 2512 struct Neighbour *n = dvh->next_hop;
2331 struct DistanceVector *dv = dvh->dv; 2513 struct DistanceVector *dv = dvh->dv;
2514 struct PendingAcknowledgement *pa;
2332 2515
2516 while (NULL != (pa = dvh->pa_head))
2517 {
2518 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2519 pa->dvh = NULL;
2520 }
2333 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh); 2521 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
2334 GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh); 2522 GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
2335 GNUNET_free (dvh); 2523 GNUNET_free (dvh);
@@ -2733,6 +2921,7 @@ free_queue (struct Queue *queue)
2733 .rtt = GNUNET_TIME_UNIT_FOREVER_REL}; 2921 .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
2734 struct QueueEntry *qe; 2922 struct QueueEntry *qe;
2735 int maxxed; 2923 int maxxed;
2924 struct PendingAcknowledgement *pa;
2736 2925
2737 if (NULL != queue->transmit_task) 2926 if (NULL != queue->transmit_task)
2738 { 2927 {
@@ -2744,6 +2933,12 @@ free_queue (struct Queue *queue)
2744 GNUNET_SCHEDULER_cancel (queue->visibility_task); 2933 GNUNET_SCHEDULER_cancel (queue->visibility_task);
2745 queue->visibility_task = NULL; 2934 queue->visibility_task = NULL;
2746 } 2935 }
2936 while (NULL != (pa = queue->pa_head))
2937 {
2938 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
2939 pa->queue = NULL;
2940 }
2941
2747 GNUNET_CONTAINER_MDLL_remove (neighbour, 2942 GNUNET_CONTAINER_MDLL_remove (neighbour,
2748 neighbour->queue_head, 2943 neighbour->queue_head,
2749 neighbour->queue_tail, 2944 neighbour->queue_tail,
@@ -3006,6 +3201,9 @@ check_client_send (void *cls, const struct OutboundMessage *obm)
3006 3201
3007/** 3202/**
3008 * Free fragment tree below @e root, excluding @e root itself. 3203 * Free fragment tree below @e root, excluding @e root itself.
3204 * FIXME: this does NOT seem to have the intended semantics
3205 * based on how this is called. Seems we generally DO expect
3206 * @a root to be free'ed itself as well!
3009 * 3207 *
3010 * @param root root of the tree to free 3208 * @param root root of the tree to free
3011 */ 3209 */
@@ -3016,7 +3214,14 @@ free_fragment_tree (struct PendingMessage *root)
3016 3214
3017 while (NULL != (frag = root->head_frag)) 3215 while (NULL != (frag = root->head_frag))
3018 { 3216 {
3217 struct PendingAcknowledgement *pa;
3218
3019 free_fragment_tree (frag); 3219 free_fragment_tree (frag);
3220 while (NULL != (pa = frag->pa_head))
3221 {
3222 GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
3223 pa->pm = NULL;
3224 }
3020 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag); 3225 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
3021 GNUNET_free (frag); 3226 GNUNET_free (frag);
3022 } 3227 }
@@ -3035,6 +3240,7 @@ free_pending_message (struct PendingMessage *pm)
3035{ 3240{
3036 struct TransportClient *tc = pm->client; 3241 struct TransportClient *tc = pm->client;
3037 struct Neighbour *target = pm->target; 3242 struct Neighbour *target = pm->target;
3243 struct PendingAcknowledgement *pa;
3038 3244
3039 if (NULL != tc) 3245 if (NULL != tc)
3040 { 3246 {
@@ -3047,6 +3253,12 @@ free_pending_message (struct PendingMessage *pm)
3047 target->pending_msg_head, 3253 target->pending_msg_head,
3048 target->pending_msg_tail, 3254 target->pending_msg_tail,
3049 pm); 3255 pm);
3256 while (NULL != (pa = pm->pa_head))
3257 {
3258 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3259 pa->pm = NULL;
3260 }
3261
3050 free_fragment_tree (pm); 3262 free_fragment_tree (pm);
3051 if (NULL != pm->qe) 3263 if (NULL != pm->qe)
3052 { 3264 {
@@ -4260,32 +4472,108 @@ check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4260 4472
4261 4473
4262/** 4474/**
4263 * Generate a fragment acknowledgement for an @a rc. 4475 * Clean up an idle cummulative acknowledgement data structure.
4264 * 4476 *
4265 * @param rc context to generate ACK for, @a rc ACK state is reset 4477 * @param cls a `struct AcknowledgementCummulator *`
4266 */ 4478 */
4267static void 4479static void
4268send_fragment_ack (struct ReassemblyContext *rc) 4480destroy_ack_cummulator (void *cls)
4269{ 4481{
4270 struct TransportFragmentAckMessage *ack; 4482 struct AcknowledgementCummulator *ac = cls;
4271 4483
4272 ack = GNUNET_new (struct TransportFragmentAckMessage); 4484 ac->task = NULL;
4273 ack->header.size = htons (sizeof (struct TransportFragmentAckMessage)); 4485 GNUNET_assert (0 == ac->num_acks);
4274 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK); 4486 GNUNET_assert (
4275 ack->frag_uuid.uuid = htonl (rc->frag_uuid); 4487 GNUNET_YES ==
4276 ack->extra_acks = GNUNET_htonll (rc->extra_acks); 4488 GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
4277 ack->msg_uuid = rc->msg_uuid; 4489 GNUNET_free (ac);
4278 ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay); 4490}
4279 if (0 == rc->msg_missing) 4491
4280 ack->reassembly_timeout = GNUNET_TIME_relative_hton ( 4492
4281 GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */ 4493/**
4494 * Do the transmission of a cummulative acknowledgement now.
4495 *
4496 * @param cls a `struct AcknowledgementCummulator *`
4497 */
4498static void
4499transmit_cummulative_ack_cb (void *cls)
4500{
4501 struct AcknowledgementCummulator *ac = cls;
4502 struct TransportReliabilityAckMessage *ack;
4503 struct TransportCummulativeAckPayloadP *ap;
4504
4505 ac->task = NULL;
4506 GNUNET_assert (0 < ac->ack_counter);
4507 ack = GNUNET_malloc (sizeof (*ack) +
4508 ac->ack_counter *
4509 sizeof (struct TransportCummulativeAckPayloadP));
4510 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
4511 ack->header.size =
4512 htons (sizeof (*ack) +
4513 ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
4514 ack->ack_counter = htonl (ac->ack_counter++);
4515 ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
4516 for (unsigned int i = 0; i < ac->ack_counter; i++)
4517 {
4518 ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
4519 ap[i].ack_delay = GNUNET_TIME_relative_hton (
4520 GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
4521 }
4522 route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
4523 ac->num_acks = 0;
4524 ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
4525 &destroy_ack_cummulator,
4526 ac);
4527}
4528
4529
4530/**
4531 * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
4532 * transmission by at most @a ack_delay.
4533 *
4534 * @param pid target peer
4535 * @param ack_uuid UUID to ack
4536 * @param max_delay how long can the ACK wait
4537 */
4538static void
4539cummulative_ack (const struct GNUNET_PeerIdentity *pid,
4540 const struct AcknowledgementUUIDP *ack_uuid,
4541 struct GNUNET_TIME_Absolute max_delay)
4542{
4543 struct AcknowledgementCummulator *ac;
4544
4545 ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
4546 if (NULL == ac)
4547 {
4548 ac = GNUNET_new (struct AcknowledgementCummulator);
4549 ac->target = *pid;
4550 ac->min_transmission_time = max_delay;
4551 GNUNET_assert (GNUNET_YES ==
4552 GNUNET_CONTAINER_multipeermap_put (
4553 ack_cummulators,
4554 &ac->target,
4555 ac,
4556 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4557 }
4282 else 4558 else
4283 ack->reassembly_timeout = GNUNET_TIME_relative_hton ( 4559 {
4284 GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)); 4560 if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
4285 route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED); 4561 {
4286 rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO; 4562 /* must run immediately, ack buffer full! */
4287 rc->num_acks = 0; 4563 GNUNET_SCHEDULER_cancel (ac->task);
4288 rc->extra_acks = 0LLU; 4564 transmit_cummulative_ack_cb (ac);
4565 }
4566 GNUNET_SCHEDULER_cancel (ac->task);
4567 ac->min_transmission_time =
4568 GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
4569 }
4570 GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
4571 ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
4572 ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
4573 ac->num_acks++;
4574 ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
4575 &transmit_cummulative_ack_cb,
4576 ac);
4289} 4577}
4290 4578
4291 4579
@@ -4348,10 +4636,8 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4348 uint16_t msize; 4636 uint16_t msize;
4349 uint16_t fsize; 4637 uint16_t fsize;
4350 uint16_t frag_off; 4638 uint16_t frag_off;
4351 uint32_t frag_uuid;
4352 char *target; 4639 char *target;
4353 struct GNUNET_TIME_Relative cdelay; 4640 struct GNUNET_TIME_Relative cdelay;
4354 int ack_now;
4355 struct FindByMessageUuidContext fc; 4641 struct FindByMessageUuidContext fc;
4356 4642
4357 n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); 4643 n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
@@ -4417,6 +4703,12 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4417 4703
4418 /* reassemble */ 4704 /* reassemble */
4419 fsize = ntohs (fb->header.size) - sizeof (*fb); 4705 fsize = ntohs (fb->header.size) - sizeof (*fb);
4706 if (0 == fsize)
4707 {
4708 GNUNET_break (0);
4709 finish_cmc_handling (cmc);
4710 return;
4711 }
4420 frag_off = ntohs (fb->frag_off); 4712 frag_off = ntohs (fb->frag_off);
4421 memcpy (&target[frag_off], &fb[1], fsize); 4713 memcpy (&target[frag_off], &fb[1], fsize);
4422 /* update bitfield and msg_missing */ 4714 /* update bitfield and msg_missing */
@@ -4430,60 +4722,17 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4430 } 4722 }
4431 4723
4432 /* Compute cummulative ACK */ 4724 /* Compute cummulative ACK */
4433 frag_uuid = ntohl (fb->frag_uuid.uuid);
4434 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag); 4725 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
4435 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks); 4726 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
4727 if (0 == rc->msg_missing)
4728 cdelay = GNUNET_TIME_UNIT_ZERO;
4729 cummulative_ack (&cmc->im.sender,
4730 &fb->ack_uuid,
4731 GNUNET_TIME_relative_to_absolute (cdelay));
4436 rc->last_frag = GNUNET_TIME_absolute_get (); 4732 rc->last_frag = GNUNET_TIME_absolute_get ();
4437 rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
4438 ack_now = GNUNET_NO;
4439 if (0 == rc->num_acks)
4440 {
4441 /* case one: first ack */
4442 rc->frag_uuid = frag_uuid;
4443 rc->extra_acks = 0LLU;
4444 rc->num_acks = 1;
4445 }
4446 else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64))
4447 {
4448 /* case two: ack fits after existing min UUID */
4449 if ((frag_uuid == rc->frag_uuid) ||
4450 (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))))
4451 {
4452 /* duplicate fragment, ack now! */
4453 ack_now = GNUNET_YES;
4454 }
4455 else
4456 {
4457 rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
4458 rc->num_acks++;
4459 }
4460 }
4461 else if ((rc->frag_uuid > frag_uuid) &&
4462 (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) ||
4463 ((rc->frag_uuid < frag_uuid + 64) &&
4464 (rc->extra_acks ==
4465 (rc->extra_acks &
4466 ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))))))
4467 {
4468 /* can fit ack by shifting extra acks and starting at
4469 frag_uid, test above esured that the bits we will
4470 shift 'extra_acks' by are all zero. */
4471 rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
4472 rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
4473 rc->frag_uuid = frag_uuid;
4474 rc->num_acks++;
4475 }
4476 if (65 == rc->num_acks) /* OPTIMIZE-FIXME: maybe use smaller threshold? This
4477 is very aggressive. */
4478 ack_now = GNUNET_YES; /* maximum acks received */
4479 // FIXME: possibly also ACK based on RTT (but for that we'd need to
4480 // determine the queue used for the ACK first!)
4481
4482 /* is reassembly complete? */ 4733 /* is reassembly complete? */
4483 if (0 != rc->msg_missing) 4734 if (0 != rc->msg_missing)
4484 { 4735 {
4485 if (ack_now)
4486 send_fragment_ack (rc);
4487 finish_cmc_handling (cmc); 4736 finish_cmc_handling (cmc);
4488 return; 4737 return;
4489 } 4738 }
@@ -4497,7 +4746,6 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4497 return; 4746 return;
4498 } 4747 }
4499 /* successful reassembly */ 4748 /* successful reassembly */
4500 send_fragment_ack (rc);
4501 demultiplex_with_cmc (cmc, msg); 4749 demultiplex_with_cmc (cmc, msg);
4502 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still 4750 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
4503 en-route and we forget that we finished this reassembly immediately! 4751 en-route and we forget that we finished this reassembly immediately!
@@ -4508,172 +4756,149 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
4508 4756
4509 4757
4510/** 4758/**
4511 * Check the @a fa against the fragments associated with @a pm. 4759 * Communicator gave us a reliability box. Check the message.
4512 * If it matches, remove the matching fragments from the transmission
4513 * list.
4514 * 4760 *
4515 * @param pm pending message to check against the ack 4761 * @param cls a `struct CommunicatorMessageContext`
4516 * @param fa the ack that was received 4762 * @param rb the send message that was sent
4517 * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not 4763 * @return #GNUNET_YES if message is well-formed
4518 */ 4764 */
4519static int 4765static int
4520check_ack_against_pm (struct PendingMessage *pm, 4766check_reliability_box (void *cls,
4521 const struct TransportFragmentAckMessage *fa) 4767 const struct TransportReliabilityBoxMessage *rb)
4522{ 4768{
4523 int match; 4769 GNUNET_MQ_check_boxed_message (rb);
4524 struct PendingMessage *nxt; 4770 return GNUNET_YES;
4525 uint32_t fs = ntohl (fa->frag_uuid.uuid);
4526 uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
4527
4528 match = GNUNET_NO;
4529 for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt)
4530 {
4531 const struct TransportFragmentBoxMessage *tfb =
4532 (const struct TransportFragmentBoxMessage *) &pm[1];
4533 uint32_t fu = ntohl (tfb->frag_uuid.uuid);
4534
4535 GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
4536 nxt = frag->next_frag;
4537 /* Check for exact match or match in the 'xtra' bitmask */
4538 if ((fu == fs) ||
4539 ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & xtra))))
4540 {
4541 match = GNUNET_YES;
4542 free_fragment_tree (frag);
4543 }
4544 }
4545 return match;
4546} 4771}
4547 4772
4548 4773
4549/** 4774/**
4550 * Communicator gave us a fragment acknowledgement. Process the request. 4775 * Communicator gave us a reliability box. Process the request.
4551 * 4776 *
4552 * @param cls a `struct CommunicatorMessageContext` (must call 4777 * @param cls a `struct CommunicatorMessageContext` (must call
4553 * #finish_cmc_handling() when done) 4778 * #finish_cmc_handling() when done)
4554 * @param fa the message that was received 4779 * @param rb the message that was received
4555 */ 4780 */
4556static void 4781static void
4557handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa) 4782handle_reliability_box (void *cls,
4783 const struct TransportReliabilityBoxMessage *rb)
4558{ 4784{
4559 struct CommunicatorMessageContext *cmc = cls; 4785 struct CommunicatorMessageContext *cmc = cls;
4560 struct Neighbour *n; 4786 const struct GNUNET_MessageHeader *inbox =
4561 int matched; 4787 (const struct GNUNET_MessageHeader *) &rb[1];
4562 4788
4563 n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); 4789 // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
4564 if (NULL == n) 4790 (void) (0 == ntohl (rb->ack_countdown));
4565 { 4791 /* continue with inner message */
4566 struct GNUNET_SERVICE_Client *client = cmc->tc->client; 4792 demultiplex_with_cmc (cmc, inbox);
4793}
4567 4794
4568 GNUNET_break (0); 4795
4569 finish_cmc_handling (cmc); 4796/**
4570 GNUNET_SERVICE_client_drop (client); 4797 * We have successfully transmitted data via @a q, update metrics.
4571 return; 4798 *
4572 } 4799 * @param q queue to update
4573 /* FIXME-OPTIMIZE: maybe use another hash map here? */ 4800 * @param rtt round trip time observed
4574 matched = GNUNET_NO; 4801 * @param bytes_transmitted_ok number of bytes successfully transmitted
4575 for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; 4802 */
4576 pm = pm->prev_neighbour) 4803static void
4577 { 4804update_queue_performance (struct Queue *q,
4578 if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid)) 4805 struct GNUNET_TIME_Relative rtt,
4579 continue; 4806 uint16_t bytes_transmitted_ok)
4580 matched = GNUNET_YES; 4807{
4581 if (GNUNET_YES == check_ack_against_pm (pm, fa)) 4808 // FIXME: implement!
4582 {
4583 struct GNUNET_TIME_Relative avg_ack_delay =
4584 GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
4585 // FIXME: update RTT and other reliability data!
4586 // ISSUE: we don't know which of n's queues the message(s)
4587 // took (and in fact the different messages might have gone
4588 // over different queues and possibly over multiple).
4589 // => track queues with PendingMessages, and update RTT only if
4590 // the queue used is unique?
4591 // -> how can we get loss rates?
4592 // -> or, add extra state to Box and ACK to identify queue?
4593 // IDEA: generate MULTIPLE frag-uuids per fragment and track
4594 // the queue with the fragment! (-> this logic must
4595 // be moved into check_ack_against_pm!)
4596 (void) avg_ack_delay;
4597 }
4598 else
4599 {
4600 GNUNET_STATISTICS_update (GST_stats,
4601 "# FRAGMENT_ACKS dropped, no matching fragment",
4602 1,
4603 GNUNET_NO);
4604 }
4605 if (NULL == pm->head_frag)
4606 {
4607 // if entire message is ACKed, handle that as well.
4608 // => clean up PM, any post actions?
4609 free_pending_message (pm);
4610 }
4611 else
4612 {
4613 struct GNUNET_TIME_Relative reassembly_timeout =
4614 GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
4615 // OPTIMIZE-FIXME: adjust retransmission strategy based on
4616 // reassembly_timeout!
4617 (void) reassembly_timeout;
4618 }
4619 break;
4620 }
4621 if (GNUNET_NO == matched)
4622 {
4623 GNUNET_STATISTICS_update (GST_stats,
4624 "# FRAGMENT_ACKS dropped, no matching pending message",
4625 1,
4626 GNUNET_NO);
4627 }
4628 finish_cmc_handling (cmc);
4629} 4809}
4630 4810
4631 4811
4632/** 4812/**
4633 * Communicator gave us a reliability box. Check the message. 4813 * We have successfully transmitted data via @a dvh, update metrics.
4634 * 4814 *
4635 * @param cls a `struct CommunicatorMessageContext` 4815 * @param dvh distance vector path data to update
4636 * @param rb the send message that was sent 4816 * @param rtt round trip time observed
4637 * @return #GNUNET_YES if message is well-formed 4817 * @param bytes_transmitted_ok number of bytes successfully transmitted
4638 */ 4818 */
4639static int 4819static void
4640check_reliability_box (void *cls, 4820update_dvh_performance (struct DistanceVectorHop *dvh,
4641 const struct TransportReliabilityBoxMessage *rb) 4821 struct GNUNET_TIME_Relative rtt,
4822 uint16_t bytes_transmitted_ok)
4642{ 4823{
4643 GNUNET_MQ_check_boxed_message (rb); 4824 // FIXME: implement!
4644 return GNUNET_YES;
4645} 4825}
4646 4826
4647 4827
4648/** 4828/**
4649 * Communicator gave us a reliability box. Process the request. 4829 * The @a pa was acknowledged, process the acknowledgement.
4650 * 4830 *
4651 * @param cls a `struct CommunicatorMessageContext` (must call 4831 * @param pa the pending acknowledgement that was satisfied
4652 * #finish_cmc_handling() when done) 4832 * @param ack_delay artificial delay from cummulative acks created by the other
4653 * @param rb the message that was received 4833 * peer
4654 */ 4834 */
4655static void 4835static void
4656handle_reliability_box (void *cls, 4836handle_acknowledged (struct PendingAcknowledgement *pa,
4657 const struct TransportReliabilityBoxMessage *rb) 4837 struct GNUNET_TIME_Relative ack_delay)
4658{ 4838{
4659 struct CommunicatorMessageContext *cmc = cls; 4839 struct PendingMessage *pm = pa->pm;
4660 const struct GNUNET_MessageHeader *inbox = 4840 struct GNUNET_TIME_Relative delay;
4661 (const struct GNUNET_MessageHeader *) &rb[1];
4662 4841
4663 if (0 == ntohl (rb->ack_countdown)) 4842 delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
4664 { 4843 if (delay.rel_value_us > ack_delay.rel_value_us)
4665 struct TransportReliabilityAckMessage *ack; 4844 delay = GNUNET_TIME_UNIT_ZERO;
4845 else
4846 delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
4847 if (NULL != pa->queue)
4848 update_queue_performance (pa->queue, delay, pa->message_size);
4849 if (NULL != pa->dvh)
4850 update_dvh_performance (pa->dvh, delay, pa->message_size);
4851 if (NULL != pm)
4852 {
4853 if (NULL != pm->frag_parent)
4854 {
4855 pm = pm->frag_parent;
4856 free_fragment_tree (pa->pm);
4857 }
4858 while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
4859 {
4860 struct PendingMessage *parent = pm->frag_parent;
4666 4861
4667 /* FIXME-OPTIMIZE: implement cummulative ACKs and ack_countdown, 4862 free_fragment_tree (pm);
4668 then setting the avg_ack_delay field below: */ 4863 pm = parent;
4669 ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct MessageUUIDP)); 4864 }
4670 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK); 4865 if (NULL != pm->head_frag)
4671 ack->header.size = htons (sizeof (*ack) + sizeof (struct MessageUUIDP)); 4866 pm = NULL; /* we are done, otherwise free 'pm' below */
4672 memcpy (&ack[1], &rb->msg_uuid, sizeof (struct MessageUUIDP));
4673 route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED);
4674 } 4867 }
4675 /* continue with inner message */ 4868 if (NULL != pm)
4676 demultiplex_with_cmc (cmc, inbox); 4869 free_pending_message (pm);
4870 free_pending_acknowledgement (pa);
4871}
4872
4873
4874/**
4875 * Communicator gave us a reliability ack. Check it is well-formed.
4876 *
4877 * @param cls a `struct CommunicatorMessageContext` (unused)
4878 * @param ra the message that was received
4879 * @return #GNUNET_Ok if @a ra is well-formed
4880 */
4881static int
4882check_reliability_ack (void *cls,
4883 const struct TransportReliabilityAckMessage *ra)
4884{
4885 unsigned int n_acks;
4886
4887 (void) cls;
4888 n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
4889 sizeof (struct TransportCummulativeAckPayloadP);
4890 if (0 == n_acks)
4891 {
4892 GNUNET_break_op (0);
4893 return GNUNET_SYSERR;
4894 }
4895 if ((ntohs (ra->header.size) - sizeof (*ra)) !=
4896 n_acks * sizeof (struct TransportCummulativeAckPayloadP))
4897 {
4898 GNUNET_break_op (0);
4899 return GNUNET_SYSERR;
4900 }
4901 return GNUNET_OK;
4677} 4902}
4678 4903
4679 4904
@@ -4689,70 +4914,33 @@ handle_reliability_ack (void *cls,
4689 const struct TransportReliabilityAckMessage *ra) 4914 const struct TransportReliabilityAckMessage *ra)
4690{ 4915{
4691 struct CommunicatorMessageContext *cmc = cls; 4916 struct CommunicatorMessageContext *cmc = cls;
4692 struct Neighbour *n; 4917 const struct TransportCummulativeAckPayloadP *ack;
4918 struct PendingAcknowledgement *pa;
4693 unsigned int n_acks; 4919 unsigned int n_acks;
4694 const struct MessageUUIDP *msg_uuids; 4920 uint32_t ack_counter;
4695 struct PendingMessage *nxt;
4696 int matched;
4697 4921
4698 n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); 4922 n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
4699 if (NULL == n) 4923 sizeof (struct TransportCummulativeAckPayloadP);
4924 ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
4925 for (unsigned int i = 0; i < n_acks; i++)
4700 { 4926 {
4701 struct GNUNET_SERVICE_Client *client = cmc->tc->client; 4927 pa =
4702 4928 GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
4703 GNUNET_break (0); 4929 if (NULL == pa)
4704 finish_cmc_handling (cmc);
4705 GNUNET_SERVICE_client_drop (client);
4706 return;
4707 }
4708 n_acks =
4709 (ntohs (ra->header.size) - sizeof (*ra)) / sizeof (struct MessageUUIDP);
4710 msg_uuids = (const struct MessageUUIDP *) &ra[1];
4711
4712 /* FIXME-OPTIMIZE: maybe use another hash map here? */
4713 matched = GNUNET_NO;
4714 for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt)
4715 {
4716 int in_list;
4717
4718 nxt = pm->next_neighbour;
4719 in_list = GNUNET_NO;
4720 for (unsigned int i = 0; i < n_acks; i++)
4721 { 4930 {
4722 if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid)) 4931 GNUNET_STATISTICS_update (
4723 continue; 4932 GST_stats,
4724 in_list = GNUNET_YES; 4933 "# FRAGMENT_ACKS dropped, no matching pending message",
4725 break; 4934 1,
4726 } 4935 GNUNET_NO);
4727 if (GNUNET_NO == in_list)
4728 continue; 4936 continue;
4729
4730 /* this pm was acked! */
4731 matched = GNUNET_YES;
4732 free_pending_message (pm);
4733
4734 {
4735 struct GNUNET_TIME_Relative avg_ack_delay =
4736 GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
4737 // FIXME: update RTT and other reliability data!
4738 // ISSUE: we don't know which of n's queues the message(s)
4739 // took (and in fact the different messages might have gone
4740 // over different queues and possibly over multiple).
4741 // => track queues with PendingMessages, and update RTT only if
4742 // the queue used is unique?
4743 // -> how can we get loss rates?
4744 // -> or, add extra state to MSG and ACKs to identify queue?
4745 // -> if we do this, might just do the same for the avg_ack_delay!
4746 (void) avg_ack_delay;
4747 } 4937 }
4938 handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
4748 } 4939 }
4749 if (GNUNET_NO == matched) 4940
4750 { 4941 ack_counter = htonl (ra->ack_counter);
4751 GNUNET_STATISTICS_update (GST_stats, 4942 // FIXME: track ACK losses based on ack_counter somewhere!
4752 "# FRAGMENT_ACKS dropped, no matching pending message", 4943 // (DV and/or Neighbour?)
4753 1,
4754 GNUNET_NO);
4755 }
4756 finish_cmc_handling (cmc); 4944 finish_cmc_handling (cmc);
4757} 4945}
4758 4946
@@ -4968,7 +5156,8 @@ backtalker_monotime_cb (void *cls,
4968 1, 5156 1,
4969 GNUNET_NO); 5157 GNUNET_NO);
4970 b->monotonic_time = mt; 5158 b->monotonic_time = mt;
4971 /* Setting body_size to 0 prevents call to #forward_backchannel_payload() */ 5159 /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
5160 */
4972 b->body_size = 0; 5161 b->body_size = 0;
4973 return; 5162 return;
4974 } 5163 }
@@ -5268,13 +5457,13 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
5268 * non-first hop is in our neighbour list (returning #GNUNET_SYSERR). 5457 * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
5269 * 5458 *
5270 * @param path the path we learned, path[0] should be us, 5459 * @param path the path we learned, path[0] should be us,
5271 * and then path contains a valid path from us to `path[path_len-1]` 5460 * and then path contains a valid path from us to
5272 * path[1] should be a direct neighbour (we should check!) 5461 * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
5273 * @param path_len number of entries on the @a path, at least three! 5462 * @param path_len number of entries on the @a path, at least three!
5274 * @param network_latency how long does the message take from us to 5463 * @param network_latency how long does the message take from us to
5275 * `path[path_len-1]`? set to "forever" if unknown 5464 * `path[path_len-1]`? set to "forever" if unknown
5276 * @param path_valid_until how long is this path considered validated? Maybe be 5465 * @param path_valid_until how long is this path considered validated? Maybe
5277 * zero. 5466 * be zero.
5278 * @return #GNUNET_YES on success, 5467 * @return #GNUNET_YES on success,
5279 * #GNUNET_NO if we have better path(s) to the target 5468 * #GNUNET_NO if we have better path(s) to the target
5280 * #GNUNET_SYSERR if the path is useless and/or invalid 5469 * #GNUNET_SYSERR if the path is useless and/or invalid
@@ -5603,7 +5792,8 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
5603 finish_cmc_handling (cmc); 5792 finish_cmc_handling (cmc);
5604 5793
5605 /* OPTIMIZE-FIXME: Technically, we only need to bother checking 5794 /* OPTIMIZE-FIXME: Technically, we only need to bother checking
5606 the initiator signature if we send the message back to the initiator... */ 5795 the initiator signature if we send the message back to the initiator...
5796 */
5607 if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator, 5797 if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator,
5608 &dvl->challenge, 5798 &dvl->challenge,
5609 &dvl->init_sig)) 5799 &dvl->init_sig))
@@ -6349,18 +6539,14 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
6349 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT, 6539 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
6350 struct TransportFragmentBoxMessage, 6540 struct TransportFragmentBoxMessage,
6351 &cmc), 6541 &cmc),
6352 GNUNET_MQ_hd_fixed_size (fragment_ack,
6353 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
6354 struct TransportFragmentAckMessage,
6355 &cmc),
6356 GNUNET_MQ_hd_var_size (reliability_box, 6542 GNUNET_MQ_hd_var_size (reliability_box,
6357 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX, 6543 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
6358 struct TransportReliabilityBoxMessage, 6544 struct TransportReliabilityBoxMessage,
6359 &cmc), 6545 &cmc),
6360 GNUNET_MQ_hd_fixed_size (reliability_ack, 6546 GNUNET_MQ_hd_var_size (reliability_ack,
6361 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK, 6547 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
6362 struct TransportReliabilityAckMessage, 6548 struct TransportReliabilityAckMessage,
6363 &cmc), 6549 &cmc),
6364 GNUNET_MQ_hd_var_size (backchannel_encapsulation, 6550 GNUNET_MQ_hd_var_size (backchannel_encapsulation,
6365 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION, 6551 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
6366 struct TransportBackchannelEncapsulationMessage, 6552 struct TransportBackchannelEncapsulationMessage,
@@ -6461,27 +6647,76 @@ set_pending_message_uuid (struct PendingMessage *pm)
6461 6647
6462 6648
6463/** 6649/**
6650 * Setup data structure waiting for acknowledgements.
6651 *
6652 * @param queue queue the @a pm will be sent over
6653 * @param dvh path the message will take, may be NULL
6654 * @param pm the pending message for transmission
6655 * @return corresponding fresh pending acknowledgement
6656 */
6657static struct PendingAcknowledgement *
6658prepare_pending_acknowledgement (struct Queue *queue,
6659 struct DistanceVectorHop *dvh,
6660 struct PendingMessage *pm)
6661{
6662 struct PendingAcknowledgement *pa;
6663
6664 pa = GNUNET_new (struct PendingAcknowledgement);
6665 pa->queue = queue;
6666 pa->dvh = dvh;
6667 pa->pm = pm;
6668 do
6669 {
6670 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6671 &pa->ack_uuid,
6672 sizeof (pa->ack_uuid));
6673 } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
6674 pending_acks,
6675 &pa->ack_uuid.value,
6676 pa,
6677 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6678 GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
6679 GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
6680 if (NULL != dvh)
6681 GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
6682 pa->transmission_time = GNUNET_TIME_absolute_get ();
6683 pa->message_size = pm->bytes_msg;
6684 return pa;
6685}
6686
6687
6688/**
6464 * Fragment the given @a pm to the given @a mtu. Adds 6689 * Fragment the given @a pm to the given @a mtu. Adds
6465 * additional fragments to the neighbour as well. If the 6690 * additional fragments to the neighbour as well. If the
6466 * @a mtu is too small, generates and error for the @a pm 6691 * @a mtu is too small, generates and error for the @a pm
6467 * and returns NULL. 6692 * and returns NULL.
6468 * 6693 *
6694 * @param queue which queue to fragment for
6695 * @param dvh path the message will take, or NULL
6469 * @param pm pending message to fragment for transmission 6696 * @param pm pending message to fragment for transmission
6470 * @param mtu MTU to apply
6471 * @return new message to transmit 6697 * @return new message to transmit
6472 */ 6698 */
6473static struct PendingMessage * 6699static struct PendingMessage *
6474fragment_message (struct PendingMessage *pm, uint16_t mtu) 6700fragment_message (struct Queue *queue,
6701 struct DistanceVectorHop *dvh,
6702 struct PendingMessage *pm)
6475{ 6703{
6704 struct PendingAcknowledgement *pa;
6476 struct PendingMessage *ff; 6705 struct PendingMessage *ff;
6706 uint16_t mtu;
6477 6707
6708 pa = prepare_pending_acknowledgement (queue, dvh, pm);
6709 mtu = (0 == queue->mtu)
6710 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
6711 : queue->mtu;
6478 set_pending_message_uuid (pm); 6712 set_pending_message_uuid (pm);
6479 6713
6480 /* This invariant is established in #handle_add_queue_message() */ 6714 /* This invariant is established in #handle_add_queue_message() */
6481 GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage)); 6715 GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
6482 6716
6483 /* select fragment for transmission, descending the tree if it has 6717 /* select fragment for transmission, descending the tree if it has
6484 been expanded until we are at a leaf or at a fragment that is small enough 6718 been expanded until we are at a leaf or at a fragment that is small
6719 enough
6485 */ 6720 */
6486 ff = pm; 6721 ff = pm;
6487 while (((ff->bytes_msg > mtu) || (pm == ff)) && 6722 while (((ff->bytes_msg > mtu) || (pm == ff)) &&
@@ -6527,7 +6762,7 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
6527 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); 6762 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
6528 tfb.header.size = 6763 tfb.header.size =
6529 htons (sizeof (struct TransportFragmentBoxMessage) + fragsize); 6764 htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
6530 tfb.frag_uuid.uuid = htonl (pm->frag_uuidgen++); 6765 tfb.ack_uuid = pa->ack_uuid;
6531 tfb.msg_uuid = pm->msg_uuid; 6766 tfb.msg_uuid = pm->msg_uuid;
6532 tfb.frag_off = htons (ff->frag_off + xoff); 6767 tfb.frag_off = htons (ff->frag_off + xoff);
6533 tfb.msg_size = htons (pm->bytes_msg); 6768 tfb.msg_size = htons (pm->bytes_msg);
@@ -6558,13 +6793,18 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
6558 * @a pm). If the @a pm is already fragmented or reliability boxed, 6793 * @a pm). If the @a pm is already fragmented or reliability boxed,
6559 * or itself an ACK, this function simply returns @a pm. 6794 * or itself an ACK, this function simply returns @a pm.
6560 * 6795 *
6796 * @param queue which queue to prepare transmission for
6797 * @param dvh path the message will take, or NULL
6561 * @param pm pending message to box for transmission over unreliabile queue 6798 * @param pm pending message to box for transmission over unreliabile queue
6562 * @return new message to transmit 6799 * @return new message to transmit
6563 */ 6800 */
6564static struct PendingMessage * 6801static struct PendingMessage *
6565reliability_box_message (struct PendingMessage *pm) 6802reliability_box_message (struct Queue *queue,
6803 struct DistanceVectorHop *dvh,
6804 struct PendingMessage *pm)
6566{ 6805{
6567 struct TransportReliabilityBoxMessage rbox; 6806 struct TransportReliabilityBoxMessage rbox;
6807 struct PendingAcknowledgement *pa;
6568 struct PendingMessage *bpm; 6808 struct PendingMessage *bpm;
6569 char *msg; 6809 char *msg;
6570 6810
@@ -6581,6 +6821,8 @@ reliability_box_message (struct PendingMessage *pm)
6581 client_send_response (pm, GNUNET_NO, 0); 6821 client_send_response (pm, GNUNET_NO, 0);
6582 return NULL; 6822 return NULL;
6583 } 6823 }
6824 pa = prepare_pending_acknowledgement (queue, dvh, pm);
6825
6584 bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) + 6826 bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
6585 pm->bytes_msg); 6827 pm->bytes_msg);
6586 bpm->target = pm->target; 6828 bpm->target = pm->target;
@@ -6593,7 +6835,8 @@ reliability_box_message (struct PendingMessage *pm)
6593 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX); 6835 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
6594 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg); 6836 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
6595 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support 6837 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
6596 rbox.msg_uuid = pm->msg_uuid; 6838
6839 rbox.ack_uuid = pa->ack_uuid;
6597 msg = (char *) &bpm[1]; 6840 msg = (char *) &bpm[1];
6598 memcpy (msg, &rbox, sizeof (rbox)); 6841 memcpy (msg, &rbox, sizeof (rbox));
6599 memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg); 6842 memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
@@ -6698,11 +6941,7 @@ transmit_on_queue (void *cls)
6698 (NULL != pm->head_frag /* fragments already exist, should 6941 (NULL != pm->head_frag /* fragments already exist, should
6699 respect that even if MTU is 0 for 6942 respect that even if MTU is 0 for
6700 this queue */) ) 6943 this queue */) )
6701 s = fragment_message (s, 6944 s = fragment_message (queue, NULL /*FIXME! */, s);
6702 (0 == queue->mtu)
6703 ? UINT16_MAX -
6704 sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
6705 : queue->mtu);
6706 if (NULL == s) 6945 if (NULL == s)
6707 { 6946 {
6708 /* Fragmentation failed, try next message... */ 6947 /* Fragmentation failed, try next message... */
@@ -6710,7 +6949,7 @@ transmit_on_queue (void *cls)
6710 return; 6949 return;
6711 } 6950 }
6712 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) 6951 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
6713 s = reliability_box_message (s); 6952 s = reliability_box_message (queue, NULL /* FIXME! */, s);
6714 if (NULL == s) 6953 if (NULL == s)
6715 { 6954 {
6716 /* Reliability boxing failed, try next message... */ 6955 /* Reliability boxing failed, try next message... */
@@ -7509,8 +7748,8 @@ handle_queue_create_ok (void *cls,
7509 7748
7510 7749
7511/** 7750/**
7512 * Communicator tells us that our request to create a queue failed. This usually 7751 * Communicator tells us that our request to create a queue failed. This
7513 * indicates that the provided address is simply invalid or that the 7752 * usually indicates that the provided address is simply invalid or that the
7514 * communicator's resources are exhausted. 7753 * communicator's resources are exhausted.
7515 * 7754 *
7516 * @param cls the `struct TransportClient` 7755 * @param cls the `struct TransportClient`
@@ -7803,7 +8042,8 @@ handle_address_consider_verify (
7803 (void) cls; 8042 (void) cls;
7804 // OPTIMIZE-FIXME: checking that we know this address already should 8043 // OPTIMIZE-FIXME: checking that we know this address already should
7805 // be done BEFORE checking the signature => HELLO API change! 8044 // be done BEFORE checking the signature => HELLO API change!
7806 // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?! 8045 // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
8046 // validation?!
7807 address = 8047 address =
7808 GNUNET_HELLO_extract_address (&hdr[1], 8048 GNUNET_HELLO_extract_address (&hdr[1],
7809 ntohs (hdr->header.size) - sizeof (*hdr), 8049 ntohs (hdr->header.size) - sizeof (*hdr),
@@ -7951,6 +8191,50 @@ free_validation_state_cb (void *cls,
7951 8191
7952 8192
7953/** 8193/**
8194 * Free pending acknowledgement.
8195 *
8196 * @param cls NULL
8197 * @param key unused
8198 * @param value a `struct PendingAcknowledgement`
8199 * @return #GNUNET_OK (always)
8200 */
8201static int
8202free_pending_ack_cb (void *cls,
8203 const struct GNUNET_ShortHashCode *key,
8204 void *value)
8205{
8206 struct PendingAcknowledgement *pa = value;
8207
8208 (void) cls;
8209 (void) key;
8210 free_pending_acknowledgement (pa);
8211 return GNUNET_OK;
8212}
8213
8214
8215/**
8216 * Free acknowledgement cummulator.
8217 *
8218 * @param cls NULL
8219 * @param pid unused
8220 * @param value a `struct AcknowledgementCummulator`
8221 * @return #GNUNET_OK (always)
8222 */
8223static int
8224free_ack_cummulator_cb (void *cls,
8225 const struct GNUNET_PeerIdentity *pid,
8226 void *value)
8227{
8228 struct AcknowledgementCummulator *ac = value;
8229
8230 (void) cls;
8231 (void) pid;
8232 GNUNET_free (ac);
8233 return GNUNET_OK;
8234}
8235
8236
8237/**
7954 * Function called when the service shuts down. Unloads our plugins 8238 * Function called when the service shuts down. Unloads our plugins
7955 * and cancels pending validations. 8239 * and cancels pending validations.
7956 * 8240 *
@@ -7983,6 +8267,16 @@ do_shutdown (void *cls)
7983 GNUNET_free (GST_my_private_key); 8267 GNUNET_free (GST_my_private_key);
7984 GST_my_private_key = NULL; 8268 GST_my_private_key = NULL;
7985 } 8269 }
8270 GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
8271 &free_ack_cummulator_cb,
8272 NULL);
8273 GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
8274 ack_cummulators = NULL;
8275 GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
8276 &free_pending_ack_cb,
8277 NULL);
8278 GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
8279 pending_acks = NULL;
7986 GNUNET_CONTAINER_multipeermap_destroy (neighbours); 8280 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
7987 neighbours = NULL; 8281 neighbours = NULL;
7988 GNUNET_CONTAINER_multipeermap_iterate (backtalkers, 8282 GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
@@ -8034,6 +8328,8 @@ run (void *cls,
8034 /* setup globals */ 8328 /* setup globals */
8035 GST_cfg = c; 8329 GST_cfg = c;
8036 backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); 8330 backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
8331 pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
8332 ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
8037 neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); 8333 neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
8038 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); 8334 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
8039 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); 8335 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);