aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2011-10-21 15:55:33 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2011-10-21 15:55:33 +0000
commitdb01713beb18dfa50deddac41f74004d13be8295 (patch)
tree6fa783625fe57ab5ca4c251eb72a57c5402ac616 /src
parent3c8dd575bcc3f22b58b13f4cb096c57f076bf2e4 (diff)
downloadgnunet-db01713beb18dfa50deddac41f74004d13be8295.tar.gz
gnunet-db01713beb18dfa50deddac41f74004d13be8295.zip
transmitting flow control information between peers
Diffstat (limited to 'src')
-rw-r--r--src/transport/plugin_transport_udp.c181
1 files changed, 137 insertions, 44 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index e0ef50acf..9812416de 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -94,6 +94,28 @@ struct UDPMessage
94 94
95 95
96/** 96/**
97 * UDP ACK Message-Packet header (after defragmentation).
98 */
99struct UDP_ACK_Message
100{
101 /**
102 * Message header.
103 */
104 struct GNUNET_MessageHeader header;
105
106 /**
107 * Desired delay for flow control
108 */
109 uint32_t delay;
110
111 /**
112 * What is the identity of the sender
113 */
114 struct GNUNET_PeerIdentity sender;
115};
116
117
118/**
97 * Network format for IPv4 addresses. 119 * Network format for IPv4 addresses.
98 */ 120 */
99struct IPv4UdpAddress 121struct IPv4UdpAddress
@@ -174,6 +196,16 @@ struct Session
174 struct GNUNET_TIME_Absolute valid_until; 196 struct GNUNET_TIME_Absolute valid_until;
175 197
176 GNUNET_SCHEDULER_TaskIdentifier invalidation_task; 198 GNUNET_SCHEDULER_TaskIdentifier invalidation_task;
199
200 /*
201 * Desired delay for next sending we send to other peer
202 */
203 struct GNUNET_TIME_Relative flow_delay_for_other_peer;
204
205 /*
206 * Desired delay for next sending we received from other peer
207 */
208 struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
177}; 209};
178 210
179 211
@@ -210,6 +242,8 @@ struct ReceiveContext
210 */ 242 */
211 size_t addr_len; 243 size_t addr_len;
212 244
245 struct GNUNET_PeerIdentity id;
246
213}; 247};
214 248
215 249
@@ -363,6 +397,43 @@ find_inbound_session (struct Plugin *plugin,
363 return psc.result; 397 return psc.result;
364} 398}
365 399
400int inbound_session_by_addr_iterator (void *cls,
401 const GNUNET_HashCode * key,
402 void *value)
403{
404 struct PeerSessionIteratorContext *psc = cls;
405 struct Session *s = value;
406 if (s->addrlen == psc->addrlen)
407 {
408 if (0 == memcmp (&s[1], psc->addr, s->addrlen))
409 psc->result = s;
410 }
411 if (psc->result != NULL)
412 return GNUNET_NO;
413 else
414 return GNUNET_YES;
415};
416
417/**
418 * Lookup the session for the given peer just by address.
419 *
420 * @param plugin the plugin
421 * @param addr address
422 * @param addrlen address length
423 * @return NULL if we have no session
424 */
425struct Session *
426find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen)
427{
428 struct PeerSessionIteratorContext psc;
429 psc.result = NULL;
430 psc.addrlen = addrlen;
431 psc.addr = addr;
432
433 GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc);
434 return psc.result;
435}
436
366 437
367/** 438/**
368 * Destroy a session, plugin is being unloaded. 439 * Destroy a session, plugin is being unloaded.
@@ -633,6 +704,7 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
633 if ((force_address == GNUNET_SYSERR) && (session == NULL)) 704 if ((force_address == GNUNET_SYSERR) && (session == NULL))
634 return GNUNET_SYSERR; 705 return GNUNET_SYSERR;
635 706
707 s = NULL;
636 /* safety check: comparing address to address stored in session */ 708 /* safety check: comparing address to address stored in session */
637 if ((session != NULL) && (addr != NULL) && (addrlen != 0)) 709 if ((session != NULL) && (addr != NULL) && (addrlen != 0))
638 { 710 {
@@ -699,6 +771,22 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
699 udp->sender = *plugin->env->my_identity; 771 udp->sender = *plugin->env->my_identity;
700 memcpy (&udp[1], msgbuf, msgbuf_size); 772 memcpy (&udp[1], msgbuf, msgbuf_size);
701 773
774 if (s != NULL)
775 {
776 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get();
777 if (s->flow_delay_from_other_peer.abs_value > now.abs_value)
778 {
779 struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference(now, s->flow_delay_from_other_peer);
780 LOG (GNUNET_ERROR_TYPE_DEBUG,
781 "We try to send to early! Should in %llu!\n", delta.rel_value);
782 }
783 else
784 LOG (GNUNET_ERROR_TYPE_DEBUG,
785 "We can send!\n");
786 }
787 else
788 LOG (GNUNET_ERROR_TYPE_DEBUG,
789 "SENDING without session!\n");
702 if (mlen <= UDP_MTU) 790 if (mlen <= UDP_MTU)
703 { 791 {
704 mlen = udp_send (plugin, peer_session->sock_addr, &udp->header); 792 mlen = udp_send (plugin, peer_session->sock_addr, &udp->header);
@@ -763,6 +851,7 @@ process_inbound_tokenized_messages (void *cls, void *client,
763 struct Plugin *plugin = cls; 851 struct Plugin *plugin = cls;
764 struct SourceInformation *si = client; 852 struct SourceInformation *si = client;
765 struct GNUNET_ATS_Information distance; 853 struct GNUNET_ATS_Information distance;
854 struct GNUNET_TIME_Relative delay;
766 855
767 /* setup ATS */ 856 /* setup ATS */
768 distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); 857 distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
@@ -770,8 +859,9 @@ process_inbound_tokenized_messages (void *cls, void *client,
770 859
771 LOG (GNUNET_ERROR_TYPE_DEBUG, 860 LOG (GNUNET_ERROR_TYPE_DEBUG,
772 "Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target)); 861 "Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target));
773 plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session, 862 delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
774 si->arg, si->args); 863 si->arg, si->args);
864 si->session->flow_delay_for_other_peer = delay;
775} 865}
776 866
777static void 867static void
@@ -938,24 +1028,38 @@ static void
938ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) 1028ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
939{ 1029{
940 struct ReceiveContext *rc = cls; 1030 struct ReceiveContext *rc = cls;
941 size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size); 1031
1032 size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
942 char buf[msize]; 1033 char buf[msize];
943 struct UDPMessage *udp; 1034 struct UDP_ACK_Message *udp_ack;
1035 uint32_t delay = 0;
1036
1037 struct Session *s;
1038 s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
1039 if (s != NULL)
1040 {
1041 if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1042 delay = s->flow_delay_for_other_peer.rel_value;
1043 else
1044 delay = UINT32_MAX;
1045 }
1046
944 1047
945#if DEBUG_UDP 1048#if DEBUG_UDP
946 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n", 1049 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n",
947 GNUNET_a2s (rc->src_addr, 1050 GNUNET_a2s (rc->src_addr,
948 (rc->src_addr->sa_family == 1051 (rc->src_addr->sa_family ==
949 AF_INET) ? sizeof (struct sockaddr_in) : 1052 AF_INET) ? sizeof (struct sockaddr_in) :
950 sizeof (struct sockaddr_in6))); 1053 sizeof (struct sockaddr_in6)),
1054 delay);
951#endif 1055#endif
952 udp = (struct UDPMessage *) buf; 1056 udp_ack = (struct UDP_ACK_Message *) buf;
953 udp->header.size = htons ((uint16_t) msize); 1057 udp_ack->header.size = htons ((uint16_t) msize);
954 udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); 1058 udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
955 udp->reserved = htonl (0); 1059 udp_ack->delay = htonl (delay);
956 udp->sender = *rc->plugin->env->my_identity; 1060 udp_ack->sender = *rc->plugin->env->my_identity;
957 memcpy (&udp[1], msg, ntohs (msg->size)); 1061 memcpy (&udp_ack[1], msg, ntohs (msg->size));
958 (void) udp_send (rc->plugin, rc->src_addr, &udp->header); 1062 (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
959} 1063}
960 1064
961 1065
@@ -978,6 +1082,8 @@ struct FindReceiveContext
978 * Number of bytes in 'addr'. 1082 * Number of bytes in 'addr'.
979 */ 1083 */
980 socklen_t addr_len; 1084 socklen_t addr_len;
1085
1086 struct Session * session;
981}; 1087};
982 1088
983 1089
@@ -1024,10 +1130,12 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1024 const struct GNUNET_MessageHeader *msg; 1130 const struct GNUNET_MessageHeader *msg;
1025 const struct GNUNET_MessageHeader *ack; 1131 const struct GNUNET_MessageHeader *ack;
1026 struct Session *peer_session; 1132 struct Session *peer_session;
1027 const struct UDPMessage *udp; 1133 const struct UDP_ACK_Message *udp_ack;
1028 struct ReceiveContext *rc; 1134 struct ReceiveContext *rc;
1029 struct GNUNET_TIME_Absolute now; 1135 struct GNUNET_TIME_Absolute now;
1030 struct FindReceiveContext frc; 1136 struct FindReceiveContext frc;
1137 struct Session * s = NULL;
1138 struct GNUNET_TIME_Relative flow_delay;
1031 1139
1032 fromlen = sizeof (addr); 1140 fromlen = sizeof (addr);
1033 memset (&addr, 0, sizeof (addr)); 1141 memset (&addr, 0, sizeof (addr));
@@ -1062,20 +1170,26 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1062 (const struct sockaddr *) addr, fromlen); 1170 (const struct sockaddr *) addr, fromlen);
1063 return; 1171 return;
1064 case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: 1172 case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1173
1065 if (ntohs (msg->size) < 1174 if (ntohs (msg->size) <
1066 sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader)) 1175 sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1067 { 1176 {
1068 GNUNET_break_op (0); 1177 GNUNET_break_op (0);
1069 return; 1178 return;
1070 } 1179 }
1071 udp = (const struct UDPMessage *) msg; 1180 udp_ack = (const struct UDP_ACK_Message *) msg;
1072 if (ntohl (udp->reserved) != 0) 1181 s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen);
1182 if (s != NULL)
1073 { 1183 {
1074 GNUNET_break_op (0); 1184 flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay);
1075 return; 1185
1186 LOG (GNUNET_ERROR_TYPE_DEBUG,
1187 "We received a sending delay of %llu\n", flow_delay.rel_value);
1188
1189 s->flow_delay_from_other_peer = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), flow_delay);
1076 } 1190 }
1077 ack = (const struct GNUNET_MessageHeader *) &udp[1]; 1191 ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1078 if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage)) 1192 if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1079 { 1193 {
1080 GNUNET_break_op (0); 1194 GNUNET_break_op (0);
1081 return; 1195 return;
@@ -1087,7 +1201,7 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1087 GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); 1201 GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1088#endif 1202#endif
1089 1203
1090 peer_session = find_session (plugin, &udp->sender); 1204 peer_session = find_session (plugin, &udp_ack->sender);
1091 if (NULL == peer_session) 1205 if (NULL == peer_session)
1092 { 1206 {
1093#if DEBUG_UDP 1207#if DEBUG_UDP
@@ -1100,13 +1214,13 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1100 return; 1214 return;
1101 GNUNET_assert (GNUNET_OK == 1215 GNUNET_assert (GNUNET_OK ==
1102 GNUNET_CONTAINER_multihashmap_remove (plugin->sessions, 1216 GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1103 &udp-> 1217 &udp_ack->
1104 sender.hashPubKey, 1218 sender.hashPubKey,
1105 peer_session)); 1219 peer_session));
1106 plugin->last_expected_delay = 1220 plugin->last_expected_delay =
1107 GNUNET_FRAGMENT_context_destroy (peer_session->frag); 1221 GNUNET_FRAGMENT_context_destroy (peer_session->frag);
1108 if (peer_session->cont != NULL) 1222 if (peer_session->cont != NULL)
1109 peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK); 1223 peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK);
1110 GNUNET_free (peer_session); 1224 GNUNET_free (peer_session);
1111 return; 1225 return;
1112 case GNUNET_MESSAGE_TYPE_FRAGMENT: 1226 case GNUNET_MESSAGE_TYPE_FRAGMENT:
@@ -1717,27 +1831,6 @@ libgnunet_plugin_transport_udp_init (void *cls)
1717 return api; 1831 return api;
1718} 1832}
1719 1833
1720/*
1721
1722static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1723{
1724 struct Session * s = cls;
1725 struct Plugin * plugin = s->plugin;
1726
1727 s->invalidation_task = GNUNET_SCHEDULER_NO_TASK;
1728
1729 GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s);
1730
1731
1732 plugin->env->session_end (plugin->env->cls, &s->target, s);
1733 LOG (GNUNET_ERROR_TYPE_ERROR,
1734 "Session %X is now invalid\n", s);
1735 destroy_session(s, &s->target.hashPubKey, s);
1736}
1737*/
1738
1739
1740
1741/** 1834/**
1742 * Shutdown the plugin. 1835 * Shutdown the plugin.
1743 * 1836 *