aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/gnunet-service-tng.c702
1 files changed, 445 insertions, 257 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index f07e1c88d..56cf61c2b 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,15 +24,8 @@
24 * 24 *
25 * TODO: 25 * TODO:
26 * Implement next: 26 * Implement next:
27 * - realize "pull" based logic (#handle_client_send()) for 27 * - FIXME-NEXT: logic to decide which pm to pick for a given queue (sorting!)
28 * `struct PendingMessage` which waits for a queue on any 28 * - FIXME-FC: realize transport-to-transport flow control (needed in case
29 * applicable route to be 'ready', in contrast
30 * to the 'push' based routing we use for control messages.
31 * Basically, when a queue goes idle, it should "search"
32 * via its neighbour for either virtual links or DVH's that
33 * have it as first hop and then find messages in those
34 * virtual links!
35 * - realize transport-to-transport flow control (needed in case
36 * communicators do not offer flow control). Note that we may not 29 * communicators do not offer flow control). Note that we may not
37 * want to simply delay the ACKs as that may cause unnecessary 30 * want to simply delay the ACKs as that may cause unnecessary
38 * re-transmissions. => Introduce proper flow and congestion window(s)! 31 * re-transmissions. => Introduce proper flow and congestion window(s)!
@@ -1375,7 +1368,7 @@ struct DistanceVector
1375 * Do we have a confirmed working queue and are thus visible to 1368 * Do we have a confirmed working queue and are thus visible to
1376 * CORE? If so, this is the virtual link, otherwise NULL. 1369 * CORE? If so, this is the virtual link, otherwise NULL.
1377 */ 1370 */
1378 struct VirtualLink *link; 1371 struct VirtualLink *vl;
1379 1372
1380 /** 1373 /**
1381 * Signature affirming @e ephemeral_key of type 1374 * Signature affirming @e ephemeral_key of type
@@ -1565,6 +1558,12 @@ struct Queue
1565 * Connection status for this queue. 1558 * Connection status for this queue.
1566 */ 1559 */
1567 enum GNUNET_TRANSPORT_ConnectionStatus cs; 1560 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1561
1562 /**
1563 * Set to #GNUNET_YES if this queue is idle waiting for some
1564 * virtual link to give it a pending message.
1565 */
1566 int idle;
1568}; 1567};
1569 1568
1570 1569
@@ -1696,7 +1695,7 @@ struct Neighbour
1696 * Do we have a confirmed working queue and are thus visible to 1695 * Do we have a confirmed working queue and are thus visible to
1697 * CORE? If so, this is the virtual link, otherwise NULL. 1696 * CORE? If so, this is the virtual link, otherwise NULL.
1698 */ 1697 */
1699 struct VirtualLink *link; 1698 struct VirtualLink *vl;
1700 1699
1701 /** 1700 /**
1702 * Latest DVLearn monotonic time seen from this peer. Initialized only 1701 * Latest DVLearn monotonic time seen from this peer. Initialized only
@@ -1766,17 +1765,7 @@ enum PendingMessageType
1766 /** 1765 /**
1767 * Reliability box. 1766 * Reliability box.
1768 */ 1767 */
1769 PMT_RELIABILITY_BOX = 2, 1768 PMT_RELIABILITY_BOX = 2
1770
1771 /**
1772 * Any type of acknowledgement.
1773 */
1774 PMT_ACKNOWLEDGEMENT = 3,
1775
1776 /**
1777 * Control traffic generated by the TRANSPORT service itself.
1778 */
1779 PMT_CONTROL = 4
1780 1769
1781}; 1770};
1782 1771
@@ -2752,6 +2741,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
2752 2741
2753 2742
2754/** 2743/**
2744 * Task run to check whether the hops of the @a cls still
2745 * are validated, or if we need to core about disconnection.
2746 *
2747 * @param cls a `struct VirtualLink`
2748 */
2749static void
2750check_link_down (void *cls);
2751
2752
2753/**
2754 * Send message to CORE clients that we lost a connection.
2755 *
2756 * @param pid peer the connection was for
2757 */
2758static void
2759cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2760{
2761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2762 "Informing CORE clients about disconnect from %s\n",
2763 GNUNET_i2s (pid));
2764 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
2765 {
2766 struct GNUNET_MQ_Envelope *env;
2767 struct DisconnectInfoMessage *dim;
2768
2769 if (CT_CORE != tc->type)
2770 continue;
2771 env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
2772 dim->peer = *pid;
2773 GNUNET_MQ_send (tc->mq, env);
2774 }
2775}
2776
2777
2778/**
2755 * Free entry in #dv_routes. First frees all hops to the target, and 2779 * Free entry in #dv_routes. First frees all hops to the target, and
2756 * if there are no entries left, frees @a dv as well. 2780 * if there are no entries left, frees @a dv as well.
2757 * 2781 *
@@ -2766,11 +2790,33 @@ free_dv_route (struct DistanceVector *dv)
2766 free_distance_vector_hop (dvh); 2790 free_distance_vector_hop (dvh);
2767 if (NULL == dv->dv_head) 2791 if (NULL == dv->dv_head)
2768 { 2792 {
2793 struct VirtualLink *vl;
2794
2769 GNUNET_assert ( 2795 GNUNET_assert (
2770 GNUNET_YES == 2796 GNUNET_YES ==
2771 GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); 2797 GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
2798 if (NULL != (vl = dv->vl))
2799 {
2800 GNUNET_assert (dv == vl->dv);
2801 vl->dv = NULL;
2802 if (NULL == vl->n)
2803 {
2804 cores_send_disconnect_info (&dv->target);
2805 free_virtual_link (vl);
2806 }
2807 else
2808 {
2809 GNUNET_SCHEDULER_cancel (vl->visibility_task);
2810 vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
2811 }
2812 dv->vl = NULL;
2813 }
2814
2772 if (NULL != dv->timeout_task) 2815 if (NULL != dv->timeout_task)
2816 {
2773 GNUNET_SCHEDULER_cancel (dv->timeout_task); 2817 GNUNET_SCHEDULER_cancel (dv->timeout_task);
2818 dv->timeout_task = NULL;
2819 }
2774 GNUNET_free (dv); 2820 GNUNET_free (dv);
2775 } 2821 }
2776} 2822}
@@ -2950,6 +2996,7 @@ static void
2950free_neighbour (struct Neighbour *neighbour) 2996free_neighbour (struct Neighbour *neighbour)
2951{ 2997{
2952 struct DistanceVectorHop *dvh; 2998 struct DistanceVectorHop *dvh;
2999 struct VirtualLink *vl;
2953 3000
2954 GNUNET_assert (NULL == neighbour->queue_head); 3001 GNUNET_assert (NULL == neighbour->queue_head);
2955 GNUNET_assert (GNUNET_YES == 3002 GNUNET_assert (GNUNET_YES ==
@@ -2989,6 +3036,22 @@ free_neighbour (struct Neighbour *neighbour)
2989 GNUNET_PEERSTORE_store_cancel (neighbour->sc); 3036 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
2990 neighbour->sc = NULL; 3037 neighbour->sc = NULL;
2991 } 3038 }
3039 if (NULL != (vl = neighbour->vl))
3040 {
3041 GNUNET_assert (neighbour == vl->n);
3042 vl->n = NULL;
3043 if (NULL == vl->dv)
3044 {
3045 cores_send_disconnect_info (&vl->target);
3046 free_virtual_link (vl);
3047 }
3048 else
3049 {
3050 GNUNET_SCHEDULER_cancel (vl->visibility_task);
3051 vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
3052 }
3053 neighbour->vl = NULL;
3054 }
2992 GNUNET_free (neighbour); 3055 GNUNET_free (neighbour);
2993} 3056}
2994 3057
@@ -3034,31 +3097,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
3034 3097
3035 3098
3036/** 3099/**
3037 * Send message to CORE clients that we lost a connection.
3038 *
3039 * @param pid peer the connection was for
3040 */
3041static void
3042cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
3043{
3044 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3045 "Informing CORE clients about disconnect from %s\n",
3046 GNUNET_i2s (pid));
3047 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3048 {
3049 struct GNUNET_MQ_Envelope *env;
3050 struct DisconnectInfoMessage *dim;
3051
3052 if (CT_CORE != tc->type)
3053 continue;
3054 env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
3055 dim->peer = *pid;
3056 GNUNET_MQ_send (tc->mq, env);
3057 }
3058}
3059
3060
3061/**
3062 * We believe we are ready to transmit a message on a queue. Gives the 3100 * We believe we are ready to transmit a message on a queue. Gives the
3063 * message to the communicator for transmission (updating the tracker, 3101 * message to the communicator for transmission (updating the tracker,
3064 * and re-scheduling itself if applicable). 3102 * and re-scheduling itself if applicable).
@@ -3070,19 +3108,15 @@ transmit_on_queue (void *cls);
3070 3108
3071 3109
3072/** 3110/**
3073 * Schedule next run of #transmit_on_queue(). Does NOTHING if 3111 * Called whenever something changed that might effect when we
3074 * we should run immediately or if the message queue is empty. 3112 * try to do the next transmission on @a queue using #transmit_on_queue().
3075 * Test for no task being added AND queue not being empty to
3076 * transmit immediately afterwards! This function must only
3077 * be called if the message queue is non-empty!
3078 * 3113 *
3079 * @param queue the queue to do scheduling for 3114 * @param queue the queue to do scheduling for
3080 * @param inside_job set to #GNUNET_YES if called from 3115 * @param p task priority to use, if @a queue is scheduled
3081 * #transmit_on_queue() itself and NOT setting
3082 * the task means running immediately
3083 */ 3116 */
3084static void 3117static void
3085schedule_transmit_on_queue (struct Queue *queue, int inside_job) 3118schedule_transmit_on_queue (struct Queue *queue,
3119 enum GNUNET_SCHEDULER_Priority p)
3086{ 3120{
3087 if (queue->tc->details.communicator.total_queue_length >= 3121 if (queue->tc->details.communicator.total_queue_length >=
3088 COMMUNICATOR_TOTAL_QUEUE_LIMIT) 3122 COMMUNICATOR_TOTAL_QUEUE_LIMIT)
@@ -3092,6 +3126,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
3092 "# Transmission throttled due to communicator queue limit", 3126 "# Transmission throttled due to communicator queue limit",
3093 1, 3127 1,
3094 GNUNET_NO); 3128 GNUNET_NO);
3129 queue->idle = GNUNET_NO;
3095 return; 3130 return;
3096 } 3131 }
3097 if (queue->queue_length >= QUEUE_LENGTH_LIMIT) 3132 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
@@ -3100,38 +3135,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
3100 "# Transmission throttled due to queue queue limit", 3135 "# Transmission throttled due to queue queue limit",
3101 1, 3136 1,
3102 GNUNET_NO); 3137 GNUNET_NO);
3138 queue->idle = GNUNET_NO;
3103 return; 3139 return;
3104 } 3140 }
3105#if FIXME - NEXT 3141 /* queue might indeed be ready, schedule it */
3106 struct Neighbour *n = queue->neighbour; 3142 if (NULL != queue->transmit_task)
3107 struct GNUNET_TIME_Relative out_delay; 3143 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3108
3109 if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
3110 {
3111 GNUNET_log (
3112 GNUNET_ERROR_TYPE_DEBUG,
3113 "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n",
3114 pm->logging_uuid,
3115 (unsigned long long) queue->qid,
3116 GNUNET_i2s (&n->pid));
3117 return; /* we should run immediately! */
3118 }
3119 /* queue has changed since we were scheduled, reschedule again */
3120 queue->transmit_task = 3144 queue->transmit_task =
3121 GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); 3145 GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
3122 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) 3146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3123 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 3147 "Considering transmission on queue `%s' to %s\n",
3124 "Next transmission <%llu> on queue `%s' in %s (high delay)\n", 3148 queue->address,
3125 pm->logging_uuid, 3149 GNUNET_i2s (&queue->neighbour->pid));
3126 queue->address,
3127 GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
3128 else
3129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3130 "Next transmission <%llu> on queue `%s' in %s\n",
3131 pm->logging_uuid,
3132 queue->address,
3133 GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
3134#endif
3135} 3150}
3136 3151
3137 3152
@@ -3156,15 +3171,21 @@ check_link_down (void *cls)
3156 pos = pos->next_dv) 3171 pos = pos->next_dv)
3157 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); 3172 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
3158 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) 3173 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3174 {
3175 vl->dv->vl = NULL;
3159 vl->dv = NULL; 3176 vl->dv = NULL;
3177 }
3160 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; 3178 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3161 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) 3179 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3162 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); 3180 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3163 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) 3181 if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3182 {
3183 vl->n->vl = NULL;
3164 vl->n = NULL; 3184 vl->n = NULL;
3185 }
3165 if ((NULL == vl->n) && (NULL == vl->dv)) 3186 if ((NULL == vl->n) && (NULL == vl->dv))
3166 { 3187 {
3167 cores_send_disconnect_info (&dv->target); 3188 cores_send_disconnect_info (&vl->target);
3168 free_virtual_link (vl); 3189 free_virtual_link (vl);
3169 return; 3190 return;
3170 } 3191 }
@@ -3229,7 +3250,7 @@ free_queue (struct Queue *queue)
3229 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < 3250 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
3230 tc->details.communicator.total_queue_length)) 3251 tc->details.communicator.total_queue_length))
3231 { 3252 {
3232 /* Communicator dropped below threshold, resume all queues */ 3253 /* Communicator dropped below threshold, resume all _other_ queues */
3233 GNUNET_STATISTICS_update ( 3254 GNUNET_STATISTICS_update (
3234 GST_stats, 3255 GST_stats,
3235 "# Transmission throttled due to communicator queue limit", 3256 "# Transmission throttled due to communicator queue limit",
@@ -3237,7 +3258,7 @@ free_queue (struct Queue *queue)
3237 GNUNET_NO); 3258 GNUNET_NO);
3238 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; 3259 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3239 s = s->next_client) 3260 s = s->next_client)
3240 schedule_transmit_on_queue (s, GNUNET_NO); 3261 schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
3241 } 3262 }
3242 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); 3263 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3243 GNUNET_free (queue); 3264 GNUNET_free (queue);
@@ -3580,6 +3601,79 @@ pick_random_dv_hops (const struct DistanceVector *dv,
3580 3601
3581 3602
3582/** 3603/**
3604 * There is a message at the head of the pending messages for @a vl
3605 * which may be ready for transmission. Check if a queue is ready to
3606 * take it.
3607 *
3608 * This function must (1) check for flow control to ensure that we can
3609 * right now send to @a vl, (2) check that the pending message in the
3610 * queue is actually eligible, (3) determine if any applicable queue
3611 * (direct neighbour or DVH path) is ready to accept messages, and
3612 * (4) prioritize based on the preferences associated with the
3613 * pending message.
3614 *
3615 * So yeah, easy.
3616 *
3617 * @param vl virtual link where we should check for transmission
3618 */
3619static void
3620check_vl_transmission (struct VirtualLink *vl)
3621{
3622 struct Neighbour *n = vl->n;
3623 struct DistanceVector *dv = vl->dv;
3624 struct GNUNET_TIME_Absolute now;
3625 int elig;
3626
3627 /* FIXME-FC: need to implement virtual link flow control! */
3628
3629 /* Check that we have an eligible pending message!
3630 (cheaper than having #transmit_on_queue() find out!) */
3631 elig = GNUNET_NO;
3632 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
3633 pm = pm->next_vl)
3634 {
3635 if (NULL != pm->qe)
3636 continue; /* not eligible, is in a queue! */
3637 elig = GNUNET_YES;
3638 break;
3639 }
3640 if (GNUNET_NO == elig)
3641 return;
3642
3643 /* Notify queues at direct neighbours that we are interested */
3644 now = GNUNET_TIME_absolute_get ();
3645 if (NULL != n)
3646 {
3647 for (struct Queue *queue = n->queue_head; NULL != queue;
3648 queue = queue->next_neighbour)
3649 if ((GNUNET_YES == queue->idle) &&
3650 (queue->validated_until.abs_value_us > now.abs_value_us))
3651 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
3652 }
3653 /* Notify queues via DV that we are interested */
3654 if (NULL != dv)
3655 {
3656 /* Do DV with lower scheduler priority, which effectively means that
3657 IF a neighbour exists and is available, we prefer it. */
3658 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3659 pos = pos->next_dv)
3660 {
3661 struct Neighbour *nh = pos->next_hop;
3662
3663 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
3664 continue; /* skip this one: path not validated */
3665 for (struct Queue *queue = nh->queue_head; NULL != queue;
3666 queue = queue->next_neighbour)
3667 if ((GNUNET_YES == queue->idle) &&
3668 (queue->validated_until.abs_value_us > now.abs_value_us))
3669 schedule_transmit_on_queue (queue,
3670 GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
3671 }
3672 }
3673}
3674
3675
3676/**
3583 * Client asked for transmission to a peer. Process the request. 3677 * Client asked for transmission to a peer. Process the request.
3584 * 3678 *
3585 * @param cls the client 3679 * @param cls the client
@@ -3594,7 +3688,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3594 uint32_t bytes_msg; 3688 uint32_t bytes_msg;
3595 struct VirtualLink *vl; 3689 struct VirtualLink *vl;
3596 enum GNUNET_MQ_PriorityPreferences pp; 3690 enum GNUNET_MQ_PriorityPreferences pp;
3597 int was_empty;
3598 3691
3599 GNUNET_assert (CT_CORE == tc->type); 3692 GNUNET_assert (CT_CORE == tc->type);
3600 obmm = (const struct GNUNET_MessageHeader *) &obm[1]; 3693 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
@@ -3631,32 +3724,11 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3631 tc->details.core.pending_msg_head, 3724 tc->details.core.pending_msg_head,
3632 tc->details.core.pending_msg_tail, 3725 tc->details.core.pending_msg_tail,
3633 pm); 3726 pm);
3634 was_empty = (NULL == vl->pending_msg_head);
3635 GNUNET_CONTAINER_MDLL_insert (vl, 3727 GNUNET_CONTAINER_MDLL_insert (vl,
3636 vl->pending_msg_head, 3728 vl->pending_msg_head,
3637 vl->pending_msg_tail, 3729 vl->pending_msg_tail,
3638 pm); 3730 pm);
3639 if (! was_empty) 3731 check_vl_transmission (vl);
3640 return; /* all queues must already be busy */
3641#if 0
3642 // FIXME: check if any DVH or neighbour queue of 'vl'
3643 // is ready for transmission now. If so, encapsulate
3644 // 'pm' accordingly and send!
3645 for (struct Queue *queue = target->queue_head; NULL != queue;
3646 queue = queue->next_neighbour)
3647 {
3648 /* try transmission on any queue that is idle */
3649 if (NULL == queue->transmit_task)
3650 {
3651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3652 "Queue %llu to %s is idle, triggering transmission\n",
3653 (unsigned long long) queue->qid,
3654 GNUNET_i2s (&queue->neighbour->pid));
3655 queue->transmit_task =
3656 GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
3657 }
3658 }
3659#endif
3660} 3732}
3661 3733
3662 3734
@@ -3861,7 +3933,7 @@ update_ephemeral (struct DistanceVector *dv)
3861 3933
3862 3934
3863/** 3935/**
3864 * Send the control message @a payload on @a queue. 3936 * Send the message @a payload on @a queue.
3865 * 3937 *
3866 * @param queue the queue to use for transmission 3938 * @param queue the queue to use for transmission
3867 * @param pm pending message to update once transmission is done, may be NULL! 3939 * @param pm pending message to update once transmission is done, may be NULL!
@@ -3879,6 +3951,7 @@ queue_send_msg (struct Queue *queue,
3879 struct GNUNET_TRANSPORT_SendMessageTo *smt; 3951 struct GNUNET_TRANSPORT_SendMessageTo *smt;
3880 struct GNUNET_MQ_Envelope *env; 3952 struct GNUNET_MQ_Envelope *env;
3881 3953
3954 queue->idle = GNUNET_NO;
3882 GNUNET_log ( 3955 GNUNET_log (
3883 GNUNET_ERROR_TYPE_DEBUG, 3956 GNUNET_ERROR_TYPE_DEBUG,
3884 "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n", 3957 "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
@@ -3910,6 +3983,11 @@ queue_send_msg (struct Queue *queue,
3910 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); 3983 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3911 queue->queue_length++; 3984 queue->queue_length++;
3912 queue->tc->details.communicator.total_queue_length++; 3985 queue->tc->details.communicator.total_queue_length++;
3986 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
3987 queue->tc->details.communicator.total_queue_length)
3988 queue->idle = GNUNET_NO;
3989 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
3990 queue->idle = GNUNET_NO;
3913 GNUNET_MQ_send (queue->tc->mq, env); 3991 GNUNET_MQ_send (queue->tc->mq, env);
3914 } 3992 }
3915} 3993}
@@ -5210,6 +5288,50 @@ update_dvh_performance (struct DistanceVectorHop *dvh,
5210 5288
5211 5289
5212/** 5290/**
5291 * We have completed transmission of @a pm, remove it from
5292 * the transmission queues (and if it is a fragment, continue
5293 * up the tree as necessary).
5294 *
5295 * @param pm pending message that was transmitted
5296 */
5297static void
5298completed_pending_message (struct PendingMessage *pm)
5299{
5300 struct PendingMessage *pos;
5301
5302 switch (pm->pmt)
5303 {
5304 case PMT_CORE:
5305 case PMT_RELIABILITY_BOX:
5306 /* Full message sent, we are done */
5307 client_send_response (pm);
5308 return;
5309 case PMT_FRAGMENT_BOX:
5310 /* Fragment sent over reliabile channel */
5311 free_fragment_tree (pm);
5312 pos = pm->frag_parent;
5313 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
5314 GNUNET_free (pm);
5315 /* check if subtree is done */
5316 while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
5317 (pos != pm))
5318 {
5319 pm = pos;
5320 pos = pm->frag_parent;
5321 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
5322 GNUNET_free (pm);
5323 }
5324
5325 /* Was this the last applicable fragmment? */
5326 if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
5327 (pos->frag_off == pos->bytes_msg))
5328 client_send_response (pos);
5329 return;
5330 }
5331}
5332
5333
5334/**
5213 * The @a pa was acknowledged, process the acknowledgement. 5335 * The @a pa was acknowledged, process the acknowledgement.
5214 * 5336 *
5215 * @param pa the pending acknowledgement that was satisfied 5337 * @param pa the pending acknowledgement that was satisfied
@@ -5220,7 +5342,6 @@ static void
5220handle_acknowledged (struct PendingAcknowledgement *pa, 5342handle_acknowledged (struct PendingAcknowledgement *pa,
5221 struct GNUNET_TIME_Relative ack_delay) 5343 struct GNUNET_TIME_Relative ack_delay)
5222{ 5344{
5223 struct PendingMessage *pm = pa->pm;
5224 struct GNUNET_TIME_Relative delay; 5345 struct GNUNET_TIME_Relative delay;
5225 5346
5226 delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time); 5347 delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
@@ -5232,25 +5353,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa,
5232 update_queue_performance (pa->queue, delay, pa->message_size); 5353 update_queue_performance (pa->queue, delay, pa->message_size);
5233 if (NULL != pa->dvh) 5354 if (NULL != pa->dvh)
5234 update_dvh_performance (pa->dvh, delay, pa->message_size); 5355 update_dvh_performance (pa->dvh, delay, pa->message_size);
5235 if (NULL != pm) 5356 if (NULL != pa->pm)
5236 { 5357 completed_pending_message (pa->pm);
5237 if (NULL != pm->frag_parent)
5238 {
5239 pm = pm->frag_parent;
5240 free_fragment_tree (pa->pm);
5241 }
5242 while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
5243 {
5244 struct PendingMessage *parent = pm->frag_parent;
5245
5246 free_fragment_tree (pm);
5247 pm = parent;
5248 }
5249 if (NULL != pm->head_frag)
5250 pm = NULL; /* we are done, otherwise free 'pm' below */
5251 }
5252 if (NULL != pm)
5253 free_pending_message (pm);
5254 free_pending_acknowledgement (pa); 5358 free_pending_acknowledgement (pa);
5255} 5359}
5256 5360
@@ -5494,6 +5598,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
5494 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); 5598 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
5495 vl->target = dv->target; 5599 vl->target = dv->target;
5496 vl->dv = dv; 5600 vl->dv = dv;
5601 dv->vl = vl;
5497 vl->core_recv_window = RECV_WINDOW_SIZE; 5602 vl->core_recv_window = RECV_WINDOW_SIZE;
5498 vl->visibility_task = 5603 vl->visibility_task =
5499 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); 5604 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
@@ -7185,6 +7290,7 @@ handle_validation_response (
7185 vl = GNUNET_new (struct VirtualLink); 7290 vl = GNUNET_new (struct VirtualLink);
7186 vl->target = n->pid; 7291 vl->target = n->pid;
7187 vl->n = n; 7292 vl->n = n;
7293 n->vl = vl;
7188 vl->core_recv_window = RECV_WINDOW_SIZE; 7294 vl->core_recv_window = RECV_WINDOW_SIZE;
7189 vl->visibility_task = 7295 vl->visibility_task =
7190 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); 7296 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
@@ -7604,10 +7710,134 @@ update_pm_next_attempt (struct PendingMessage *pm,
7604 7710
7605 7711
7606/** 7712/**
7607 * We believe we are ready to transmit a message on a queue. 7713 * Context for #select_best_pending_from_link().
7608 * Gives the message to the 7714 */
7609 * communicator for transmission (updating the tracker, and re-scheduling 7715struct PendingMessageScoreContext
7610 * itself if applicable). 7716{
7717 /**
7718 * Set to the best message that was found, NULL for none.
7719 */
7720 struct PendingMessage *best;
7721
7722 /**
7723 * DVH that @e best should take, or NULL for direct transmission.
7724 */
7725 struct DistanceVectorHop *dvh;
7726
7727 /**
7728 * What is the estimated total overhead for this message?
7729 */
7730 size_t real_overhead;
7731
7732 /**
7733 * Number of pending messages we seriously considered this time.
7734 */
7735 unsigned int consideration_counter;
7736
7737 /**
7738 * Did we have to fragment?
7739 */
7740 int frag;
7741
7742 /**
7743 * Did we have to reliability box?
7744 */
7745 int relb;
7746};
7747
7748
7749/**
7750 * Select the best pending message from @a vl for transmission
7751 * via @a queue.
7752 *
7753 * @param sc[in,out] best message so far (NULL for none), plus scoring data
7754 * @param queue the queue that will be used for transmission
7755 * @param vl the virtual link providing the messages
7756 * @param dvh path we are currently considering, or NULL for none
7757 * @param overhead number of bytes of overhead to be expected
7758 * from DV encapsulation (0 for without DV)
7759 */
7760static void
7761select_best_pending_from_link (struct PendingMessageScoreContext *sc,
7762 struct Queue *queue,
7763 struct VirtualLink *vl,
7764 struct DistanceVectorHop *dvh,
7765 size_t overhead)
7766{
7767 /* FIXME-NEXT: right now we ignore all the 'fancy' sorting
7768 we do on the pending message list, resulting in a
7769 linear time algorithm (PLUS linear time list management).
7770 So we should probably either avoid keeping a sorted list,
7771 or find a way to make the sorting useful here! */
7772 for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
7773 pos = pos->next_vl)
7774 {
7775 size_t real_overhead = overhead;
7776 int frag;
7777 int relb;
7778
7779 if (NULL != pos->qe)
7780 continue; /* not eligible */
7781 sc->consideration_counter++;
7782 /* determine if we have to reliability-box, if so add reliability box
7783 overhead */
7784 relb = GNUNET_NO;
7785 if ((GNUNET_NO == frag) &&
7786 (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
7787 (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
7788 {
7789 relb = GNUNET_YES;
7790 real_overhead += sizeof (struct TransportReliabilityBoxMessage);
7791 }
7792 /* determine if we have to fragment, if so add fragmentation
7793 overhead! */
7794 frag = GNUNET_NO;
7795 if ( ( (0 != queue->mtu) &&
7796 (pos->bytes_msg + real_overhead > queue->mtu) ) ||
7797 (pos->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
7798 (NULL != pos->head_frag /* fragments already exist, should
7799 respect that even if MTU is 0 for
7800 this queue */) )
7801 {
7802 frag = GNUNET_YES;
7803 relb = GNUNET_NO; /* if we fragment, we never also reliability box */
7804 if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
7805 {
7806 /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation
7807 header without the ACK UUID when using a *reliable* channel! */
7808 }
7809 real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage);
7810 }
7811
7812 /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
7813 message would beat it! */
7814 if (NULL != sc->best)
7815 {
7816 /* FIXME-NEXT: CHECK if pos fits queue BETTER than pm, if not:
7817 continue; */
7818 /* NOTE: use 'overhead' to estimate need for fragmentation,
7819 prefer it if MTU is sufficient and close! */
7820 }
7821 sc->best = pos;
7822 sc->dvh = dvh;
7823 sc->frag = frag;
7824 sc->relb = relb;
7825 }
7826}
7827
7828
7829/**
7830 * We believe we are ready to transmit a `struct PendingMessage` on a
7831 * queue, the big question is which one! We need to see if there is
7832 * one pending that is allowed by flow control and congestion control
7833 * and (ideally) matches our queue's performance profile.
7834 *
7835 * If such a message is found, we give the message to the communicator
7836 * for transmission (updating the tracker, and re-scheduling ourselves
7837 * if applicable).
7838 *
7839 * If no such message is found, the queue's `idle` field must be set
7840 * to #GNUNET_YES.
7611 * 7841 *
7612 * @param cls the `struct Queue` to process transmissions for 7842 * @param cls the `struct Queue` to process transmissions for
7613 */ 7843 */
@@ -7615,128 +7845,99 @@ static void
7615transmit_on_queue (void *cls) 7845transmit_on_queue (void *cls)
7616{ 7846{
7617 struct Queue *queue = cls; 7847 struct Queue *queue = cls;
7618
7619 queue->transmit_task = NULL;
7620#if FIXME - NEXT
7621 struct Neighbour *n = queue->neighbour; 7848 struct Neighbour *n = queue->neighbour;
7849 struct PendingMessageScoreContext sc;
7622 struct PendingMessage *pm; 7850 struct PendingMessage *pm;
7623 struct PendingMessage *s;
7624 uint32_t overhead;
7625 7851
7626 if (NULL == (pm = n->pending_msg_head)) 7852 queue->transmit_task = NULL;
7853 if (NULL == n->vl)
7627 { 7854 {
7628 /* no message pending, nothing to do here! */
7629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 7855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7630 "No messages waiting on queue %s to %s, going to sleep\n", 7856 "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
7631 queue->address, 7857 GNUNET_i2s (&n->pid),
7632 GNUNET_i2s (&n->pid)); 7858 queue->address);
7859 queue->idle = GNUNET_YES;
7633 return; 7860 return;
7634 } 7861 }
7635 if (NULL != pm->qe) 7862 memset (&sc, 0, sizeof (sc));
7863 select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
7864 if (NULL == sc.best)
7865 {
7866 /* Also look at DVH that have the n as first hop! */
7867 for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
7868 dvh = dvh->next_neighbour)
7869 {
7870 select_best_pending_from_link (&sc,
7871 queue,
7872 dvh->dv->vl,
7873 dvh,
7874 sizeof (struct GNUNET_PeerIdentity) *
7875 (1 + dvh->distance) +
7876 sizeof (struct TransportDVBoxMessage) +
7877 sizeof (struct TransportDVBoxPayloadP));
7878 }
7879 }
7880 if (NULL == sc.best)
7636 { 7881 {
7637 /* message still pending with communciator! 7882 /* no message pending, nothing to do here! */
7638 LOGGING-FIXME: Use stats? Should this not be rare? */
7639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 7883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7640 "Waiting on communicator for queue %s to %s, going to sleep\n", 7884 "No pending messages, queue `%s' to %s now idle\n",
7641 queue->address, 7885 queue->address,
7642 GNUNET_i2s (&n->pid)); 7886 GNUNET_i2s (&n->pid));
7887 queue->idle = GNUNET_YES;
7643 return; 7888 return;
7644 } 7889 }
7645 schedule_transmit_on_queue (queue, GNUNET_YES); 7890
7646 if (NULL != queue->transmit_task) 7891 /* Given selection in `sc`, do transmission */
7892 pm = sc.best;
7893 if (GNUNET_YES == sc.frag)
7647 { 7894 {
7648 GNUNET_log ( 7895 pm = fragment_message (queue, sc.dvh, sc.best);
7649 GNUNET_ERROR_TYPE_DEBUG, 7896 if (NULL == pm)
7650 "Scheduled transmission on queue %s to %s for later, going to sleep\n", 7897 {
7651 queue->address, 7898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7652 GNUNET_i2s (&n->pid)); 7899 "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
7653 return; /* do it later */ 7900 queue->address,
7654 } 7901 GNUNET_i2s (&n->pid),
7655 overhead = 0; 7902 pm->logging_uuid);
7656 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) 7903 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
7657 overhead += sizeof (struct TransportReliabilityBoxMessage); 7904 }
7658 s = pm;
7659 if ( ( (0 != queue->mtu) &&
7660 (pm->bytes_msg + overhead > queue->mtu) ) ||
7661 (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
7662 (NULL != pm->head_frag /* fragments already exist, should
7663 respect that even if MTU is 0 for
7664 this queue */) )
7665 s = fragment_message (queue, pm->dvh, s);
7666 if (NULL == s)
7667 {
7668 /* Fragmentation failed, try next message... */
7669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7670 "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
7671 queue->address,
7672 GNUNET_i2s (&n->pid),
7673 pm->logging_uuid);
7674 schedule_transmit_on_queue (queue, GNUNET_NO);
7675 return;
7676 } 7905 }
7677 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) 7906 else if (GNUNET_YES == sc.relb)
7678 // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
7679 s = reliability_box_message (queue, pm->dvh, s);
7680 if (NULL == s)
7681 { 7907 {
7682 /* Reliability boxing failed, try next message... */ 7908 pm = reliability_box_message (queue, sc.dvh, sc.best);
7683 GNUNET_log ( 7909 if (NULL == pm)
7684 GNUNET_ERROR_TYPE_DEBUG, 7910 {
7685 "Reliability boxing failed queue %s to %s for <%llu>, trying again\n", 7911 /* Reliability boxing failed, try next message... */
7686 queue->address, 7912 GNUNET_log (
7687 GNUNET_i2s (&n->pid), 7913 GNUNET_ERROR_TYPE_DEBUG,
7688 pm->logging_uuid); 7914 "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
7689 schedule_transmit_on_queue (queue, GNUNET_NO); 7915 queue->address,
7690 return; 7916 GNUNET_i2s (&n->pid),
7917 pm->logging_uuid);
7918 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
7919 return;
7920 }
7691 } 7921 }
7922 else
7923 pm = sc.best; /* no boxing required */
7692 7924
7693 /* Pass 's' for transission to the communicator */ 7925 /* Pass 'pm' for transission to the communicator */
7694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 7926 GNUNET_log (
7695 "Passing message <%llu> to queue %s for peer %s\n", 7927 GNUNET_ERROR_TYPE_DEBUG,
7696 s->logging_uuid, 7928 "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
7697 queue->address, 7929 pm->logging_uuid,
7698 GNUNET_i2s (&n->pid)); 7930 queue->address,
7699 queue_send_msg (queue, s, &s[1], s->bytes_msg); 7931 GNUNET_i2s (&n->pid),
7700 // FIXME: do something similar to the logic below 7932 sc.consideration_counter);
7701 // in defragmentation / reliability ACK handling! 7933 queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
7702 7934
7703 /* Check if this transmission somehow conclusively finished handing 'pm' 7935 /* Check if this transmission somehow conclusively finished handing 'pm'
7704 even without any explicit ACKs */ 7936 even without any explicit ACKs */
7705 if ((PMT_CORE == s->pmt) && 7937 if ((PMT_CORE == pm->pmt) ||
7706 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) 7938 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
7707 { 7939 {
7708 /* Full message sent, and over reliabile channel */ 7940 completed_pending_message (pm);
7709 client_send_response (pm);
7710 }
7711 else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
7712 queue->tc->details.communicator.cc) &&
7713 (PMT_FRAGMENT_BOX == s->pmt))
7714 {
7715 struct PendingMessage *pos;
7716
7717 /* Fragment sent over reliabile channel */
7718 free_fragment_tree (s);
7719 pos = s->frag_parent;
7720 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
7721 GNUNET_free (s);
7722 /* check if subtree is done */
7723 while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
7724 (pos != pm))
7725 {
7726 s = pos;
7727 pos = s->frag_parent;
7728 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
7729 GNUNET_free (s);
7730 }
7731
7732 /* Was this the last applicable fragmment? */
7733 if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
7734 client_send_response (pm);
7735 }
7736 else if (PMT_CORE != pm->pmt)
7737 {
7738 /* This was an acknowledgement of some type, always free */
7739 free_pending_message (pm);
7740 } 7941 }
7741 else 7942 else
7742 { 7943 {
@@ -7748,15 +7949,13 @@ transmit_on_queue (void *cls)
7748 retransmitting. Note that in the future this heuristic should 7949 retransmitting. Note that in the future this heuristic should
7749 likely be improved further (measure RTT stability, consider 7950 likely be improved further (measure RTT stability, consider
7750 message urgency and size when delaying ACKs, etc.) */ 7951 message urgency and size when delaying ACKs, etc.) */
7751 update_pm_next_attempt (s, 7952 update_pm_next_attempt (pm,
7752 GNUNET_TIME_relative_to_absolute ( 7953 GNUNET_TIME_relative_to_absolute (
7753 GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, 7954 GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
7754 4))); 7955 4)));
7755 } 7956 }
7756
7757 /* finally, re-schedule queue transmission task itself */ 7957 /* finally, re-schedule queue transmission task itself */
7758 schedule_transmit_on_queue (queue, GNUNET_NO); 7958 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
7759#endif
7760} 7959}
7761 7960
7762 7961
@@ -7871,7 +8070,7 @@ handle_send_message_ack (void *cls,
7871 for (struct Queue *queue = tc->details.communicator.queue_head; 8070 for (struct Queue *queue = tc->details.communicator.queue_head;
7872 NULL != queue; 8071 NULL != queue;
7873 queue = queue->next_client) 8072 queue = queue->next_client)
7874 schedule_transmit_on_queue (queue, GNUNET_NO); 8073 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
7875 } 8074 }
7876 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) 8075 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
7877 { 8076 {
@@ -7880,7 +8079,7 @@ handle_send_message_ack (void *cls,
7880 "# Transmission throttled due to queue queue limit", 8079 "# Transmission throttled due to queue queue limit",
7881 -1, 8080 -1,
7882 GNUNET_NO); 8081 GNUNET_NO);
7883 schedule_transmit_on_queue (qe->queue, GNUNET_NO); 8082 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
7884 } 8083 }
7885 8084
7886 if (NULL != (pm = qe->pm)) 8085 if (NULL != (pm = qe->pm))
@@ -7894,21 +8093,7 @@ handle_send_message_ack (void *cls,
7894 transmit on queue for queues of the neighbour */ 8093 transmit on queue for queues of the neighbour */
7895 vl = pm->vl; 8094 vl = pm->vl;
7896 if (vl->pending_msg_head == pm) 8095 if (vl->pending_msg_head == pm)
7897 { 8096 check_vl_transmission (vl);
7898#if FIXME - NEXT
7899 for (struct Queue *queue = n->queue_head; NULL != queue;
7900 queue = queue->next_neighbour)
7901 schedule_transmit_on_queue (queue, GNUNET_NO);
7902#endif
7903 }
7904 if (GNUNET_OK != ntohl (sma->status))
7905 {
7906 GNUNET_log (
7907 GNUNET_ERROR_TYPE_INFO,
7908 "Queue failed in transmission <%llu>, will try retransmission immediately\n",
7909 pm->logging_uuid);
7910 update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
7911 }
7912 } 8097 }
7913 GNUNET_free (qe); 8098 GNUNET_free (qe);
7914} 8099}
@@ -8431,6 +8616,7 @@ handle_add_queue_message (void *cls,
8431 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); 8616 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
8432 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); 8617 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
8433 queue->neighbour = neighbour; 8618 queue->neighbour = neighbour;
8619 queue->idle = GNUNET_YES;
8434 memcpy (&queue[1], addr, addr_len); 8620 memcpy (&queue[1], addr, addr_len);
8435 /* notify monitors about new queue */ 8621 /* notify monitors about new queue */
8436 { 8622 {
@@ -8452,6 +8638,8 @@ handle_add_queue_message (void *cls,
8452 &aqm->receiver, 8638 &aqm->receiver,
8453 &check_validation_request_pending, 8639 &check_validation_request_pending,
8454 queue); 8640 queue);
8641 /* look for traffic for this queue */
8642 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
8455 /* might be our first queue, try launching DV learning */ 8643 /* might be our first queue, try launching DV learning */
8456 if (NULL == dvlearn_task) 8644 if (NULL == dvlearn_task)
8457 dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); 8645 dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);