aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-21 12:13:33 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-21 12:13:33 +0200
commitd97224045fe41e824406f771e24c46fb89514942 (patch)
tree03addadb497a11428043946e1812f714879df06f /src/transport/gnunet-service-tng.c
parent94f367a644545eb7d4ea51902f0c7ed9e2d45193 (diff)
downloadgnunet-d97224045fe41e824406f771e24c46fb89514942.tar.gz
gnunet-d97224045fe41e824406f771e24c46fb89514942.zip
handle communicator status, address a few FIXMEs
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c197
1 files changed, 139 insertions, 58 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 62e9c0d8e..697c43f0d 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -1097,6 +1097,10 @@ struct DistanceVector
1097 */ 1097 */
1098struct Queue; 1098struct Queue;
1099 1099
1100/**
1101 * Message awaiting transmission. See detailed comments below.
1102 */
1103struct PendingMessage;
1100 1104
1101/** 1105/**
1102 * Entry identifying transmission in one of our `struct 1106 * Entry identifying transmission in one of our `struct
@@ -1126,6 +1130,11 @@ struct QueueEntry
1126 struct Queue *queue; 1130 struct Queue *queue;
1127 1131
1128 /** 1132 /**
1133 * Pending message this entry is for, or NULL for none.
1134 */
1135 struct PendingMessage *pm;
1136
1137 /**
1129 * Message ID used for this message with the queue used for transmission. 1138 * Message ID used for this message with the queue used for transmission.
1130 */ 1139 */
1131 uint64_t mid; 1140 uint64_t mid;
@@ -1585,6 +1594,16 @@ struct PendingMessage
1585 struct Neighbour *target; 1594 struct Neighbour *target;
1586 1595
1587 /** 1596 /**
1597 * Set to non-NULL value if this message is currently being given to a
1598 * communicator and we are awaiting that communicator's acknowledgement.
1599 * Note that we must not retransmit a pending message while we're still
1600 * in the process of giving it to a communicator. If a pending message
1601 * is free'd while this entry is non-NULL, the @e qe reference to us
1602 * should simply be set to NULL.
1603 */
1604 struct QueueEntry *qe;
1605
1606 /**
1588 * Client that issued the transmission request, if @e pmt is #PMT_CORE. 1607 * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1589 */ 1608 */
1590 struct TransportClient *client; 1609 struct TransportClient *client;
@@ -2472,9 +2491,12 @@ transmit_on_queue (void *cls);
2472 * be called if the message queue is non-empty! 2491 * be called if the message queue is non-empty!
2473 * 2492 *
2474 * @param queue the queue to do scheduling for 2493 * @param queue the queue to do scheduling for
2494 * @param inside_job set to #GNUNET_YES if called from
2495 * #transmit_on_queue() itself and NOT setting
2496 * the task means running immediately
2475 */ 2497 */
2476static void 2498static void
2477schedule_transmit_on_queue (struct Queue *queue) 2499schedule_transmit_on_queue (struct Queue *queue, int inside_job)
2478{ 2500{
2479 struct Neighbour *n = queue->neighbour; 2501 struct Neighbour *n = queue->neighbour;
2480 struct PendingMessage *pm = n->pending_msg_head; 2502 struct PendingMessage *pm = n->pending_msg_head;
@@ -2507,7 +2529,7 @@ schedule_transmit_on_queue (struct Queue *queue)
2507 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining ( 2529 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
2508 pm->next_attempt), 2530 pm->next_attempt),
2509 out_delay); 2531 out_delay);
2510 if (0 == out_delay.rel_value_us) 2532 if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
2511 return; /* we should run immediately! */ 2533 return; /* we should run immediately! */
2512 /* queue has changed since we were scheduled, reschedule again */ 2534 /* queue has changed since we were scheduled, reschedule again */
2513 queue->transmit_task = 2535 queue->transmit_task =
@@ -2575,6 +2597,11 @@ free_queue (struct Queue *queue)
2575 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); 2597 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
2576 queue->queue_length--; 2598 queue->queue_length--;
2577 tc->details.communicator.total_queue_length--; 2599 tc->details.communicator.total_queue_length--;
2600 if (NULL != qe->pm)
2601 {
2602 GNUNET_assert (qe == qe->pm->qe);
2603 qe->pm->qe = NULL;
2604 }
2578 GNUNET_free (qe); 2605 GNUNET_free (qe);
2579 } 2606 }
2580 GNUNET_assert (0 == queue->queue_length); 2607 GNUNET_assert (0 == queue->queue_length);
@@ -2589,7 +2616,7 @@ free_queue (struct Queue *queue)
2589 GNUNET_NO); 2616 GNUNET_NO);
2590 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; 2617 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
2591 s = s->next_client) 2618 s = s->next_client)
2592 schedule_transmit_on_queue (s); 2619 schedule_transmit_on_queue (s, GNUNET_NO);
2593 } 2620 }
2594 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); 2621 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
2595 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); 2622 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
@@ -2859,6 +2886,11 @@ free_pending_message (struct PendingMessage *pm)
2859 target->pending_msg_tail, 2886 target->pending_msg_tail,
2860 pm); 2887 pm);
2861 free_fragment_tree (pm); 2888 free_fragment_tree (pm);
2889 if (NULL != pm->qe)
2890 {
2891 GNUNET_assert (pm == pm->qe->pm);
2892 pm->qe->pm = NULL;
2893 }
2862 GNUNET_free_non_null (pm->bpm); 2894 GNUNET_free_non_null (pm->bpm);
2863 GNUNET_free (pm); 2895 GNUNET_free (pm);
2864} 2896}
@@ -3245,8 +3277,12 @@ queue_send_msg (struct Queue *queue,
3245 qe = GNUNET_new (struct QueueEntry); 3277 qe = GNUNET_new (struct QueueEntry);
3246 qe->mid = queue->mid_gen++; 3278 qe->mid = queue->mid_gen++;
3247 qe->queue = queue; 3279 qe->queue = queue;
3248 // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! 3280 if (NULL != pm)
3249 // (also, note that pm may be NULL!) 3281 {
3282 qe->pm = pm;
3283 GNUNET_assert (NULL == pm->qe);
3284 pm->qe = qe;
3285 }
3250 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); 3286 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
3251 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); 3287 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3252 queue->queue_length++; 3288 queue->queue_length++;
@@ -6038,6 +6074,60 @@ reliability_box_message (struct PendingMessage *pm)
6038 6074
6039 6075
6040/** 6076/**
6077 * Change the value of the `next_attempt` field of @a pm
6078 * to @a next_attempt and re-order @a pm in the transmission
6079 * list as required by the new timestmap.
6080 *
6081 * @param pm a pending message to update
6082 * @param next_attempt timestamp to use
6083 */
6084static void
6085update_pm_next_attempt (struct PendingMessage *pm,
6086 struct GNUNET_TIME_Absolute next_attempt)
6087{
6088 struct Neighbour *neighbour = pm->target;
6089
6090 pm->next_attempt = next_attempt;
6091 if (NULL == pm->frag_parent)
6092 {
6093 struct PendingMessage *pos;
6094
6095 /* re-insert sort in neighbour list */
6096 GNUNET_CONTAINER_MDLL_remove (neighbour,
6097 neighbour->pending_msg_head,
6098 neighbour->pending_msg_tail,
6099 pm);
6100 pos = neighbour->pending_msg_tail;
6101 while ((NULL != pos) &&
6102 (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
6103 pos = pos->prev_neighbour;
6104 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
6105 neighbour->pending_msg_head,
6106 neighbour->pending_msg_tail,
6107 pos,
6108 pm);
6109 }
6110 else
6111 {
6112 /* re-insert sort in fragment list */
6113 struct PendingMessage *fp = pm->frag_parent;
6114 struct PendingMessage *pos;
6115
6116 GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
6117 pos = fp->tail_frag;
6118 while ((NULL != pos) &&
6119 (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
6120 pos = pos->prev_frag;
6121 GNUNET_CONTAINER_MDLL_insert_after (frag,
6122 fp->head_frag,
6123 fp->tail_frag,
6124 pos,
6125 pm);
6126 }
6127}
6128
6129
6130/**
6041 * We believe we are ready to transmit a message on a queue. Double-checks 6131 * We believe we are ready to transmit a message on a queue. Double-checks
6042 * with the queue's "tracker_out" and then gives the message to the 6132 * with the queue's "tracker_out" and then gives the message to the
6043 * communicator for transmission (updating the tracker, and re-scheduling 6133 * communicator for transmission (updating the tracker, and re-scheduling
@@ -6060,7 +6150,13 @@ transmit_on_queue (void *cls)
6060 /* no message pending, nothing to do here! */ 6150 /* no message pending, nothing to do here! */
6061 return; 6151 return;
6062 } 6152 }
6063 schedule_transmit_on_queue (queue); 6153 if (NULL != pm->qe)
6154 {
6155 /* message still pending with communciator!
6156 LOGGING-FIXME: Use stats? logging? Should this not be rare? */
6157 return;
6158 }
6159 schedule_transmit_on_queue (queue, GNUNET_YES);
6064 if (NULL != queue->transmit_task) 6160 if (NULL != queue->transmit_task)
6065 return; /* do it later */ 6161 return; /* do it later */
6066 overhead = 0; 6162 overhead = 0;
@@ -6081,7 +6177,7 @@ transmit_on_queue (void *cls)
6081 if (NULL == s) 6177 if (NULL == s)
6082 { 6178 {
6083 /* Fragmentation failed, try next message... */ 6179 /* Fragmentation failed, try next message... */
6084 schedule_transmit_on_queue (queue); 6180 schedule_transmit_on_queue (queue, GNUNET_NO);
6085 return; 6181 return;
6086 } 6182 }
6087 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) 6183 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
@@ -6089,7 +6185,7 @@ transmit_on_queue (void *cls)
6089 if (NULL == s) 6185 if (NULL == s)
6090 { 6186 {
6091 /* Reliability boxing failed, try next message... */ 6187 /* Reliability boxing failed, try next message... */
6092 schedule_transmit_on_queue (queue); 6188 schedule_transmit_on_queue (queue, GNUNET_NO);
6093 return; 6189 return;
6094 } 6190 }
6095 6191
@@ -6141,57 +6237,21 @@ transmit_on_queue (void *cls)
6141 } 6237 }
6142 else 6238 else
6143 { 6239 {
6144 /* message not finished, waiting for acknowledgement */ 6240 /* Message not finished, waiting for acknowledgement.
6145 struct Neighbour *neighbour = pm->target; 6241 Update time by which we might retransmit 's' based on queue
6146 /* Update time by which we might retransmit 's' based on queue
6147 characteristics (i.e. RTT); it takes one RTT for the message to 6242 characteristics (i.e. RTT); it takes one RTT for the message to
6148 arrive and the ACK to come back in the best case; but the other 6243 arrive and the ACK to come back in the best case; but the other
6149 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before 6244 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
6150 retransmitting. Note that in the future this heuristic should 6245 retransmitting. Note that in the future this heuristic should
6151 likely be improved further (measure RTT stability, consider 6246 likely be improved further (measure RTT stability, consider
6152 message urgency and size when delaying ACKs, etc.) */ 6247 message urgency and size when delaying ACKs, etc.) */
6153 s->next_attempt = GNUNET_TIME_relative_to_absolute ( 6248 update_pm_next_attempt (s,
6154 GNUNET_TIME_relative_multiply (queue->rtt, 4)); 6249 GNUNET_TIME_relative_to_absolute (
6155 if (s == pm) 6250 GNUNET_TIME_relative_multiply (queue->rtt, 4)));
6156 {
6157 struct PendingMessage *pos;
6158
6159 /* re-insert sort in neighbour list */
6160 GNUNET_CONTAINER_MDLL_remove (neighbour,
6161 neighbour->pending_msg_head,
6162 neighbour->pending_msg_tail,
6163 pm);
6164 pos = neighbour->pending_msg_tail;
6165 while ((NULL != pos) &&
6166 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
6167 pos = pos->prev_neighbour;
6168 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
6169 neighbour->pending_msg_head,
6170 neighbour->pending_msg_tail,
6171 pos,
6172 pm);
6173 }
6174 else
6175 {
6176 /* re-insert sort in fragment list */
6177 struct PendingMessage *fp = s->frag_parent;
6178 struct PendingMessage *pos;
6179
6180 GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s);
6181 pos = fp->tail_frag;
6182 while ((NULL != pos) &&
6183 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
6184 pos = pos->prev_frag;
6185 GNUNET_CONTAINER_MDLL_insert_after (frag,
6186 fp->head_frag,
6187 fp->tail_frag,
6188 pos,
6189 s);
6190 }
6191 } 6251 }
6192 6252
6193 /* finally, re-schedule queue transmission task itself */ 6253 /* finally, re-schedule queue transmission task itself */
6194 schedule_transmit_on_queue (queue); 6254 schedule_transmit_on_queue (queue, GNUNET_NO);
6195} 6255}
6196 6256
6197 6257
@@ -6216,7 +6276,7 @@ tracker_update_out_cb (void *cls)
6216 } 6276 }
6217 GNUNET_SCHEDULER_cancel (queue->transmit_task); 6277 GNUNET_SCHEDULER_cancel (queue->transmit_task);
6218 queue->transmit_task = NULL; 6278 queue->transmit_task = NULL;
6219 schedule_transmit_on_queue (queue); 6279 schedule_transmit_on_queue (queue, GNUNET_NO);
6220} 6280}
6221 6281
6222 6282
@@ -6309,6 +6369,7 @@ handle_send_message_ack (void *cls,
6309{ 6369{
6310 struct TransportClient *tc = cls; 6370 struct TransportClient *tc = cls;
6311 struct QueueEntry *qe; 6371 struct QueueEntry *qe;
6372 struct PendingMessage *pm;
6312 6373
6313 if (CT_COMMUNICATOR != tc->type) 6374 if (CT_COMMUNICATOR != tc->type)
6314 { 6375 {
@@ -6352,7 +6413,8 @@ handle_send_message_ack (void *cls,
6352 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == 6413 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
6353 tc->details.communicator.total_queue_length) 6414 tc->details.communicator.total_queue_length)
6354 { 6415 {
6355 /* Communicator dropped below threshold, resume all queues */ 6416 /* Communicator dropped below threshold, resume all queues
6417 incident with this client! */
6356 GNUNET_STATISTICS_update ( 6418 GNUNET_STATISTICS_update (
6357 GST_stats, 6419 GST_stats,
6358 "# Transmission throttled due to communicator queue limit", 6420 "# Transmission throttled due to communicator queue limit",
@@ -6361,7 +6423,7 @@ handle_send_message_ack (void *cls,
6361 for (struct Queue *queue = tc->details.communicator.queue_head; 6423 for (struct Queue *queue = tc->details.communicator.queue_head;
6362 NULL != queue; 6424 NULL != queue;
6363 queue = queue->next_client) 6425 queue = queue->next_client)
6364 schedule_transmit_on_queue (queue); 6426 schedule_transmit_on_queue (queue, GNUNET_NO);
6365 } 6427 }
6366 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) 6428 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
6367 { 6429 {
@@ -6370,14 +6432,33 @@ handle_send_message_ack (void *cls,
6370 "# Transmission throttled due to queue queue limit", 6432 "# Transmission throttled due to queue queue limit",
6371 -1, 6433 -1,
6372 GNUNET_NO); 6434 GNUNET_NO);
6373 schedule_transmit_on_queue (qe->queue); 6435 schedule_transmit_on_queue (qe->queue, GNUNET_NO);
6374 } 6436 }
6375 6437
6376 /* TODO: we also should react on the status! */ 6438 if (NULL != (pm = qe->pm))
6377 // FIXME: this probably requires queue->pm = s assignment! 6439 {
6378 // FIXME: react to communicator status about transmission request. We got: 6440 struct Neighbour *n;
6379 sma->status; // OK success, SYSERR failure
6380 6441
6442 GNUNET_assert (qe == pm->qe);
6443 pm->qe = NULL;
6444 /* If waiting for this communicator may have blocked transmission
6445 of pm on other queues for this neighbour, force schedule
6446 transmit on queue for queues of the neighbour */
6447 n = pm->target;
6448 if (n->pending_msg_head == pm)
6449 {
6450 for (struct Queue *queue = n->queue_head; NULL != queue;
6451 queue = queue->next_neighbour)
6452 schedule_transmit_on_queue (queue, GNUNET_NO);
6453 }
6454 if (GNUNET_OK != ntohl (sma->status))
6455 {
6456 GNUNET_log (
6457 GNUNET_ERROR_TYPE_INFO,
6458 "Queue failed in transmission, will try retransmission immediately\n");
6459 update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
6460 }
6461 }
6381 GNUNET_free (qe); 6462 GNUNET_free (qe);
6382} 6463}
6383 6464