From db01713beb18dfa50deddac41f74004d13be8295 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 21 Oct 2011 15:55:33 +0000 Subject: transmitting flow control information between peers --- src/transport/plugin_transport_udp.c | 181 ++++++++++++++++++++++++++--------- 1 file changed, 137 insertions(+), 44 deletions(-) (limited to 'src/transport') diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index e0ef50acf..9812416de 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -93,6 +93,28 @@ struct UDPMessage }; +/** + * UDP ACK Message-Packet header (after defragmentation). + */ +struct UDP_ACK_Message +{ + /** + * Message header. + */ + struct GNUNET_MessageHeader header; + + /** + * Desired delay for flow control + */ + uint32_t delay; + + /** + * What is the identity of the sender + */ + struct GNUNET_PeerIdentity sender; +}; + + /** * Network format for IPv4 addresses. */ @@ -174,6 +196,16 @@ struct Session struct GNUNET_TIME_Absolute valid_until; GNUNET_SCHEDULER_TaskIdentifier invalidation_task; + + /* + * Desired delay for next sending we send to other peer + */ + struct GNUNET_TIME_Relative flow_delay_for_other_peer; + + /* + * Desired delay for next sending we received from other peer + */ + struct GNUNET_TIME_Absolute flow_delay_from_other_peer; }; @@ -210,6 +242,8 @@ struct ReceiveContext */ size_t addr_len; + struct GNUNET_PeerIdentity id; + }; @@ -363,6 +397,43 @@ find_inbound_session (struct Plugin *plugin, return psc.result; } +int inbound_session_by_addr_iterator (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct PeerSessionIteratorContext *psc = cls; + struct Session *s = value; + if (s->addrlen == psc->addrlen) + { + if (0 == memcmp (&s[1], psc->addr, s->addrlen)) + psc->result = s; + } + if (psc->result != NULL) + return GNUNET_NO; + else + return GNUNET_YES; +}; + +/** + * Lookup the session for the given peer just by address. + * + * @param plugin the plugin + * @param addr address + * @param addrlen address length + * @return NULL if we have no session + */ +struct Session * +find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen) +{ + struct PeerSessionIteratorContext psc; + psc.result = NULL; + psc.addrlen = addrlen; + psc.addr = addr; + + GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc); + return psc.result; +} + /** * Destroy a session, plugin is being unloaded. @@ -633,6 +704,7 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, if ((force_address == GNUNET_SYSERR) && (session == NULL)) return GNUNET_SYSERR; + s = NULL; /* safety check: comparing address to address stored in session */ if ((session != NULL) && (addr != NULL) && (addrlen != 0)) { @@ -699,6 +771,22 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, udp->sender = *plugin->env->my_identity; memcpy (&udp[1], msgbuf, msgbuf_size); + if (s != NULL) + { + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get(); + if (s->flow_delay_from_other_peer.abs_value > now.abs_value) + { + struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference(now, s->flow_delay_from_other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "We try to send to early! Should in %llu!\n", delta.rel_value); + } + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "We can send!\n"); + } + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "SENDING without session!\n"); if (mlen <= UDP_MTU) { mlen = udp_send (plugin, peer_session->sock_addr, &udp->header); @@ -763,6 +851,7 @@ process_inbound_tokenized_messages (void *cls, void *client, struct Plugin *plugin = cls; struct SourceInformation *si = client; struct GNUNET_ATS_Information distance; + struct GNUNET_TIME_Relative delay; /* setup ATS */ distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); @@ -770,8 +859,9 @@ process_inbound_tokenized_messages (void *cls, void *client, LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving Session %X %s to transport\n", si->session, GNUNET_i2s(&si->session->target)); - plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session, + delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session, si->arg, si->args); + si->session->flow_delay_for_other_peer = delay; } static void @@ -938,24 +1028,38 @@ static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { struct ReceiveContext *rc = cls; - size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size); + + size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); char buf[msize]; - struct UDPMessage *udp; + struct UDP_ACK_Message *udp_ack; + uint32_t delay = 0; + + struct Session *s; + s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len); + if (s != NULL) + { + if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) + delay = s->flow_delay_for_other_peer.rel_value; + else + delay = UINT32_MAX; + } + #if DEBUG_UDP - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n", GNUNET_a2s (rc->src_addr, (rc->src_addr->sa_family == AF_INET) ? sizeof (struct sockaddr_in) : - sizeof (struct sockaddr_in6))); + sizeof (struct sockaddr_in6)), + delay); #endif - udp = (struct UDPMessage *) buf; - udp->header.size = htons ((uint16_t) msize); - udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); - udp->reserved = htonl (0); - udp->sender = *rc->plugin->env->my_identity; - memcpy (&udp[1], msg, ntohs (msg->size)); - (void) udp_send (rc->plugin, rc->src_addr, &udp->header); + udp_ack = (struct UDP_ACK_Message *) buf; + udp_ack->header.size = htons ((uint16_t) msize); + udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); + udp_ack->delay = htonl (delay); + udp_ack->sender = *rc->plugin->env->my_identity; + memcpy (&udp_ack[1], msg, ntohs (msg->size)); + (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header); } @@ -978,6 +1082,8 @@ struct FindReceiveContext * Number of bytes in 'addr'. */ socklen_t addr_len; + + struct Session * session; }; @@ -1024,10 +1130,12 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) const struct GNUNET_MessageHeader *msg; const struct GNUNET_MessageHeader *ack; struct Session *peer_session; - const struct UDPMessage *udp; + const struct UDP_ACK_Message *udp_ack; struct ReceiveContext *rc; struct GNUNET_TIME_Absolute now; struct FindReceiveContext frc; + struct Session * s = NULL; + struct GNUNET_TIME_Relative flow_delay; fromlen = sizeof (addr); memset (&addr, 0, sizeof (addr)); @@ -1062,20 +1170,26 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) (const struct sockaddr *) addr, fromlen); return; case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: + if (ntohs (msg->size) < - sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader)) + sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return; } - udp = (const struct UDPMessage *) msg; - if (ntohl (udp->reserved) != 0) + udp_ack = (const struct UDP_ACK_Message *) msg; + s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen); + if (s != NULL) { - GNUNET_break_op (0); - return; + flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "We received a sending delay of %llu\n", flow_delay.rel_value); + + s->flow_delay_from_other_peer = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), flow_delay); } - ack = (const struct GNUNET_MessageHeader *) &udp[1]; - if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage)) + ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; + if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message)) { GNUNET_break_op (0); return; @@ -1087,7 +1201,7 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); #endif - peer_session = find_session (plugin, &udp->sender); + peer_session = find_session (plugin, &udp_ack->sender); if (NULL == peer_session) { #if DEBUG_UDP @@ -1100,13 +1214,13 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) return; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_remove (plugin->sessions, - &udp-> + &udp_ack-> sender.hashPubKey, peer_session)); plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy (peer_session->frag); if (peer_session->cont != NULL) - peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK); + peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK); GNUNET_free (peer_session); return; case GNUNET_MESSAGE_TYPE_FRAGMENT: @@ -1717,27 +1831,6 @@ libgnunet_plugin_transport_udp_init (void *cls) return api; } -/* - -static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Session * s = cls; - struct Plugin * plugin = s->plugin; - - s->invalidation_task = GNUNET_SCHEDULER_NO_TASK; - - GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s); - - - plugin->env->session_end (plugin->env->cls, &s->target, s); - LOG (GNUNET_ERROR_TYPE_ERROR, - "Session %X is now invalid\n", s); - destroy_session(s, &s->target.hashPubKey, s); -} -*/ - - - /** * Shutdown the plugin. * -- cgit v1.2.3