From 6e2ca97fb0d3d85c1ef127bc0b9bd3e1e72aa9e8 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 19 May 2019 13:56:22 +0200 Subject: FC work --- src/transport/gnunet-service-tng.c | 867 ++++++++++++++++++++++++------------- 1 file changed, 576 insertions(+), 291 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index e00c7d573..f009a491b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -25,11 +25,27 @@ * TODO: * Implement next: * - FIXME-FC: realize transport-to-transport flow control (needed in case - * communicators do not offer flow control). Note that we may not - * want to simply delay the ACKs as that may cause unnecessary - * re-transmissions. => Introduce proper flow and congestion window(s)! + * communicators do not offer flow control). + * We do transmit FC window sizes now. Left: + * for SENDING) + * - Increment "outbound_fc_window_size_used_kb" on transmission + * - Throttle sending if "outbound_fc_window_size_used_kb" reaches limit + * - Send *new* challenge when we get close to the limit (including + * at the beginning when the limit is zero!) + * - Retransmit challenge if it goes unanswered! + * + * for RECEIVING) + * - increment incoming_fc_window_size_ram_kb when receiving + * incoming packets! + * - OR drop if incoming_fc_window_size_ram goes + * (significantly?) beyond available_fc_window_size_kb + * - decrement incoming_fc_window_size_ram_kb when CORE is done + * with incoming packets! + * - increment incoming_fc_window_size_used_kb when CORE is done + * with incoming packets! + * * - review retransmission logic, right now there is no smartness there! - * => congestion control, flow control, etc [PERFORMANCE-BASICS] + * => congestion control, etc [PERFORMANCE-BASICS] * * Optimizations: * - When forwarding DV learn messages, if a peer is reached that @@ -59,6 +75,14 @@ * AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH] * (-> have 2nd type of acknowledgment message; low priority, as we * do not have an MTU-limited *reliable* communicator) + * - Adapt available_fc_window_size_kb, using larger values for high-bandwidth + * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls] + * - Set last_window_consum_limit_kb promise properly based on + * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls] + * - re-sending challenge response without a challenge when we have + * significantly increased the FC window (upon CORE being done with messages) + * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] + * Also can re-use signature in this case [CPU]. * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -113,6 +137,22 @@ */ #define GOODPUT_AGING_SLOTS 4 +/** + * How big is the flow control window size by default; + * limits per-neighbour RAM utilization (in kilobytes). + */ +#define DEFAULT_WINDOW_SIZE_KB 128 + +/** + * For how many incoming connections do we try to create a + * virtual link for (at the same time!). This does NOT + * limit the number of incoming connections, just the number + * for which we are actively trying to find working addresses + * in the absence (!) of our own applications wanting the + * link to go up. + */ +#define MAX_INCOMING_REQUEST 16 + /** * Maximum number of peers we select for forwarding DVInit * messages at the same time (excluding initiator). @@ -818,64 +858,8 @@ struct TransportValidationChallengeMessage struct ChallengeNonceP challenge; /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. - */ - struct GNUNET_TIME_AbsoluteNBO sender_time; -}; - - -/** - * Message send to another peer to answer to a validation challenge - * and at the same time issue a challenge in the other direction. - */ -struct TransportValidationChallengeResponseMessage -{ - - /** - * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE - */ - struct GNUNET_MessageHeader header; - - /** - * Flow control window size in kilobytes (1024 b), in NBO. - * The receiver can now send this many kilobytes as per - * the @e received_challenge "account". - */ - uint32_t fc_window_size_kb GNUNET_PACKED; - - /** - * Challenge returned to the origin by the receiving peer. - */ - struct ChallengeNonceP received_challenge; - - /** - * The peer's signature matching the - * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose. - */ - struct GNUNET_CRYPTO_EddsaSignature signature; - - /** - * Fresh challenge created by the sender to be returned - * by the receiving peer. - */ - struct ChallengeNonceP sender_challenge; - - /** - * How long does the sender believe the address on - * which the challenge was received to remain valid? - */ - struct GNUNET_TIME_RelativeNBO validity_duration; - - /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. - */ - struct GNUNET_TIME_AbsoluteNBO origin_time; - - /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. + * Timestamp of the sender, to be copied into the reply to allow + * sender to calculate RTT. Must be monotonically increasing! */ struct GNUNET_TIME_AbsoluteNBO sender_time; }; @@ -1224,7 +1208,84 @@ struct VirtualLink uint64_t message_uuid_ctr; /** - * How many more messages can we send to core before we exhaust + * Sender timestamp of @e n_challenge, used to generate out-of-order + * challenges (as sender's timestamps must be monotonically + * increasing). Note that we do not persist this monotonic time + * as we do not really have to worry about ancient flow control + * window sizes after restarts. + */ + struct GNUNET_TIME_Absolute n_challenge_time; + + /** + * Last challenge we received from @a n, for which we created the + * flow control window given in @e fc_window_size_kb. + */ + struct ChallengeNonceP n_challenge; + + /** + * Last challenge we used with @a n for flow control. If we receive + * window size increases for a different challenge, they are + * out-of-order and must be discarded! + */ + struct ChallengeNonceP my_challenge; + + /** + * Memory allocated for this virtual link. Expresses how much RAM + * we are willing to allocate to this virtual link. OPTIMIZE-ME: + * Can be adapted to dedicate more RAM to links that need it, while + * sticking to some overall RAM limit. For now, set to + * #DEFAULT_WINDOW_SIZE_KB. + */ + uint32_t available_fc_window_size_kb; + + /** + * Memory actually used to buffer packets on this virtual link. + * Expresses how much RAM we are currently using for virtual link. + * Note that once CORE is done with a packet, we decrement the value + * here. + */ + uint32_t incoming_fc_window_size_ram_kb; + + /** + * Last flow control window size we provided to the other peer, in + * kilobytes (1024 b). We are allowing the other peer to send this + * many kilobytes as per its last @e n_challenge "account". + */ + uint32_t incoming_fc_window_size_kb; + + /** + * How many bytes could we still get from the previous flow control + * window, in kilobytes (1024 b). We need to consider this value + * when calculating what we allow for the current window due to + * the possibility of out-of-order challenges. + */ + uint32_t last_fc_window_size_remaining_kb; + + /** + * How much of the window did the other peer successfully use (and + * we already passed it on to CORE)? Must be below @e + * incoming_fc_window_size_kb. We should effectively signal the + * other peer that the window is this much bigger at the next + * opportunity / challenge. + */ + uint32_t incoming_fc_window_size_used_kb; + + /** + * Our current flow control window size in kilobytes (1024 b). We + * are allowed to transmit this many kilobytes to @a n as per + * our @e my_challenge "account". + */ + uint32_t outbound_fc_window_size_kb; + + /** + * How much of our current flow control window size have we + * used (in kilobytes (1024 b)). Must be below + * @e outbound_fc_window_size_kb. + */ + uint32_t outbound_fc_window_size_used_kb; + + /** + * How many more messages can we send to CORE before we exhaust * the receive window of CORE for this peer? If this hits zero, * we must tell communicators to stop providing us more messages * for this peer. In fact, the window can go negative as we @@ -1786,6 +1847,35 @@ struct Neighbour }; +/** + * Another peer attempted to talk to us, we should try to establish + * a connection in the other direction. + */ +struct IncomingRequest +{ + + /** + * Kept in a DLL. + */ + struct IncomingRequest *next; + + /** + * Kept in a DLL. + */ + struct IncomingRequest *prev; + + /** + * Handle for watching the peerstore for HELLOs for this peer. + */ + struct GNUNET_PEERSTORE_WatchContext *wc; + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity pid; +}; + + /** * A peer that an application (client) would like us to talk to directly. */ @@ -2356,6 +2446,13 @@ struct ValidationState */ struct GNUNET_PEERSTORE_StoreContext *sc; + /** + * Self-imposed limit on the previous flow control window. (May be zero, + * if we never used data from the previous window or are establishing the + * connection for the first time). + */ + uint32_t last_window_consum_limit_kb; + /** * We are technically ready to send the challenge, but we are waiting for * the respective queue to become available for transmission. @@ -2544,6 +2641,23 @@ static struct PendingAcknowledgement *pa_head; */ static struct PendingAcknowledgement *pa_tail; +/** + * List of incomming connections where we are trying + * to get a connection back established. Length + * kept in #ir_total. + */ +static struct IncomingRequest *ir_head; + +/** + * Tail of DLL starting at #ir_head. + */ +static struct IncomingRequest *ir_tail; + +/** + * Length of the DLL starting at #ir_head. + */ +static unsigned int ir_total; + /** * Generator of `logging_uuid` in `struct PendingMessage`. */ @@ -2586,6 +2700,22 @@ get_age () } +/** + * Release @a ir data structure. + * + * @param ir data structure to release + */ +static void +free_incoming_request (struct IncomingRequest *ir) +{ + GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir); + GNUNET_assert (ir_total > 0); + ir_total--; + GNUNET_PEERSTORE_watch_cancel (ir->wc); + GNUNET_free (ir); +} + + /** * Release @a pa data structure. * @@ -5688,6 +5818,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) vl->dv = dv; dv->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB; vl->visibility_task = GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); GNUNET_break (GNUNET_YES == @@ -7052,87 +7183,98 @@ check_incoming_msg (void *cls, /** - * Communicator gave us a transport address validation challenge. Process the - * request. + * We received a @a challenge from another peer, check if we can + * increase the flow control window to that peer. * - * @param cls a `struct CommunicatorMessageContext` (must call - * #finish_cmc_handling() when done) - * @param tvc the message that was received + * @param vl virtual link + * @param challenge the challenge we received + * @param sender_time when did the peer send the message? + * @param last_window_consum_limit_kb maximum number of kb the sender + * promises to use of the previous window (if any) */ static void -handle_validation_challenge ( - void *cls, - const struct TransportValidationChallengeMessage *tvc) -{ - struct CommunicatorMessageContext *cmc = cls; - struct TransportValidationResponseMessage *tvr; - - if (cmc->total_hops > 0) - { - /* DV routing is not allowed for validation challenges! */ - GNUNET_break_op (0); - finish_cmc_handling (cmc); +update_fc_window (struct VirtualLink *vl, + const struct ChallengeNonceP *challenge, + struct GNUNET_TIME_Absolute sender_time, + uint32_t last_window_consum_limit_kb) +{ + if (0 == GNUNET_memcmp (challenge, &vl->n_challenge)) + { + uint32_t avail; + + /* Challenge identical to last one, update + @a last_window_consum_limit_kb (to minimum) */ + vl->last_fc_window_size_remaining_kb = + GNUNET_MIN (last_window_consum_limit_kb, + vl->last_fc_window_size_remaining_kb); + /* window could have shrunk! */ + if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb) + avail = + vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb; + else + avail = 0; + /* guard against integer overflow */ + if (vl->incoming_fc_window_size_used_kb + avail >= + vl->incoming_fc_window_size_used_kb) + vl->incoming_fc_window_size_kb = + vl->incoming_fc_window_size_used_kb + avail; + else + vl->incoming_fc_window_size_kb = UINT32_MAX; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Updated window to %u/%u kb (%u used) for virtual link to %s!\n", + vl->incoming_fc_window_size_kb, + vl->available_fc_window_size_kb, + vl->incoming_fc_window_size_used_kb, + GNUNET_i2s (&vl->target)); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received address validation challenge %s\n", - GNUNET_sh2s (&tvc->challenge.value)); - tvr = GNUNET_new (struct TransportValidationResponseMessage); - tvr->header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE); - tvr->header.size = htons (sizeof (*tvr)); - tvr->challenge = tvc->challenge; - tvr->origin_time = tvc->sender_time; - tvr->validity_duration = cmc->im.expected_address_validity; + if (vl->n_challenge_time.abs_value_us >= sender_time.abs_value_us) { - /* create signature */ - struct TransportValidationPS tvp = - {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE), - .purpose.size = htonl (sizeof (tvp)), - .validity_duration = tvr->validity_duration, - .challenge = tvc->challenge}; - - GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key, - &tvp.purpose, - &tvr->signature)); + GNUNET_STATISTICS_update (GST_stats, + "# Challenges ignored: sender time not increasing", + 1, + GNUNET_NO); + return; } - route_message (&cmc->im.sender, - &tvr->header, - RMO_ANYTHING_GOES | RMO_REDUNDANT); - finish_cmc_handling (cmc); -} - - -/** - * Communicator gave us a transport address validation challenge. Process the - * request. - * - * @param cls a `struct CommunicatorMessageContext` (must call - * #finish_cmc_handling() when done) - * @param tvrc the message that was received - */ -static void -handle_validation_challenge_response ( - void *cls, - const struct TransportValidationChallengeResponseMessage *tvrc) -{ - struct CommunicatorMessageContext *cmc = cls; - struct TransportValidationResponseMessage *tvr; - - - finish_cmc_handling (cmc); + /* new challenge! */ + if (vl->incoming_fc_window_size_used_kb > last_window_consum_limit_kb) + { + /* lying peer: it already used more than it promised it would ever use! */ + GNUNET_break_op (0); + last_window_consum_limit_kb = vl->incoming_fc_window_size_used_kb; + } + /* What remains is at most the difference between what we already processed + and what the sender promises to limit itself to. */ + vl->last_fc_window_size_remaining_kb = + last_window_consum_limit_kb - vl->incoming_fc_window_size_used_kb; + vl->n_challenge = *challenge; + vl->n_challenge_time = sender_time; + vl->incoming_fc_window_size_used_kb = 0; + /* window could have shrunk! */ + if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb) + vl->incoming_fc_window_size_kb = + vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb; + else + vl->incoming_fc_window_size_kb = 0; + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "New window at %u/%u kb (%u left on previous) for virtual link to %s!\n", + vl->incoming_fc_window_size_kb, + vl->available_fc_window_size_kb, + vl->last_fc_window_size_remaining_kb, + GNUNET_i2s (&vl->target)); } /** - * Closure for #check_known_challenge. + * Closure for #check_known_address. */ -struct CheckKnownChallengeContext +struct CheckKnownAddressContext { /** - * Set to the challenge we are looking for. + * Set to the address we are looking for. */ - const struct ChallengeNonceP *challenge; + const char *address; /** * Set to a matching validation state, if one was found. @@ -7143,51 +7285,29 @@ struct CheckKnownChallengeContext /** * Test if the validation state in @a value matches the - * challenge from @a cls. + * address from @a cls. * - * @param cls a `struct CheckKnownChallengeContext` + * @param cls a `struct CheckKnownAddressContext` * @param pid unused (must match though) * @param value a `struct ValidationState` * @return #GNUNET_OK if not matching, #GNUNET_NO if match found */ static int -check_known_challenge (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) +check_known_address (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) { - struct CheckKnownChallengeContext *ckac = cls; + struct CheckKnownAddressContext *ckac = cls; struct ValidationState *vs = value; (void) pid; - if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge)) + if (0 != strcmp (vs->address, ckac->address)) return GNUNET_OK; ckac->vs = vs; return GNUNET_NO; } -/** - * Function called when peerstore is done storing a - * validated address. - * - * @param cls a `struct ValidationState` - * @param success #GNUNET_YES on success - */ -static void -peerstore_store_validation_cb (void *cls, int success) -{ - struct ValidationState *vs = cls; - - vs->sc = NULL; - if (GNUNET_YES == success) - return; - GNUNET_STATISTICS_update (GST_stats, - "# Peerstore failed to store foreign address", - 1, - GNUNET_NO); -} - - /** * Task run periodically to validate some address based on #validation_heap. * @@ -7234,25 +7354,279 @@ update_next_challenge_time (struct ValidationState *vs, /** - * Find the queue matching @a pid and @a address. + * Start address validation. * - * @param pid peer the queue must go to - * @param address address the queue must use - * @return NULL if no such queue exists + * @param pid peer the @a address is for + * @param address an address to reach @a pid (presumably) */ -static struct Queue * -find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) +static void +start_address_validation (const struct GNUNET_PeerIdentity *pid, + const char *address) { - struct Neighbour *n; + struct GNUNET_TIME_Absolute now; + struct ValidationState *vs; + struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL}; - n = lookup_neighbour (pid); - if (NULL == n) - return NULL; - for (struct Queue *pos = n->queue_head; NULL != pos; - pos = pos->next_neighbour) + (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, + pid, + &check_known_address, + &ckac); + if (NULL != (vs = ckac.vs)) { - if (0 == strcmp (pos->address, address)) - return pos; + /* if 'vs' is not currently valid, we need to speed up retrying the + * validation */ + if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us) + { + /* reduce backoff as we got a fresh advertisement */ + vs->challenge_backoff = + GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ, + GNUNET_TIME_relative_divide (vs->challenge_backoff, + 2)); + update_next_challenge_time (vs, + GNUNET_TIME_relative_to_absolute ( + vs->challenge_backoff)); + } + return; + } + now = GNUNET_TIME_absolute_get (); + vs = GNUNET_new (struct ValidationState); + vs->pid = *pid; + vs->valid_until = + GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME); + vs->first_challenge_use = now; + vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL; + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &vs->challenge, + sizeof (vs->challenge)); + vs->address = GNUNET_strdup (address); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting address validation `%s' of peer %s using challenge %s\n", + address, + GNUNET_i2s (pid), + GNUNET_sh2s (&vs->challenge.value)); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + validation_map, + &vs->pid, + vs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + update_next_challenge_time (vs, now); +} + + +/** + * Function called by PEERSTORE for each matching record. + * + * @param cls closure, a `struct IncomingRequest` + * @param record peerstore record information + * @param emsg error message, or NULL if no errors + */ +static void +handle_hello_for_incoming (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) +{ + struct IncomingRequest *ir = cls; + const char *val; + + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got failure from PEERSTORE: %s\n", + emsg); + return; + } + val = record->value; + if ((0 == record->value_size) || ('\0' != val[record->value_size - 1])) + { + GNUNET_break (0); + return; + } + start_address_validation (&ir->pid, (const char *) record->value); +} + + +/** + * Communicator gave us a transport address validation challenge. Process the + * request. + * + * @param cls a `struct CommunicatorMessageContext` (must call + * #finish_cmc_handling() when done) + * @param tvc the message that was received + */ +static void +handle_validation_challenge ( + void *cls, + const struct TransportValidationChallengeMessage *tvc) +{ + struct CommunicatorMessageContext *cmc = cls; + struct TransportValidationResponseMessage tvr; + struct VirtualLink *vl; + struct GNUNET_TIME_RelativeNBO validity_duration; + struct IncomingRequest *ir; + struct Neighbour *n; + + /* We use a validity_duration of 0 for DV-routed messages, + as we can neither control the validity and need to allow + the receiver to tell DV paths from direct connections */ + if (cmc->total_hops > 0) + validity_duration = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); + else + validity_duration = cmc->im.expected_address_validity; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received address validation challenge %s\n", + GNUNET_sh2s (&tvc->challenge.value)); + /* If we have a virtual link, we use this mechanism to signal the + size of the flow control window, and to allow the sender + to ask for increases. If for us the virtual link is still down, + we will always give a window size of zero. */ + vl = lookup_virtual_link (&cmc->im.sender); + if (NULL != vl) + update_fc_window (vl, + &tvc->challenge, + GNUNET_TIME_absolute_ntoh (tvc->sender_time), + ntohl (tvc->last_window_consum_limit_kb)); + tvr.header.type = + htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE); + tvr.header.size = htons (sizeof (tvr)); + tvr.fc_window_size_kb = + htonl ((NULL == vl) ? 0 : vl->incoming_fc_window_size_kb); + tvr.challenge = tvc->challenge; + tvr.origin_time = tvc->sender_time; + tvr.validity_duration = validity_duration; + { + /* create signature */ + struct TransportValidationPS tvp = + {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE), + .purpose.size = htonl (sizeof (tvp)), + .validity_duration = validity_duration, + .challenge = tvc->challenge}; + + GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key, + &tvp.purpose, + &tvr.signature)); + } + route_message (&cmc->im.sender, + &tvr.header, + RMO_ANYTHING_GOES | RMO_REDUNDANT); + finish_cmc_handling (cmc); + if (NULL != vl) + return; + + /* For us, the link is still down, but we need bi-directional + connections (for flow-control and for this to be useful for + CORE), so we must try to bring the link up! */ + + /* (1) Check existing queues, if any, we may be lucky! */ + n = lookup_neighbour (&cmc->im.sender); + if (NULL != n) + for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) + start_address_validation (&cmc->im.sender, q->address); + /* (2) Also try to see if we have addresses in PEERSTORE for this peer + we could use */ + for (ir = ir_head; NULL != ir; ir = ir->next) + if (0 == GNUNET_memcmp (&ir->pid, &cmc->im.sender)) + return; /* we are already trying */ + ir = GNUNET_new (struct IncomingRequest); + ir->pid = cmc->im.sender; + GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir); + ir->wc = GNUNET_PEERSTORE_watch (peerstore, + "transport", + &ir->pid, + GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY, + &handle_hello_for_incoming, + ir); + ir_total++; + /* Bound attempts we do in parallel here, might otherwise get excessive */ + while (ir_total > MAX_INCOMING_REQUEST) + free_incoming_request (ir_head); +} + + +/** + * Closure for #check_known_challenge. + */ +struct CheckKnownChallengeContext +{ + /** + * Set to the challenge we are looking for. + */ + const struct ChallengeNonceP *challenge; + + /** + * Set to a matching validation state, if one was found. + */ + struct ValidationState *vs; +}; + + +/** + * Test if the validation state in @a value matches the + * challenge from @a cls. + * + * @param cls a `struct CheckKnownChallengeContext` + * @param pid unused (must match though) + * @param value a `struct ValidationState` + * @return #GNUNET_OK if not matching, #GNUNET_NO if match found + */ +static int +check_known_challenge (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct CheckKnownChallengeContext *ckac = cls; + struct ValidationState *vs = value; + + (void) pid; + if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge)) + return GNUNET_OK; + ckac->vs = vs; + return GNUNET_NO; +} + + +/** + * Function called when peerstore is done storing a + * validated address. + * + * @param cls a `struct ValidationState` + * @param success #GNUNET_YES on success + */ +static void +peerstore_store_validation_cb (void *cls, int success) +{ + struct ValidationState *vs = cls; + + vs->sc = NULL; + if (GNUNET_YES == success) + return; + GNUNET_STATISTICS_update (GST_stats, + "# Peerstore failed to store foreign address", + 1, + GNUNET_NO); +} + + +/** + * Find the queue matching @a pid and @a address. + * + * @param pid peer the queue must go to + * @param address address the queue must use + * @return NULL if no such queue exists + */ +static struct Queue * +find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) +{ + struct Neighbour *n; + + n = lookup_neighbour (pid); + if (NULL == n) + return NULL; + for (struct Queue *pos = n->queue_head; NULL != pos; + pos = pos->next_neighbour) + { + if (0 == strcmp (pos->address, address)) + return pos; } return NULL; } @@ -7387,10 +7761,23 @@ handle_validation_response ( if (NULL != vl) { /* Link was already up, remember n is also now available and we are done */ - vl->n = n; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Virtual link to %s could now also direct neighbour!\n", - GNUNET_i2s (&vs->pid)); + if (NULL == vl->n) + { + vl->n = n; + n->vl = vl; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Virtual link to %s could now also direct neighbour!\n", + GNUNET_i2s (&vs->pid)); + } + else + { + GNUNET_assert (n == vl->n); + } + if (0 == GNUNET_memcmp (&vl->my_challenge, &tvr->challenge)) + { + /* Update window size if the challenge matches */ + vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb); + } return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -7401,6 +7788,9 @@ handle_validation_response ( vl->n = n; n->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB; + vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb); + vl->my_challenge = tvr->challenge; vl->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); GNUNET_break (GNUNET_YES == @@ -7478,11 +7868,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE, struct TransportValidationChallengeMessage, &cmc), - GNUNET_MQ_hd_fixed_size ( - validation_challenge_response, - GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE, - struct TransportValidationChallengeResponseMessage, - &cmc), GNUNET_MQ_hd_fixed_size ( validation_response, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE, @@ -8398,11 +8783,11 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs) { struct TransportValidationChallengeMessage tvc; - vs->last_challenge_use = GNUNET_TIME_absolute_get (); + vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg); tvc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE); tvc.header.size = htons (sizeof (tvc)); - tvc.last_window_consum_limit_kb = htonl (0); // FIXME! + tvc.last_window_consum_limit_kb = htonl (vs->last_window_consum_limit_kb); tvc.challenge = vs->challenge; tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use); GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -8892,120 +9277,17 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg) } -/** - * Closure for #check_known_address. - */ -struct CheckKnownAddressContext -{ - /** - * Set to the address we are looking for. - */ - const char *address; - - /** - * Set to a matching validation state, if one was found. - */ - struct ValidationState *vs; -}; - - -/** - * Test if the validation state in @a value matches the - * address from @a cls. - * - * @param cls a `struct CheckKnownAddressContext` - * @param pid unused (must match though) - * @param value a `struct ValidationState` - * @return #GNUNET_OK if not matching, #GNUNET_NO if match found - */ -static int -check_known_address (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) -{ - struct CheckKnownAddressContext *ckac = cls; - struct ValidationState *vs = value; - - (void) pid; - if (0 != strcmp (vs->address, ckac->address)) - return GNUNET_OK; - ckac->vs = vs; - return GNUNET_NO; -} - - -/** - * Start address validation. - * - * @param pid peer the @a address is for - * @param address an address to reach @a pid (presumably) - */ -static void -start_address_validation (const struct GNUNET_PeerIdentity *pid, - const char *address) -{ - struct GNUNET_TIME_Absolute now; - struct ValidationState *vs; - struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL}; - - (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, - pid, - &check_known_address, - &ckac); - if (NULL != (vs = ckac.vs)) - { - /* if 'vs' is not currently valid, we need to speed up retrying the - * validation */ - if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us) - { - /* reduce backoff as we got a fresh advertisement */ - vs->challenge_backoff = - GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ, - GNUNET_TIME_relative_divide (vs->challenge_backoff, - 2)); - update_next_challenge_time (vs, - GNUNET_TIME_relative_to_absolute ( - vs->challenge_backoff)); - } - return; - } - now = GNUNET_TIME_absolute_get (); - vs = GNUNET_new (struct ValidationState); - vs->pid = *pid; - vs->valid_until = - GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME); - vs->first_challenge_use = now; - vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL; - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &vs->challenge, - sizeof (vs->challenge)); - vs->address = GNUNET_strdup (address); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting address validation `%s' of peer %s using challenge %s\n", - address, - GNUNET_i2s (pid), - GNUNET_sh2s (&vs->challenge.value)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put ( - validation_map, - &vs->pid, - vs, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - update_next_challenge_time (vs, now); -} - - /** * Function called by PEERSTORE for each matching record. * - * @param cls closure + * @param cls closure, a `struct PeerRequest` * @param record peerstore record information * @param emsg error message, or NULL if no errors */ static void -handle_hello (void *cls, - const struct GNUNET_PEERSTORE_Record *record, - const char *emsg) +handle_hello_for_client (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) { struct PeerRequest *pr = cls; const char *val; @@ -9077,7 +9359,7 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg) "transport", &pr->pid, GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY, - &handle_hello, + &handle_hello_for_client, pr); GNUNET_SERVICE_client_continue (tc->client); } @@ -9286,6 +9568,9 @@ do_shutdown (void *cls) NULL); GNUNET_CONTAINER_multipeermap_destroy (validation_map); validation_map = NULL; + while (NULL != ir_head) + free_incoming_request (ir_head); + GNUNET_assert (0 == ir_total); while (NULL != (lle = lle_head)) { GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle); -- cgit v1.2.3