aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c148
1 files changed, 21 insertions, 127 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 3381f886f..40e820e2f 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -1447,7 +1447,7 @@ struct Queue
1447 1447
1448 /** 1448 /**
1449 * Task scheduled for the time when this queue can (likely) transmit the 1449 * Task scheduled for the time when this queue can (likely) transmit the
1450 * next message. Still needs to check with the @e tracker_out to be sure. 1450 * next message.
1451 */ 1451 */
1452 struct GNUNET_SCHEDULER_Task *transmit_task; 1452 struct GNUNET_SCHEDULER_Task *transmit_task;
1453 1453
@@ -1511,16 +1511,6 @@ struct Queue
1511 * Connection status for this queue. 1511 * Connection status for this queue.
1512 */ 1512 */
1513 enum GNUNET_TRANSPORT_ConnectionStatus cs; 1513 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1514
1515 /**
1516 * How much outbound bandwidth do we have available for this queue?
1517 */
1518 struct GNUNET_BANDWIDTH_Tracker tracker_out;
1519
1520 /**
1521 * How much inbound bandwidth do we have available for this queue?
1522 */
1523 struct GNUNET_BANDWIDTH_Tracker tracker_in;
1524}; 1514};
1525 1515
1526 1516
@@ -2956,6 +2946,9 @@ static void
2956cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, 2946cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2957 struct GNUNET_BANDWIDTH_Value32NBO quota_out) 2947 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2958{ 2948{
2949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2950 "Informing CORE clients about connection to %s\n",
2951 GNUNET_i2s (pid));
2959 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) 2952 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
2960 { 2953 {
2961 if (CT_CORE != tc->type) 2954 if (CT_CORE != tc->type)
@@ -2973,6 +2966,9 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2973static void 2966static void
2974cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) 2967cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2975{ 2968{
2969 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2970 "Informing CORE clients about disconnect from %s\n",
2971 GNUNET_i2s (pid));
2976 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) 2972 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
2977 { 2973 {
2978 struct GNUNET_MQ_Envelope *env; 2974 struct GNUNET_MQ_Envelope *env;
@@ -2988,10 +2984,9 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2988 2984
2989 2985
2990/** 2986/**
2991 * We believe we are ready to transmit a message on a queue. Double-checks 2987 * We believe we are ready to transmit a message on a queue. Gives the
2992 * with the queue's "tracker_out" and then gives the message to the 2988 * message to the communicator for transmission (updating the tracker,
2993 * communicator for transmission (updating the tracker, and re-scheduling 2989 * and re-scheduling itself if applicable).
2994 * itself if applicable).
2995 * 2990 *
2996 * @param cls the `struct Queue` to process transmissions for 2991 * @param cls the `struct Queue` to process transmissions for
2997 */ 2992 */
@@ -3017,7 +3012,6 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
3017 struct Neighbour *n = queue->neighbour; 3012 struct Neighbour *n = queue->neighbour;
3018 struct PendingMessage *pm = n->pending_msg_head; 3013 struct PendingMessage *pm = n->pending_msg_head;
3019 struct GNUNET_TIME_Relative out_delay; 3014 struct GNUNET_TIME_Relative out_delay;
3020 unsigned int wsize;
3021 3015
3022 GNUNET_assert (NULL != pm); 3016 GNUNET_assert (NULL != pm);
3023 if (queue->tc->details.communicator.total_queue_length >= 3017 if (queue->tc->details.communicator.total_queue_length >=
@@ -3039,14 +3033,16 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
3039 return; 3033 return;
3040 } 3034 }
3041 3035
3042 wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */ 3036 out_delay = GNUNET_TIME_absolute_get_remaining (pm->next_attempt);
3043 : queue->mtu;
3044 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize);
3045 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
3046 pm->next_attempt),
3047 out_delay);
3048 if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) 3037 if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
3038 {
3039 GNUNET_log (
3040 GNUNET_ERROR_TYPE_DEBUG,
3041 "Schedule transmission on queue %llu of %s decides to run immediately\n",
3042 (unsigned long long) queue->qid,
3043 GNUNET_i2s (&n->pid));
3049 return; /* we should run immediately! */ 3044 return; /* we should run immediately! */
3045 }
3050 /* queue has changed since we were scheduled, reschedule again */ 3046 /* queue has changed since we were scheduled, reschedule again */
3051 queue->transmit_task = 3047 queue->transmit_task =
3052 GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); 3048 GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
@@ -3142,8 +3138,6 @@ free_queue (struct Queue *queue)
3142 schedule_transmit_on_queue (s, GNUNET_NO); 3138 schedule_transmit_on_queue (s, GNUNET_NO);
3143 } 3139 }
3144 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); 3140 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3145 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
3146 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
3147 GNUNET_free (queue); 3141 GNUNET_free (queue);
3148 3142
3149 update_neighbour_core_visibility (neighbour); 3143 update_neighbour_core_visibility (neighbour);
@@ -5918,6 +5912,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
5918 hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT); 5912 hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
5919 hop->path_valid_until = path_valid_until; 5913 hop->path_valid_until = path_valid_until;
5920 hop->distance = path_len - 2; 5914 hop->distance = path_len - 2;
5915 hop->pd.aged_rtt = network_latency;
5921 GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop); 5916 GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
5922 GNUNET_CONTAINER_MDLL_insert (neighbour, 5917 GNUNET_CONTAINER_MDLL_insert (neighbour,
5923 next_hop->dv_head, 5918 next_hop->dv_head,
@@ -7185,25 +7180,6 @@ check_add_queue_message (void *cls,
7185 7180
7186 7181
7187/** 7182/**
7188 * Bandwidth tracker informs us that the delay until we should receive
7189 * more has changed.
7190 *
7191 * @param cls a `struct Queue` for which the delay changed
7192 */
7193static void
7194tracker_update_in_cb (void *cls)
7195{
7196 struct Queue *queue = cls;
7197 struct GNUNET_TIME_Relative in_delay;
7198 unsigned int rsize;
7199
7200 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
7201 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize);
7202 // FIXME: how exactly do we do inbound flow control?
7203}
7204
7205
7206/**
7207 * If necessary, generates the UUID for a @a pm 7183 * If necessary, generates the UUID for a @a pm
7208 * 7184 *
7209 * @param pm pending message to generate UUID for. 7185 * @param pm pending message to generate UUID for.
@@ -7474,8 +7450,8 @@ update_pm_next_attempt (struct PendingMessage *pm,
7474 7450
7475 7451
7476/** 7452/**
7477 * We believe we are ready to transmit a message on a queue. Double-checks 7453 * We believe we are ready to transmit a message on a queue.
7478 * with the queue's "tracker_out" and then gives the message to the 7454 * Gives the message to the
7479 * communicator for transmission (updating the tracker, and re-scheduling 7455 * communicator for transmission (updating the tracker, and re-scheduling
7480 * itself if applicable). 7456 * itself if applicable).
7481 * 7457 *
@@ -7600,73 +7576,6 @@ transmit_on_queue (void *cls)
7600 7576
7601 7577
7602/** 7578/**
7603 * Bandwidth tracker informs us that the delay until we
7604 * can transmit again changed.
7605 *
7606 * @param cls a `struct Queue` for which the delay changed
7607 */
7608static void
7609tracker_update_out_cb (void *cls)
7610{
7611 struct Queue *queue = cls;
7612 struct Neighbour *n = queue->neighbour;
7613
7614 if (NULL == n->pending_msg_head)
7615 {
7616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7617 "Bandwidth allocation updated for empty transmission queue `%s'\n",
7618 queue->address);
7619 return; /* no message pending, nothing to do here! */
7620 }
7621 GNUNET_SCHEDULER_cancel (queue->transmit_task);
7622 queue->transmit_task = NULL;
7623 schedule_transmit_on_queue (queue, GNUNET_NO);
7624}
7625
7626
7627/**
7628 * Bandwidth tracker informs us that excessive outbound bandwidth was
7629 * allocated which is not being used.
7630 *
7631 * @param cls a `struct Queue` for which the excess was noted
7632 */
7633static void
7634tracker_excess_out_cb (void *cls)
7635{
7636 (void) cls;
7637
7638 /* FIXME: trigger excess bandwidth report to core? Right now,
7639 this is done internally within transport_api2_core already,
7640 but we probably want to change the logic and trigger it
7641 from here via a message instead! */
7642 /* TODO: maybe inform someone at this point? */
7643 GNUNET_STATISTICS_update (GST_stats,
7644 "# Excess outbound bandwidth reported",
7645 1,
7646 GNUNET_NO);
7647}
7648
7649
7650/**
7651 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
7652 * which is not being used.
7653 *
7654 * @param cls a `struct Queue` for which the excess was noted
7655 */
7656static void
7657tracker_excess_in_cb (void *cls)
7658{
7659 (void) cls;
7660
7661 /* TODO: maybe inform somone at this point? */
7662 GNUNET_STATISTICS_update (GST_stats,
7663 "# Excess inbound bandwidth reported",
7664 1,
7665 GNUNET_NO);
7666}
7667
7668
7669/**
7670 * Queue to a peer went down. Process the request. 7579 * Queue to a peer went down. Process the request.
7671 * 7580 *
7672 * @param cls the client 7581 * @param cls the client
@@ -8220,7 +8129,6 @@ neighbour_dv_monotime_cb (void *cls,
8220{ 8129{
8221 struct Neighbour *n = cls; 8130 struct Neighbour *n = cls;
8222 struct GNUNET_TIME_AbsoluteNBO *mtbe; 8131 struct GNUNET_TIME_AbsoluteNBO *mtbe;
8223 struct GNUNET_TIME_Absolute mt;
8224 8132
8225 (void) emsg; 8133 (void) emsg;
8226 if (NULL == record) 8134 if (NULL == record)
@@ -8299,20 +8207,6 @@ handle_add_queue_message (void *cls,
8299 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); 8207 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
8300 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); 8208 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
8301 queue->neighbour = neighbour; 8209 queue->neighbour = neighbour;
8302 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
8303 &tracker_update_in_cb,
8304 queue,
8305 GNUNET_BANDWIDTH_ZERO,
8306 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
8307 &tracker_excess_in_cb,
8308 queue);
8309 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
8310 &tracker_update_out_cb,
8311 queue,
8312 GNUNET_BANDWIDTH_ZERO,
8313 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
8314 &tracker_excess_out_cb,
8315 queue);
8316 memcpy (&queue[1], addr, addr_len); 8210 memcpy (&queue[1], addr, addr_len);
8317 /* notify monitors about new queue */ 8211 /* notify monitors about new queue */
8318 { 8212 {