diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-01-25 19:29:45 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-01-25 19:29:45 +0100 |
commit | 80d6555ec30182b9a8a59778339f5cbe7929ce60 (patch) | |
tree | d79b713b80a4fef3edc1f0483c738f61aabdcb88 /src/cadet/gnunet-service-cadet-new_channel.c | |
parent | 3071beacd58c57edba1cb8b392afe6873560c676 (diff) | |
download | gnunet-80d6555ec30182b9a8a59778339f5cbe7929ce60.tar.gz gnunet-80d6555ec30182b9a8a59778339f5cbe7929ce60.zip |
towards proper DATA_ACK handling
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_channel.c | 263 |
1 files changed, 150 insertions, 113 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index dc3d4352c..e561f1992 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c | |||
@@ -25,6 +25,7 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | 26 | * |
27 | * TODO: | 27 | * TODO: |
28 | * - Optimize ACKs by using 'mid_futures' properly! | ||
28 | * - introduce shutdown so we can have half-closed channels, modify | 29 | * - introduce shutdown so we can have half-closed channels, modify |
29 | * destroy to include MID to have FIN-ACK equivalents, etc. | 30 | * destroy to include MID to have FIN-ACK equivalents, etc. |
30 | * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! | 31 | * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! |
@@ -287,6 +288,8 @@ struct CadetChannel | |||
287 | 288 | ||
288 | /** | 289 | /** |
289 | * Bitfield of already-received messages past @e mid_recv. | 290 | * Bitfield of already-received messages past @e mid_recv. |
291 | * | ||
292 | * FIXME: not yet properly used (bits here are never set!) | ||
290 | */ | 293 | */ |
291 | uint64_t mid_futures; | 294 | uint64_t mid_futures; |
292 | 295 | ||
@@ -769,6 +772,11 @@ send_channel_data_ack (struct CadetChannel *ch) | |||
769 | msg.futures = GNUNET_htonll (ch->mid_futures); | 772 | msg.futures = GNUNET_htonll (ch->mid_futures); |
770 | if (NULL != ch->last_control_qe) | 773 | if (NULL != ch->last_control_qe) |
771 | GCT_send_cancel (ch->last_control_qe); | 774 | GCT_send_cancel (ch->last_control_qe); |
775 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
776 | "Sending DATA_ACK %u:%llX via %s\n", | ||
777 | (unsigned int) ntohl (msg.mid.mid), | ||
778 | (unsigned long long) ch->mid_futures, | ||
779 | GCCH_2s (ch)); | ||
772 | ch->last_control_qe = GCT_send (ch->t, | 780 | ch->last_control_qe = GCT_send (ch->t, |
773 | &msg.header, | 781 | &msg.header, |
774 | &send_ack_cb, | 782 | &send_ack_cb, |
@@ -1076,23 +1084,11 @@ is_before (void *cls, | |||
1076 | if (delta > (uint32_t) INT_MAX) | 1084 | if (delta > (uint32_t) INT_MAX) |
1077 | { | 1085 | { |
1078 | /* in overflow range, we can safely assume we wrapped around */ | 1086 | /* in overflow range, we can safely assume we wrapped around */ |
1079 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1080 | "%u > %u => %p > %p\n", | ||
1081 | (unsigned int) v1, | ||
1082 | (unsigned int) v2, | ||
1083 | m1, | ||
1084 | m2); | ||
1085 | return GNUNET_NO; | 1087 | return GNUNET_NO; |
1086 | } | 1088 | } |
1087 | else | 1089 | else |
1088 | { | 1090 | { |
1089 | /* result is small, thus v2 > v1, thus e1 < e2 */ | 1091 | /* result is small, thus v2 > v1, thus e1 < e2 */ |
1090 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1091 | "%u < %u => %p < %p\n", | ||
1092 | (unsigned int) v1, | ||
1093 | (unsigned int) v2, | ||
1094 | m1, | ||
1095 | m2); | ||
1096 | return GNUNET_YES; | 1092 | return GNUNET_YES; |
1097 | } | 1093 | } |
1098 | } | 1094 | } |
@@ -1113,6 +1109,11 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1113 | struct GNUNET_CADET_LocalData *ld; | 1109 | struct GNUNET_CADET_LocalData *ld; |
1114 | struct CadetChannelClient *ccc; | 1110 | struct CadetChannelClient *ccc; |
1115 | size_t payload_size; | 1111 | size_t payload_size; |
1112 | struct CadetOutOfOrderMessage *com; | ||
1113 | int duplicate; | ||
1114 | uint32_t mid_min; | ||
1115 | uint32_t mid_max; | ||
1116 | uint32_t mid_msg; | ||
1116 | 1117 | ||
1117 | GNUNET_assert (GNUNET_NO == ch->is_loopback); | 1118 | GNUNET_assert (GNUNET_NO == ch->is_loopback); |
1118 | if ( (GNUNET_YES == ch->destroy) && | 1119 | if ( (GNUNET_YES == ch->destroy) && |
@@ -1153,72 +1154,75 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1153 | env); | 1154 | env); |
1154 | ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); | 1155 | ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); |
1155 | ch->mid_futures >>= 1; | 1156 | ch->mid_futures >>= 1; |
1157 | if (GNUNET_YES == ch->reliable) | ||
1158 | send_channel_data_ack (ch); | ||
1159 | return; | ||
1156 | } | 1160 | } |
1157 | else | 1161 | |
1162 | /* check if message ought to be dropped because it is anicent/too distant/duplicate */ | ||
1163 | mid_min = ntohl (ch->mid_recv.mid); | ||
1164 | mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; | ||
1165 | mid_msg = ntohl (msg->mid.mid); | ||
1166 | if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || | ||
1167 | ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) | ||
1158 | { | 1168 | { |
1159 | struct CadetOutOfOrderMessage *com; | 1169 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1160 | int duplicate; | 1170 | "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n", |
1161 | uint32_t mid_min; | 1171 | (unsigned int) payload_size, |
1162 | uint32_t mid_max; | 1172 | GCCH_2s (ch), |
1163 | uint32_t mid_msg; | 1173 | ntohl (msg->mid.mid)); |
1164 | 1174 | GNUNET_STATISTICS_update (stats, | |
1165 | mid_min = ntohl (ch->mid_recv.mid); | 1175 | "# duplicate DATA (ancient or future)", |
1166 | mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; | 1176 | 1, |
1167 | mid_msg = ntohl (msg->mid.mid); | 1177 | GNUNET_NO); |
1168 | if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || | 1178 | GNUNET_MQ_discard (env); |
1169 | ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) | 1179 | return; |
1170 | { | 1180 | } |
1171 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1172 | "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n", | ||
1173 | (unsigned int) payload_size, | ||
1174 | GCCH_2s (ch), | ||
1175 | ntohl (msg->mid.mid)); | ||
1176 | GNUNET_STATISTICS_update (stats, | ||
1177 | "# duplicate DATA (ancient or future)", | ||
1178 | 1, | ||
1179 | GNUNET_NO); | ||
1180 | GNUNET_MQ_discard (env); | ||
1181 | return; | ||
1182 | } | ||
1183 | 1181 | ||
1184 | com = GNUNET_new (struct CadetOutOfOrderMessage); | 1182 | /* Insert message into sorted out-of-order queue */ |
1185 | com->mid = msg->mid; | 1183 | com = GNUNET_new (struct CadetOutOfOrderMessage); |
1186 | com->env = env; | 1184 | com->mid = msg->mid; |
1187 | duplicate = GNUNET_NO; | 1185 | com->env = env; |
1188 | GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, | 1186 | duplicate = GNUNET_NO; |
1189 | is_before, | 1187 | GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, |
1190 | &duplicate, | 1188 | is_before, |
1191 | ccc->head_recv, | 1189 | &duplicate, |
1192 | ccc->tail_recv, | 1190 | ccc->head_recv, |
1193 | com); | 1191 | ccc->tail_recv, |
1194 | if (GNUNET_YES == duplicate) | 1192 | com); |
1195 | { | 1193 | if (GNUNET_YES == duplicate) |
1196 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1194 | { |
1197 | "Duplicate payload of %u bytes on %s (mid %u) dropped\n", | 1195 | /* Duplicate within the queue, drop also */ |
1198 | (unsigned int) payload_size, | ||
1199 | GCCH_2s (ch), | ||
1200 | ntohl (msg->mid.mid)); | ||
1201 | GNUNET_STATISTICS_update (stats, | ||
1202 | "# duplicate DATA", | ||
1203 | 1, | ||
1204 | GNUNET_NO); | ||
1205 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, | ||
1206 | ccc->tail_recv, | ||
1207 | com); | ||
1208 | GNUNET_MQ_discard (com->env); | ||
1209 | GNUNET_free (com); | ||
1210 | return; | ||
1211 | } | ||
1212 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1196 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1213 | "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n", | 1197 | "Duplicate payload of %u bytes on %s (mid %u) dropped\n", |
1214 | (GNUNET_YES == ccc->client_ready) | ||
1215 | ? "out-of-order" | ||
1216 | : "client-not-ready", | ||
1217 | (unsigned int) payload_size, | 1198 | (unsigned int) payload_size, |
1218 | GCCH_2s (ch), | 1199 | GCCH_2s (ch), |
1219 | ntohl (msg->mid.mid), | 1200 | ntohl (msg->mid.mid)); |
1220 | ntohl (ch->mid_recv.mid)); | 1201 | GNUNET_STATISTICS_update (stats, |
1202 | "# duplicate DATA", | ||
1203 | 1, | ||
1204 | GNUNET_NO); | ||
1205 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, | ||
1206 | ccc->tail_recv, | ||
1207 | com); | ||
1208 | GNUNET_MQ_discard (com->env); | ||
1209 | GNUNET_free (com); | ||
1210 | if (GNUNET_YES == ch->reliable) | ||
1211 | send_channel_data_ack (ch); | ||
1212 | return; | ||
1221 | } | 1213 | } |
1214 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1215 | "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n", | ||
1216 | (GNUNET_YES == ccc->client_ready) | ||
1217 | ? "out-of-order" | ||
1218 | : "client-not-ready", | ||
1219 | (unsigned int) payload_size, | ||
1220 | GCCH_2s (ch), | ||
1221 | ntohl (ccc->ccn.channel_of_client), | ||
1222 | ccc, | ||
1223 | ntohl (msg->mid.mid), | ||
1224 | ntohl (ch->mid_recv.mid)); | ||
1225 | send_channel_data_ack (ch); | ||
1222 | } | 1226 | } |
1223 | 1227 | ||
1224 | 1228 | ||
@@ -1261,6 +1265,36 @@ retry_transmission (void *cls) | |||
1261 | 1265 | ||
1262 | 1266 | ||
1263 | /** | 1267 | /** |
1268 | * We got an ACK for a message in our queue, remove it from | ||
1269 | * the queue and tell our client that it can send more. | ||
1270 | * | ||
1271 | * @param ch the channel that got the ACK | ||
1272 | * @param crm the message that got acknowledged | ||
1273 | */ | ||
1274 | static void | ||
1275 | handle_matching_ack (struct CadetChannel *ch, | ||
1276 | struct CadetReliableMessage *crm) | ||
1277 | { | ||
1278 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, | ||
1279 | ch->tail_sent, | ||
1280 | crm); | ||
1281 | ch->pending_messages--; | ||
1282 | GNUNET_assert (ch->pending_messages < ch->max_pending_messages); | ||
1283 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1284 | "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", | ||
1285 | GCCH_2s (ch), | ||
1286 | (unsigned int) ntohl (crm->data_message->mid.mid), | ||
1287 | ch->pending_messages); | ||
1288 | GNUNET_free (crm->data_message); | ||
1289 | GNUNET_free (crm); | ||
1290 | send_ack_to_client (ch, | ||
1291 | (NULL == ch->owner) | ||
1292 | ? GNUNET_NO | ||
1293 | : GNUNET_YES); | ||
1294 | } | ||
1295 | |||
1296 | |||
1297 | /** | ||
1264 | * We got an acknowledgement for payload data for a channel. | 1298 | * We got an acknowledgement for payload data for a channel. |
1265 | * Possibly resume transmissions. | 1299 | * Possibly resume transmissions. |
1266 | * | 1300 | * |
@@ -1272,7 +1306,11 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, | |||
1272 | const struct GNUNET_CADET_ChannelDataAckMessage *ack) | 1306 | const struct GNUNET_CADET_ChannelDataAckMessage *ack) |
1273 | { | 1307 | { |
1274 | struct CadetReliableMessage *crm; | 1308 | struct CadetReliableMessage *crm; |
1275 | int was_head; | 1309 | struct CadetReliableMessage *crmn; |
1310 | int found; | ||
1311 | uint32_t mid_base; | ||
1312 | uint64_t mid_mask; | ||
1313 | unsigned int delta; | ||
1276 | 1314 | ||
1277 | GNUNET_break (GNUNET_NO == ch->is_loopback); | 1315 | GNUNET_break (GNUNET_NO == ch->is_loopback); |
1278 | if (GNUNET_NO == ch->reliable) | 1316 | if (GNUNET_NO == ch->reliable) |
@@ -1281,12 +1319,28 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, | |||
1281 | GNUNET_break_op (0); | 1319 | GNUNET_break_op (0); |
1282 | return; | 1320 | return; |
1283 | } | 1321 | } |
1322 | mid_base = ntohl (ack->mid.mid); | ||
1323 | mid_mask = GNUNET_htonll (ack->futures); | ||
1324 | found = GNUNET_NO; | ||
1284 | for (crm = ch->head_sent; | 1325 | for (crm = ch->head_sent; |
1285 | NULL != crm; | 1326 | NULL != crm; |
1286 | crm = crm->next) | 1327 | crm = crmn) |
1328 | { | ||
1329 | crmn = crm->next; | ||
1287 | if (ack->mid.mid == crm->data_message->mid.mid) | 1330 | if (ack->mid.mid == crm->data_message->mid.mid) |
1288 | break; | 1331 | { |
1289 | if (NULL == crm) | 1332 | handle_matching_ack (ch, |
1333 | crm); | ||
1334 | continue; | ||
1335 | } | ||
1336 | delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1; | ||
1337 | if (delta >= 64) | ||
1338 | continue; | ||
1339 | if (0 != (mid_mask & (1LLU << delta))) | ||
1340 | handle_matching_ack (ch, | ||
1341 | crm); | ||
1342 | } | ||
1343 | if (GNUNET_NO == found) | ||
1290 | { | 1344 | { |
1291 | /* ACK for message we already dropped, might have been a | 1345 | /* ACK for message we already dropped, might have been a |
1292 | duplicate ACK? Ignore. */ | 1346 | duplicate ACK? Ignore. */ |
@@ -1299,36 +1353,16 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, | |||
1299 | GNUNET_NO); | 1353 | GNUNET_NO); |
1300 | return; | 1354 | return; |
1301 | } | 1355 | } |
1302 | was_head = (crm == ch->head_sent); | 1356 | if (NULL != ch->retry_data_task) |
1303 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, | ||
1304 | ch->tail_sent, | ||
1305 | crm); | ||
1306 | GNUNET_free (crm->data_message); | ||
1307 | GNUNET_free (crm); | ||
1308 | ch->pending_messages--; | ||
1309 | GNUNET_assert (ch->pending_messages < ch->max_pending_messages); | ||
1310 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1311 | "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", | ||
1312 | GCCH_2s (ch), | ||
1313 | (unsigned int) ntohl (ack->mid.mid), | ||
1314 | ch->pending_messages); | ||
1315 | send_ack_to_client (ch, | ||
1316 | (NULL == ch->owner) | ||
1317 | ? GNUNET_NO | ||
1318 | : GNUNET_YES); | ||
1319 | if (was_head) | ||
1320 | { | 1357 | { |
1321 | if (NULL != ch->retry_data_task) | 1358 | GNUNET_SCHEDULER_cancel (ch->retry_data_task); |
1322 | { | 1359 | ch->retry_data_task = NULL; |
1323 | GNUNET_SCHEDULER_cancel (ch->retry_data_task); | ||
1324 | ch->retry_data_task = NULL; | ||
1325 | } | ||
1326 | if (NULL != ch->head_sent) | ||
1327 | ch->retry_data_task | ||
1328 | = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, | ||
1329 | &retry_transmission, | ||
1330 | ch); | ||
1331 | } | 1360 | } |
1361 | if (NULL != ch->head_sent) | ||
1362 | ch->retry_data_task | ||
1363 | = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry, | ||
1364 | &retry_transmission, | ||
1365 | ch); | ||
1332 | } | 1366 | } |
1333 | 1367 | ||
1334 | 1368 | ||
@@ -1591,10 +1625,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch, | |||
1591 | if (NULL == com) | 1625 | if (NULL == com) |
1592 | { | 1626 | { |
1593 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1627 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1594 | "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on %s)!\n", | 1628 | "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n", |
1595 | GSC_2s (ccc->c), | 1629 | GSC_2s (ccc->c), |
1630 | ntohl (client_ccn.channel_of_client), | ||
1631 | GCCH_2s (ch), | ||
1596 | ntohl (ccc->ccn.channel_of_client), | 1632 | ntohl (ccc->ccn.channel_of_client), |
1597 | GCCH_2s (ch)); | 1633 | ccc); |
1598 | return; /* none pending */ | 1634 | return; /* none pending */ |
1599 | } | 1635 | } |
1600 | if (GNUNET_YES == ch->is_loopback) | 1636 | if (GNUNET_YES == ch->is_loopback) |
@@ -1637,11 +1673,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch, | |||
1637 | return; /* missing next one in-order */ | 1673 | return; /* missing next one in-order */ |
1638 | } | 1674 | } |
1639 | 1675 | ||
1640 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1676 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1641 | "Got LOCAL ACK, passing payload message to %s-%X on %s\n", | 1677 | "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n", |
1642 | GSC_2s (ccc->c), | 1678 | ntohl (com->mid.mid), |
1643 | ntohl (ccc->ccn.channel_of_client), | 1679 | GSC_2s (ccc->c), |
1644 | GCCH_2s (ch)); | 1680 | ntohl (ccc->ccn.channel_of_client), |
1681 | GCCH_2s (ch)); | ||
1645 | 1682 | ||
1646 | /* all good, pass next message to client */ | 1683 | /* all good, pass next message to client */ |
1647 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, | 1684 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, |
@@ -1663,9 +1700,9 @@ GCCH_handle_local_ack (struct CadetChannel *ch, | |||
1663 | urgently waiting for an ACK from us. (As we have an inherent | 1700 | urgently waiting for an ACK from us. (As we have an inherent |
1664 | maximum of 64 bits, and 15 is getting too close for comfort.) | 1701 | maximum of 64 bits, and 15 is getting too close for comfort.) |
1665 | So we should send one now. */ | 1702 | So we should send one now. */ |
1666 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1703 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1667 | "Sender on %s likely blocked on flow-control, sending ACK now.\n", | 1704 | "Sender on %s likely blocked on flow-control, sending ACK now.\n", |
1668 | GCCH_2s (ch)); | 1705 | GCCH_2s (ch)); |
1669 | if (GNUNET_YES == ch->reliable) | 1706 | if (GNUNET_YES == ch->reliable) |
1670 | send_channel_data_ack (ch); | 1707 | send_channel_data_ack (ch); |
1671 | } | 1708 | } |