diff options
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 148 |
1 files changed, 21 insertions, 127 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 3381f886f..40e820e2f 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -1447,7 +1447,7 @@ struct Queue | |||
1447 | 1447 | ||
1448 | /** | 1448 | /** |
1449 | * Task scheduled for the time when this queue can (likely) transmit the | 1449 | * Task scheduled for the time when this queue can (likely) transmit the |
1450 | * next message. Still needs to check with the @e tracker_out to be sure. | 1450 | * next message. |
1451 | */ | 1451 | */ |
1452 | struct GNUNET_SCHEDULER_Task *transmit_task; | 1452 | struct GNUNET_SCHEDULER_Task *transmit_task; |
1453 | 1453 | ||
@@ -1511,16 +1511,6 @@ struct Queue | |||
1511 | * Connection status for this queue. | 1511 | * Connection status for this queue. |
1512 | */ | 1512 | */ |
1513 | enum GNUNET_TRANSPORT_ConnectionStatus cs; | 1513 | enum GNUNET_TRANSPORT_ConnectionStatus cs; |
1514 | |||
1515 | /** | ||
1516 | * How much outbound bandwidth do we have available for this queue? | ||
1517 | */ | ||
1518 | struct GNUNET_BANDWIDTH_Tracker tracker_out; | ||
1519 | |||
1520 | /** | ||
1521 | * How much inbound bandwidth do we have available for this queue? | ||
1522 | */ | ||
1523 | struct GNUNET_BANDWIDTH_Tracker tracker_in; | ||
1524 | }; | 1514 | }; |
1525 | 1515 | ||
1526 | 1516 | ||
@@ -2956,6 +2946,9 @@ static void | |||
2956 | cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, | 2946 | cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, |
2957 | struct GNUNET_BANDWIDTH_Value32NBO quota_out) | 2947 | struct GNUNET_BANDWIDTH_Value32NBO quota_out) |
2958 | { | 2948 | { |
2949 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2950 | "Informing CORE clients about connection to %s\n", | ||
2951 | GNUNET_i2s (pid)); | ||
2959 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | 2952 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) |
2960 | { | 2953 | { |
2961 | if (CT_CORE != tc->type) | 2954 | if (CT_CORE != tc->type) |
@@ -2973,6 +2966,9 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, | |||
2973 | static void | 2966 | static void |
2974 | cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) | 2967 | cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) |
2975 | { | 2968 | { |
2969 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2970 | "Informing CORE clients about disconnect from %s\n", | ||
2971 | GNUNET_i2s (pid)); | ||
2976 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | 2972 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) |
2977 | { | 2973 | { |
2978 | struct GNUNET_MQ_Envelope *env; | 2974 | struct GNUNET_MQ_Envelope *env; |
@@ -2988,10 +2984,9 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) | |||
2988 | 2984 | ||
2989 | 2985 | ||
2990 | /** | 2986 | /** |
2991 | * We believe we are ready to transmit a message on a queue. Double-checks | 2987 | * We believe we are ready to transmit a message on a queue. Gives the |
2992 | * with the queue's "tracker_out" and then gives the message to the | 2988 | * message to the communicator for transmission (updating the tracker, |
2993 | * communicator for transmission (updating the tracker, and re-scheduling | 2989 | * and re-scheduling itself if applicable). |
2994 | * itself if applicable). | ||
2995 | * | 2990 | * |
2996 | * @param cls the `struct Queue` to process transmissions for | 2991 | * @param cls the `struct Queue` to process transmissions for |
2997 | */ | 2992 | */ |
@@ -3017,7 +3012,6 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) | |||
3017 | struct Neighbour *n = queue->neighbour; | 3012 | struct Neighbour *n = queue->neighbour; |
3018 | struct PendingMessage *pm = n->pending_msg_head; | 3013 | struct PendingMessage *pm = n->pending_msg_head; |
3019 | struct GNUNET_TIME_Relative out_delay; | 3014 | struct GNUNET_TIME_Relative out_delay; |
3020 | unsigned int wsize; | ||
3021 | 3015 | ||
3022 | GNUNET_assert (NULL != pm); | 3016 | GNUNET_assert (NULL != pm); |
3023 | if (queue->tc->details.communicator.total_queue_length >= | 3017 | if (queue->tc->details.communicator.total_queue_length >= |
@@ -3039,14 +3033,16 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) | |||
3039 | return; | 3033 | return; |
3040 | } | 3034 | } |
3041 | 3035 | ||
3042 | wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */ | 3036 | out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt); |
3043 | : queue->mtu; | ||
3044 | out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize); | ||
3045 | out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining ( | ||
3046 | pm->next_attempt), | ||
3047 | out_delay); | ||
3048 | if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) | 3037 | if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) |
3038 | { | ||
3039 | GNUNET_log ( | ||
3040 | GNUNET_ERROR_TYPE_DEBUG, | ||
3041 | "Schedule transmission on queue %llu of %s decides to run immediately\n", | ||
3042 | (unsigned long long) queue->qid, | ||
3043 | GNUNET_i2s (&n->pid)); | ||
3049 | return; /* we should run immediately! */ | 3044 | return; /* we should run immediately! */ |
3045 | } | ||
3050 | /* queue has changed since we were scheduled, reschedule again */ | 3046 | /* queue has changed since we were scheduled, reschedule again */ |
3051 | queue->transmit_task = | 3047 | queue->transmit_task = |
3052 | GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); | 3048 | GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); |
@@ -3142,8 +3138,6 @@ free_queue (struct Queue *queue) | |||
3142 | schedule_transmit_on_queue (s, GNUNET_NO); | 3138 | schedule_transmit_on_queue (s, GNUNET_NO); |
3143 | } | 3139 | } |
3144 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); | 3140 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); |
3145 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); | ||
3146 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); | ||
3147 | GNUNET_free (queue); | 3141 | GNUNET_free (queue); |
3148 | 3142 | ||
3149 | update_neighbour_core_visibility (neighbour); | 3143 | update_neighbour_core_visibility (neighbour); |
@@ -5918,6 +5912,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, | |||
5918 | hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT); | 5912 | hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT); |
5919 | hop->path_valid_until = path_valid_until; | 5913 | hop->path_valid_until = path_valid_until; |
5920 | hop->distance = path_len - 2; | 5914 | hop->distance = path_len - 2; |
5915 | hop->pd.aged_rtt = network_latency; | ||
5921 | GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop); | 5916 | GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop); |
5922 | GNUNET_CONTAINER_MDLL_insert (neighbour, | 5917 | GNUNET_CONTAINER_MDLL_insert (neighbour, |
5923 | next_hop->dv_head, | 5918 | next_hop->dv_head, |
@@ -7185,25 +7180,6 @@ check_add_queue_message (void *cls, | |||
7185 | 7180 | ||
7186 | 7181 | ||
7187 | /** | 7182 | /** |
7188 | * Bandwidth tracker informs us that the delay until we should receive | ||
7189 | * more has changed. | ||
7190 | * | ||
7191 | * @param cls a `struct Queue` for which the delay changed | ||
7192 | */ | ||
7193 | static void | ||
7194 | tracker_update_in_cb (void *cls) | ||
7195 | { | ||
7196 | struct Queue *queue = cls; | ||
7197 | struct GNUNET_TIME_Relative in_delay; | ||
7198 | unsigned int rsize; | ||
7199 | |||
7200 | rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu; | ||
7201 | in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize); | ||
7202 | // FIXME: how exactly do we do inbound flow control? | ||
7203 | } | ||
7204 | |||
7205 | |||
7206 | /** | ||
7207 | * If necessary, generates the UUID for a @a pm | 7183 | * If necessary, generates the UUID for a @a pm |
7208 | * | 7184 | * |
7209 | * @param pm pending message to generate UUID for. | 7185 | * @param pm pending message to generate UUID for. |
@@ -7474,8 +7450,8 @@ update_pm_next_attempt (struct PendingMessage *pm, | |||
7474 | 7450 | ||
7475 | 7451 | ||
7476 | /** | 7452 | /** |
7477 | * We believe we are ready to transmit a message on a queue. Double-checks | 7453 | * We believe we are ready to transmit a message on a queue. |
7478 | * with the queue's "tracker_out" and then gives the message to the | 7454 | * Gives the message to the |
7479 | * communicator for transmission (updating the tracker, and re-scheduling | 7455 | * communicator for transmission (updating the tracker, and re-scheduling |
7480 | * itself if applicable). | 7456 | * itself if applicable). |
7481 | * | 7457 | * |
@@ -7600,73 +7576,6 @@ transmit_on_queue (void *cls) | |||
7600 | 7576 | ||
7601 | 7577 | ||
7602 | /** | 7578 | /** |
7603 | * Bandwidth tracker informs us that the delay until we | ||
7604 | * can transmit again changed. | ||
7605 | * | ||
7606 | * @param cls a `struct Queue` for which the delay changed | ||
7607 | */ | ||
7608 | static void | ||
7609 | tracker_update_out_cb (void *cls) | ||
7610 | { | ||
7611 | struct Queue *queue = cls; | ||
7612 | struct Neighbour *n = queue->neighbour; | ||
7613 | |||
7614 | if (NULL == n->pending_msg_head) | ||
7615 | { | ||
7616 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
7617 | "Bandwidth allocation updated for empty transmission queue `%s'\n", | ||
7618 | queue->address); | ||
7619 | return; /* no message pending, nothing to do here! */ | ||
7620 | } | ||
7621 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | ||
7622 | queue->transmit_task = NULL; | ||
7623 | schedule_transmit_on_queue (queue, GNUNET_NO); | ||
7624 | } | ||
7625 | |||
7626 | |||
7627 | /** | ||
7628 | * Bandwidth tracker informs us that excessive outbound bandwidth was | ||
7629 | * allocated which is not being used. | ||
7630 | * | ||
7631 | * @param cls a `struct Queue` for which the excess was noted | ||
7632 | */ | ||
7633 | static void | ||
7634 | tracker_excess_out_cb (void *cls) | ||
7635 | { | ||
7636 | (void) cls; | ||
7637 | |||
7638 | /* FIXME: trigger excess bandwidth report to core? Right now, | ||
7639 | this is done internally within transport_api2_core already, | ||
7640 | but we probably want to change the logic and trigger it | ||
7641 | from here via a message instead! */ | ||
7642 | /* TODO: maybe inform someone at this point? */ | ||
7643 | GNUNET_STATISTICS_update (GST_stats, | ||
7644 | "# Excess outbound bandwidth reported", | ||
7645 | 1, | ||
7646 | GNUNET_NO); | ||
7647 | } | ||
7648 | |||
7649 | |||
7650 | /** | ||
7651 | * Bandwidth tracker informs us that excessive inbound bandwidth was allocated | ||
7652 | * which is not being used. | ||
7653 | * | ||
7654 | * @param cls a `struct Queue` for which the excess was noted | ||
7655 | */ | ||
7656 | static void | ||
7657 | tracker_excess_in_cb (void *cls) | ||
7658 | { | ||
7659 | (void) cls; | ||
7660 | |||
7661 | /* TODO: maybe inform somone at this point? */ | ||
7662 | GNUNET_STATISTICS_update (GST_stats, | ||
7663 | "# Excess inbound bandwidth reported", | ||
7664 | 1, | ||
7665 | GNUNET_NO); | ||
7666 | } | ||
7667 | |||
7668 | |||
7669 | /** | ||
7670 | * Queue to a peer went down. Process the request. | 7579 | * Queue to a peer went down. Process the request. |
7671 | * | 7580 | * |
7672 | * @param cls the client | 7581 | * @param cls the client |
@@ -8220,7 +8129,6 @@ neighbour_dv_monotime_cb (void *cls, | |||
8220 | { | 8129 | { |
8221 | struct Neighbour *n = cls; | 8130 | struct Neighbour *n = cls; |
8222 | struct GNUNET_TIME_AbsoluteNBO *mtbe; | 8131 | struct GNUNET_TIME_AbsoluteNBO *mtbe; |
8223 | struct GNUNET_TIME_Absolute mt; | ||
8224 | 8132 | ||
8225 | (void) emsg; | 8133 | (void) emsg; |
8226 | if (NULL == record) | 8134 | if (NULL == record) |
@@ -8299,20 +8207,6 @@ handle_add_queue_message (void *cls, | |||
8299 | queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); | 8207 | queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); |
8300 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); | 8208 | queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); |
8301 | queue->neighbour = neighbour; | 8209 | queue->neighbour = neighbour; |
8302 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, | ||
8303 | &tracker_update_in_cb, | ||
8304 | queue, | ||
8305 | GNUNET_BANDWIDTH_ZERO, | ||
8306 | GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, | ||
8307 | &tracker_excess_in_cb, | ||
8308 | queue); | ||
8309 | GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, | ||
8310 | &tracker_update_out_cb, | ||
8311 | queue, | ||
8312 | GNUNET_BANDWIDTH_ZERO, | ||
8313 | GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, | ||
8314 | &tracker_excess_out_cb, | ||
8315 | queue); | ||
8316 | memcpy (&queue[1], addr, addr_len); | 8210 | memcpy (&queue[1], addr, addr_len); |
8317 | /* notify monitors about new queue */ | 8211 | /* notify monitors about new queue */ |
8318 | { | 8212 | { |