diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-05-11 22:13:47 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-05-11 22:13:47 +0200 |
commit | 84b3c87161116786074b16f54f2d22e526421db0 (patch) | |
tree | e29ac20eaf0b84c357a6ed99134b05f54804f187 /src/transport/gnunet-service-tng.c | |
parent | dc4902a536daa8e46dbbd79a9fa98498b5e848b3 (diff) | |
download | gnunet-84b3c87161116786074b16f54f2d22e526421db0.tar.gz gnunet-84b3c87161116786074b16f54f2d22e526421db0.zip |
clean up transmission logic to have queues 'pull' for pending messages while control traffic is 'pushed' into queues
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 702 |
1 files changed, 445 insertions, 257 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index f07e1c88d..56cf61c2b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -24,15 +24,8 @@ | |||
24 | * | 24 | * |
25 | * TODO: | 25 | * TODO: |
26 | * Implement next: | 26 | * Implement next: |
27 | * - realize "pull" based logic (#handle_client_send()) for | 27 | * - FIXME-NEXT: logic to decide which pm to pick for a given queue (sorting!) |
28 | * `struct PendingMessage` which waits for a queue on any | 28 | * - FIXME-FC: realize transport-to-transport flow control (needed in case |
29 | * applicable route to be 'ready', in contrast | ||
30 | * to the 'push' based routing we use for control messages. | ||
31 | * Basically, when a queue goes idle, it should "search" | ||
32 | * via its neighbour for either virtual links or DVH's that | ||
33 | * have it as first hop and then find messages in those | ||
34 | * virtual links! | ||
35 | * - realize transport-to-transport flow control (needed in case | ||
36 | * communicators do not offer flow control). Note that we may not | 29 | * communicators do not offer flow control). Note that we may not |
37 | * want to simply delay the ACKs as that may cause unnecessary | 30 | * want to simply delay the ACKs as that may cause unnecessary |
38 | * re-transmissions. => Introduce proper flow and congestion window(s)! | 31 | * re-transmissions. => Introduce proper flow and congestion window(s)! |
@@ -1375,7 +1368,7 @@ struct DistanceVector | |||
1375 | * Do we have a confirmed working queue and are thus visible to | 1368 | * Do we have a confirmed working queue and are thus visible to |
1376 | * CORE? If so, this is the virtual link, otherwise NULL. | 1369 | * CORE? If so, this is the virtual link, otherwise NULL. |
1377 | */ | 1370 | */ |
1378 | struct VirtualLink *link; | 1371 | struct VirtualLink *vl; |
1379 | 1372 | ||
1380 | /** | 1373 | /** |
1381 | * Signature affirming @e ephemeral_key of type | 1374 | * Signature affirming @e ephemeral_key of type |
@@ -1565,6 +1558,12 @@ struct Queue | |||
1565 | * Connection status for this queue. | 1558 | * Connection status for this queue. |
1566 | */ | 1559 | */ |
1567 | enum GNUNET_TRANSPORT_ConnectionStatus cs; | 1560 | enum GNUNET_TRANSPORT_ConnectionStatus cs; |
1561 | |||
1562 | /** | ||
1563 | * Set to #GNUNET_YES if this queue is idle waiting for some | ||
1564 | * virtual link to give it a pending message. | ||
1565 | */ | ||
1566 | int idle; | ||
1568 | }; | 1567 | }; |
1569 | 1568 | ||
1570 | 1569 | ||
@@ -1696,7 +1695,7 @@ struct Neighbour | |||
1696 | * Do we have a confirmed working queue and are thus visible to | 1695 | * Do we have a confirmed working queue and are thus visible to |
1697 | * CORE? If so, this is the virtual link, otherwise NULL. | 1696 | * CORE? If so, this is the virtual link, otherwise NULL. |
1698 | */ | 1697 | */ |
1699 | struct VirtualLink *link; | 1698 | struct VirtualLink *vl; |
1700 | 1699 | ||
1701 | /** | 1700 | /** |
1702 | * Latest DVLearn monotonic time seen from this peer. Initialized only | 1701 | * Latest DVLearn monotonic time seen from this peer. Initialized only |
@@ -1766,17 +1765,7 @@ enum PendingMessageType | |||
1766 | /** | 1765 | /** |
1767 | * Reliability box. | 1766 | * Reliability box. |
1768 | */ | 1767 | */ |
1769 | PMT_RELIABILITY_BOX = 2, | 1768 | PMT_RELIABILITY_BOX = 2 |
1770 | |||
1771 | /** | ||
1772 | * Any type of acknowledgement. | ||
1773 | */ | ||
1774 | PMT_ACKNOWLEDGEMENT = 3, | ||
1775 | |||
1776 | /** | ||
1777 | * Control traffic generated by the TRANSPORT service itself. | ||
1778 | */ | ||
1779 | PMT_CONTROL = 4 | ||
1780 | 1769 | ||
1781 | }; | 1770 | }; |
1782 | 1771 | ||
@@ -2752,6 +2741,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh) | |||
2752 | 2741 | ||
2753 | 2742 | ||
2754 | /** | 2743 | /** |
2744 | * Task run to check whether the hops of the @a cls still | ||
2745 | * are validated, or if we need to core about disconnection. | ||
2746 | * | ||
2747 | * @param cls a `struct VirtualLink` | ||
2748 | */ | ||
2749 | static void | ||
2750 | check_link_down (void *cls); | ||
2751 | |||
2752 | |||
2753 | /** | ||
2754 | * Send message to CORE clients that we lost a connection. | ||
2755 | * | ||
2756 | * @param pid peer the connection was for | ||
2757 | */ | ||
2758 | static void | ||
2759 | cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) | ||
2760 | { | ||
2761 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2762 | "Informing CORE clients about disconnect from %s\n", | ||
2763 | GNUNET_i2s (pid)); | ||
2764 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | ||
2765 | { | ||
2766 | struct GNUNET_MQ_Envelope *env; | ||
2767 | struct DisconnectInfoMessage *dim; | ||
2768 | |||
2769 | if (CT_CORE != tc->type) | ||
2770 | continue; | ||
2771 | env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); | ||
2772 | dim->peer = *pid; | ||
2773 | GNUNET_MQ_send (tc->mq, env); | ||
2774 | } | ||
2775 | } | ||
2776 | |||
2777 | |||
2778 | /** | ||
2755 | * Free entry in #dv_routes. First frees all hops to the target, and | 2779 | * Free entry in #dv_routes. First frees all hops to the target, and |
2756 | * if there are no entries left, frees @a dv as well. | 2780 | * if there are no entries left, frees @a dv as well. |
2757 | * | 2781 | * |
@@ -2766,11 +2790,33 @@ free_dv_route (struct DistanceVector *dv) | |||
2766 | free_distance_vector_hop (dvh); | 2790 | free_distance_vector_hop (dvh); |
2767 | if (NULL == dv->dv_head) | 2791 | if (NULL == dv->dv_head) |
2768 | { | 2792 | { |
2793 | struct VirtualLink *vl; | ||
2794 | |||
2769 | GNUNET_assert ( | 2795 | GNUNET_assert ( |
2770 | GNUNET_YES == | 2796 | GNUNET_YES == |
2771 | GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); | 2797 | GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); |
2798 | if (NULL != (vl = dv->vl)) | ||
2799 | { | ||
2800 | GNUNET_assert (dv == vl->dv); | ||
2801 | vl->dv = NULL; | ||
2802 | if (NULL == vl->n) | ||
2803 | { | ||
2804 | cores_send_disconnect_info (&dv->target); | ||
2805 | free_virtual_link (vl); | ||
2806 | } | ||
2807 | else | ||
2808 | { | ||
2809 | GNUNET_SCHEDULER_cancel (vl->visibility_task); | ||
2810 | vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl); | ||
2811 | } | ||
2812 | dv->vl = NULL; | ||
2813 | } | ||
2814 | |||
2772 | if (NULL != dv->timeout_task) | 2815 | if (NULL != dv->timeout_task) |
2816 | { | ||
2773 | GNUNET_SCHEDULER_cancel (dv->timeout_task); | 2817 | GNUNET_SCHEDULER_cancel (dv->timeout_task); |
2818 | dv->timeout_task = NULL; | ||
2819 | } | ||
2774 | GNUNET_free (dv); | 2820 | GNUNET_free (dv); |
2775 | } | 2821 | } |
2776 | } | 2822 | } |
@@ -2950,6 +2996,7 @@ static void | |||
2950 | free_neighbour (struct Neighbour *neighbour) | 2996 | free_neighbour (struct Neighbour *neighbour) |
2951 | { | 2997 | { |
2952 | struct DistanceVectorHop *dvh; | 2998 | struct DistanceVectorHop *dvh; |
2999 | struct VirtualLink *vl; | ||
2953 | 3000 | ||
2954 | GNUNET_assert (NULL == neighbour->queue_head); | 3001 | GNUNET_assert (NULL == neighbour->queue_head); |
2955 | GNUNET_assert (GNUNET_YES == | 3002 | GNUNET_assert (GNUNET_YES == |
@@ -2989,6 +3036,22 @@ free_neighbour (struct Neighbour *neighbour) | |||
2989 | GNUNET_PEERSTORE_store_cancel (neighbour->sc); | 3036 | GNUNET_PEERSTORE_store_cancel (neighbour->sc); |
2990 | neighbour->sc = NULL; | 3037 | neighbour->sc = NULL; |
2991 | } | 3038 | } |
3039 | if (NULL != (vl = neighbour->vl)) | ||
3040 | { | ||
3041 | GNUNET_assert (neighbour == vl->n); | ||
3042 | vl->n = NULL; | ||
3043 | if (NULL == vl->dv) | ||
3044 | { | ||
3045 | cores_send_disconnect_info (&vl->target); | ||
3046 | free_virtual_link (vl); | ||
3047 | } | ||
3048 | else | ||
3049 | { | ||
3050 | GNUNET_SCHEDULER_cancel (vl->visibility_task); | ||
3051 | vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl); | ||
3052 | } | ||
3053 | neighbour->vl = NULL; | ||
3054 | } | ||
2992 | GNUNET_free (neighbour); | 3055 | GNUNET_free (neighbour); |
2993 | } | 3056 | } |
2994 | 3057 | ||
@@ -3034,31 +3097,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) | |||
3034 | 3097 | ||
3035 | 3098 | ||
3036 | /** | 3099 | /** |
3037 | * Send message to CORE clients that we lost a connection. | ||
3038 | * | ||
3039 | * @param pid peer the connection was for | ||
3040 | */ | ||
3041 | static void | ||
3042 | cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) | ||
3043 | { | ||
3044 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3045 | "Informing CORE clients about disconnect from %s\n", | ||
3046 | GNUNET_i2s (pid)); | ||
3047 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | ||
3048 | { | ||
3049 | struct GNUNET_MQ_Envelope *env; | ||
3050 | struct DisconnectInfoMessage *dim; | ||
3051 | |||
3052 | if (CT_CORE != tc->type) | ||
3053 | continue; | ||
3054 | env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); | ||
3055 | dim->peer = *pid; | ||
3056 | GNUNET_MQ_send (tc->mq, env); | ||
3057 | } | ||
3058 | } | ||
3059 | |||
3060 | |||
3061 | /** | ||
3062 | * We believe we are ready to transmit a message on a queue. Gives the | 3100 | * We believe we are ready to transmit a message on a queue. Gives the |
3063 | * message to the communicator for transmission (updating the tracker, | 3101 | * message to the communicator for transmission (updating the tracker, |
3064 | * and re-scheduling itself if applicable). | 3102 | * and re-scheduling itself if applicable). |
@@ -3070,19 +3108,15 @@ transmit_on_queue (void *cls); | |||
3070 | 3108 | ||
3071 | 3109 | ||
3072 | /** | 3110 | /** |
3073 | * Schedule next run of #transmit_on_queue(). Does NOTHING if | 3111 | * Called whenever something changed that might effect when we |
3074 | * we should run immediately or if the message queue is empty. | 3112 | * try to do the next transmission on @a queue using #transmit_on_queue(). |
3075 | * Test for no task being added AND queue not being empty to | ||
3076 | * transmit immediately afterwards! This function must only | ||
3077 | * be called if the message queue is non-empty! | ||
3078 | * | 3113 | * |
3079 | * @param queue the queue to do scheduling for | 3114 | * @param queue the queue to do scheduling for |
3080 | * @param inside_job set to #GNUNET_YES if called from | 3115 | * @param p task priority to use, if @a queue is scheduled |
3081 | * #transmit_on_queue() itself and NOT setting | ||
3082 | * the task means running immediately | ||
3083 | */ | 3116 | */ |
3084 | static void | 3117 | static void |
3085 | schedule_transmit_on_queue (struct Queue *queue, int inside_job) | 3118 | schedule_transmit_on_queue (struct Queue *queue, |
3119 | enum GNUNET_SCHEDULER_Priority p) | ||
3086 | { | 3120 | { |
3087 | if (queue->tc->details.communicator.total_queue_length >= | 3121 | if (queue->tc->details.communicator.total_queue_length >= |
3088 | COMMUNICATOR_TOTAL_QUEUE_LIMIT) | 3122 | COMMUNICATOR_TOTAL_QUEUE_LIMIT) |
@@ -3092,6 +3126,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) | |||
3092 | "# Transmission throttled due to communicator queue limit", | 3126 | "# Transmission throttled due to communicator queue limit", |
3093 | 1, | 3127 | 1, |
3094 | GNUNET_NO); | 3128 | GNUNET_NO); |
3129 | queue->idle = GNUNET_NO; | ||
3095 | return; | 3130 | return; |
3096 | } | 3131 | } |
3097 | if (queue->queue_length >= QUEUE_LENGTH_LIMIT) | 3132 | if (queue->queue_length >= QUEUE_LENGTH_LIMIT) |
@@ -3100,38 +3135,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) | |||
3100 | "# Transmission throttled due to queue queue limit", | 3135 | "# Transmission throttled due to queue queue limit", |
3101 | 1, | 3136 | 1, |
3102 | GNUNET_NO); | 3137 | GNUNET_NO); |
3138 | queue->idle = GNUNET_NO; | ||
3103 | return; | 3139 | return; |
3104 | } | 3140 | } |
3105 | #if FIXME - NEXT | 3141 | /* queue might indeed be ready, schedule it */ |
3106 | struct Neighbour *n = queue->neighbour; | 3142 | if (NULL != queue->transmit_task) |
3107 | struct GNUNET_TIME_Relative out_delay; | 3143 | GNUNET_SCHEDULER_cancel (queue->transmit_task); |
3108 | |||
3109 | if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) | ||
3110 | { | ||
3111 | GNUNET_log ( | ||
3112 | GNUNET_ERROR_TYPE_DEBUG, | ||
3113 | "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n", | ||
3114 | pm->logging_uuid, | ||
3115 | (unsigned long long) queue->qid, | ||
3116 | GNUNET_i2s (&n->pid)); | ||
3117 | return; /* we should run immediately! */ | ||
3118 | } | ||
3119 | /* queue has changed since we were scheduled, reschedule again */ | ||
3120 | queue->transmit_task = | 3144 | queue->transmit_task = |
3121 | GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); | 3145 | GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue); |
3122 | if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) | 3146 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3123 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 3147 | "Considering transmission on queue `%s' to %s\n", |
3124 | "Next transmission <%llu> on queue `%s' in %s (high delay)\n", | 3148 | queue->address, |
3125 | pm->logging_uuid, | 3149 | GNUNET_i2s (&queue->neighbour->pid)); |
3126 | queue->address, | ||
3127 | GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); | ||
3128 | else | ||
3129 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3130 | "Next transmission <%llu> on queue `%s' in %s\n", | ||
3131 | pm->logging_uuid, | ||
3132 | queue->address, | ||
3133 | GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); | ||
3134 | #endif | ||
3135 | } | 3150 | } |
3136 | 3151 | ||
3137 | 3152 | ||
@@ -3156,15 +3171,21 @@ check_link_down (void *cls) | |||
3156 | pos = pos->next_dv) | 3171 | pos = pos->next_dv) |
3157 | dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); | 3172 | dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); |
3158 | if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) | 3173 | if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) |
3174 | { | ||
3175 | vl->dv->vl = NULL; | ||
3159 | vl->dv = NULL; | 3176 | vl->dv = NULL; |
3177 | } | ||
3160 | q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; | 3178 | q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; |
3161 | for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) | 3179 | for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) |
3162 | q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); | 3180 | q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); |
3163 | if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) | 3181 | if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us) |
3182 | { | ||
3183 | vl->n->vl = NULL; | ||
3164 | vl->n = NULL; | 3184 | vl->n = NULL; |
3185 | } | ||
3165 | if ((NULL == vl->n) && (NULL == vl->dv)) | 3186 | if ((NULL == vl->n) && (NULL == vl->dv)) |
3166 | { | 3187 | { |
3167 | cores_send_disconnect_info (&dv->target); | 3188 | cores_send_disconnect_info (&vl->target); |
3168 | free_virtual_link (vl); | 3189 | free_virtual_link (vl); |
3169 | return; | 3190 | return; |
3170 | } | 3191 | } |
@@ -3229,7 +3250,7 @@ free_queue (struct Queue *queue) | |||
3229 | if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < | 3250 | if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < |
3230 | tc->details.communicator.total_queue_length)) | 3251 | tc->details.communicator.total_queue_length)) |
3231 | { | 3252 | { |
3232 | /* Communicator dropped below threshold, resume all queues */ | 3253 | /* Communicator dropped below threshold, resume all _other_ queues */ |
3233 | GNUNET_STATISTICS_update ( | 3254 | GNUNET_STATISTICS_update ( |
3234 | GST_stats, | 3255 | GST_stats, |
3235 | "# Transmission throttled due to communicator queue limit", | 3256 | "# Transmission throttled due to communicator queue limit", |
@@ -3237,7 +3258,7 @@ free_queue (struct Queue *queue) | |||
3237 | GNUNET_NO); | 3258 | GNUNET_NO); |
3238 | for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; | 3259 | for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; |
3239 | s = s->next_client) | 3260 | s = s->next_client) |
3240 | schedule_transmit_on_queue (s, GNUNET_NO); | 3261 | schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
3241 | } | 3262 | } |
3242 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); | 3263 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); |
3243 | GNUNET_free (queue); | 3264 | GNUNET_free (queue); |
@@ -3580,6 +3601,79 @@ pick_random_dv_hops (const struct DistanceVector *dv, | |||
3580 | 3601 | ||
3581 | 3602 | ||
3582 | /** | 3603 | /** |
3604 | * There is a message at the head of the pending messages for @a vl | ||
3605 | * which may be ready for transmission. Check if a queue is ready to | ||
3606 | * take it. | ||
3607 | * | ||
3608 | * This function must (1) check for flow control to ensure that we can | ||
3609 | * right now send to @a vl, (2) check that the pending message in the | ||
3610 | * queue is actually eligible, (3) determine if any applicable queue | ||
3611 | * (direct neighbour or DVH path) is ready to accept messages, and | ||
3612 | * (4) prioritize based on the preferences associated with the | ||
3613 | * pending message. | ||
3614 | * | ||
3615 | * So yeah, easy. | ||
3616 | * | ||
3617 | * @param vl virtual link where we should check for transmission | ||
3618 | */ | ||
3619 | static void | ||
3620 | check_vl_transmission (struct VirtualLink *vl) | ||
3621 | { | ||
3622 | struct Neighbour *n = vl->n; | ||
3623 | struct DistanceVector *dv = vl->dv; | ||
3624 | struct GNUNET_TIME_Absolute now; | ||
3625 | int elig; | ||
3626 | |||
3627 | /* FIXME-FC: need to implement virtual link flow control! */ | ||
3628 | |||
3629 | /* Check that we have an eligible pending message! | ||
3630 | (cheaper than having #transmit_on_queue() find out!) */ | ||
3631 | elig = GNUNET_NO; | ||
3632 | for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; | ||
3633 | pm = pm->next_vl) | ||
3634 | { | ||
3635 | if (NULL != pm->qe) | ||
3636 | continue; /* not eligible, is in a queue! */ | ||
3637 | elig = GNUNET_YES; | ||
3638 | break; | ||
3639 | } | ||
3640 | if (GNUNET_NO == elig) | ||
3641 | return; | ||
3642 | |||
3643 | /* Notify queues at direct neighbours that we are interested */ | ||
3644 | now = GNUNET_TIME_absolute_get (); | ||
3645 | if (NULL != n) | ||
3646 | { | ||
3647 | for (struct Queue *queue = n->queue_head; NULL != queue; | ||
3648 | queue = queue->next_neighbour) | ||
3649 | if ((GNUNET_YES == queue->idle) && | ||
3650 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
3651 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
3652 | } | ||
3653 | /* Notify queues via DV that we are interested */ | ||
3654 | if (NULL != dv) | ||
3655 | { | ||
3656 | /* Do DV with lower scheduler priority, which effectively means that | ||
3657 | IF a neighbour exists and is available, we prefer it. */ | ||
3658 | for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; | ||
3659 | pos = pos->next_dv) | ||
3660 | { | ||
3661 | struct Neighbour *nh = pos->next_hop; | ||
3662 | |||
3663 | if (pos->path_valid_until.abs_value_us <= now.abs_value_us) | ||
3664 | continue; /* skip this one: path not validated */ | ||
3665 | for (struct Queue *queue = nh->queue_head; NULL != queue; | ||
3666 | queue = queue->next_neighbour) | ||
3667 | if ((GNUNET_YES == queue->idle) && | ||
3668 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
3669 | schedule_transmit_on_queue (queue, | ||
3670 | GNUNET_SCHEDULER_PRIORITY_BACKGROUND); | ||
3671 | } | ||
3672 | } | ||
3673 | } | ||
3674 | |||
3675 | |||
3676 | /** | ||
3583 | * Client asked for transmission to a peer. Process the request. | 3677 | * Client asked for transmission to a peer. Process the request. |
3584 | * | 3678 | * |
3585 | * @param cls the client | 3679 | * @param cls the client |
@@ -3594,7 +3688,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3594 | uint32_t bytes_msg; | 3688 | uint32_t bytes_msg; |
3595 | struct VirtualLink *vl; | 3689 | struct VirtualLink *vl; |
3596 | enum GNUNET_MQ_PriorityPreferences pp; | 3690 | enum GNUNET_MQ_PriorityPreferences pp; |
3597 | int was_empty; | ||
3598 | 3691 | ||
3599 | GNUNET_assert (CT_CORE == tc->type); | 3692 | GNUNET_assert (CT_CORE == tc->type); |
3600 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; | 3693 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; |
@@ -3631,32 +3724,11 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3631 | tc->details.core.pending_msg_head, | 3724 | tc->details.core.pending_msg_head, |
3632 | tc->details.core.pending_msg_tail, | 3725 | tc->details.core.pending_msg_tail, |
3633 | pm); | 3726 | pm); |
3634 | was_empty = (NULL == vl->pending_msg_head); | ||
3635 | GNUNET_CONTAINER_MDLL_insert (vl, | 3727 | GNUNET_CONTAINER_MDLL_insert (vl, |
3636 | vl->pending_msg_head, | 3728 | vl->pending_msg_head, |
3637 | vl->pending_msg_tail, | 3729 | vl->pending_msg_tail, |
3638 | pm); | 3730 | pm); |
3639 | if (! was_empty) | 3731 | check_vl_transmission (vl); |
3640 | return; /* all queues must already be busy */ | ||
3641 | #if 0 | ||
3642 | // FIXME: check if any DVH or neighbour queue of 'vl' | ||
3643 | // is ready for transmission now. If so, encapsulate | ||
3644 | // 'pm' accordingly and send! | ||
3645 | for (struct Queue *queue = target->queue_head; NULL != queue; | ||
3646 | queue = queue->next_neighbour) | ||
3647 | { | ||
3648 | /* try transmission on any queue that is idle */ | ||
3649 | if (NULL == queue->transmit_task) | ||
3650 | { | ||
3651 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3652 | "Queue %llu to %s is idle, triggering transmission\n", | ||
3653 | (unsigned long long) queue->qid, | ||
3654 | GNUNET_i2s (&queue->neighbour->pid)); | ||
3655 | queue->transmit_task = | ||
3656 | GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue); | ||
3657 | } | ||
3658 | } | ||
3659 | #endif | ||
3660 | } | 3732 | } |
3661 | 3733 | ||
3662 | 3734 | ||
@@ -3861,7 +3933,7 @@ update_ephemeral (struct DistanceVector *dv) | |||
3861 | 3933 | ||
3862 | 3934 | ||
3863 | /** | 3935 | /** |
3864 | * Send the control message @a payload on @a queue. | 3936 | * Send the message @a payload on @a queue. |
3865 | * | 3937 | * |
3866 | * @param queue the queue to use for transmission | 3938 | * @param queue the queue to use for transmission |
3867 | * @param pm pending message to update once transmission is done, may be NULL! | 3939 | * @param pm pending message to update once transmission is done, may be NULL! |
@@ -3879,6 +3951,7 @@ queue_send_msg (struct Queue *queue, | |||
3879 | struct GNUNET_TRANSPORT_SendMessageTo *smt; | 3951 | struct GNUNET_TRANSPORT_SendMessageTo *smt; |
3880 | struct GNUNET_MQ_Envelope *env; | 3952 | struct GNUNET_MQ_Envelope *env; |
3881 | 3953 | ||
3954 | queue->idle = GNUNET_NO; | ||
3882 | GNUNET_log ( | 3955 | GNUNET_log ( |
3883 | GNUNET_ERROR_TYPE_DEBUG, | 3956 | GNUNET_ERROR_TYPE_DEBUG, |
3884 | "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n", | 3957 | "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n", |
@@ -3910,6 +3983,11 @@ queue_send_msg (struct Queue *queue, | |||
3910 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | 3983 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); |
3911 | queue->queue_length++; | 3984 | queue->queue_length++; |
3912 | queue->tc->details.communicator.total_queue_length++; | 3985 | queue->tc->details.communicator.total_queue_length++; |
3986 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT == | ||
3987 | queue->tc->details.communicator.total_queue_length) | ||
3988 | queue->idle = GNUNET_NO; | ||
3989 | if (QUEUE_LENGTH_LIMIT == queue->queue_length) | ||
3990 | queue->idle = GNUNET_NO; | ||
3913 | GNUNET_MQ_send (queue->tc->mq, env); | 3991 | GNUNET_MQ_send (queue->tc->mq, env); |
3914 | } | 3992 | } |
3915 | } | 3993 | } |
@@ -5210,6 +5288,50 @@ update_dvh_performance (struct DistanceVectorHop *dvh, | |||
5210 | 5288 | ||
5211 | 5289 | ||
5212 | /** | 5290 | /** |
5291 | * We have completed transmission of @a pm, remove it from | ||
5292 | * the transmission queues (and if it is a fragment, continue | ||
5293 | * up the tree as necessary). | ||
5294 | * | ||
5295 | * @param pm pending message that was transmitted | ||
5296 | */ | ||
5297 | static void | ||
5298 | completed_pending_message (struct PendingMessage *pm) | ||
5299 | { | ||
5300 | struct PendingMessage *pos; | ||
5301 | |||
5302 | switch (pm->pmt) | ||
5303 | { | ||
5304 | case PMT_CORE: | ||
5305 | case PMT_RELIABILITY_BOX: | ||
5306 | /* Full message sent, we are done */ | ||
5307 | client_send_response (pm); | ||
5308 | return; | ||
5309 | case PMT_FRAGMENT_BOX: | ||
5310 | /* Fragment sent over reliabile channel */ | ||
5311 | free_fragment_tree (pm); | ||
5312 | pos = pm->frag_parent; | ||
5313 | GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); | ||
5314 | GNUNET_free (pm); | ||
5315 | /* check if subtree is done */ | ||
5316 | while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) && | ||
5317 | (pos != pm)) | ||
5318 | { | ||
5319 | pm = pos; | ||
5320 | pos = pm->frag_parent; | ||
5321 | GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); | ||
5322 | GNUNET_free (pm); | ||
5323 | } | ||
5324 | |||
5325 | /* Was this the last applicable fragmment? */ | ||
5326 | if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) && | ||
5327 | (pos->frag_off == pos->bytes_msg)) | ||
5328 | client_send_response (pos); | ||
5329 | return; | ||
5330 | } | ||
5331 | } | ||
5332 | |||
5333 | |||
5334 | /** | ||
5213 | * The @a pa was acknowledged, process the acknowledgement. | 5335 | * The @a pa was acknowledged, process the acknowledgement. |
5214 | * | 5336 | * |
5215 | * @param pa the pending acknowledgement that was satisfied | 5337 | * @param pa the pending acknowledgement that was satisfied |
@@ -5220,7 +5342,6 @@ static void | |||
5220 | handle_acknowledged (struct PendingAcknowledgement *pa, | 5342 | handle_acknowledged (struct PendingAcknowledgement *pa, |
5221 | struct GNUNET_TIME_Relative ack_delay) | 5343 | struct GNUNET_TIME_Relative ack_delay) |
5222 | { | 5344 | { |
5223 | struct PendingMessage *pm = pa->pm; | ||
5224 | struct GNUNET_TIME_Relative delay; | 5345 | struct GNUNET_TIME_Relative delay; |
5225 | 5346 | ||
5226 | delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time); | 5347 | delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time); |
@@ -5232,25 +5353,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa, | |||
5232 | update_queue_performance (pa->queue, delay, pa->message_size); | 5353 | update_queue_performance (pa->queue, delay, pa->message_size); |
5233 | if (NULL != pa->dvh) | 5354 | if (NULL != pa->dvh) |
5234 | update_dvh_performance (pa->dvh, delay, pa->message_size); | 5355 | update_dvh_performance (pa->dvh, delay, pa->message_size); |
5235 | if (NULL != pm) | 5356 | if (NULL != pa->pm) |
5236 | { | 5357 | completed_pending_message (pa->pm); |
5237 | if (NULL != pm->frag_parent) | ||
5238 | { | ||
5239 | pm = pm->frag_parent; | ||
5240 | free_fragment_tree (pa->pm); | ||
5241 | } | ||
5242 | while ((NULL != pm->frag_parent) && (NULL == pm->head_frag)) | ||
5243 | { | ||
5244 | struct PendingMessage *parent = pm->frag_parent; | ||
5245 | |||
5246 | free_fragment_tree (pm); | ||
5247 | pm = parent; | ||
5248 | } | ||
5249 | if (NULL != pm->head_frag) | ||
5250 | pm = NULL; /* we are done, otherwise free 'pm' below */ | ||
5251 | } | ||
5252 | if (NULL != pm) | ||
5253 | free_pending_message (pm); | ||
5254 | free_pending_acknowledgement (pa); | 5358 | free_pending_acknowledgement (pa); |
5255 | } | 5359 | } |
5256 | 5360 | ||
@@ -5494,6 +5598,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) | |||
5494 | GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); | 5598 | GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); |
5495 | vl->target = dv->target; | 5599 | vl->target = dv->target; |
5496 | vl->dv = dv; | 5600 | vl->dv = dv; |
5601 | dv->vl = vl; | ||
5497 | vl->core_recv_window = RECV_WINDOW_SIZE; | 5602 | vl->core_recv_window = RECV_WINDOW_SIZE; |
5498 | vl->visibility_task = | 5603 | vl->visibility_task = |
5499 | GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); | 5604 | GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); |
@@ -7185,6 +7290,7 @@ handle_validation_response ( | |||
7185 | vl = GNUNET_new (struct VirtualLink); | 7290 | vl = GNUNET_new (struct VirtualLink); |
7186 | vl->target = n->pid; | 7291 | vl->target = n->pid; |
7187 | vl->n = n; | 7292 | vl->n = n; |
7293 | n->vl = vl; | ||
7188 | vl->core_recv_window = RECV_WINDOW_SIZE; | 7294 | vl->core_recv_window = RECV_WINDOW_SIZE; |
7189 | vl->visibility_task = | 7295 | vl->visibility_task = |
7190 | GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); | 7296 | GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); |
@@ -7604,10 +7710,134 @@ update_pm_next_attempt (struct PendingMessage *pm, | |||
7604 | 7710 | ||
7605 | 7711 | ||
7606 | /** | 7712 | /** |
7607 | * We believe we are ready to transmit a message on a queue. | 7713 | * Context for #select_best_pending_from_link(). |
7608 | * Gives the message to the | 7714 | */ |
7609 | * communicator for transmission (updating the tracker, and re-scheduling | 7715 | struct PendingMessageScoreContext |
7610 | * itself if applicable). | 7716 | { |
7717 | /** | ||
7718 | * Set to the best message that was found, NULL for none. | ||
7719 | */ | ||
7720 | struct PendingMessage *best; | ||
7721 | |||
7722 | /** | ||
7723 | * DVH that @e best should take, or NULL for direct transmission. | ||
7724 | */ | ||
7725 | struct DistanceVectorHop *dvh; | ||
7726 | |||
7727 | /** | ||
7728 | * What is the estimated total overhead for this message? | ||
7729 | */ | ||
7730 | size_t real_overhead; | ||
7731 | |||
7732 | /** | ||
7733 | * Number of pending messages we seriously considered this time. | ||
7734 | */ | ||
7735 | unsigned int consideration_counter; | ||
7736 | |||
7737 | /** | ||
7738 | * Did we have to fragment? | ||
7739 | */ | ||
7740 | int frag; | ||
7741 | |||
7742 | /** | ||
7743 | * Did we have to reliability box? | ||
7744 | */ | ||
7745 | int relb; | ||
7746 | }; | ||
7747 | |||
7748 | |||
7749 | /** | ||
7750 | * Select the best pending message from @a vl for transmission | ||
7751 | * via @a queue. | ||
7752 | * | ||
7753 | * @param sc[in,out] best message so far (NULL for none), plus scoring data | ||
7754 | * @param queue the queue that will be used for transmission | ||
7755 | * @param vl the virtual link providing the messages | ||
7756 | * @param dvh path we are currently considering, or NULL for none | ||
7757 | * @param overhead number of bytes of overhead to be expected | ||
7758 | * from DV encapsulation (0 for without DV) | ||
7759 | */ | ||
7760 | static void | ||
7761 | select_best_pending_from_link (struct PendingMessageScoreContext *sc, | ||
7762 | struct Queue *queue, | ||
7763 | struct VirtualLink *vl, | ||
7764 | struct DistanceVectorHop *dvh, | ||
7765 | size_t overhead) | ||
7766 | { | ||
7767 | /* FIXME-NEXT: right now we ignore all the 'fancy' sorting | ||
7768 | we do on the pending message list, resulting in a | ||
7769 | linear time algorithm (PLUS linear time list management). | ||
7770 | So we should probably either avoid keeping a sorted list, | ||
7771 | or find a way to make the sorting useful here! */ | ||
7772 | for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos; | ||
7773 | pos = pos->next_vl) | ||
7774 | { | ||
7775 | size_t real_overhead = overhead; | ||
7776 | int frag; | ||
7777 | int relb; | ||
7778 | |||
7779 | if (NULL != pos->qe) | ||
7780 | continue; /* not eligible */ | ||
7781 | sc->consideration_counter++; | ||
7782 | /* determine if we have to reliability-box, if so add reliability box | ||
7783 | overhead */ | ||
7784 | relb = GNUNET_NO; | ||
7785 | if ((GNUNET_NO == frag) && | ||
7786 | (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) && | ||
7787 | (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)) | ||
7788 | { | ||
7789 | relb = GNUNET_YES; | ||
7790 | real_overhead += sizeof (struct TransportReliabilityBoxMessage); | ||
7791 | } | ||
7792 | /* determine if we have to fragment, if so add fragmentation | ||
7793 | overhead! */ | ||
7794 | frag = GNUNET_NO; | ||
7795 | if ( ( (0 != queue->mtu) && | ||
7796 | (pos->bytes_msg + real_overhead > queue->mtu) ) || | ||
7797 | (pos->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) || | ||
7798 | (NULL != pos->head_frag /* fragments already exist, should | ||
7799 | respect that even if MTU is 0 for | ||
7800 | this queue */) ) | ||
7801 | { | ||
7802 | frag = GNUNET_YES; | ||
7803 | relb = GNUNET_NO; /* if we fragment, we never also reliability box */ | ||
7804 | if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) | ||
7805 | { | ||
7806 | /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation | ||
7807 | header without the ACK UUID when using a *reliable* channel! */ | ||
7808 | } | ||
7809 | real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage); | ||
7810 | } | ||
7811 | |||
7812 | /* Finally, compare to existing 'best' in sc to see if this 'pos' pending | ||
7813 | message would beat it! */ | ||
7814 | if (NULL != sc->best) | ||
7815 | { | ||
7816 | /* FIXME-NEXT: CHECK if pos fits queue BETTER than pm, if not: | ||
7817 | continue; */ | ||
7818 | /* NOTE: use 'overhead' to estimate need for fragmentation, | ||
7819 | prefer it if MTU is sufficient and close! */ | ||
7820 | } | ||
7821 | sc->best = pos; | ||
7822 | sc->dvh = dvh; | ||
7823 | sc->frag = frag; | ||
7824 | sc->relb = relb; | ||
7825 | } | ||
7826 | } | ||
7827 | |||
7828 | |||
7829 | /** | ||
7830 | * We believe we are ready to transmit a `struct PendingMessage` on a | ||
7831 | * queue, the big question is which one! We need to see if there is | ||
7832 | * one pending that is allowed by flow control and congestion control | ||
7833 | * and (ideally) matches our queue's performance profile. | ||
7834 | * | ||
7835 | * If such a message is found, we give the message to the communicator | ||
7836 | * for transmission (updating the tracker, and re-scheduling ourselves | ||
7837 | * if applicable). | ||
7838 | * | ||
7839 | * If no such message is found, the queue's `idle` field must be set | ||
7840 | * to #GNUNET_YES. | ||
7611 | * | 7841 | * |
7612 | * @param cls the `struct Queue` to process transmissions for | 7842 | * @param cls the `struct Queue` to process transmissions for |
7613 | */ | 7843 | */ |
@@ -7615,128 +7845,99 @@ static void | |||
7615 | transmit_on_queue (void *cls) | 7845 | transmit_on_queue (void *cls) |
7616 | { | 7846 | { |
7617 | struct Queue *queue = cls; | 7847 | struct Queue *queue = cls; |
7618 | |||
7619 | queue->transmit_task = NULL; | ||
7620 | #if FIXME - NEXT | ||
7621 | struct Neighbour *n = queue->neighbour; | 7848 | struct Neighbour *n = queue->neighbour; |
7849 | struct PendingMessageScoreContext sc; | ||
7622 | struct PendingMessage *pm; | 7850 | struct PendingMessage *pm; |
7623 | struct PendingMessage *s; | ||
7624 | uint32_t overhead; | ||
7625 | 7851 | ||
7626 | if (NULL == (pm = n->pending_msg_head)) | 7852 | queue->transmit_task = NULL; |
7853 | if (NULL == n->vl) | ||
7627 | { | 7854 | { |
7628 | /* no message pending, nothing to do here! */ | ||
7629 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 7855 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
7630 | "No messages waiting on queue %s to %s, going to sleep\n", | 7856 | "Virtual link `%s' is down, cannot have PM for queue `%s'\n", |
7631 | queue->address, | 7857 | GNUNET_i2s (&n->pid), |
7632 | GNUNET_i2s (&n->pid)); | 7858 | queue->address); |
7859 | queue->idle = GNUNET_YES; | ||
7633 | return; | 7860 | return; |
7634 | } | 7861 | } |
7635 | if (NULL != pm->qe) | 7862 | memset (&sc, 0, sizeof (sc)); |
7863 | select_best_pending_from_link (&sc, queue, n->vl, NULL, 0); | ||
7864 | if (NULL == sc.best) | ||
7865 | { | ||
7866 | /* Also look at DVH that have the n as first hop! */ | ||
7867 | for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh; | ||
7868 | dvh = dvh->next_neighbour) | ||
7869 | { | ||
7870 | select_best_pending_from_link (&sc, | ||
7871 | queue, | ||
7872 | dvh->dv->vl, | ||
7873 | dvh, | ||
7874 | sizeof (struct GNUNET_PeerIdentity) * | ||
7875 | (1 + dvh->distance) + | ||
7876 | sizeof (struct TransportDVBoxMessage) + | ||
7877 | sizeof (struct TransportDVBoxPayloadP)); | ||
7878 | } | ||
7879 | } | ||
7880 | if (NULL == sc.best) | ||
7636 | { | 7881 | { |
7637 | /* message still pending with communciator! | 7882 | /* no message pending, nothing to do here! */ |
7638 | LOGGING-FIXME: Use stats? Should this not be rare? */ | ||
7639 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 7883 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
7640 | "Waiting on communicator for queue %s to %s, going to sleep\n", | 7884 | "No pending messages, queue `%s' to %s now idle\n", |
7641 | queue->address, | 7885 | queue->address, |
7642 | GNUNET_i2s (&n->pid)); | 7886 | GNUNET_i2s (&n->pid)); |
7887 | queue->idle = GNUNET_YES; | ||
7643 | return; | 7888 | return; |
7644 | } | 7889 | } |
7645 | schedule_transmit_on_queue (queue, GNUNET_YES); | 7890 | |
7646 | if (NULL != queue->transmit_task) | 7891 | /* Given selection in `sc`, do transmission */ |
7892 | pm = sc.best; | ||
7893 | if (GNUNET_YES == sc.frag) | ||
7647 | { | 7894 | { |
7648 | GNUNET_log ( | 7895 | pm = fragment_message (queue, sc.dvh, sc.best); |
7649 | GNUNET_ERROR_TYPE_DEBUG, | 7896 | if (NULL == pm) |
7650 | "Scheduled transmission on queue %s to %s for later, going to sleep\n", | 7897 | { |
7651 | queue->address, | 7898 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
7652 | GNUNET_i2s (&n->pid)); | 7899 | "Fragmentation failed queue %s to %s for <%llu>, trying again\n", |
7653 | return; /* do it later */ | 7900 | queue->address, |
7654 | } | 7901 | GNUNET_i2s (&n->pid), |
7655 | overhead = 0; | 7902 | pm->logging_uuid); |
7656 | if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) | 7903 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
7657 | overhead += sizeof (struct TransportReliabilityBoxMessage); | 7904 | } |
7658 | s = pm; | ||
7659 | if ( ( (0 != queue->mtu) && | ||
7660 | (pm->bytes_msg + overhead > queue->mtu) ) || | ||
7661 | (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) || | ||
7662 | (NULL != pm->head_frag /* fragments already exist, should | ||
7663 | respect that even if MTU is 0 for | ||
7664 | this queue */) ) | ||
7665 | s = fragment_message (queue, pm->dvh, s); | ||
7666 | if (NULL == s) | ||
7667 | { | ||
7668 | /* Fragmentation failed, try next message... */ | ||
7669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
7670 | "Fragmentation failed queue %s to %s for <%llu>, trying again\n", | ||
7671 | queue->address, | ||
7672 | GNUNET_i2s (&n->pid), | ||
7673 | pm->logging_uuid); | ||
7674 | schedule_transmit_on_queue (queue, GNUNET_NO); | ||
7675 | return; | ||
7676 | } | 7905 | } |
7677 | if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) | 7906 | else if (GNUNET_YES == sc.relb) |
7678 | // FIXME-OPTIMIZE: and if reliability was requested for 's' by core! | ||
7679 | s = reliability_box_message (queue, pm->dvh, s); | ||
7680 | if (NULL == s) | ||
7681 | { | 7907 | { |
7682 | /* Reliability boxing failed, try next message... */ | 7908 | pm = reliability_box_message (queue, sc.dvh, sc.best); |
7683 | GNUNET_log ( | 7909 | if (NULL == pm) |
7684 | GNUNET_ERROR_TYPE_DEBUG, | 7910 | { |
7685 | "Reliability boxing failed queue %s to %s for <%llu>, trying again\n", | 7911 | /* Reliability boxing failed, try next message... */ |
7686 | queue->address, | 7912 | GNUNET_log ( |
7687 | GNUNET_i2s (&n->pid), | 7913 | GNUNET_ERROR_TYPE_DEBUG, |
7688 | pm->logging_uuid); | 7914 | "Reliability boxing failed queue %s to %s for <%llu>, trying again\n", |
7689 | schedule_transmit_on_queue (queue, GNUNET_NO); | 7915 | queue->address, |
7690 | return; | 7916 | GNUNET_i2s (&n->pid), |
7917 | pm->logging_uuid); | ||
7918 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
7919 | return; | ||
7920 | } | ||
7691 | } | 7921 | } |
7922 | else | ||
7923 | pm = sc.best; /* no boxing required */ | ||
7692 | 7924 | ||
7693 | /* Pass 's' for transission to the communicator */ | 7925 | /* Pass 'pm' for transission to the communicator */ |
7694 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 7926 | GNUNET_log ( |
7695 | "Passing message <%llu> to queue %s for peer %s\n", | 7927 | GNUNET_ERROR_TYPE_DEBUG, |
7696 | s->logging_uuid, | 7928 | "Passing message <%llu> to queue %s for peer %s (considered %u others)\n", |
7697 | queue->address, | 7929 | pm->logging_uuid, |
7698 | GNUNET_i2s (&n->pid)); | 7930 | queue->address, |
7699 | queue_send_msg (queue, s, &s[1], s->bytes_msg); | 7931 | GNUNET_i2s (&n->pid), |
7700 | // FIXME: do something similar to the logic below | 7932 | sc.consideration_counter); |
7701 | // in defragmentation / reliability ACK handling! | 7933 | queue_send_msg (queue, pm, &pm[1], pm->bytes_msg); |
7702 | 7934 | ||
7703 | /* Check if this transmission somehow conclusively finished handing 'pm' | 7935 | /* Check if this transmission somehow conclusively finished handing 'pm' |
7704 | even without any explicit ACKs */ | 7936 | even without any explicit ACKs */ |
7705 | if ((PMT_CORE == s->pmt) && | 7937 | if ((PMT_CORE == pm->pmt) || |
7706 | (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) | 7938 | (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) |
7707 | { | 7939 | { |
7708 | /* Full message sent, and over reliabile channel */ | 7940 | completed_pending_message (pm); |
7709 | client_send_response (pm); | ||
7710 | } | ||
7711 | else if ((GNUNET_TRANSPORT_CC_RELIABLE == | ||
7712 | queue->tc->details.communicator.cc) && | ||
7713 | (PMT_FRAGMENT_BOX == s->pmt)) | ||
7714 | { | ||
7715 | struct PendingMessage *pos; | ||
7716 | |||
7717 | /* Fragment sent over reliabile channel */ | ||
7718 | free_fragment_tree (s); | ||
7719 | pos = s->frag_parent; | ||
7720 | GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); | ||
7721 | GNUNET_free (s); | ||
7722 | /* check if subtree is done */ | ||
7723 | while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) && | ||
7724 | (pos != pm)) | ||
7725 | { | ||
7726 | s = pos; | ||
7727 | pos = s->frag_parent; | ||
7728 | GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); | ||
7729 | GNUNET_free (s); | ||
7730 | } | ||
7731 | |||
7732 | /* Was this the last applicable fragmment? */ | ||
7733 | if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) | ||
7734 | client_send_response (pm); | ||
7735 | } | ||
7736 | else if (PMT_CORE != pm->pmt) | ||
7737 | { | ||
7738 | /* This was an acknowledgement of some type, always free */ | ||
7739 | free_pending_message (pm); | ||
7740 | } | 7941 | } |
7741 | else | 7942 | else |
7742 | { | 7943 | { |
@@ -7748,15 +7949,13 @@ transmit_on_queue (void *cls) | |||
7748 | retransmitting. Note that in the future this heuristic should | 7949 | retransmitting. Note that in the future this heuristic should |
7749 | likely be improved further (measure RTT stability, consider | 7950 | likely be improved further (measure RTT stability, consider |
7750 | message urgency and size when delaying ACKs, etc.) */ | 7951 | message urgency and size when delaying ACKs, etc.) */ |
7751 | update_pm_next_attempt (s, | 7952 | update_pm_next_attempt (pm, |
7752 | GNUNET_TIME_relative_to_absolute ( | 7953 | GNUNET_TIME_relative_to_absolute ( |
7753 | GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, | 7954 | GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, |
7754 | 4))); | 7955 | 4))); |
7755 | } | 7956 | } |
7756 | |||
7757 | /* finally, re-schedule queue transmission task itself */ | 7957 | /* finally, re-schedule queue transmission task itself */ |
7758 | schedule_transmit_on_queue (queue, GNUNET_NO); | 7958 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
7759 | #endif | ||
7760 | } | 7959 | } |
7761 | 7960 | ||
7762 | 7961 | ||
@@ -7871,7 +8070,7 @@ handle_send_message_ack (void *cls, | |||
7871 | for (struct Queue *queue = tc->details.communicator.queue_head; | 8070 | for (struct Queue *queue = tc->details.communicator.queue_head; |
7872 | NULL != queue; | 8071 | NULL != queue; |
7873 | queue = queue->next_client) | 8072 | queue = queue->next_client) |
7874 | schedule_transmit_on_queue (queue, GNUNET_NO); | 8073 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
7875 | } | 8074 | } |
7876 | else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) | 8075 | else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) |
7877 | { | 8076 | { |
@@ -7880,7 +8079,7 @@ handle_send_message_ack (void *cls, | |||
7880 | "# Transmission throttled due to queue queue limit", | 8079 | "# Transmission throttled due to queue queue limit", |
7881 | -1, | 8080 | -1, |
7882 | GNUNET_NO); | 8081 | GNUNET_NO); |
7883 | schedule_transmit_on_queue (qe->queue, GNUNET_NO); | 8082 | schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
7884 | } | 8083 | } |
7885 | 8084 | ||
7886 | if (NULL != (pm = qe->pm)) | 8085 | if (NULL != (pm = qe->pm)) |
@@ -7894,21 +8093,7 @@ handle_send_message_ack (void *cls, | |||
7894 | transmit on queue for queues of the neighbour */ | 8093 | transmit on queue for queues of the neighbour */ |
7895 | vl = pm->vl; | 8094 | vl = pm->vl; |
7896 | if (vl->pending_msg_head == pm) | 8095 | if (vl->pending_msg_head == pm) |
7897 | { | 8096 | check_vl_transmission (vl); |
7898 | #if FIXME - NEXT | ||
7899 | for (struct Queue *queue = n->queue_head; NULL != queue; | ||
7900 | queue = queue->next_neighbour) | ||
7901 | schedule_transmit_on_queue (queue, GNUNET_NO); | ||
7902 | #endif | ||
7903 | } | ||
7904 | if (GNUNET_OK != ntohl (sma->status)) | ||
7905 | { | ||
7906 | GNUNET_log ( | ||
7907 | GNUNET_ERROR_TYPE_INFO, | ||
7908 | "Queue failed in transmission <%llu>, will try retransmission immediately\n", | ||
7909 | pm->logging_uuid); | ||
7910 | update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS); | ||
7911 | } | ||
7912 | } | 8097 | } |
7913 | GNUNET_free (qe); | 8098 | GNUNET_free (qe); |
7914 | } | 8099 | } |
@@ -8431,6 +8616,7 @@ handle_add_queue_message (void *cls, | |||
8431 | queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); | 8616 | queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); |
8432 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); | 8617 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); |
8433 | queue->neighbour = neighbour; | 8618 | queue->neighbour = neighbour; |
8619 | queue->idle = GNUNET_YES; | ||
8434 | memcpy (&queue[1], addr, addr_len); | 8620 | memcpy (&queue[1], addr, addr_len); |
8435 | /* notify monitors about new queue */ | 8621 | /* notify monitors about new queue */ |
8436 | { | 8622 | { |
@@ -8452,6 +8638,8 @@ handle_add_queue_message (void *cls, | |||
8452 | &aqm->receiver, | 8638 | &aqm->receiver, |
8453 | &check_validation_request_pending, | 8639 | &check_validation_request_pending, |
8454 | queue); | 8640 | queue); |
8641 | /* look for traffic for this queue */ | ||
8642 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
8455 | /* might be our first queue, try launching DV learning */ | 8643 | /* might be our first queue, try launching DV learning */ |
8456 | if (NULL == dvlearn_task) | 8644 | if (NULL == dvlearn_task) |
8457 | dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); | 8645 | dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); |