diff options
author | Matthias Wachs <wachs@net.in.tum.de> | 2011-10-21 15:55:33 +0000 |
---|---|---|
committer | Matthias Wachs <wachs@net.in.tum.de> | 2011-10-21 15:55:33 +0000 |
commit | db01713beb18dfa50deddac41f74004d13be8295 (patch) | |
tree | 6fa783625fe57ab5ca4c251eb72a57c5402ac616 /src/transport | |
parent | 3c8dd575bcc3f22b58b13f4cb096c57f076bf2e4 (diff) | |
download | gnunet-db01713beb18dfa50deddac41f74004d13be8295.tar.gz gnunet-db01713beb18dfa50deddac41f74004d13be8295.zip |
transmitting flow control information between peers
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/plugin_transport_udp.c | 181 |
1 files changed, 137 insertions, 44 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index e0ef50acf..9812416de 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c | |||
@@ -94,6 +94,28 @@ struct UDPMessage | |||
94 | 94 | ||
95 | 95 | ||
96 | /** | 96 | /** |
97 | * UDP ACK Message-Packet header (after defragmentation). | ||
98 | */ | ||
99 | struct UDP_ACK_Message | ||
100 | { | ||
101 | /** | ||
102 | * Message header. | ||
103 | */ | ||
104 | struct GNUNET_MessageHeader header; | ||
105 | |||
106 | /** | ||
107 | * Desired delay for flow control | ||
108 | */ | ||
109 | uint32_t delay; | ||
110 | |||
111 | /** | ||
112 | * What is the identity of the sender | ||
113 | */ | ||
114 | struct GNUNET_PeerIdentity sender; | ||
115 | }; | ||
116 | |||
117 | |||
118 | /** | ||
97 | * Network format for IPv4 addresses. | 119 | * Network format for IPv4 addresses. |
98 | */ | 120 | */ |
99 | struct IPv4UdpAddress | 121 | struct IPv4UdpAddress |
@@ -174,6 +196,16 @@ struct Session | |||
174 | struct GNUNET_TIME_Absolute valid_until; | 196 | struct GNUNET_TIME_Absolute valid_until; |
175 | 197 | ||
176 | GNUNET_SCHEDULER_TaskIdentifier invalidation_task; | 198 | GNUNET_SCHEDULER_TaskIdentifier invalidation_task; |
199 | |||
200 | /* | ||
201 | * Desired delay for next sending we send to other peer | ||
202 | */ | ||
203 | struct GNUNET_TIME_Relative flow_delay_for_other_peer; | ||
204 | |||
205 | /* | ||
206 | * Desired delay for next sending we received from other peer | ||
207 | */ | ||
208 | struct GNUNET_TIME_Absolute flow_delay_from_other_peer; | ||
177 | }; | 209 | }; |
178 | 210 | ||
179 | 211 | ||
@@ -210,6 +242,8 @@ struct ReceiveContext | |||
210 | */ | 242 | */ |
211 | size_t addr_len; | 243 | size_t addr_len; |
212 | 244 | ||
245 | struct GNUNET_PeerIdentity id; | ||
246 | |||
213 | }; | 247 | }; |
214 | 248 | ||
215 | 249 | ||
@@ -363,6 +397,43 @@ find_inbound_session (struct Plugin *plugin, | |||
363 | return psc.result; | 397 | return psc.result; |
364 | } | 398 | } |
365 | 399 | ||
400 | int inbound_session_by_addr_iterator (void *cls, | ||
401 | const GNUNET_HashCode * key, | ||
402 | void *value) | ||
403 | { | ||
404 | struct PeerSessionIteratorContext *psc = cls; | ||
405 | struct Session *s = value; | ||
406 | if (s->addrlen == psc->addrlen) | ||
407 | { | ||
408 | if (0 == memcmp (&s[1], psc->addr, s->addrlen)) | ||
409 | psc->result = s; | ||
410 | } | ||
411 | if (psc->result != NULL) | ||
412 | return GNUNET_NO; | ||
413 | else | ||
414 | return GNUNET_YES; | ||
415 | }; | ||
416 | |||
417 | /** | ||
418 | * Lookup the session for the given peer just by address. | ||
419 | * | ||
420 | * @param plugin the plugin | ||
421 | * @param addr address | ||
422 | * @param addrlen address length | ||
423 | * @return NULL if we have no session | ||
424 | */ | ||
425 | struct Session * | ||
426 | find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen) | ||
427 | { | ||
428 | struct PeerSessionIteratorContext psc; | ||
429 | psc.result = NULL; | ||
430 | psc.addrlen = addrlen; | ||
431 | psc.addr = addr; | ||
432 | |||
433 | GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc); | ||
434 | return psc.result; | ||
435 | } | ||
436 | |||
366 | 437 | ||
367 | /** | 438 | /** |
368 | * Destroy a session, plugin is being unloaded. | 439 | * Destroy a session, plugin is being unloaded. |
@@ -633,6 +704,7 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, | |||
633 | if ((force_address == GNUNET_SYSERR) && (session == NULL)) | 704 | if ((force_address == GNUNET_SYSERR) && (session == NULL)) |
634 | return GNUNET_SYSERR; | 705 | return GNUNET_SYSERR; |
635 | 706 | ||
707 | s = NULL; | ||
636 | /* safety check: comparing address to address stored in session */ | 708 | /* safety check: comparing address to address stored in session */ |
637 | if ((session != NULL) && (addr != NULL) && (addrlen != 0)) | 709 | if ((session != NULL) && (addr != NULL) && (addrlen != 0)) |
638 | { | 710 | { |
@@ -699,6 +771,22 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, | |||
699 | udp->sender = *plugin->env->my_identity; | 771 | udp->sender = *plugin->env->my_identity; |
700 | memcpy (&udp[1], msgbuf, msgbuf_size); | 772 | memcpy (&udp[1], msgbuf, msgbuf_size); |
701 | 773 | ||
774 | if (s != NULL) | ||
775 | { | ||
776 | struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get(); | ||
777 | if (s->flow_delay_from_other_peer.abs_value > now.abs_value) | ||
778 | { | ||
779 | struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference(now, s->flow_delay_from_other_peer); | ||
780 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
781 | "We try to send to early! Should in %llu!\n", delta.rel_value); | ||
782 | } | ||
783 | else | ||
784 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
785 | "We can send!\n"); | ||
786 | } | ||
787 | else | ||
788 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
789 | "SENDING without session!\n"); | ||
702 | if (mlen <= UDP_MTU) | 790 | if (mlen <= UDP_MTU) |
703 | { | 791 | { |
704 | mlen = udp_send (plugin, peer_session->sock_addr, &udp->header); | 792 | mlen = udp_send (plugin, peer_session->sock_addr, &udp->header); |
@@ -763,6 +851,7 @@ process_inbound_tokenized_messages (void *cls, void *client, | |||
763 | struct Plugin *plugin = cls; | 851 | struct Plugin *plugin = cls; |
764 | struct SourceInformation *si = client; | 852 | struct SourceInformation *si = client; |
765 | struct GNUNET_ATS_Information distance; | 853 | struct GNUNET_ATS_Information distance; |
854 | struct GNUNET_TIME_Relative delay; | ||
766 | 855 | ||
767 | /* setup ATS */ | 856 | /* setup ATS */ |
768 | distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); | 857 | distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); |
@@ -770,8 +859,9 @@ process_inbound_tokenized_messages (void *cls, void *client, | |||
770 | 859 | ||
771 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 860 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
772 | "Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target)); | 861 | "Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target)); |
773 | plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session, | 862 | delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session, |
774 | si->arg, si->args); | 863 | si->arg, si->args); |
864 | si->session->flow_delay_for_other_peer = delay; | ||
775 | } | 865 | } |
776 | 866 | ||
777 | static void | 867 | static void |
@@ -938,24 +1028,38 @@ static void | |||
938 | ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) | 1028 | ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) |
939 | { | 1029 | { |
940 | struct ReceiveContext *rc = cls; | 1030 | struct ReceiveContext *rc = cls; |
941 | size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size); | 1031 | |
1032 | size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); | ||
942 | char buf[msize]; | 1033 | char buf[msize]; |
943 | struct UDPMessage *udp; | 1034 | struct UDP_ACK_Message *udp_ack; |
1035 | uint32_t delay = 0; | ||
1036 | |||
1037 | struct Session *s; | ||
1038 | s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len); | ||
1039 | if (s != NULL) | ||
1040 | { | ||
1041 | if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) | ||
1042 | delay = s->flow_delay_for_other_peer.rel_value; | ||
1043 | else | ||
1044 | delay = UINT32_MAX; | ||
1045 | } | ||
1046 | |||
944 | 1047 | ||
945 | #if DEBUG_UDP | 1048 | #if DEBUG_UDP |
946 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n", | 1049 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n", |
947 | GNUNET_a2s (rc->src_addr, | 1050 | GNUNET_a2s (rc->src_addr, |
948 | (rc->src_addr->sa_family == | 1051 | (rc->src_addr->sa_family == |
949 | AF_INET) ? sizeof (struct sockaddr_in) : | 1052 | AF_INET) ? sizeof (struct sockaddr_in) : |
950 | sizeof (struct sockaddr_in6))); | 1053 | sizeof (struct sockaddr_in6)), |
1054 | delay); | ||
951 | #endif | 1055 | #endif |
952 | udp = (struct UDPMessage *) buf; | 1056 | udp_ack = (struct UDP_ACK_Message *) buf; |
953 | udp->header.size = htons ((uint16_t) msize); | 1057 | udp_ack->header.size = htons ((uint16_t) msize); |
954 | udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); | 1058 | udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); |
955 | udp->reserved = htonl (0); | 1059 | udp_ack->delay = htonl (delay); |
956 | udp->sender = *rc->plugin->env->my_identity; | 1060 | udp_ack->sender = *rc->plugin->env->my_identity; |
957 | memcpy (&udp[1], msg, ntohs (msg->size)); | 1061 | memcpy (&udp_ack[1], msg, ntohs (msg->size)); |
958 | (void) udp_send (rc->plugin, rc->src_addr, &udp->header); | 1062 | (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header); |
959 | } | 1063 | } |
960 | 1064 | ||
961 | 1065 | ||
@@ -978,6 +1082,8 @@ struct FindReceiveContext | |||
978 | * Number of bytes in 'addr'. | 1082 | * Number of bytes in 'addr'. |
979 | */ | 1083 | */ |
980 | socklen_t addr_len; | 1084 | socklen_t addr_len; |
1085 | |||
1086 | struct Session * session; | ||
981 | }; | 1087 | }; |
982 | 1088 | ||
983 | 1089 | ||
@@ -1024,10 +1130,12 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
1024 | const struct GNUNET_MessageHeader *msg; | 1130 | const struct GNUNET_MessageHeader *msg; |
1025 | const struct GNUNET_MessageHeader *ack; | 1131 | const struct GNUNET_MessageHeader *ack; |
1026 | struct Session *peer_session; | 1132 | struct Session *peer_session; |
1027 | const struct UDPMessage *udp; | 1133 | const struct UDP_ACK_Message *udp_ack; |
1028 | struct ReceiveContext *rc; | 1134 | struct ReceiveContext *rc; |
1029 | struct GNUNET_TIME_Absolute now; | 1135 | struct GNUNET_TIME_Absolute now; |
1030 | struct FindReceiveContext frc; | 1136 | struct FindReceiveContext frc; |
1137 | struct Session * s = NULL; | ||
1138 | struct GNUNET_TIME_Relative flow_delay; | ||
1031 | 1139 | ||
1032 | fromlen = sizeof (addr); | 1140 | fromlen = sizeof (addr); |
1033 | memset (&addr, 0, sizeof (addr)); | 1141 | memset (&addr, 0, sizeof (addr)); |
@@ -1062,20 +1170,26 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
1062 | (const struct sockaddr *) addr, fromlen); | 1170 | (const struct sockaddr *) addr, fromlen); |
1063 | return; | 1171 | return; |
1064 | case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: | 1172 | case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: |
1173 | |||
1065 | if (ntohs (msg->size) < | 1174 | if (ntohs (msg->size) < |
1066 | sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader)) | 1175 | sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader)) |
1067 | { | 1176 | { |
1068 | GNUNET_break_op (0); | 1177 | GNUNET_break_op (0); |
1069 | return; | 1178 | return; |
1070 | } | 1179 | } |
1071 | udp = (const struct UDPMessage *) msg; | 1180 | udp_ack = (const struct UDP_ACK_Message *) msg; |
1072 | if (ntohl (udp->reserved) != 0) | 1181 | s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen); |
1182 | if (s != NULL) | ||
1073 | { | 1183 | { |
1074 | GNUNET_break_op (0); | 1184 | flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay); |
1075 | return; | 1185 | |
1186 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1187 | "We received a sending delay of %llu\n", flow_delay.rel_value); | ||
1188 | |||
1189 | s->flow_delay_from_other_peer = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), flow_delay); | ||
1076 | } | 1190 | } |
1077 | ack = (const struct GNUNET_MessageHeader *) &udp[1]; | 1191 | ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; |
1078 | if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage)) | 1192 | if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message)) |
1079 | { | 1193 | { |
1080 | GNUNET_break_op (0); | 1194 | GNUNET_break_op (0); |
1081 | return; | 1195 | return; |
@@ -1087,7 +1201,7 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
1087 | GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); | 1201 | GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); |
1088 | #endif | 1202 | #endif |
1089 | 1203 | ||
1090 | peer_session = find_session (plugin, &udp->sender); | 1204 | peer_session = find_session (plugin, &udp_ack->sender); |
1091 | if (NULL == peer_session) | 1205 | if (NULL == peer_session) |
1092 | { | 1206 | { |
1093 | #if DEBUG_UDP | 1207 | #if DEBUG_UDP |
@@ -1100,13 +1214,13 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
1100 | return; | 1214 | return; |
1101 | GNUNET_assert (GNUNET_OK == | 1215 | GNUNET_assert (GNUNET_OK == |
1102 | GNUNET_CONTAINER_multihashmap_remove (plugin->sessions, | 1216 | GNUNET_CONTAINER_multihashmap_remove (plugin->sessions, |
1103 | &udp-> | 1217 | &udp_ack-> |
1104 | sender.hashPubKey, | 1218 | sender.hashPubKey, |
1105 | peer_session)); | 1219 | peer_session)); |
1106 | plugin->last_expected_delay = | 1220 | plugin->last_expected_delay = |
1107 | GNUNET_FRAGMENT_context_destroy (peer_session->frag); | 1221 | GNUNET_FRAGMENT_context_destroy (peer_session->frag); |
1108 | if (peer_session->cont != NULL) | 1222 | if (peer_session->cont != NULL) |
1109 | peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK); | 1223 | peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK); |
1110 | GNUNET_free (peer_session); | 1224 | GNUNET_free (peer_session); |
1111 | return; | 1225 | return; |
1112 | case GNUNET_MESSAGE_TYPE_FRAGMENT: | 1226 | case GNUNET_MESSAGE_TYPE_FRAGMENT: |
@@ -1717,27 +1831,6 @@ libgnunet_plugin_transport_udp_init (void *cls) | |||
1717 | return api; | 1831 | return api; |
1718 | } | 1832 | } |
1719 | 1833 | ||
1720 | /* | ||
1721 | |||
1722 | static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1723 | { | ||
1724 | struct Session * s = cls; | ||
1725 | struct Plugin * plugin = s->plugin; | ||
1726 | |||
1727 | s->invalidation_task = GNUNET_SCHEDULER_NO_TASK; | ||
1728 | |||
1729 | GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s); | ||
1730 | |||
1731 | |||
1732 | plugin->env->session_end (plugin->env->cls, &s->target, s); | ||
1733 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1734 | "Session %X is now invalid\n", s); | ||
1735 | destroy_session(s, &s->target.hashPubKey, s); | ||
1736 | } | ||
1737 | */ | ||
1738 | |||
1739 | |||
1740 | |||
1741 | /** | 1834 | /** |
1742 | * Shutdown the plugin. | 1835 | * Shutdown the plugin. |
1743 | * | 1836 | * |