diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-06-02 21:58:37 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-06-02 21:58:37 +0200 |
commit | 3d9f1acfb62cd53fc7d1441eb33e1341c9ff3790 (patch) | |
tree | 18ee7d45da8e4235440ddd85926cb7a798263b70 | |
parent | 0373e8a441feb05359b12be0e1803423783b55b2 (diff) | |
download | gnunet-3d9f1acfb62cd53fc7d1441eb33e1341c9ff3790.tar.gz gnunet-3d9f1acfb62cd53fc7d1441eb33e1341c9ff3790.zip |
generate and handle TRANSPORT_FLOW_CONTROL messages (TNG)
-rw-r--r-- | src/include/gnunet_protocols.h | 5 | ||||
-rw-r--r-- | src/transport/gnunet-service-tng.c | 410 |
2 files changed, 272 insertions, 143 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index a00ddacca..d93e12bfb 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -3149,6 +3149,11 @@ extern "C" { | |||
3149 | */ | 3149 | */ |
3150 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING 1220 | 3150 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING 1220 |
3151 | 3151 | ||
3152 | /** | ||
3153 | * Transport signalling incoming backchannel message to a communicator. | ||
3154 | */ | ||
3155 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221 | ||
3156 | |||
3152 | 3157 | ||
3153 | /** | 3158 | /** |
3154 | * Message sent to indicate to the transport that a monitor | 3159 | * Message sent to indicate to the transport that a monitor |
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 471ded644..7cc9f193c 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -28,10 +28,7 @@ | |||
28 | * communicators do not offer flow control). | 28 | * communicators do not offer flow control). |
29 | * We do transmit FC window sizes now. Left: | 29 | * We do transmit FC window sizes now. Left: |
30 | * for SENDING) | 30 | * for SENDING) |
31 | * - Throttle sending if "outbound_fc_window_size_used" reaches limit | 31 | * - need to call consider_sending_fc() periodically if it goes unanswered! |
32 | * - Send *new* challenge when we get close to the limit (including | ||
33 | * at the beginning when the limit is zero!) | ||
34 | * - Retransmit challenge if it goes unanswered! | ||
35 | * | 32 | * |
36 | * for DV) | 33 | * for DV) |
37 | * - send challenges via DV (when DVH is confirmed *and* we care about | 34 | * - send challenges via DV (when DVH is confirmed *and* we care about |
@@ -78,10 +75,6 @@ | |||
78 | * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls] | 75 | * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls] |
79 | * - Set last_window_consum_limit promise properly based on | 76 | * - Set last_window_consum_limit promise properly based on |
80 | * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls] | 77 | * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls] |
81 | * - re-sending challenge response without a challenge when we have | ||
82 | * significantly increased the FC window (upon CORE being done with messages) | ||
83 | * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] | ||
84 | * Also can re-use signature in this case [CPU]. Marked with "TODO-M1" | ||
85 | * | 78 | * |
86 | * Design realizations / discussion: | 79 | * Design realizations / discussion: |
87 | * - communicators do flow control by calling MQ "notify sent" | 80 | * - communicators do flow control by calling MQ "notify sent" |
@@ -1326,6 +1319,13 @@ struct VirtualLink | |||
1326 | struct GNUNET_TIME_Absolute n_challenge_time; | 1319 | struct GNUNET_TIME_Absolute n_challenge_time; |
1327 | 1320 | ||
1328 | /** | 1321 | /** |
1322 | * When did we last send a | ||
1323 | * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message? | ||
1324 | * Used to determine whether it is time to re-transmit the message. | ||
1325 | */ | ||
1326 | struct GNUNET_TIME_Absolute last_fc_transmission; | ||
1327 | |||
1328 | /** | ||
1329 | * Sender timestamp of the last | 1329 | * Sender timestamp of the last |
1330 | * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have | 1330 | * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have |
1331 | * received. Note that we do not persist this monotonic time as we | 1331 | * received. Note that we do not persist this monotonic time as we |
@@ -1388,8 +1388,7 @@ struct VirtualLink | |||
1388 | 1388 | ||
1389 | /** | 1389 | /** |
1390 | * Our current flow control window size in bytes. We | 1390 | * Our current flow control window size in bytes. We |
1391 | * are allowed to transmit this many bytes to @a n as per | 1391 | * are allowed to transmit this many bytes to @a n. |
1392 | * our @e my_challenge "account". | ||
1393 | */ | 1392 | */ |
1394 | uint64_t outbound_fc_window_size; | 1393 | uint64_t outbound_fc_window_size; |
1395 | 1394 | ||
@@ -3967,138 +3966,6 @@ pick_random_dv_hops (const struct DistanceVector *dv, | |||
3967 | 3966 | ||
3968 | 3967 | ||
3969 | /** | 3968 | /** |
3970 | * There is a message at the head of the pending messages for @a vl | ||
3971 | * which may be ready for transmission. Check if a queue is ready to | ||
3972 | * take it. | ||
3973 | * | ||
3974 | * This function must (1) check for flow control to ensure that we can | ||
3975 | * right now send to @a vl, (2) check that the pending message in the | ||
3976 | * queue is actually eligible, (3) determine if any applicable queue | ||
3977 | * (direct neighbour or DVH path) is ready to accept messages, and | ||
3978 | * (4) prioritize based on the preferences associated with the | ||
3979 | * pending message. | ||
3980 | * | ||
3981 | * So yeah, easy. | ||
3982 | * | ||
3983 | * @param vl virtual link where we should check for transmission | ||
3984 | */ | ||
3985 | static void | ||
3986 | check_vl_transmission (struct VirtualLink *vl) | ||
3987 | { | ||
3988 | struct Neighbour *n = vl->n; | ||
3989 | struct DistanceVector *dv = vl->dv; | ||
3990 | struct GNUNET_TIME_Absolute now; | ||
3991 | int elig; | ||
3992 | |||
3993 | /* FIXME-FC: need to implement virtual link flow control! */ | ||
3994 | |||
3995 | /* Check that we have an eligible pending message! | ||
3996 | (cheaper than having #transmit_on_queue() find out!) */ | ||
3997 | elig = GNUNET_NO; | ||
3998 | for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; | ||
3999 | pm = pm->next_vl) | ||
4000 | { | ||
4001 | if (NULL != pm->qe) | ||
4002 | continue; /* not eligible, is in a queue! */ | ||
4003 | elig = GNUNET_YES; | ||
4004 | break; | ||
4005 | } | ||
4006 | if (GNUNET_NO == elig) | ||
4007 | return; | ||
4008 | |||
4009 | /* Notify queues at direct neighbours that we are interested */ | ||
4010 | now = GNUNET_TIME_absolute_get (); | ||
4011 | if (NULL != n) | ||
4012 | { | ||
4013 | for (struct Queue *queue = n->queue_head; NULL != queue; | ||
4014 | queue = queue->next_neighbour) | ||
4015 | if ((GNUNET_YES == queue->idle) && | ||
4016 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
4017 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
4018 | } | ||
4019 | /* Notify queues via DV that we are interested */ | ||
4020 | if (NULL != dv) | ||
4021 | { | ||
4022 | /* Do DV with lower scheduler priority, which effectively means that | ||
4023 | IF a neighbour exists and is available, we prefer it. */ | ||
4024 | for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; | ||
4025 | pos = pos->next_dv) | ||
4026 | { | ||
4027 | struct Neighbour *nh = pos->next_hop; | ||
4028 | |||
4029 | if (pos->path_valid_until.abs_value_us <= now.abs_value_us) | ||
4030 | continue; /* skip this one: path not validated */ | ||
4031 | for (struct Queue *queue = nh->queue_head; NULL != queue; | ||
4032 | queue = queue->next_neighbour) | ||
4033 | if ((GNUNET_YES == queue->idle) && | ||
4034 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
4035 | schedule_transmit_on_queue (queue, | ||
4036 | GNUNET_SCHEDULER_PRIORITY_BACKGROUND); | ||
4037 | } | ||
4038 | } | ||
4039 | } | ||
4040 | |||
4041 | |||
4042 | /** | ||
4043 | * Client asked for transmission to a peer. Process the request. | ||
4044 | * | ||
4045 | * @param cls the client | ||
4046 | * @param obm the send message that was sent | ||
4047 | */ | ||
4048 | static void | ||
4049 | handle_client_send (void *cls, const struct OutboundMessage *obm) | ||
4050 | { | ||
4051 | struct TransportClient *tc = cls; | ||
4052 | struct PendingMessage *pm; | ||
4053 | const struct GNUNET_MessageHeader *obmm; | ||
4054 | uint32_t bytes_msg; | ||
4055 | struct VirtualLink *vl; | ||
4056 | enum GNUNET_MQ_PriorityPreferences pp; | ||
4057 | |||
4058 | GNUNET_assert (CT_CORE == tc->type); | ||
4059 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; | ||
4060 | bytes_msg = ntohs (obmm->size); | ||
4061 | pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); | ||
4062 | vl = lookup_virtual_link (&obm->peer); | ||
4063 | if (NULL == vl) | ||
4064 | { | ||
4065 | /* Failure: don't have this peer as a neighbour (anymore). | ||
4066 | Might have gone down asynchronously, so this is NOT | ||
4067 | a protocol violation by CORE. Still count the event, | ||
4068 | as this should be rare. */ | ||
4069 | GNUNET_SERVICE_client_continue (tc->client); | ||
4070 | GNUNET_STATISTICS_update (GST_stats, | ||
4071 | "# messages dropped (neighbour unknown)", | ||
4072 | 1, | ||
4073 | GNUNET_NO); | ||
4074 | return; | ||
4075 | } | ||
4076 | |||
4077 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); | ||
4078 | pm->logging_uuid = logging_uuid_gen++; | ||
4079 | pm->prefs = pp; | ||
4080 | pm->client = tc; | ||
4081 | pm->vl = vl; | ||
4082 | pm->bytes_msg = bytes_msg; | ||
4083 | memcpy (&pm[1], obmm, bytes_msg); | ||
4084 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4085 | "Sending %u bytes as <%llu> to %s\n", | ||
4086 | bytes_msg, | ||
4087 | pm->logging_uuid, | ||
4088 | GNUNET_i2s (&obm->peer)); | ||
4089 | GNUNET_CONTAINER_MDLL_insert (client, | ||
4090 | tc->details.core.pending_msg_head, | ||
4091 | tc->details.core.pending_msg_tail, | ||
4092 | pm); | ||
4093 | GNUNET_CONTAINER_MDLL_insert (vl, | ||
4094 | vl->pending_msg_head, | ||
4095 | vl->pending_msg_tail, | ||
4096 | pm); | ||
4097 | check_vl_transmission (vl); | ||
4098 | } | ||
4099 | |||
4100 | |||
4101 | /** | ||
4102 | * Communicator started. Test message is well-formed. | 3969 | * Communicator started. Test message is well-formed. |
4103 | * | 3970 | * |
4104 | * @param cls the client | 3971 | * @param cls the client |
@@ -4854,6 +4721,187 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, | |||
4854 | 4721 | ||
4855 | 4722 | ||
4856 | /** | 4723 | /** |
4724 | * Something changed on the virtual link with respect to flow | ||
4725 | * control. Consider retransmitting the FC window size. | ||
4726 | * | ||
4727 | * @param vl virtual link to work with | ||
4728 | */ | ||
4729 | static void | ||
4730 | consider_sending_fc (struct VirtualLink *vl) | ||
4731 | { | ||
4732 | struct GNUNET_TIME_Absolute monotime; | ||
4733 | struct TransportFlowControlMessage fc; | ||
4734 | struct GNUNET_TIME_Relative duration; | ||
4735 | |||
4736 | duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission); | ||
4737 | /* FIXME: decide sane criteria on when to do this, instead of doing | ||
4738 | it always! */ | ||
4739 | /* For example, we should probably ONLY do this if a bit more than | ||
4740 | an RTT has passed, or if the window changed "significantly" since | ||
4741 | then. */ | ||
4742 | (void) duration; | ||
4743 | |||
4744 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4745 | "Sending FC seq %u to %s with new window %llu\n", | ||
4746 | (unsigned int) vl->fc_seq_gen, | ||
4747 | GNUNET_i2s (&vl->target), | ||
4748 | (unsigned long long) vl->incoming_fc_window_size); | ||
4749 | monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg); | ||
4750 | vl->last_fc_transmission = monotime; | ||
4751 | fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL); | ||
4752 | fc.header.size = htons (sizeof (fc)); | ||
4753 | fc.seq = htonl (vl->fc_seq_gen++); | ||
4754 | fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size); | ||
4755 | fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used); | ||
4756 | fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size); | ||
4757 | fc.sender_time = GNUNET_TIME_absolute_hton (monotime); | ||
4758 | route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE); | ||
4759 | } | ||
4760 | |||
4761 | |||
4762 | /** | ||
4763 | * There is a message at the head of the pending messages for @a vl | ||
4764 | * which may be ready for transmission. Check if a queue is ready to | ||
4765 | * take it. | ||
4766 | * | ||
4767 | * This function must (1) check for flow control to ensure that we can | ||
4768 | * right now send to @a vl, (2) check that the pending message in the | ||
4769 | * queue is actually eligible, (3) determine if any applicable queue | ||
4770 | * (direct neighbour or DVH path) is ready to accept messages, and | ||
4771 | * (4) prioritize based on the preferences associated with the | ||
4772 | * pending message. | ||
4773 | * | ||
4774 | * So yeah, easy. | ||
4775 | * | ||
4776 | * @param vl virtual link where we should check for transmission | ||
4777 | */ | ||
4778 | static void | ||
4779 | check_vl_transmission (struct VirtualLink *vl) | ||
4780 | { | ||
4781 | struct Neighbour *n = vl->n; | ||
4782 | struct DistanceVector *dv = vl->dv; | ||
4783 | struct GNUNET_TIME_Absolute now; | ||
4784 | int elig; | ||
4785 | |||
4786 | /* Check that we have an eligible pending message! | ||
4787 | (cheaper than having #transmit_on_queue() find out!) */ | ||
4788 | elig = GNUNET_NO; | ||
4789 | for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; | ||
4790 | pm = pm->next_vl) | ||
4791 | { | ||
4792 | if (NULL != pm->qe) | ||
4793 | continue; /* not eligible, is in a queue! */ | ||
4794 | if (pm->bytes_msg + vl->outbound_fc_window_size_used > | ||
4795 | vl->outbound_fc_window_size) | ||
4796 | { | ||
4797 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4798 | "Stalled transmision on VL %s due to flow control: %llu < %llu\n", | ||
4799 | GNUNET_i2s (&vl->target), | ||
4800 | (unsigned long long) vl->outbound_fc_window_size, | ||
4801 | (unsigned long long) (pm->bytes_msg + | ||
4802 | vl->outbound_fc_window_size_used)); | ||
4803 | consider_sending_fc (vl); | ||
4804 | return; /* We have a message, but flow control says "nope" */ | ||
4805 | } | ||
4806 | elig = GNUNET_YES; | ||
4807 | break; | ||
4808 | } | ||
4809 | if (GNUNET_NO == elig) | ||
4810 | return; | ||
4811 | |||
4812 | /* Notify queues at direct neighbours that we are interested */ | ||
4813 | now = GNUNET_TIME_absolute_get (); | ||
4814 | if (NULL != n) | ||
4815 | { | ||
4816 | for (struct Queue *queue = n->queue_head; NULL != queue; | ||
4817 | queue = queue->next_neighbour) | ||
4818 | if ((GNUNET_YES == queue->idle) && | ||
4819 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
4820 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
4821 | } | ||
4822 | /* Notify queues via DV that we are interested */ | ||
4823 | if (NULL != dv) | ||
4824 | { | ||
4825 | /* Do DV with lower scheduler priority, which effectively means that | ||
4826 | IF a neighbour exists and is available, we prefer it. */ | ||
4827 | for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; | ||
4828 | pos = pos->next_dv) | ||
4829 | { | ||
4830 | struct Neighbour *nh = pos->next_hop; | ||
4831 | |||
4832 | if (pos->path_valid_until.abs_value_us <= now.abs_value_us) | ||
4833 | continue; /* skip this one: path not validated */ | ||
4834 | for (struct Queue *queue = nh->queue_head; NULL != queue; | ||
4835 | queue = queue->next_neighbour) | ||
4836 | if ((GNUNET_YES == queue->idle) && | ||
4837 | (queue->validated_until.abs_value_us > now.abs_value_us)) | ||
4838 | schedule_transmit_on_queue (queue, | ||
4839 | GNUNET_SCHEDULER_PRIORITY_BACKGROUND); | ||
4840 | } | ||
4841 | } | ||
4842 | } | ||
4843 | |||
4844 | |||
4845 | /** | ||
4846 | * Client asked for transmission to a peer. Process the request. | ||
4847 | * | ||
4848 | * @param cls the client | ||
4849 | * @param obm the send message that was sent | ||
4850 | */ | ||
4851 | static void | ||
4852 | handle_client_send (void *cls, const struct OutboundMessage *obm) | ||
4853 | { | ||
4854 | struct TransportClient *tc = cls; | ||
4855 | struct PendingMessage *pm; | ||
4856 | const struct GNUNET_MessageHeader *obmm; | ||
4857 | uint32_t bytes_msg; | ||
4858 | struct VirtualLink *vl; | ||
4859 | enum GNUNET_MQ_PriorityPreferences pp; | ||
4860 | |||
4861 | GNUNET_assert (CT_CORE == tc->type); | ||
4862 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; | ||
4863 | bytes_msg = ntohs (obmm->size); | ||
4864 | pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); | ||
4865 | vl = lookup_virtual_link (&obm->peer); | ||
4866 | if (NULL == vl) | ||
4867 | { | ||
4868 | /* Failure: don't have this peer as a neighbour (anymore). | ||
4869 | Might have gone down asynchronously, so this is NOT | ||
4870 | a protocol violation by CORE. Still count the event, | ||
4871 | as this should be rare. */ | ||
4872 | GNUNET_SERVICE_client_continue (tc->client); | ||
4873 | GNUNET_STATISTICS_update (GST_stats, | ||
4874 | "# messages dropped (neighbour unknown)", | ||
4875 | 1, | ||
4876 | GNUNET_NO); | ||
4877 | return; | ||
4878 | } | ||
4879 | |||
4880 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); | ||
4881 | pm->logging_uuid = logging_uuid_gen++; | ||
4882 | pm->prefs = pp; | ||
4883 | pm->client = tc; | ||
4884 | pm->vl = vl; | ||
4885 | pm->bytes_msg = bytes_msg; | ||
4886 | memcpy (&pm[1], obmm, bytes_msg); | ||
4887 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4888 | "Sending %u bytes as <%llu> to %s\n", | ||
4889 | bytes_msg, | ||
4890 | pm->logging_uuid, | ||
4891 | GNUNET_i2s (&obm->peer)); | ||
4892 | GNUNET_CONTAINER_MDLL_insert (client, | ||
4893 | tc->details.core.pending_msg_head, | ||
4894 | tc->details.core.pending_msg_tail, | ||
4895 | pm); | ||
4896 | GNUNET_CONTAINER_MDLL_insert (vl, | ||
4897 | vl->pending_msg_head, | ||
4898 | vl->pending_msg_tail, | ||
4899 | pm); | ||
4900 | check_vl_transmission (vl); | ||
4901 | } | ||
4902 | |||
4903 | |||
4904 | /** | ||
4857 | * Communicator requests backchannel transmission. Process the request. | 4905 | * Communicator requests backchannel transmission. Process the request. |
4858 | * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *` | 4906 | * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *` |
4859 | * (which for now has exactly the same format, only a different message type) | 4907 | * (which for now has exactly the same format, only a different message type) |
@@ -5113,7 +5161,7 @@ core_env_sent_cb (void *cls) | |||
5113 | GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size); | 5161 | GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size); |
5114 | vl->incoming_fc_window_size_ram -= ctx->size; | 5162 | vl->incoming_fc_window_size_ram -= ctx->size; |
5115 | vl->incoming_fc_window_size_used += ctx->isize; | 5163 | vl->incoming_fc_window_size_used += ctx->isize; |
5116 | /* TODO-M1 */ | 5164 | consider_sending_fc (vl); |
5117 | GNUNET_free (ctx); | 5165 | GNUNET_free (ctx); |
5118 | } | 5166 | } |
5119 | 5167 | ||
@@ -6046,6 +6094,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) | |||
6046 | &vl->target, | 6094 | &vl->target, |
6047 | vl, | 6095 | vl, |
6048 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 6096 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
6097 | consider_sending_fc (vl); | ||
6049 | /* We lacked a confirmed connection to the target | 6098 | /* We lacked a confirmed connection to the target |
6050 | before, so tell CORE about it (finally!) */ | 6099 | before, so tell CORE about it (finally!) */ |
6051 | cores_send_connect_info (&dv->target); | 6100 | cores_send_connect_info (&dv->target); |
@@ -8031,6 +8080,7 @@ handle_validation_response ( | |||
8031 | &vl->target, | 8080 | &vl->target, |
8032 | vl, | 8081 | vl, |
8033 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 8082 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
8083 | consider_sending_fc (vl); | ||
8034 | /* We lacked a confirmed connection to the target | 8084 | /* We lacked a confirmed connection to the target |
8035 | before, so tell CORE about it (finally!) */ | 8085 | before, so tell CORE about it (finally!) */ |
8036 | cores_send_connect_info (&n->pid); | 8086 | cores_send_connect_info (&n->pid); |
@@ -8060,6 +8110,76 @@ handle_incoming_msg (void *cls, | |||
8060 | 8110 | ||
8061 | 8111 | ||
8062 | /** | 8112 | /** |
8113 | * Communicator gave us a transport address validation response. Process the | ||
8114 | * request. | ||
8115 | * | ||
8116 | * @param cls a `struct CommunicatorMessageContext` (must call | ||
8117 | * #finish_cmc_handling() when done) | ||
8118 | * @param fc the message that was received | ||
8119 | */ | ||
8120 | static void | ||
8121 | handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) | ||
8122 | { | ||
8123 | struct CommunicatorMessageContext *cmc = cls; | ||
8124 | struct VirtualLink *vl; | ||
8125 | uint32_t seq; | ||
8126 | struct GNUNET_TIME_Absolute st; | ||
8127 | uint64_t os; | ||
8128 | uint64_t wnd; | ||
8129 | |||
8130 | vl = lookup_virtual_link (&cmc->im.sender); | ||
8131 | if (NULL == vl) | ||
8132 | { | ||
8133 | GNUNET_STATISTICS_update (GST_stats, | ||
8134 | "# FC dropped: virtual link unknown", | ||
8135 | 1, | ||
8136 | GNUNET_NO); | ||
8137 | finish_cmc_handling (cmc); | ||
8138 | return; | ||
8139 | } | ||
8140 | st = GNUNET_TIME_absolute_ntoh (fc->sender_time); | ||
8141 | if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us) | ||
8142 | { | ||
8143 | /* out of order, drop */ | ||
8144 | GNUNET_STATISTICS_update (GST_stats, | ||
8145 | "# FC dropped: message out of order", | ||
8146 | 1, | ||
8147 | GNUNET_NO); | ||
8148 | finish_cmc_handling (cmc); | ||
8149 | return; | ||
8150 | } | ||
8151 | seq = ntohl (fc->seq); | ||
8152 | if (seq < vl->last_fc_seq) | ||
8153 | { | ||
8154 | /* Wrap-around/reset of other peer; start all counters from zero */ | ||
8155 | vl->outbound_fc_window_size_used = 0; | ||
8156 | } | ||
8157 | vl->last_fc_seq = seq; | ||
8158 | vl->last_fc_timestamp = st; | ||
8159 | vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size); | ||
8160 | os = GNUNET_ntohll (fc->outbound_sent); | ||
8161 | vl->incoming_fc_window_size_loss = | ||
8162 | (int64_t) (os - vl->incoming_fc_window_size_used); | ||
8163 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
8164 | "Received FC from %s, seq %u, new window %llu (loss at %lld)\n", | ||
8165 | GNUNET_i2s (&vl->target), | ||
8166 | (unsigned int) seq, | ||
8167 | (unsigned long long) vl->outbound_fc_window_size, | ||
8168 | (long long) vl->incoming_fc_window_size_loss); | ||
8169 | wnd = GNUNET_ntohll (fc->outbound_window_size); | ||
8170 | if (wnd < vl->incoming_fc_window_size) | ||
8171 | { | ||
8172 | /* Consider re-sending our FC message, as clearly the | ||
8173 | other peer's idea of the window is not up-to-date */ | ||
8174 | consider_sending_fc (vl); | ||
8175 | } | ||
8176 | /* FC window likely increased, check transmission possibilities! */ | ||
8177 | check_vl_transmission (vl); | ||
8178 | finish_cmc_handling (cmc); | ||
8179 | } | ||
8180 | |||
8181 | |||
8182 | /** | ||
8063 | * Given an inbound message @a msg from a communicator @a cmc, | 8183 | * Given an inbound message @a msg from a communicator @a cmc, |
8064 | * demultiplex it based on the type calling the right handler. | 8184 | * demultiplex it based on the type calling the right handler. |
8065 | * | 8185 | * |
@@ -8100,6 +8220,10 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, | |||
8100 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE, | 8220 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE, |
8101 | struct TransportValidationChallengeMessage, | 8221 | struct TransportValidationChallengeMessage, |
8102 | &cmc), | 8222 | &cmc), |
8223 | GNUNET_MQ_hd_fixed_size (flow_control, | ||
8224 | GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL, | ||
8225 | struct TransportFlowControlMessage, | ||
8226 | &cmc), | ||
8103 | GNUNET_MQ_hd_fixed_size ( | 8227 | GNUNET_MQ_hd_fixed_size ( |
8104 | validation_response, | 8228 | validation_response, |
8105 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE, | 8229 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE, |