diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-04-21 12:13:33 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-04-21 12:13:33 +0200 |
commit | d97224045fe41e824406f771e24c46fb89514942 (patch) | |
tree | 03addadb497a11428043946e1812f714879df06f /src/transport/gnunet-service-tng.c | |
parent | 94f367a644545eb7d4ea51902f0c7ed9e2d45193 (diff) | |
download | gnunet-d97224045fe41e824406f771e24c46fb89514942.tar.gz gnunet-d97224045fe41e824406f771e24c46fb89514942.zip |
handle communicator status, address a few FIXMEs
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 197 |
1 files changed, 139 insertions, 58 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 62e9c0d8e..697c43f0d 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -1097,6 +1097,10 @@ struct DistanceVector | |||
1097 | */ | 1097 | */ |
1098 | struct Queue; | 1098 | struct Queue; |
1099 | 1099 | ||
1100 | /** | ||
1101 | * Message awaiting transmission. See detailed comments below. | ||
1102 | */ | ||
1103 | struct PendingMessage; | ||
1100 | 1104 | ||
1101 | /** | 1105 | /** |
1102 | * Entry identifying transmission in one of our `struct | 1106 | * Entry identifying transmission in one of our `struct |
@@ -1126,6 +1130,11 @@ struct QueueEntry | |||
1126 | struct Queue *queue; | 1130 | struct Queue *queue; |
1127 | 1131 | ||
1128 | /** | 1132 | /** |
1133 | * Pending message this entry is for, or NULL for none. | ||
1134 | */ | ||
1135 | struct PendingMessage *pm; | ||
1136 | |||
1137 | /** | ||
1129 | * Message ID used for this message with the queue used for transmission. | 1138 | * Message ID used for this message with the queue used for transmission. |
1130 | */ | 1139 | */ |
1131 | uint64_t mid; | 1140 | uint64_t mid; |
@@ -1585,6 +1594,16 @@ struct PendingMessage | |||
1585 | struct Neighbour *target; | 1594 | struct Neighbour *target; |
1586 | 1595 | ||
1587 | /** | 1596 | /** |
1597 | * Set to non-NULL value if this message is currently being given to a | ||
1598 | * communicator and we are awaiting that communicator's acknowledgement. | ||
1599 | * Note that we must not retransmit a pending message while we're still | ||
1600 | * in the process of giving it to a communicator. If a pending message | ||
1601 | * is free'd while this entry is non-NULL, the @e qe reference to us | ||
1602 | * should simply be set to NULL. | ||
1603 | */ | ||
1604 | struct QueueEntry *qe; | ||
1605 | |||
1606 | /** | ||
1588 | * Client that issued the transmission request, if @e pmt is #PMT_CORE. | 1607 | * Client that issued the transmission request, if @e pmt is #PMT_CORE. |
1589 | */ | 1608 | */ |
1590 | struct TransportClient *client; | 1609 | struct TransportClient *client; |
@@ -2472,9 +2491,12 @@ transmit_on_queue (void *cls); | |||
2472 | * be called if the message queue is non-empty! | 2491 | * be called if the message queue is non-empty! |
2473 | * | 2492 | * |
2474 | * @param queue the queue to do scheduling for | 2493 | * @param queue the queue to do scheduling for |
2494 | * @param inside_job set to #GNUNET_YES if called from | ||
2495 | * #transmit_on_queue() itself and NOT setting | ||
2496 | * the task means running immediately | ||
2475 | */ | 2497 | */ |
2476 | static void | 2498 | static void |
2477 | schedule_transmit_on_queue (struct Queue *queue) | 2499 | schedule_transmit_on_queue (struct Queue *queue, int inside_job) |
2478 | { | 2500 | { |
2479 | struct Neighbour *n = queue->neighbour; | 2501 | struct Neighbour *n = queue->neighbour; |
2480 | struct PendingMessage *pm = n->pending_msg_head; | 2502 | struct PendingMessage *pm = n->pending_msg_head; |
@@ -2507,7 +2529,7 @@ schedule_transmit_on_queue (struct Queue *queue) | |||
2507 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining ( | 2529 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining ( |
2508 | pm->next_attempt), | 2530 | pm->next_attempt), |
2509 | out_delay); | 2531 | out_delay); |
2510 | if (0 == out_delay.rel_value_us) | 2532 | if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) |
2511 | return; /* we should run immediately! */ | 2533 | return; /* we should run immediately! */ |
2512 | /* queue has changed since we were scheduled, reschedule again */ | 2534 | /* queue has changed since we were scheduled, reschedule again */ |
2513 | queue->transmit_task = | 2535 | queue->transmit_task = |
@@ -2575,6 +2597,11 @@ free_queue (struct Queue *queue) | |||
2575 | GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); | 2597 | GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); |
2576 | queue->queue_length--; | 2598 | queue->queue_length--; |
2577 | tc->details.communicator.total_queue_length--; | 2599 | tc->details.communicator.total_queue_length--; |
2600 | if (NULL != qe->pm) | ||
2601 | { | ||
2602 | GNUNET_assert (qe == qe->pm->qe); | ||
2603 | qe->pm->qe = NULL; | ||
2604 | } | ||
2578 | GNUNET_free (qe); | 2605 | GNUNET_free (qe); |
2579 | } | 2606 | } |
2580 | GNUNET_assert (0 == queue->queue_length); | 2607 | GNUNET_assert (0 == queue->queue_length); |
@@ -2589,7 +2616,7 @@ free_queue (struct Queue *queue) | |||
2589 | GNUNET_NO); | 2616 | GNUNET_NO); |
2590 | for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; | 2617 | for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; |
2591 | s = s->next_client) | 2618 | s = s->next_client) |
2592 | schedule_transmit_on_queue (s); | 2619 | schedule_transmit_on_queue (s, GNUNET_NO); |
2593 | } | 2620 | } |
2594 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); | 2621 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); |
2595 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); | 2622 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); |
@@ -2859,6 +2886,11 @@ free_pending_message (struct PendingMessage *pm) | |||
2859 | target->pending_msg_tail, | 2886 | target->pending_msg_tail, |
2860 | pm); | 2887 | pm); |
2861 | free_fragment_tree (pm); | 2888 | free_fragment_tree (pm); |
2889 | if (NULL != pm->qe) | ||
2890 | { | ||
2891 | GNUNET_assert (pm == pm->qe->pm); | ||
2892 | pm->qe->pm = NULL; | ||
2893 | } | ||
2862 | GNUNET_free_non_null (pm->bpm); | 2894 | GNUNET_free_non_null (pm->bpm); |
2863 | GNUNET_free (pm); | 2895 | GNUNET_free (pm); |
2864 | } | 2896 | } |
@@ -3245,8 +3277,12 @@ queue_send_msg (struct Queue *queue, | |||
3245 | qe = GNUNET_new (struct QueueEntry); | 3277 | qe = GNUNET_new (struct QueueEntry); |
3246 | qe->mid = queue->mid_gen++; | 3278 | qe->mid = queue->mid_gen++; |
3247 | qe->queue = queue; | 3279 | qe->queue = queue; |
3248 | // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! | 3280 | if (NULL != pm) |
3249 | // (also, note that pm may be NULL!) | 3281 | { |
3282 | qe->pm = pm; | ||
3283 | GNUNET_assert (NULL == pm->qe); | ||
3284 | pm->qe = qe; | ||
3285 | } | ||
3250 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); | 3286 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); |
3251 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | 3287 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); |
3252 | queue->queue_length++; | 3288 | queue->queue_length++; |
@@ -6038,6 +6074,60 @@ reliability_box_message (struct PendingMessage *pm) | |||
6038 | 6074 | ||
6039 | 6075 | ||
6040 | /** | 6076 | /** |
6077 | * Change the value of the `next_attempt` field of @a pm | ||
6078 | * to @a next_attempt and re-order @a pm in the transmission | ||
6079 | * list as required by the new timestmap. | ||
6080 | * | ||
6081 | * @param pm a pending message to update | ||
6082 | * @param next_attempt timestamp to use | ||
6083 | */ | ||
6084 | static void | ||
6085 | update_pm_next_attempt (struct PendingMessage *pm, | ||
6086 | struct GNUNET_TIME_Absolute next_attempt) | ||
6087 | { | ||
6088 | struct Neighbour *neighbour = pm->target; | ||
6089 | |||
6090 | pm->next_attempt = next_attempt; | ||
6091 | if (NULL == pm->frag_parent) | ||
6092 | { | ||
6093 | struct PendingMessage *pos; | ||
6094 | |||
6095 | /* re-insert sort in neighbour list */ | ||
6096 | GNUNET_CONTAINER_MDLL_remove (neighbour, | ||
6097 | neighbour->pending_msg_head, | ||
6098 | neighbour->pending_msg_tail, | ||
6099 | pm); | ||
6100 | pos = neighbour->pending_msg_tail; | ||
6101 | while ((NULL != pos) && | ||
6102 | (next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) | ||
6103 | pos = pos->prev_neighbour; | ||
6104 | GNUNET_CONTAINER_MDLL_insert_after (neighbour, | ||
6105 | neighbour->pending_msg_head, | ||
6106 | neighbour->pending_msg_tail, | ||
6107 | pos, | ||
6108 | pm); | ||
6109 | } | ||
6110 | else | ||
6111 | { | ||
6112 | /* re-insert sort in fragment list */ | ||
6113 | struct PendingMessage *fp = pm->frag_parent; | ||
6114 | struct PendingMessage *pos; | ||
6115 | |||
6116 | GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm); | ||
6117 | pos = fp->tail_frag; | ||
6118 | while ((NULL != pos) && | ||
6119 | (next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) | ||
6120 | pos = pos->prev_frag; | ||
6121 | GNUNET_CONTAINER_MDLL_insert_after (frag, | ||
6122 | fp->head_frag, | ||
6123 | fp->tail_frag, | ||
6124 | pos, | ||
6125 | pm); | ||
6126 | } | ||
6127 | } | ||
6128 | |||
6129 | |||
6130 | /** | ||
6041 | * We believe we are ready to transmit a message on a queue. Double-checks | 6131 | * We believe we are ready to transmit a message on a queue. Double-checks |
6042 | * with the queue's "tracker_out" and then gives the message to the | 6132 | * with the queue's "tracker_out" and then gives the message to the |
6043 | * communicator for transmission (updating the tracker, and re-scheduling | 6133 | * communicator for transmission (updating the tracker, and re-scheduling |
@@ -6060,7 +6150,13 @@ transmit_on_queue (void *cls) | |||
6060 | /* no message pending, nothing to do here! */ | 6150 | /* no message pending, nothing to do here! */ |
6061 | return; | 6151 | return; |
6062 | } | 6152 | } |
6063 | schedule_transmit_on_queue (queue); | 6153 | if (NULL != pm->qe) |
6154 | { | ||
6155 | /* message still pending with communciator! | ||
6156 | LOGGING-FIXME: Use stats? logging? Should this not be rare? */ | ||
6157 | return; | ||
6158 | } | ||
6159 | schedule_transmit_on_queue (queue, GNUNET_YES); | ||
6064 | if (NULL != queue->transmit_task) | 6160 | if (NULL != queue->transmit_task) |
6065 | return; /* do it later */ | 6161 | return; /* do it later */ |
6066 | overhead = 0; | 6162 | overhead = 0; |
@@ -6081,7 +6177,7 @@ transmit_on_queue (void *cls) | |||
6081 | if (NULL == s) | 6177 | if (NULL == s) |
6082 | { | 6178 | { |
6083 | /* Fragmentation failed, try next message... */ | 6179 | /* Fragmentation failed, try next message... */ |
6084 | schedule_transmit_on_queue (queue); | 6180 | schedule_transmit_on_queue (queue, GNUNET_NO); |
6085 | return; | 6181 | return; |
6086 | } | 6182 | } |
6087 | if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) | 6183 | if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) |
@@ -6089,7 +6185,7 @@ transmit_on_queue (void *cls) | |||
6089 | if (NULL == s) | 6185 | if (NULL == s) |
6090 | { | 6186 | { |
6091 | /* Reliability boxing failed, try next message... */ | 6187 | /* Reliability boxing failed, try next message... */ |
6092 | schedule_transmit_on_queue (queue); | 6188 | schedule_transmit_on_queue (queue, GNUNET_NO); |
6093 | return; | 6189 | return; |
6094 | } | 6190 | } |
6095 | 6191 | ||
@@ -6141,57 +6237,21 @@ transmit_on_queue (void *cls) | |||
6141 | } | 6237 | } |
6142 | else | 6238 | else |
6143 | { | 6239 | { |
6144 | /* message not finished, waiting for acknowledgement */ | 6240 | /* Message not finished, waiting for acknowledgement. |
6145 | struct Neighbour *neighbour = pm->target; | 6241 | Update time by which we might retransmit 's' based on queue |
6146 | /* Update time by which we might retransmit 's' based on queue | ||
6147 | characteristics (i.e. RTT); it takes one RTT for the message to | 6242 | characteristics (i.e. RTT); it takes one RTT for the message to |
6148 | arrive and the ACK to come back in the best case; but the other | 6243 | arrive and the ACK to come back in the best case; but the other |
6149 | side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before | 6244 | side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before |
6150 | retransmitting. Note that in the future this heuristic should | 6245 | retransmitting. Note that in the future this heuristic should |
6151 | likely be improved further (measure RTT stability, consider | 6246 | likely be improved further (measure RTT stability, consider |
6152 | message urgency and size when delaying ACKs, etc.) */ | 6247 | message urgency and size when delaying ACKs, etc.) */ |
6153 | s->next_attempt = GNUNET_TIME_relative_to_absolute ( | 6248 | update_pm_next_attempt (s, |
6154 | GNUNET_TIME_relative_multiply (queue->rtt, 4)); | 6249 | GNUNET_TIME_relative_to_absolute ( |
6155 | if (s == pm) | 6250 | GNUNET_TIME_relative_multiply (queue->rtt, 4))); |
6156 | { | ||
6157 | struct PendingMessage *pos; | ||
6158 | |||
6159 | /* re-insert sort in neighbour list */ | ||
6160 | GNUNET_CONTAINER_MDLL_remove (neighbour, | ||
6161 | neighbour->pending_msg_head, | ||
6162 | neighbour->pending_msg_tail, | ||
6163 | pm); | ||
6164 | pos = neighbour->pending_msg_tail; | ||
6165 | while ((NULL != pos) && | ||
6166 | (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) | ||
6167 | pos = pos->prev_neighbour; | ||
6168 | GNUNET_CONTAINER_MDLL_insert_after (neighbour, | ||
6169 | neighbour->pending_msg_head, | ||
6170 | neighbour->pending_msg_tail, | ||
6171 | pos, | ||
6172 | pm); | ||
6173 | } | ||
6174 | else | ||
6175 | { | ||
6176 | /* re-insert sort in fragment list */ | ||
6177 | struct PendingMessage *fp = s->frag_parent; | ||
6178 | struct PendingMessage *pos; | ||
6179 | |||
6180 | GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s); | ||
6181 | pos = fp->tail_frag; | ||
6182 | while ((NULL != pos) && | ||
6183 | (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) | ||
6184 | pos = pos->prev_frag; | ||
6185 | GNUNET_CONTAINER_MDLL_insert_after (frag, | ||
6186 | fp->head_frag, | ||
6187 | fp->tail_frag, | ||
6188 | pos, | ||
6189 | s); | ||
6190 | } | ||
6191 | } | 6251 | } |
6192 | 6252 | ||
6193 | /* finally, re-schedule queue transmission task itself */ | 6253 | /* finally, re-schedule queue transmission task itself */ |
6194 | schedule_transmit_on_queue (queue); | 6254 | schedule_transmit_on_queue (queue, GNUNET_NO); |
6195 | } | 6255 | } |
6196 | 6256 | ||
6197 | 6257 | ||
@@ -6216,7 +6276,7 @@ tracker_update_out_cb (void *cls) | |||
6216 | } | 6276 | } |
6217 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 6277 | GNUNET_SCHEDULER_cancel (queue->transmit_task); |
6218 | queue->transmit_task = NULL; | 6278 | queue->transmit_task = NULL; |
6219 | schedule_transmit_on_queue (queue); | 6279 | schedule_transmit_on_queue (queue, GNUNET_NO); |
6220 | } | 6280 | } |
6221 | 6281 | ||
6222 | 6282 | ||
@@ -6309,6 +6369,7 @@ handle_send_message_ack (void *cls, | |||
6309 | { | 6369 | { |
6310 | struct TransportClient *tc = cls; | 6370 | struct TransportClient *tc = cls; |
6311 | struct QueueEntry *qe; | 6371 | struct QueueEntry *qe; |
6372 | struct PendingMessage *pm; | ||
6312 | 6373 | ||
6313 | if (CT_COMMUNICATOR != tc->type) | 6374 | if (CT_COMMUNICATOR != tc->type) |
6314 | { | 6375 | { |
@@ -6352,7 +6413,8 @@ handle_send_message_ack (void *cls, | |||
6352 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == | 6413 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == |
6353 | tc->details.communicator.total_queue_length) | 6414 | tc->details.communicator.total_queue_length) |
6354 | { | 6415 | { |
6355 | /* Communicator dropped below threshold, resume all queues */ | 6416 | /* Communicator dropped below threshold, resume all queues |
6417 | incident with this client! */ | ||
6356 | GNUNET_STATISTICS_update ( | 6418 | GNUNET_STATISTICS_update ( |
6357 | GST_stats, | 6419 | GST_stats, |
6358 | "# Transmission throttled due to communicator queue limit", | 6420 | "# Transmission throttled due to communicator queue limit", |
@@ -6361,7 +6423,7 @@ handle_send_message_ack (void *cls, | |||
6361 | for (struct Queue *queue = tc->details.communicator.queue_head; | 6423 | for (struct Queue *queue = tc->details.communicator.queue_head; |
6362 | NULL != queue; | 6424 | NULL != queue; |
6363 | queue = queue->next_client) | 6425 | queue = queue->next_client) |
6364 | schedule_transmit_on_queue (queue); | 6426 | schedule_transmit_on_queue (queue, GNUNET_NO); |
6365 | } | 6427 | } |
6366 | else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) | 6428 | else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) |
6367 | { | 6429 | { |
@@ -6370,14 +6432,33 @@ handle_send_message_ack (void *cls, | |||
6370 | "# Transmission throttled due to queue queue limit", | 6432 | "# Transmission throttled due to queue queue limit", |
6371 | -1, | 6433 | -1, |
6372 | GNUNET_NO); | 6434 | GNUNET_NO); |
6373 | schedule_transmit_on_queue (qe->queue); | 6435 | schedule_transmit_on_queue (qe->queue, GNUNET_NO); |
6374 | } | 6436 | } |
6375 | 6437 | ||
6376 | /* TODO: we also should react on the status! */ | 6438 | if (NULL != (pm = qe->pm)) |
6377 | // FIXME: this probably requires queue->pm = s assignment! | 6439 | { |
6378 | // FIXME: react to communicator status about transmission request. We got: | 6440 | struct Neighbour *n; |
6379 | sma->status; // OK success, SYSERR failure | ||
6380 | 6441 | ||
6442 | GNUNET_assert (qe == pm->qe); | ||
6443 | pm->qe = NULL; | ||
6444 | /* If waiting for this communicator may have blocked transmission | ||
6445 | of pm on other queues for this neighbour, force schedule | ||
6446 | transmit on queue for queues of the neighbour */ | ||
6447 | n = pm->target; | ||
6448 | if (n->pending_msg_head == pm) | ||
6449 | { | ||
6450 | for (struct Queue *queue = n->queue_head; NULL != queue; | ||
6451 | queue = queue->next_neighbour) | ||
6452 | schedule_transmit_on_queue (queue, GNUNET_NO); | ||
6453 | } | ||
6454 | if (GNUNET_OK != ntohl (sma->status)) | ||
6455 | { | ||
6456 | GNUNET_log ( | ||
6457 | GNUNET_ERROR_TYPE_INFO, | ||
6458 | "Queue failed in transmission, will try retransmission immediately\n"); | ||
6459 | update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS); | ||
6460 | } | ||
6461 | } | ||
6381 | GNUNET_free (qe); | 6462 | GNUNET_free (qe); |
6382 | } | 6463 | } |
6383 | 6464 | ||