From 2e85825b85b733c2ada5bdf9a70ca70b130536e6 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 18 Oct 2015 18:57:48 +0000 Subject: -use UINT32_MAX to mean disconnect, for real --- src/transport/plugin_transport_udp.c | 301 ++++++++++++++++++----------------- 1 file changed, 155 insertions(+), 146 deletions(-) diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 2c95918f1..29ade08f0 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -480,6 +480,8 @@ struct UDP_ACK_Message /** * Desired delay for flow control, in us (in NBO). + * A value of UINT32_MAX indicates that the other + * peer wants us to disconnect. */ uint32_t delay GNUNET_PACKED; @@ -2143,118 +2145,6 @@ udp_plugin_send (void *cls, } -/** - * Handle an ACK message. - * - * @param plugin the UDP plugin - * @param msg the (presumed) UDP ACK message - * @param udp_addr sender address - * @param udp_addr_len number of bytes in @a udp_addr - */ -static void -read_process_ack (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - const union UdpAddress *udp_addr, - socklen_t udp_addr_len) -{ - const struct GNUNET_MessageHeader *ack; - const struct UDP_ACK_Message *udp_ack; - struct GNUNET_HELLO_Address *address; - struct GNUNET_ATS_Session *s; - struct GNUNET_TIME_Relative flow_delay; - - if (ntohs (msg->size) - < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return; - } - udp_ack = (const struct UDP_ACK_Message *) msg; - ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; - if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message)) - { - GNUNET_break_op(0); - return; - } - address = GNUNET_HELLO_address_allocate (&udp_ack->sender, - PLUGIN_NAME, - udp_addr, - udp_addr_len, - GNUNET_HELLO_ADDRESS_INFO_NONE); - s = udp_plugin_lookup_session (plugin, - address); - if (NULL == s) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "UDP session of address %s for ACK not found\n", - udp_address_to_string (plugin, - address->address, - address->address_length)); - GNUNET_HELLO_address_free (address); - return; - } - if (NULL == s->frag_ctx) - { - LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Fragmentation context of address %s for ACK (%s) not found\n", - udp_address_to_string (plugin, - address->address, - address->address_length), - GNUNET_FRAGMENT_print_ack (ack)); - GNUNET_HELLO_address_free (address); - return; - } - GNUNET_HELLO_address_free (address); - - flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay); - if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) - LOG (GNUNET_ERROR_TYPE_WARNING, - "We received a sending delay of %s for %s\n", - GNUNET_STRINGS_relative_time_to_string (flow_delay, - GNUNET_YES), - GNUNET_i2s (&udp_ack->sender)); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, - "We received a sending delay of %s for %s\n", - GNUNET_STRINGS_relative_time_to_string (flow_delay, - GNUNET_YES), - GNUNET_i2s (&udp_ack->sender)); - /* Flow delay is for the reassembled packet, however, our delay - is per packet, so we need to adjust: */ - flow_delay = GNUNET_TIME_relative_divide (flow_delay, - 1 + (s->frag_ctx->payload_size / - UDP_MTU)); - s->flow_delay_from_other_peer = flow_delay; - - - if (GNUNET_OK != - GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, - ack)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_i2s (&udp_ack->sender), - udp_address_to_string (plugin, - udp_addr, - udp_addr_len)); - /* Expect more ACKs to arrive */ - return; - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message from %s at %s full ACK'ed\n", - GNUNET_i2s (&udp_ack->sender), - udp_address_to_string (plugin, - udp_addr, - udp_addr_len)); - - /* Remove fragmented message after successful sending */ - fragmented_message_done (s->frag_ctx, - GNUNET_OK); -} - - /* ********************** Receiving ********************** */ @@ -2317,35 +2207,6 @@ find_receive_context (void *cls, } -/** - * Message tokenizer has broken up an incomming message. Pass it on - * to the service. - * - * @param cls the `struct Plugin *` - * @param client the `struct GNUNET_ATS_Session *` - * @param hdr the actual message - * @return #GNUNET_OK (always) - */ -static int -process_inbound_tokenized_messages (void *cls, - void *client, - const struct GNUNET_MessageHeader *hdr) -{ - struct Plugin *plugin = cls; - struct GNUNET_ATS_Session *session = client; - - if (GNUNET_YES == session->in_destroy) - return GNUNET_OK; - reschedule_session_timeout (session); - session->flow_delay_for_other_peer - = plugin->env->receive (plugin->env->cls, - session->address, - session, - hdr); - return GNUNET_OK; -} - - /** * Functions with this signature are called whenever we need to close * a session due to a disconnect or failure to establish a connection. @@ -2463,6 +2324,154 @@ udp_disconnect_session (void *cls, } +/** + * Handle an ACK message. + * + * @param plugin the UDP plugin + * @param msg the (presumed) UDP ACK message + * @param udp_addr sender address + * @param udp_addr_len number of bytes in @a udp_addr + */ +static void +read_process_ack (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + const union UdpAddress *udp_addr, + socklen_t udp_addr_len) +{ + const struct GNUNET_MessageHeader *ack; + const struct UDP_ACK_Message *udp_ack; + struct GNUNET_HELLO_Address *address; + struct GNUNET_ATS_Session *s; + struct GNUNET_TIME_Relative flow_delay; + + if (ntohs (msg->size) + < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return; + } + udp_ack = (const struct UDP_ACK_Message *) msg; + ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; + if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message)) + { + GNUNET_break_op(0); + return; + } + address = GNUNET_HELLO_address_allocate (&udp_ack->sender, + PLUGIN_NAME, + udp_addr, + udp_addr_len, + GNUNET_HELLO_ADDRESS_INFO_NONE); + s = udp_plugin_lookup_session (plugin, + address); + if (NULL == s) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "UDP session of address %s for ACK not found\n", + udp_address_to_string (plugin, + address->address, + address->address_length)); + GNUNET_HELLO_address_free (address); + return; + } + if (NULL == s->frag_ctx) + { + LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Fragmentation context of address %s for ACK (%s) not found\n", + udp_address_to_string (plugin, + address->address, + address->address_length), + GNUNET_FRAGMENT_print_ack (ack)); + GNUNET_HELLO_address_free (address); + return; + } + GNUNET_HELLO_address_free (address); + + if (UINT32_MAX == ntohl (udp_ack->delay)) + { + /* Other peer asked for us to terminate the session */ + udp_disconnect_session (plugin, + s); + return; + } + flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay); + if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + LOG (GNUNET_ERROR_TYPE_WARNING, + "We received a sending delay of %s for %s\n", + GNUNET_STRINGS_relative_time_to_string (flow_delay, + GNUNET_YES), + GNUNET_i2s (&udp_ack->sender)); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "We received a sending delay of %s for %s\n", + GNUNET_STRINGS_relative_time_to_string (flow_delay, + GNUNET_YES), + GNUNET_i2s (&udp_ack->sender)); + /* Flow delay is for the reassembled packet, however, our delay + is per packet, so we need to adjust: */ + flow_delay = GNUNET_TIME_relative_divide (flow_delay, + 1 + (s->frag_ctx->payload_size / + UDP_MTU)); + s->flow_delay_from_other_peer = flow_delay; + + + if (GNUNET_OK != + GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, + ack)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&udp_ack->sender), + udp_address_to_string (plugin, + udp_addr, + udp_addr_len)); + /* Expect more ACKs to arrive */ + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message from %s at %s full ACK'ed\n", + GNUNET_i2s (&udp_ack->sender), + udp_address_to_string (plugin, + udp_addr, + udp_addr_len)); + + /* Remove fragmented message after successful sending */ + fragmented_message_done (s->frag_ctx, + GNUNET_OK); +} + + +/** + * Message tokenizer has broken up an incomming message. Pass it on + * to the service. + * + * @param cls the `struct Plugin *` + * @param client the `struct GNUNET_ATS_Session *` + * @param hdr the actual message + * @return #GNUNET_OK (always) + */ +static int +process_inbound_tokenized_messages (void *cls, + void *client, + const struct GNUNET_MessageHeader *hdr) +{ + struct Plugin *plugin = cls; + struct GNUNET_ATS_Session *session = client; + + if (GNUNET_YES == session->in_destroy) + return GNUNET_OK; + reschedule_session_timeout (session); + session->flow_delay_for_other_peer + = plugin->env->receive (plugin->env->cls, + session->address, + session, + hdr); + return GNUNET_OK; +} + + /** * Destroy a session, plugin is being unloaded. * @@ -2868,10 +2877,13 @@ ack_proc (void *cls, GNUNET_NO); return; } - if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX) + if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == + s->flow_delay_for_other_peer.rel_value_us) + delay = UINT32_MAX; + else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX) delay = s->flow_delay_for_other_peer.rel_value_us; else - delay = UINT32_MAX; + delay = UINT32_MAX - 1; /* largest value we can communicate */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %s\n", udp_address_to_string (plugin, @@ -3078,9 +3090,6 @@ udp_select_read (struct Plugin *plugin, return; } - - - msg = (const struct GNUNET_MessageHeader *) buf; LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP received %u-byte message from `%s' type %u\n", -- cgit v1.2.3