aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c478
1 files changed, 325 insertions, 153 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
index c3d5ef194..e55a9a77d 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,13 +25,19 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * TODO: 27 * TODO:
28 * - FIXME: send ACKs back to loopback clients! 28 * - Optimize ACKs by using 'mid_futures' properly!
29 * 29 * - calculate current RTT if possible, use that for initial retransmissions
30 * (NOTE: needs us to learn which connection the tunnel uses for the message!)
30 * - introduce shutdown so we can have half-closed channels, modify 31 * - introduce shutdown so we can have half-closed channels, modify
31 * destroy to include MID to have FIN-ACK equivalents, etc. 32 * destroy to include MID to have FIN-ACK equivalents, etc.
32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! 33 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
34 * (and figure out how/where to use this!)
33 * - check that '0xFFULL' really is sufficient for flow control! 35 * - check that '0xFFULL' really is sufficient for flow control!
36 * (this is right now a big HACK!)
34 * - revisit handling of 'unreliable' traffic! 37 * - revisit handling of 'unreliable' traffic!
38 * (has not seen enough review)
39 * - revisit handling of 'unbuffered' traffic!
40 * (has not seen enough review)
35 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. 41 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36 * - figure out flow control without ACKs (unreliable traffic!) 42 * - figure out flow control without ACKs (unreliable traffic!)
37 */ 43 */
@@ -59,6 +65,23 @@
59 */ 65 */
60#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) 66#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61 67
68/**
69 * How long do we wait at least before retransmitting ever?
70 */
71#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
72
73/**
74 * Maximum message ID into the future we accept for out-of-order messages.
75 * If the message is more than this into the future, we drop it. This is
76 * important both to detect values that are actually in the past, as well
77 * as to limit adversarially triggerable memory consumption.
78 *
79 * Note that right now we have "max_pending_messages = 4" hard-coded in
80 * the logic below, so a value of 4 would suffice here. But we plan to
81 * allow larger windows in the future...
82 */
83#define MAX_OUT_OF_ORDER_DISTANCE 1024
84
62 85
63/** 86/**
64 * All the states a connection can be in. 87 * All the states a connection can be in.
@@ -272,6 +295,8 @@ struct CadetChannel
272 295
273 /** 296 /**
274 * Bitfield of already-received messages past @e mid_recv. 297 * Bitfield of already-received messages past @e mid_recv.
298 *
299 * FIXME: not yet properly used (bits here are never set!)
275 */ 300 */
276 uint64_t mid_futures; 301 uint64_t mid_futures;
277 302
@@ -526,6 +551,7 @@ send_channel_open (void *cls)
526 &msgcc.header, 551 &msgcc.header,
527 &channel_open_sent_cb, 552 &channel_open_sent_cb,
528 ch); 553 ch);
554 GNUNET_assert (NULL == ch->retry_control_task);
529} 555}
530 556
531 557
@@ -753,6 +779,11 @@ send_channel_data_ack (struct CadetChannel *ch)
753 msg.futures = GNUNET_htonll (ch->mid_futures); 779 msg.futures = GNUNET_htonll (ch->mid_futures);
754 if (NULL != ch->last_control_qe) 780 if (NULL != ch->last_control_qe)
755 GCT_send_cancel (ch->last_control_qe); 781 GCT_send_cancel (ch->last_control_qe);
782 LOG (GNUNET_ERROR_TYPE_DEBUG,
783 "Sending DATA_ACK %u:%llX via %s\n",
784 (unsigned int) ntohl (msg.mid.mid),
785 (unsigned long long) ch->mid_futures,
786 GCCH_2s (ch));
756 ch->last_control_qe = GCT_send (ch->t, 787 ch->last_control_qe = GCT_send (ch->t,
757 &msg.header, 788 &msg.header,
758 &send_ack_cb, 789 &send_ack_cb,
@@ -837,9 +868,16 @@ send_ack_to_client (struct CadetChannel *ch,
837 struct GNUNET_CADET_LocalAck *ack; 868 struct GNUNET_CADET_LocalAck *ack;
838 struct CadetChannelClient *ccc; 869 struct CadetChannelClient *ccc;
839 870
871 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
872 if (NULL == ccc)
873 {
874 /* This can happen if we are just getting ACKs after
875 our local client already disconnected. */
876 GNUNET_assert (GNUNET_YES == ch->destroy);
877 return;
878 }
840 env = GNUNET_MQ_msg (ack, 879 env = GNUNET_MQ_msg (ack,
841 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); 880 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
842 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843 ack->ccn = ccc->ccn; 881 ack->ccn = ccc->ccn;
844 LOG (GNUNET_ERROR_TYPE_DEBUG, 882 LOG (GNUNET_ERROR_TYPE_DEBUG,
845 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n", 883 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
@@ -1060,23 +1098,11 @@ is_before (void *cls,
1060 if (delta > (uint32_t) INT_MAX) 1098 if (delta > (uint32_t) INT_MAX)
1061 { 1099 {
1062 /* in overflow range, we can safely assume we wrapped around */ 1100 /* in overflow range, we can safely assume we wrapped around */
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "%u > %u => %p > %p\n",
1065 (unsigned int) v1,
1066 (unsigned int) v2,
1067 m1,
1068 m2);
1069 return GNUNET_NO; 1101 return GNUNET_NO;
1070 } 1102 }
1071 else 1103 else
1072 { 1104 {
1073 /* result is small, thus v2 > v1, thus e1 < e2 */ 1105 /* result is small, thus v2 > v1, thus e1 < e2 */
1074 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075 "%u < %u => %p < %p\n",
1076 (unsigned int) v1,
1077 (unsigned int) v2,
1078 m1,
1079 m2);
1080 return GNUNET_YES; 1106 return GNUNET_YES;
1081 } 1107 }
1082} 1108}
@@ -1097,6 +1123,11 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1097 struct GNUNET_CADET_LocalData *ld; 1123 struct GNUNET_CADET_LocalData *ld;
1098 struct CadetChannelClient *ccc; 1124 struct CadetChannelClient *ccc;
1099 size_t payload_size; 1125 size_t payload_size;
1126 struct CadetOutOfOrderMessage *com;
1127 int duplicate;
1128 uint32_t mid_min;
1129 uint32_t mid_max;
1130 uint32_t mid_msg;
1100 1131
1101 GNUNET_assert (GNUNET_NO == ch->is_loopback); 1132 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1102 if ( (GNUNET_YES == ch->destroy) && 1133 if ( (GNUNET_YES == ch->destroy) &&
@@ -1127,8 +1158,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1127 (msg->mid.mid == ch->mid_recv.mid) ) ) 1158 (msg->mid.mid == ch->mid_recv.mid) ) )
1128 { 1159 {
1129 LOG (GNUNET_ERROR_TYPE_DEBUG, 1160 LOG (GNUNET_ERROR_TYPE_DEBUG,
1130 "Giving %u bytes of payload from %s to client %s\n", 1161 "Giving %u bytes of payload with MID %u from %s to client %s\n",
1131 (unsigned int) payload_size, 1162 (unsigned int) payload_size,
1163 ntohl (msg->mid.mid),
1132 GCCH_2s (ch), 1164 GCCH_2s (ch),
1133 GSC_2s (ccc->c)); 1165 GSC_2s (ccc->c));
1134 ccc->client_ready = GNUNET_NO; 1166 ccc->client_ready = GNUNET_NO;
@@ -1136,52 +1168,148 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1136 env); 1168 env);
1137 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); 1169 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1138 ch->mid_futures >>= 1; 1170 ch->mid_futures >>= 1;
1171 if (GNUNET_YES == ch->reliable)
1172 send_channel_data_ack (ch);
1173 return;
1139 } 1174 }
1140 else 1175
1176 /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1177 mid_min = ntohl (ch->mid_recv.mid);
1178 mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1179 mid_msg = ntohl (msg->mid.mid);
1180 if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1181 ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1141 { 1182 {
1142 struct CadetOutOfOrderMessage *com;
1143 int duplicate;
1144
1145 /* FIXME-SECURITY: if the element is WAY too far ahead,
1146 drop it (can't buffer too much!) */
1147
1148 com = GNUNET_new (struct CadetOutOfOrderMessage);
1149 com->mid = msg->mid;
1150 com->env = env;
1151 duplicate = GNUNET_NO;
1152 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1153 is_before,
1154 &duplicate,
1155 ccc->head_recv,
1156 ccc->tail_recv,
1157 com);
1158 if (GNUNET_YES == duplicate)
1159 {
1160 LOG (GNUNET_ERROR_TYPE_DEBUG,
1161 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1162 (unsigned int) payload_size,
1163 GCCH_2s (ch),
1164 ntohl (msg->mid.mid));
1165 GNUNET_STATISTICS_update (stats,
1166 "# duplicate DATA",
1167 1,
1168 GNUNET_NO);
1169 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1170 ccc->tail_recv,
1171 com);
1172 GNUNET_free (com);
1173 return;
1174 }
1175 LOG (GNUNET_ERROR_TYPE_DEBUG, 1183 LOG (GNUNET_ERROR_TYPE_DEBUG,
1176 "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n", 1184 "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1177 (GNUNET_YES == ccc->client_ready)
1178 ? "out-of-order"
1179 : "client-not-ready",
1180 (unsigned int) payload_size, 1185 (unsigned int) payload_size,
1181 GCCH_2s (ch), 1186 GCCH_2s (ch),
1182 ntohl (msg->mid.mid), 1187 ntohl (msg->mid.mid));
1183 ntohl (ch->mid_recv.mid)); 1188 GNUNET_STATISTICS_update (stats,
1189 "# duplicate DATA (ancient or future)",
1190 1,
1191 GNUNET_NO);
1192 GNUNET_MQ_discard (env);
1193 return;
1194 }
1195
1196 /* Insert message into sorted out-of-order queue */
1197 com = GNUNET_new (struct CadetOutOfOrderMessage);
1198 com->mid = msg->mid;
1199 com->env = env;
1200 duplicate = GNUNET_NO;
1201 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1202 is_before,
1203 &duplicate,
1204 ccc->head_recv,
1205 ccc->tail_recv,
1206 com);
1207 if (GNUNET_YES == duplicate)
1208 {
1209 /* Duplicate within the queue, drop also */
1210 LOG (GNUNET_ERROR_TYPE_DEBUG,
1211 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1212 (unsigned int) payload_size,
1213 GCCH_2s (ch),
1214 ntohl (msg->mid.mid));
1215 GNUNET_STATISTICS_update (stats,
1216 "# duplicate DATA",
1217 1,
1218 GNUNET_NO);
1219 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1220 ccc->tail_recv,
1221 com);
1222 GNUNET_MQ_discard (com->env);
1223 GNUNET_free (com);
1224 if (GNUNET_YES == ch->reliable)
1225 send_channel_data_ack (ch);
1226 return;
1184 } 1227 }
1228 LOG (GNUNET_ERROR_TYPE_DEBUG,
1229 "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1230 (GNUNET_YES == ccc->client_ready)
1231 ? "out-of-order"
1232 : "client-not-ready",
1233 (unsigned int) payload_size,
1234 GCCH_2s (ch),
1235 ntohl (ccc->ccn.channel_of_client),
1236 ccc,
1237 ntohl (msg->mid.mid),
1238 ntohl (ch->mid_recv.mid));
1239 send_channel_data_ack (ch);
1240}
1241
1242
1243/**
1244 * Function called once the tunnel has sent one of our messages.
1245 * If the message is unreliable, simply frees the `crm`. If the
1246 * message was reliable, calculate retransmission time and
1247 * wait for ACK (or retransmit).
1248 *
1249 * @param cls the `struct CadetReliableMessage` that was sent
1250 */
1251static void
1252data_sent_cb (void *cls);
1253
1254
1255/**
1256 * We need to retry a transmission, the last one took too long to
1257 * be acknowledged.
1258 *
1259 * @param cls the `struct CadetChannel` where we need to retransmit
1260 */
1261static void
1262retry_transmission (void *cls)
1263{
1264 struct CadetChannel *ch = cls;
1265 struct CadetReliableMessage *crm = ch->head_sent;
1266
1267 ch->retry_data_task = NULL;
1268 GNUNET_assert (NULL == crm->qe);
1269 LOG (GNUNET_ERROR_TYPE_DEBUG,
1270 "Retrying transmission on %s of message %u\n",
1271 GCCH_2s (ch),
1272 (unsigned int) ntohl (crm->data_message->mid.mid));
1273 crm->qe = GCT_send (ch->t,
1274 &crm->data_message->header,
1275 &data_sent_cb,
1276 crm);
1277 GNUNET_assert (NULL == ch->retry_data_task);
1278}
1279
1280
1281/**
1282 * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1283 * the queue and tell our client that it can send more.
1284 *
1285 * @param ch the channel that got the PLAINTEXT_DATA_ACK
1286 * @param crm the message that got acknowledged
1287 */
1288static void
1289handle_matching_ack (struct CadetChannel *ch,
1290 struct CadetReliableMessage *crm)
1291{
1292 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1293 ch->tail_sent,
1294 crm);
1295 ch->pending_messages--;
1296 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1297 LOG (GNUNET_ERROR_TYPE_DEBUG,
1298 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1299 GCCH_2s (ch),
1300 (unsigned int) ntohl (crm->data_message->mid.mid),
1301 ch->pending_messages);
1302 if (NULL != crm->qe)
1303 {
1304 GCT_send_cancel (crm->qe);
1305 crm->qe = NULL;
1306 }
1307 GNUNET_free (crm->data_message);
1308 GNUNET_free (crm);
1309 send_ack_to_client (ch,
1310 (NULL == ch->owner)
1311 ? GNUNET_NO
1312 : GNUNET_YES);
1185} 1313}
1186 1314
1187 1315
@@ -1197,6 +1325,11 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1197 const struct GNUNET_CADET_ChannelDataAckMessage *ack) 1325 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1198{ 1326{
1199 struct CadetReliableMessage *crm; 1327 struct CadetReliableMessage *crm;
1328 struct CadetReliableMessage *crmn;
1329 int found;
1330 uint32_t mid_base;
1331 uint64_t mid_mask;
1332 unsigned int delta;
1200 1333
1201 GNUNET_break (GNUNET_NO == ch->is_loopback); 1334 GNUNET_break (GNUNET_NO == ch->is_loopback);
1202 if (GNUNET_NO == ch->reliable) 1335 if (GNUNET_NO == ch->reliable)
@@ -1205,12 +1338,32 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1205 GNUNET_break_op (0); 1338 GNUNET_break_op (0);
1206 return; 1339 return;
1207 } 1340 }
1341 mid_base = ntohl (ack->mid.mid);
1342 mid_mask = GNUNET_htonll (ack->futures);
1343 found = GNUNET_NO;
1208 for (crm = ch->head_sent; 1344 for (crm = ch->head_sent;
1209 NULL != crm; 1345 NULL != crm;
1210 crm = crm->next) 1346 crm = crmn)
1347 {
1348 crmn = crm->next;
1211 if (ack->mid.mid == crm->data_message->mid.mid) 1349 if (ack->mid.mid == crm->data_message->mid.mid)
1212 break; 1350 {
1213 if (NULL == crm) 1351 handle_matching_ack (ch,
1352 crm);
1353 found = GNUNET_YES;
1354 continue;
1355 }
1356 delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1357 if (delta >= 64)
1358 continue;
1359 if (0 != (mid_mask & (1LLU << delta)))
1360 {
1361 handle_matching_ack (ch,
1362 crm);
1363 found = GNUNET_YES;
1364 }
1365 }
1366 if (GNUNET_NO == found)
1214 { 1367 {
1215 /* ACK for message we already dropped, might have been a 1368 /* ACK for message we already dropped, might have been a
1216 duplicate ACK? Ignore. */ 1369 duplicate ACK? Ignore. */
@@ -1223,26 +1376,16 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1223 GNUNET_NO); 1376 GNUNET_NO);
1224 return; 1377 return;
1225 } 1378 }
1226 GNUNET_CONTAINER_DLL_remove (ch->head_sent, 1379 if (NULL != ch->retry_data_task)
1227 ch->tail_sent, 1380 {
1228 crm); 1381 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1229 GNUNET_free (crm->data_message); 1382 ch->retry_data_task = NULL;
1230 GNUNET_free (crm); 1383 }
1231 ch->pending_messages--; 1384 if (NULL != ch->head_sent)
1232 send_ack_to_client (ch, 1385 ch->retry_data_task
1233 (NULL == ch->owner) 1386 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1234 ? GNUNET_NO 1387 &retry_transmission,
1235 : GNUNET_YES); 1388 ch);
1236 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1237 LOG (GNUNET_ERROR_TYPE_DEBUG,
1238 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1239 GCCH_2s (ch),
1240 (unsigned int) ntohl (ack->mid.mid),
1241 ch->pending_messages);
1242 send_ack_to_client (ch,
1243 (NULL == ch->owner)
1244 ? GNUNET_NO
1245 : GNUNET_YES);
1246} 1389}
1247 1390
1248 1391
@@ -1285,35 +1428,22 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch)
1285 1428
1286 1429
1287/** 1430/**
1288 * Function called once the tunnel has sent one of our messages. 1431 * Test if element @a e1 comes before element @a e2.
1289 * If the message is unreliable, simply frees the `crm`. If the
1290 * message was reliable, calculate retransmission time and
1291 * wait for ACK (or retransmit).
1292 *
1293 * @param cls the `struct CadetReliableMessage` that was sent
1294 */
1295static void
1296data_sent_cb (void *cls);
1297
1298
1299/**
1300 * We need to retry a transmission, the last one took too long to
1301 * be acknowledged.
1302 * 1432 *
1303 * @param cls the `struct CadetChannel` where we need to retransmit 1433 * @param cls closure, to a flag where we indicate duplicate packets
1434 * @param crm1 an element of to sort
1435 * @param crm2 another element to sort
1436 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1304 */ 1437 */
1305static void 1438static int
1306retry_transmission (void *cls) 1439cmp_crm_by_next_retry (void *cls,
1440 struct CadetReliableMessage *crm1,
1441 struct CadetReliableMessage *crm2)
1307{ 1442{
1308 struct CadetChannel *ch = cls; 1443 if (crm1->next_retry.abs_value_us <
1309 struct CadetReliableMessage *crm = ch->head_sent; 1444 crm2->next_retry.abs_value_us)
1310 1445 return GNUNET_YES;
1311 ch->retry_data_task = NULL; 1446 return GNUNET_NO;
1312 GNUNET_assert (NULL == crm->qe);
1313 crm->qe = GCT_send (ch->t,
1314 &crm->data_message->header,
1315 &data_sent_cb,
1316 crm);
1317} 1447}
1318 1448
1319 1449
@@ -1330,9 +1460,9 @@ data_sent_cb (void *cls)
1330{ 1460{
1331 struct CadetReliableMessage *crm = cls; 1461 struct CadetReliableMessage *crm = cls;
1332 struct CadetChannel *ch = crm->ch; 1462 struct CadetChannel *ch = crm->ch;
1333 struct CadetReliableMessage *off;
1334 1463
1335 GNUNET_assert (GNUNET_NO == ch->is_loopback); 1464 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1465 GNUNET_assert (NULL != crm->qe);
1336 crm->qe = NULL; 1466 crm->qe = NULL;
1337 GNUNET_CONTAINER_DLL_remove (ch->head_sent, 1467 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1338 ch->tail_sent, 1468 ch->tail_sent,
@@ -1350,42 +1480,33 @@ data_sent_cb (void *cls)
1350 } 1480 }
1351 if (0 == crm->retry_delay.rel_value_us) 1481 if (0 == crm->retry_delay.rel_value_us)
1352 crm->retry_delay = ch->expected_delay; 1482 crm->retry_delay = ch->expected_delay;
1483 else
1484 crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1485 crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1486 MIN_RTT_DELAY);
1353 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay); 1487 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1354 1488
1355 /* find position for re-insertion into the DLL */ 1489 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1356 if ( (NULL == ch->head_sent) || 1490 cmp_crm_by_next_retry,
1357 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) ) 1491 NULL,
1492 ch->head_sent,
1493 ch->tail_sent,
1494 crm);
1495 LOG (GNUNET_ERROR_TYPE_DEBUG,
1496 "Message %u sent, next transmission on %s in %s\n",
1497 (unsigned int) ntohl (crm->data_message->mid.mid),
1498 GCCH_2s (ch),
1499 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1500 GNUNET_YES));
1501 if (crm == ch->head_sent)
1358 { 1502 {
1359 /* insert at HEAD, also (re)schedule retry task! */ 1503 /* We are the new head, need to reschedule retry task */
1360 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1361 ch->tail_sent,
1362 crm);
1363 if (NULL != ch->retry_data_task) 1504 if (NULL != ch->retry_data_task)
1364 GNUNET_SCHEDULER_cancel (ch->retry_data_task); 1505 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1365 GNUNET_assert (NULL == crm->qe);
1366 ch->retry_data_task 1506 ch->retry_data_task
1367 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, 1507 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1368 &retry_transmission, 1508 &retry_transmission,
1369 ch); 1509 ch);
1370 return;
1371 }
1372 for (off = ch->head_sent; NULL != off; off = off->next)
1373 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1374 break;
1375 if (NULL == off)
1376 {
1377 /* insert at tail */
1378 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1379 ch->tail_sent,
1380 crm);
1381 }
1382 else
1383 {
1384 /* insert before off */
1385 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1386 ch->tail_sent,
1387 off->prev,
1388 crm);
1389 } 1510 }
1390} 1511}
1391 1512
@@ -1446,11 +1567,23 @@ GCCH_handle_local_data (struct CadetChannel *ch,
1446 GNUNET_memcpy (&ld[1], 1567 GNUNET_memcpy (&ld[1],
1447 buf, 1568 buf,
1448 buf_len); 1569 buf_len);
1449 /* FIXME: this does not provide for flow control! */ 1570 if (GNUNET_YES == receiver->client_ready)
1450 GSC_send_to_client (receiver->c, 1571 {
1451 env); 1572 GSC_send_to_client (receiver->c,
1452 send_ack_to_client (ch, 1573 env);
1453 to_owner); 1574 send_ack_to_client (ch,
1575 to_owner);
1576 }
1577 else
1578 {
1579 struct CadetOutOfOrderMessage *oom;
1580
1581 oom = GNUNET_new (struct CadetOutOfOrderMessage);
1582 oom->env = env;
1583 GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1584 receiver->tail_recv,
1585 oom);
1586 }
1454 return GNUNET_OK; 1587 return GNUNET_OK;
1455 } 1588 }
1456 1589
@@ -1471,13 +1604,20 @@ GCCH_handle_local_data (struct CadetChannel *ch,
1471 ch->tail_sent, 1604 ch->tail_sent,
1472 crm); 1605 crm);
1473 LOG (GNUNET_ERROR_TYPE_DEBUG, 1606 LOG (GNUNET_ERROR_TYPE_DEBUG,
1474 "Sending %u bytes from local client to %s\n", 1607 "Sending %u bytes from local client to %s with MID %u\n",
1475 buf_len, 1608 buf_len,
1476 GCCH_2s (ch)); 1609 GCCH_2s (ch),
1610 ntohl (crm->data_message->mid.mid));
1611 if (NULL != ch->retry_data_task)
1612 {
1613 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1614 ch->retry_data_task = NULL;
1615 }
1477 crm->qe = GCT_send (ch->t, 1616 crm->qe = GCT_send (ch->t,
1478 &crm->data_message->header, 1617 &crm->data_message->header,
1479 &data_sent_cb, 1618 &data_sent_cb,
1480 crm); 1619 crm);
1620 GNUNET_assert (NULL == ch->retry_data_task);
1481 return GNUNET_OK; 1621 return GNUNET_OK;
1482} 1622}
1483 1623
@@ -1509,11 +1649,42 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1509 if (NULL == com) 1649 if (NULL == com)
1510 { 1650 {
1511 LOG (GNUNET_ERROR_TYPE_DEBUG, 1651 LOG (GNUNET_ERROR_TYPE_DEBUG,
1512 "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n", 1652 "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1513 GSC_2s (ccc->c), 1653 GSC_2s (ccc->c),
1514 ntohl (ccc->ccn.channel_of_client)); 1654 ntohl (client_ccn.channel_of_client),
1655 GCCH_2s (ch),
1656 ntohl (ccc->ccn.channel_of_client),
1657 ccc);
1515 return; /* none pending */ 1658 return; /* none pending */
1516 } 1659 }
1660 if (GNUNET_YES == ch->is_loopback)
1661 {
1662 int to_owner;
1663
1664 /* Messages are always in-order, just send */
1665 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1666 ccc->tail_recv,
1667 com);
1668 GSC_send_to_client (ccc->c,
1669 com->env);
1670 /* Notify sender that we can receive more */
1671 if (ccc->ccn.channel_of_client ==
1672 ch->owner->ccn.channel_of_client)
1673 {
1674 to_owner = GNUNET_NO;
1675 }
1676 else
1677 {
1678 GNUNET_assert (ccc->ccn.channel_of_client ==
1679 ch->dest->ccn.channel_of_client);
1680 to_owner = GNUNET_YES;
1681 }
1682 send_ack_to_client (ch,
1683 to_owner);
1684 GNUNET_free (com);
1685 return;
1686 }
1687
1517 if ( (com->mid.mid != ch->mid_recv.mid) && 1688 if ( (com->mid.mid != ch->mid_recv.mid) &&
1518 (GNUNET_NO == ch->out_of_order) ) 1689 (GNUNET_NO == ch->out_of_order) )
1519 { 1690 {
@@ -1526,11 +1697,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1526 return; /* missing next one in-order */ 1697 return; /* missing next one in-order */
1527 } 1698 }
1528 1699
1529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1700 LOG (GNUNET_ERROR_TYPE_DEBUG,
1530 "Got LOCAL ACK, passing payload message to %s-%X on %s\n", 1701 "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
1531 GSC_2s (ccc->c), 1702 ntohl (com->mid.mid),
1532 ntohl (ccc->ccn.channel_of_client), 1703 GSC_2s (ccc->c),
1533 GCCH_2s (ch)); 1704 ntohl (ccc->ccn.channel_of_client),
1705 GCCH_2s (ch));
1534 1706
1535 /* all good, pass next message to client */ 1707 /* all good, pass next message to client */
1536 GNUNET_CONTAINER_DLL_remove (ccc->head_recv, 1708 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
@@ -1552,9 +1724,9 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
1552 urgently waiting for an ACK from us. (As we have an inherent 1724 urgently waiting for an ACK from us. (As we have an inherent
1553 maximum of 64 bits, and 15 is getting too close for comfort.) 1725 maximum of 64 bits, and 15 is getting too close for comfort.)
1554 So we should send one now. */ 1726 So we should send one now. */
1555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1727 LOG (GNUNET_ERROR_TYPE_DEBUG,
1556 "Sender on %s likely blocked on flow-control, sending ACK now.\n", 1728 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1557 GCCH_2s (ch)); 1729 GCCH_2s (ch));
1558 if (GNUNET_YES == ch->reliable) 1730 if (GNUNET_YES == ch->reliable)
1559 send_channel_data_ack (ch); 1731 send_channel_data_ack (ch);
1560 } 1732 }