aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_channel.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-01-25 19:29:45 +0100
committerChristian Grothoff <christian@grothoff.org>2017-01-25 19:29:45 +0100
commit80d6555ec30182b9a8a59778339f5cbe7929ce60 (patch)
treed79b713b80a4fef3edc1f0483c738f61aabdcb88 /src/cadet/gnunet-service-cadet-new_channel.c
parent3071beacd58c57edba1cb8b392afe6873560c676 (diff)
downloadgnunet-80d6555ec30182b9a8a59778339f5cbe7929ce60.tar.gz
gnunet-80d6555ec30182b9a8a59778339f5cbe7929ce60.zip
towards proper DATA_ACK handling
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c263
1 files changed, 150 insertions, 113 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
index dc3d4352c..e561f1992 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,6 +25,7 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * TODO: 27 * TODO:
28 * - Optimize ACKs by using 'mid_futures' properly!
28 * - introduce shutdown so we can have half-closed channels, modify 29 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc. 30 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! 31 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
@@ -287,6 +288,8 @@ struct CadetChannel
287 288
288 /** 289 /**
289 * Bitfield of already-received messages past @e mid_recv. 290 * Bitfield of already-received messages past @e mid_recv.
291 *
292 * FIXME: not yet properly used (bits here are never set!)
290 */ 293 */
291 uint64_t mid_futures; 294 uint64_t mid_futures;
292 295
@@ -769,6 +772,11 @@ send_channel_data_ack (struct CadetChannel *ch)
769 msg.futures = GNUNET_htonll (ch->mid_futures); 772 msg.futures = GNUNET_htonll (ch->mid_futures);
770 if (NULL != ch->last_control_qe) 773 if (NULL != ch->last_control_qe)
771 GCT_send_cancel (ch->last_control_qe); 774 GCT_send_cancel (ch->last_control_qe);
775 LOG (GNUNET_ERROR_TYPE_DEBUG,
776 "Sending DATA_ACK %u:%llX via %s\n",
777 (unsigned int) ntohl (msg.mid.mid),
778 (unsigned long long) ch->mid_futures,
779 GCCH_2s (ch));
772 ch->last_control_qe = GCT_send (ch->t, 780 ch->last_control_qe = GCT_send (ch->t,
773 &msg.header, 781 &msg.header,
774 &send_ack_cb, 782 &send_ack_cb,
@@ -1076,23 +1084,11 @@ is_before (void *cls,
1076 if (delta > (uint32_t) INT_MAX) 1084 if (delta > (uint32_t) INT_MAX)
1077 { 1085 {
1078 /* in overflow range, we can safely assume we wrapped around */ 1086 /* in overflow range, we can safely assume we wrapped around */
1079 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080 "%u > %u => %p > %p\n",
1081 (unsigned int) v1,
1082 (unsigned int) v2,
1083 m1,
1084 m2);
1085 return GNUNET_NO; 1087 return GNUNET_NO;
1086 } 1088 }
1087 else 1089 else
1088 { 1090 {
1089 /* result is small, thus v2 > v1, thus e1 < e2 */ 1091 /* result is small, thus v2 > v1, thus e1 < e2 */
1090 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1091 "%u < %u => %p < %p\n",
1092 (unsigned int) v1,
1093 (unsigned int) v2,
1094 m1,
1095 m2);
1096 return GNUNET_YES; 1092 return GNUNET_YES;
1097 } 1093 }
1098} 1094}
@@ -1113,6 +1109,11 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1113 struct GNUNET_CADET_LocalData *ld; 1109 struct GNUNET_CADET_LocalData *ld;
1114 struct CadetChannelClient *ccc; 1110 struct CadetChannelClient *ccc;
1115 size_t payload_size; 1111 size_t payload_size;
1112 struct CadetOutOfOrderMessage *com;
1113 int duplicate;
1114 uint32_t mid_min;
1115 uint32_t mid_max;
1116 uint32_t mid_msg;
1116 1117
1117 GNUNET_assert (GNUNET_NO == ch->is_loopback); 1118 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1118 if ( (GNUNET_YES == ch->destroy) && 1119 if ( (GNUNET_YES == ch->destroy) &&
@@ -1153,72 +1154,75 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1153 env); 1154 env);
1154 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); 1155 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1155 ch->mid_futures >>= 1; 1156 ch->mid_futures >>= 1;
1157 if (GNUNET_YES == ch->reliable)
1158 send_channel_data_ack (ch);
1159 return;
1156 } 1160 }
1157 else 1161
1162 /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1163 mid_min = ntohl (ch->mid_recv.mid);
1164 mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1165 mid_msg = ntohl (msg->mid.mid);
1166 if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1167 ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1158 { 1168 {
1159 struct CadetOutOfOrderMessage *com; 1169 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 int duplicate; 1170 "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1161 uint32_t mid_min; 1171 (unsigned int) payload_size,
1162 uint32_t mid_max; 1172 GCCH_2s (ch),
1163 uint32_t mid_msg; 1173 ntohl (msg->mid.mid));
1164 1174 GNUNET_STATISTICS_update (stats,
1165 mid_min = ntohl (ch->mid_recv.mid); 1175 "# duplicate DATA (ancient or future)",
1166 mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE; 1176 1,
1167 mid_msg = ntohl (msg->mid.mid); 1177 GNUNET_NO);
1168 if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) || 1178 GNUNET_MQ_discard (env);
1169 ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) ) 1179 return;
1170 { 1180 }
1171 LOG (GNUNET_ERROR_TYPE_DEBUG,
1172 "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1173 (unsigned int) payload_size,
1174 GCCH_2s (ch),
1175 ntohl (msg->mid.mid));
1176 GNUNET_STATISTICS_update (stats,
1177 "# duplicate DATA (ancient or future)",
1178 1,
1179 GNUNET_NO);
1180 GNUNET_MQ_discard (env);
1181 return;
1182 }
1183 1181
1184 com = GNUNET_new (struct CadetOutOfOrderMessage); 1182 /* Insert message into sorted out-of-order queue */
1185 com->mid = msg->mid; 1183 com = GNUNET_new (struct CadetOutOfOrderMessage);
1186 com->env = env; 1184 com->mid = msg->mid;
1187 duplicate = GNUNET_NO; 1185 com->env = env;
1188 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage, 1186 duplicate = GNUNET_NO;
1189 is_before, 1187 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1190 &duplicate, 1188 is_before,
1191 ccc->head_recv, 1189 &duplicate,
1192 ccc->tail_recv, 1190 ccc->head_recv,
1193 com); 1191 ccc->tail_recv,
1194 if (GNUNET_YES == duplicate) 1192 com);
1195 { 1193 if (GNUNET_YES == duplicate)
1196 LOG (GNUNET_ERROR_TYPE_DEBUG, 1194 {
1197 "Duplicate payload of %u bytes on %s (mid %u) dropped\n", 1195 /* Duplicate within the queue, drop also */
1198 (unsigned int) payload_size,
1199 GCCH_2s (ch),
1200 ntohl (msg->mid.mid));
1201 GNUNET_STATISTICS_update (stats,
1202 "# duplicate DATA",
1203 1,
1204 GNUNET_NO);
1205 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1206 ccc->tail_recv,
1207 com);
1208 GNUNET_MQ_discard (com->env);
1209 GNUNET_free (com);
1210 return;
1211 }
1212 LOG (GNUNET_ERROR_TYPE_DEBUG, 1196 LOG (GNUNET_ERROR_TYPE_DEBUG,
1213 "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n", 1197 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1214 (GNUNET_YES == ccc->client_ready)
1215 ? "out-of-order"
1216 : "client-not-ready",
1217 (unsigned int) payload_size, 1198 (unsigned int) payload_size,
1218 GCCH_2s (ch), 1199 GCCH_2s (ch),
1219 ntohl (msg->mid.mid), 1200 ntohl (msg->mid.mid));
1220 ntohl (ch->mid_recv.mid)); 1201 GNUNET_STATISTICS_update (stats,
1202 "# duplicate DATA",
1203 1,
1204 GNUNET_NO);
1205 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1206 ccc->tail_recv,
1207 com);
1208 GNUNET_MQ_discard (com->env);
1209 GNUNET_free (com);
1210 if (GNUNET_YES == ch->reliable)
1211 send_channel_data_ack (ch);
1212 return;
1221 } 1213 }
1214 LOG (GNUNET_ERROR_TYPE_DEBUG,
1215 "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1216 (GNUNET_YES == ccc->client_ready)
1217 ? "out-of-order"
1218 : "client-not-ready",
1219 (unsigned int) payload_size,
1220 GCCH_2s (ch),
1221 ntohl (ccc->ccn.channel_of_client),
1222 ccc,
1223 ntohl (msg->mid.mid),
1224 ntohl (ch->mid_recv.mid));
1225 send_channel_data_ack (ch);
1222} 1226}
1223 1227
1224 1228
@@ -1261,6 +1265,36 @@ retry_transmission (void *cls)
1261 1265
1262 1266
1263/** 1267/**
1268 * We got an ACK for a message in our queue, remove it from
1269 * the queue and tell our client that it can send more.
1270 *
1271 * @param ch the channel that got the ACK
1272 * @param crm the message that got acknowledged
1273 */
1274static void
1275handle_matching_ack (struct CadetChannel *ch,
1276 struct CadetReliableMessage *crm)
1277{
1278 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1279 ch->tail_sent,
1280 crm);
1281 ch->pending_messages--;
1282 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1283 LOG (GNUNET_ERROR_TYPE_DEBUG,
1284 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1285 GCCH_2s (ch),
1286 (unsigned int) ntohl (crm->data_message->mid.mid),
1287 ch->pending_messages);
1288 GNUNET_free (crm->data_message);
1289 GNUNET_free (crm);
1290 send_ack_to_client (ch,
1291 (NULL == ch->owner)
1292 ? GNUNET_NO
1293 : GNUNET_YES);
1294}
1295
1296
1297/**
1264 * We got an acknowledgement for payload data for a channel. 1298 * We got an acknowledgement for payload data for a channel.
1265 * Possibly resume transmissions. 1299 * Possibly resume transmissions.
1266 * 1300 *
@@ -1272,7 +1306,11 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1272 const struct GNUNET_CADET_ChannelDataAckMessage *ack) 1306 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1273{ 1307{
1274 struct CadetReliableMessage *crm; 1308 struct CadetReliableMessage *crm;
1275 int was_head; 1309 struct CadetReliableMessage *crmn;
1310 int found;
1311 uint32_t mid_base;
1312 uint64_t mid_mask;
1313 unsigned int delta;
1276 1314
1277 GNUNET_break (GNUNET_NO == ch->is_loopback); 1315 GNUNET_break (GNUNET_NO == ch->is_loopback);
1278 if (GNUNET_NO == ch->reliable) 1316 if (GNUNET_NO == ch->reliable)
@@ -1281,12 +1319,28 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1281 GNUNET_break_op (0); 1319 GNUNET_break_op (0);
1282 return; 1320 return;
1283 } 1321 }
1322 mid_base = ntohl (ack->mid.mid);
1323 mid_mask = GNUNET_htonll (ack->futures);
1324 found = GNUNET_NO;
1284 for (crm = ch->head_sent; 1325 for (crm = ch->head_sent;
1285 NULL != crm; 1326 NULL != crm;
1286 crm = crm->next) 1327 crm = crmn)
1328 {
1329 crmn = crm->next;
1287 if (ack->mid.mid == crm->data_message->mid.mid) 1330 if (ack->mid.mid == crm->data_message->mid.mid)
1288 break; 1331 {
1289 if (NULL == crm) 1332 handle_matching_ack (ch,
1333 crm);
1334 continue;
1335 }
1336 delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1337 if (delta >= 64)
1338 continue;
1339 if (0 != (mid_mask & (1LLU << delta)))
1340 handle_matching_ack (ch,
1341 crm);
1342 }
1343 if (GNUNET_NO == found)
1290 { 1344 {
1291 /* ACK for message we already dropped, might have been a 1345 /* ACK for message we already dropped, might have been a
1292 duplicate ACK? Ignore. */ 1346 duplicate ACK? Ignore. */
@@ -1299,36 +1353,16 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1299 GNUNET_NO); 1353 GNUNET_NO);
1300 return; 1354 return;
1301 } 1355 }
1302 was_head = (crm == ch->head_sent); 1356 if (NULL != ch->retry_data_task)
1303 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1304 ch->tail_sent,
1305 crm);
1306 GNUNET_free (crm->data_message);
1307 GNUNET_free (crm);
1308 ch->pending_messages--;
1309 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1310 LOG (GNUNET_ERROR_TYPE_DEBUG,
1311 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1312 GCCH_2s (ch),
1313 (unsigned int) ntohl (ack->mid.mid),
1314 ch->pending_messages);
1315 send_ack_to_client (ch,
1316 (NULL == ch->owner)
1317 ? GNUNET_NO
1318 : GNUNET_YES);
1319 if (was_head)
1320 { 1357 {
1321 if (NULL != ch->retry_data_task) 1358 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1322 { 1359 ch->retry_data_task = NULL;
1323 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1324 ch->retry_data_task = NULL;
1325 }
1326 if (NULL != ch->head_sent)
1327 ch->retry_data_task
1328 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1329 &retry_transmission,
1330 ch);
1331 } 1360 }
1361 if (NULL != ch->head_sent)
1362 ch->retry_data_task
1363 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1364 &retry_transmission,
1365 ch);
1332} 1366}
1333 1367
1334 1368
@@ -1591,10 +1625,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1591 if (NULL == com) 1625 if (NULL == com)
1592 { 1626 {
1593 LOG (GNUNET_ERROR_TYPE_DEBUG, 1627 LOG (GNUNET_ERROR_TYPE_DEBUG,
1594 "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on %s)!\n", 1628 "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1595 GSC_2s (ccc->c), 1629 GSC_2s (ccc->c),
1630 ntohl (client_ccn.channel_of_client),
1631 GCCH_2s (ch),
1596 ntohl (ccc->ccn.channel_of_client), 1632 ntohl (ccc->ccn.channel_of_client),
1597 GCCH_2s (ch)); 1633 ccc);
1598 return; /* none pending */ 1634 return; /* none pending */
1599 } 1635 }
1600 if (GNUNET_YES == ch->is_loopback) 1636 if (GNUNET_YES == ch->is_loopback)
@@ -1637,11 +1673,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1637 return; /* missing next one in-order */ 1673 return; /* missing next one in-order */
1638 } 1674 }
1639 1675
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1676 LOG (GNUNET_ERROR_TYPE_DEBUG,
1641 "Got LOCAL ACK, passing payload message to %s-%X on %s\n", 1677 "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
1642 GSC_2s (ccc->c), 1678 ntohl (com->mid.mid),
1643 ntohl (ccc->ccn.channel_of_client), 1679 GSC_2s (ccc->c),
1644 GCCH_2s (ch)); 1680 ntohl (ccc->ccn.channel_of_client),
1681 GCCH_2s (ch));
1645 1682
1646 /* all good, pass next message to client */ 1683 /* all good, pass next message to client */
1647 GNUNET_CONTAINER_DLL_remove (ccc->head_recv, 1684 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
@@ -1663,9 +1700,9 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1663 urgently waiting for an ACK from us. (As we have an inherent 1700 urgently waiting for an ACK from us. (As we have an inherent
1664 maximum of 64 bits, and 15 is getting too close for comfort.) 1701 maximum of 64 bits, and 15 is getting too close for comfort.)
1665 So we should send one now. */ 1702 So we should send one now. */
1666 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1703 LOG (GNUNET_ERROR_TYPE_DEBUG,
1667 "Sender on %s likely blocked on flow-control, sending ACK now.\n", 1704 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1668 GCCH_2s (ch)); 1705 GCCH_2s (ch));
1669 if (GNUNET_YES == ch->reliable) 1706 if (GNUNET_YES == ch->reliable)
1670 send_channel_data_ack (ch); 1707 send_channel_data_ack (ch);
1671 } 1708 }