diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-05-19 13:56:22 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-05-19 13:56:22 +0200 |
commit | 6e2ca97fb0d3d85c1ef127bc0b9bd3e1e72aa9e8 (patch) | |
tree | ab6fccbfa05730f952c519cb3f397afd7af199aa /src/transport/gnunet-service-tng.c | |
parent | be18b8ecb5232ef5bc718cb8d71c4bcc9f03d2f7 (diff) |
FC work
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 809 |
1 files changed, 547 insertions, 262 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index e00c7d573..f009a491b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -25,11 +25,27 @@ * TODO: * Implement next: * - FIXME-FC: realize transport-to-transport flow control (needed in case - * communicators do not offer flow control). Note that we may not - * want to simply delay the ACKs as that may cause unnecessary - * re-transmissions. => Introduce proper flow and congestion window(s)! + * communicators do not offer flow control). + * We do transmit FC window sizes now. Left: + * for SENDING) + * - Increment "outbound_fc_window_size_used_kb" on transmission + * - Throttle sending if "outbound_fc_window_size_used_kb" reaches limit + * - Send *new* challenge when we get close to the limit (including + * at the beginning when the limit is zero!) + * - Retransmit challenge if it goes unanswered! + * + * for RECEIVING) + * - increment incoming_fc_window_size_ram_kb when receiving + * incoming packets! + * - OR drop if incoming_fc_window_size_ram goes + * (significantly?) beyond available_fc_window_size_kb + * - decrement incoming_fc_window_size_ram_kb when CORE is done + * with incoming packets! + * - increment incoming_fc_window_size_used_kb when CORE is done + * with incoming packets! + * * - review retransmission logic, right now there is no smartness there! - * => congestion control, flow control, etc [PERFORMANCE-BASICS] + * => congestion control, etc [PERFORMANCE-BASICS] * * Optimizations: * - When forwarding DV learn messages, if a peer is reached that @@ -59,6 +75,14 @@ * AcknowledgementUUIDP altogether, as they won't be acked! [BANDWIDTH] * (-> have 2nd type of acknowledgment message; low priority, as we * do not have an MTU-limited *reliable* communicator) + * - Adapt available_fc_window_size_kb, using larger values for high-bandwidth + * and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls] + * - Set last_window_consum_limit_kb promise properly based on + * latency and bandwidth of the respective connection [GOODPUT / utilization / stalls] + * - re-sending challenge response without a challenge when we have + * significantly increased the FC window (upon CORE being done with messages) + * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] + * Also can re-use signature in this case [CPU]. * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -114,6 +138,22 @@ #define GOODPUT_AGING_SLOTS 4 /** + * How big is the flow control window size by default; + * limits per-neighbour RAM utilization (in kilobytes). + */ +#define DEFAULT_WINDOW_SIZE_KB 128 + +/** + * For how many incoming connections do we try to create a + * virtual link for (at the same time!). This does NOT + * limit the number of incoming connections, just the number + * for which we are actively trying to find working addresses + * in the absence (!) of our own applications wanting the + * link to go up. + */ +#define MAX_INCOMING_REQUEST 16 + +/** * Maximum number of peers we select for forwarding DVInit * messages at the same time (excluding initiator). */ @@ -818,64 +858,8 @@ struct TransportValidationChallengeMessage struct ChallengeNonceP challenge; /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. - */ - struct GNUNET_TIME_AbsoluteNBO sender_time; -}; - - -/** - * Message send to another peer to answer to a validation challenge - * and at the same time issue a challenge in the other direction. - */ -struct TransportValidationChallengeResponseMessage -{ - - /** - * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE - */ - struct GNUNET_MessageHeader header; - - /** - * Flow control window size in kilobytes (1024 b), in NBO. - * The receiver can now send this many kilobytes as per - * the @e received_challenge "account". - */ - uint32_t fc_window_size_kb GNUNET_PACKED; - - /** - * Challenge returned to the origin by the receiving peer. - */ - struct ChallengeNonceP received_challenge; - - /** - * The peer's signature matching the - * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose. - */ - struct GNUNET_CRYPTO_EddsaSignature signature; - - /** - * Fresh challenge created by the sender to be returned - * by the receiving peer. - */ - struct ChallengeNonceP sender_challenge; - - /** - * How long does the sender believe the address on - * which the challenge was received to remain valid? - */ - struct GNUNET_TIME_RelativeNBO validity_duration; - - /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. - */ - struct GNUNET_TIME_AbsoluteNBO origin_time; - - /** - * Timestamp of the sender, to be copied into the reply - * to allow sender to calculate RTT. + * Timestamp of the sender, to be copied into the reply to allow + * sender to calculate RTT. Must be monotonically increasing! */ struct GNUNET_TIME_AbsoluteNBO sender_time; }; @@ -1224,7 +1208,84 @@ struct VirtualLink uint64_t message_uuid_ctr; /** - * How many more messages can we send to core before we exhaust + * Sender timestamp of @e n_challenge, used to generate out-of-order + * challenges (as sender's timestamps must be monotonically + * increasing). Note that we do not persist this monotonic time + * as we do not really have to worry about ancient flow control + * window sizes after restarts. + */ + struct GNUNET_TIME_Absolute n_challenge_time; + + /** + * Last challenge we received from @a n, for which we created the + * flow control window given in @e fc_window_size_kb. + */ + struct ChallengeNonceP n_challenge; + + /** + * Last challenge we used with @a n for flow control. If we receive + * window size increases for a different challenge, they are + * out-of-order and must be discarded! + */ + struct ChallengeNonceP my_challenge; + + /** + * Memory allocated for this virtual link. Expresses how much RAM + * we are willing to allocate to this virtual link. OPTIMIZE-ME: + * Can be adapted to dedicate more RAM to links that need it, while + * sticking to some overall RAM limit. For now, set to + * #DEFAULT_WINDOW_SIZE_KB. + */ + uint32_t available_fc_window_size_kb; + + /** + * Memory actually used to buffer packets on this virtual link. + * Expresses how much RAM we are currently using for virtual link. + * Note that once CORE is done with a packet, we decrement the value + * here. + */ + uint32_t incoming_fc_window_size_ram_kb; + + /** + * Last flow control window size we provided to the other peer, in + * kilobytes (1024 b). We are allowing the other peer to send this + * many kilobytes as per its last @e n_challenge "account". + */ + uint32_t incoming_fc_window_size_kb; + + /** + * How many bytes could we still get from the previous flow control + * window, in kilobytes (1024 b). We need to consider this value + * when calculating what we allow for the current window due to + * the possibility of out-of-order challenges. + */ + uint32_t last_fc_window_size_remaining_kb; + + /** + * How much of the window did the other peer successfully use (and + * we already passed it on to CORE)? Must be below @e + * incoming_fc_window_size_kb. We should effectively signal the + * other peer that the window is this much bigger at the next + * opportunity / challenge. + */ + uint32_t incoming_fc_window_size_used_kb; + + /** + * Our current flow control window size in kilobytes (1024 b). We + * are allowed to transmit this many kilobytes to @a n as per + * our @e my_challenge "account". + */ + uint32_t outbound_fc_window_size_kb; + + /** + * How much of our current flow control window size have we + * used (in kilobytes (1024 b)). Must be below + * @e outbound_fc_window_size_kb. + */ + uint32_t outbound_fc_window_size_used_kb; + + /** + * How many more messages can we send to CORE before we exhaust * the receive window of CORE for this peer? If this hits zero, * we must tell communicators to stop providing us more messages * for this peer. In fact, the window can go negative as we @@ -1787,6 +1848,35 @@ struct Neighbour /** + * Another peer attempted to talk to us, we should try to establish + * a connection in the other direction. + */ +struct IncomingRequest +{ + + /** + * Kept in a DLL. + */ + struct IncomingRequest *next; + + /** + * Kept in a DLL. + */ + struct IncomingRequest *prev; + + /** + * Handle for watching the peerstore for HELLOs for this peer. + */ + struct GNUNET_PEERSTORE_WatchContext *wc; + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity pid; +}; + + +/** * A peer that an application (client) would like us to talk to directly. */ struct PeerRequest @@ -2357,6 +2447,13 @@ struct ValidationState struct GNUNET_PEERSTORE_StoreContext *sc; /** + * Self-imposed limit on the previous flow control window. (May be zero, + * if we never used data from the previous window or are establishing the + * connection for the first time). + */ + uint32_t last_window_consum_limit_kb; + + /** * We are technically ready to send the challenge, but we are waiting for * the respective queue to become available for transmission. */ @@ -2545,6 +2642,23 @@ static struct PendingAcknowledgement *pa_head; static struct PendingAcknowledgement *pa_tail; /** + * List of incomming connections where we are trying + * to get a connection back established. Length + * kept in #ir_total. + */ +static struct IncomingRequest *ir_head; + +/** + * Tail of DLL starting at #ir_head. + */ +static struct IncomingRequest *ir_tail; + +/** + * Length of the DLL starting at #ir_head. + */ +static unsigned int ir_total; + +/** * Generator of `logging_uuid` in `struct PendingMessage`. */ static unsigned long long logging_uuid_gen; @@ -2587,6 +2701,22 @@ get_age () /** + * Release @a ir data structure. + * + * @param ir data structure to release + */ +static void +free_incoming_request (struct IncomingRequest *ir) +{ + GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir); + GNUNET_assert (ir_total > 0); + ir_total--; + GNUNET_PEERSTORE_watch_cancel (ir->wc); + GNUNET_free (ir); +} + + +/** * Release @a pa data structure. * * @param pa data structure to release @@ -5688,6 +5818,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) vl->dv = dv; dv->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB; vl->visibility_task = GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); GNUNET_break (GNUNET_YES == @@ -7052,6 +7183,270 @@ check_incoming_msg (void *cls, /** + * We received a @a challenge from another peer, check if we can + * increase the flow control window to that peer. + * + * @param vl virtual link + * @param challenge the challenge we received + * @param sender_time when did the peer send the message? + * @param last_window_consum_limit_kb maximum number of kb the sender + * promises to use of the previous window (if any) + */ +static void +update_fc_window (struct VirtualLink *vl, + const struct ChallengeNonceP *challenge, + struct GNUNET_TIME_Absolute sender_time, + uint32_t last_window_consum_limit_kb) +{ + if (0 == GNUNET_memcmp (challenge, &vl->n_challenge)) + { + uint32_t avail; + + /* Challenge identical to last one, update + @a last_window_consum_limit_kb (to minimum) */ + vl->last_fc_window_size_remaining_kb = + GNUNET_MIN (last_window_consum_limit_kb, + vl->last_fc_window_size_remaining_kb); + /* window could have shrunk! */ + if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb) + avail = + vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb; + else + avail = 0; + /* guard against integer overflow */ + if (vl->incoming_fc_window_size_used_kb + avail >= + vl->incoming_fc_window_size_used_kb) + vl->incoming_fc_window_size_kb = + vl->incoming_fc_window_size_used_kb + avail; + else + vl->incoming_fc_window_size_kb = UINT32_MAX; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Updated window to %u/%u kb (%u used) for virtual link to %s!\n", + vl->incoming_fc_window_size_kb, + vl->available_fc_window_size_kb, + vl->incoming_fc_window_size_used_kb, + GNUNET_i2s (&vl->target)); + return; + } + if (vl->n_challenge_time.abs_value_us >= sender_time.abs_value_us) + { + GNUNET_STATISTICS_update (GST_stats, + "# Challenges ignored: sender time not increasing", + 1, + GNUNET_NO); + return; + } + /* new challenge! */ + if (vl->incoming_fc_window_size_used_kb > last_window_consum_limit_kb) + { + /* lying peer: it already used more than it promised it would ever use! */ + GNUNET_break_op (0); + last_window_consum_limit_kb = vl->incoming_fc_window_size_used_kb; + } + /* What remains is at most the difference between what we already processed + and what the sender promises to limit itself to. */ + vl->last_fc_window_size_remaining_kb = + last_window_consum_limit_kb - vl->incoming_fc_window_size_used_kb; + vl->n_challenge = *challenge; + vl->n_challenge_time = sender_time; + vl->incoming_fc_window_size_used_kb = 0; + /* window could have shrunk! */ + if (vl->available_fc_window_size_kb > vl->last_fc_window_size_remaining_kb) + vl->incoming_fc_window_size_kb = + vl->available_fc_window_size_kb - vl->last_fc_window_size_remaining_kb; + else + vl->incoming_fc_window_size_kb = 0; + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "New window at %u/%u kb (%u left on previous) for virtual link to %s!\n", + vl->incoming_fc_window_size_kb, + vl->available_fc_window_size_kb, + vl->last_fc_window_size_remaining_kb, + GNUNET_i2s (&vl->target)); +} + + +/** + * Closure for #check_known_address. + */ +struct CheckKnownAddressContext +{ + /** + * Set to the address we are looking for. + */ + const char *address; + + /** + * Set to a matching validation state, if one was found. + */ + struct ValidationState *vs; +}; + + +/** + * Test if the validation state in @a value matches the + * address from @a cls. + * + * @param cls a `struct CheckKnownAddressContext` + * @param pid unused (must match though) + * @param value a `struct ValidationState` + * @return #GNUNET_OK if not matching, #GNUNET_NO if match found + */ +static int +check_known_address (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct CheckKnownAddressContext *ckac = cls; + struct ValidationState *vs = value; + + (void) pid; + if (0 != strcmp (vs->address, ckac->address)) + return GNUNET_OK; + ckac->vs = vs; + return GNUNET_NO; +} + + +/** + * Task run periodically to validate some address based on #validation_heap. + * + * @param cls NULL + */ +static void +validation_start_cb (void *cls); + + +/** + * Set the time for next_challenge of @a vs to @a new_time. + * Updates the heap and if necessary reschedules the job. + * + * @param vs validation state to update + * @param new_time new time for revalidation + */ +static void +update_next_challenge_time (struct ValidationState *vs, + struct GNUNET_TIME_Absolute new_time) +{ + struct GNUNET_TIME_Relative delta; + + if (new_time.abs_value_us == vs->next_challenge.abs_value_us) + return; /* be lazy */ + vs->next_challenge = new_time; + if (NULL == vs->hn) + vs->hn = + GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us); + else + GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us); + if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) && + (NULL != validation_task)) + return; + if (NULL != validation_task) + GNUNET_SCHEDULER_cancel (validation_task); + /* randomize a bit */ + delta.rel_value_us = + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + MIN_DELAY_ADDRESS_VALIDATION.rel_value_us); + new_time = GNUNET_TIME_absolute_add (new_time, delta); + validation_task = + GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL); +} + + +/** + * Start address validation. + * + * @param pid peer the @a address is for + * @param address an address to reach @a pid (presumably) + */ +static void +start_address_validation (const struct GNUNET_PeerIdentity *pid, + const char *address) +{ + struct GNUNET_TIME_Absolute now; + struct ValidationState *vs; + struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL}; + + (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, + pid, + &check_known_address, + &ckac); + if (NULL != (vs = ckac.vs)) + { + /* if 'vs' is not currently valid, we need to speed up retrying the + * validation */ + if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us) + { + /* reduce backoff as we got a fresh advertisement */ + vs->challenge_backoff = + GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ, + GNUNET_TIME_relative_divide (vs->challenge_backoff, + 2)); + update_next_challenge_time (vs, + GNUNET_TIME_relative_to_absolute ( + vs->challenge_backoff)); + } + return; + } + now = GNUNET_TIME_absolute_get (); + vs = GNUNET_new (struct ValidationState); + vs->pid = *pid; + vs->valid_until = + GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME); + vs->first_challenge_use = now; + vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL; + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &vs->challenge, + sizeof (vs->challenge)); + vs->address = GNUNET_strdup (address); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting address validation `%s' of peer %s using challenge %s\n", + address, + GNUNET_i2s (pid), + GNUNET_sh2s (&vs->challenge.value)); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + validation_map, + &vs->pid, + vs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + update_next_challenge_time (vs, now); +} + + +/** + * Function called by PEERSTORE for each matching record. + * + * @param cls closure, a `struct IncomingRequest` + * @param record peerstore record information + * @param emsg error message, or NULL if no errors + */ +static void +handle_hello_for_incoming (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) +{ + struct IncomingRequest *ir = cls; + const char *val; + + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got failure from PEERSTORE: %s\n", + emsg); + return; + } + val = record->value; + if ((0 == record->value_size) || ('\0' != val[record->value_size - 1])) + { + GNUNET_break (0); + return; + } + start_address_validation (&ir->pid, (const char *) record->value); +} + + +/** * Communicator gave us a transport address validation challenge. Process the * request. * @@ -7065,62 +7460,86 @@ handle_validation_challenge ( const struct TransportValidationChallengeMessage *tvc) { struct CommunicatorMessageContext *cmc = cls; - struct TransportValidationResponseMessage *tvr; + struct TransportValidationResponseMessage tvr; + struct VirtualLink *vl; + struct GNUNET_TIME_RelativeNBO validity_duration; + struct IncomingRequest *ir; + struct Neighbour *n; + /* We use a validity_duration of 0 for DV-routed messages, + as we can neither control the validity and need to allow + the receiver to tell DV paths from direct connections */ if (cmc->total_hops > 0) - { - /* DV routing is not allowed for validation challenges! */ - GNUNET_break_op (0); - finish_cmc_handling (cmc); - return; - } + validity_duration = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); + else + validity_duration = cmc->im.expected_address_validity; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received address validation challenge %s\n", GNUNET_sh2s (&tvc->challenge.value)); - tvr = GNUNET_new (struct TransportValidationResponseMessage); - tvr->header.type = + /* If we have a virtual link, we use this mechanism to signal the + size of the flow control window, and to allow the sender + to ask for increases. If for us the virtual link is still down, + we will always give a window size of zero. */ + vl = lookup_virtual_link (&cmc->im.sender); + if (NULL != vl) + update_fc_window (vl, + &tvc->challenge, + GNUNET_TIME_absolute_ntoh (tvc->sender_time), + ntohl (tvc->last_window_consum_limit_kb)); + tvr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE); - tvr->header.size = htons (sizeof (*tvr)); - tvr->challenge = tvc->challenge; - tvr->origin_time = tvc->sender_time; - tvr->validity_duration = cmc->im.expected_address_validity; + tvr.header.size = htons (sizeof (tvr)); + tvr.fc_window_size_kb = + htonl ((NULL == vl) ? 0 : vl->incoming_fc_window_size_kb); + tvr.challenge = tvc->challenge; + tvr.origin_time = tvc->sender_time; + tvr.validity_duration = validity_duration; { /* create signature */ struct TransportValidationPS tvp = {.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE), .purpose.size = htonl (sizeof (tvp)), - .validity_duration = tvr->validity_duration, + .validity_duration = validity_duration, .challenge = tvc->challenge}; GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key, &tvp.purpose, - &tvr->signature)); + &tvr.signature)); } route_message (&cmc->im.sender, - &tvr->header, + &tvr.header, RMO_ANYTHING_GOES | RMO_REDUNDANT); finish_cmc_handling (cmc); -} - - -/** - * Communicator gave us a transport address validation challenge. Process the - * request. - * - * @param cls a `struct CommunicatorMessageContext` (must call - * #finish_cmc_handling() when done) - * @param tvrc the message that was received - */ -static void -handle_validation_challenge_response ( - void *cls, - const struct TransportValidationChallengeResponseMessage *tvrc) -{ - struct CommunicatorMessageContext *cmc = cls; - struct TransportValidationResponseMessage *tvr; + if (NULL != vl) + return; + /* For us, the link is still down, but we need bi-directional + connections (for flow-control and for this to be useful for + CORE), so we must try to bring the link up! */ - finish_cmc_handling (cmc); + /* (1) Check existing queues, if any, we may be lucky! */ + n = lookup_neighbour (&cmc->im.sender); + if (NULL != n) + for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) + start_address_validation (&cmc->im.sender, q->address); + /* (2) Also try to see if we have addresses in PEERSTORE for this peer + we could use */ + for (ir = ir_head; NULL != ir; ir = ir->next) + if (0 == GNUNET_memcmp (&ir->pid, &cmc->im.sender)) + return; /* we are already trying */ + ir = GNUNET_new (struct IncomingRequest); + ir->pid = cmc->im.sender; + GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir); + ir->wc = GNUNET_PEERSTORE_watch (peerstore, + "transport", + &ir->pid, + GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY, + &handle_hello_for_incoming, + ir); + ir_total++; + /* Bound attempts we do in parallel here, might otherwise get excessive */ + while (ir_total > MAX_INCOMING_REQUEST) + free_incoming_request (ir_head); } @@ -7189,51 +7608,6 @@ peerstore_store_validation_cb (void *cls, int success) /** - * Task run periodically to validate some address based on #validation_heap. - * - * @param cls NULL - */ -static void -validation_start_cb (void *cls); - - -/** - * Set the time for next_challenge of @a vs to @a new_time. - * Updates the heap and if necessary reschedules the job. - * - * @param vs validation state to update - * @param new_time new time for revalidation - */ -static void -update_next_challenge_time (struct ValidationState *vs, - struct GNUNET_TIME_Absolute new_time) -{ - struct GNUNET_TIME_Relative delta; - - if (new_time.abs_value_us == vs->next_challenge.abs_value_us) - return; /* be lazy */ - vs->next_challenge = new_time; - if (NULL == vs->hn) - vs->hn = - GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us); - else - GNUNET_CONTAINER_heap_update_cost (vs->hn, new_time.abs_value_us); - if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) && - (NULL != validation_task)) - return; - if (NULL != validation_task) - GNUNET_SCHEDULER_cancel (validation_task); - /* randomize a bit */ - delta.rel_value_us = - GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, - MIN_DELAY_ADDRESS_VALIDATION.rel_value_us); - new_time = GNUNET_TIME_absolute_add (new_time, delta); - validation_task = - GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL); -} - - -/** * Find the queue matching @a pid and @a address. * * @param pid peer the queue must go to @@ -7387,10 +7761,23 @@ handle_validation_response ( if (NULL != vl) { /* Link was already up, remember n is also now available and we are done */ - vl->n = n; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Virtual link to %s could now also direct neighbour!\n", - GNUNET_i2s (&vs->pid)); + if (NULL == vl->n) + { + vl->n = n; + n->vl = vl; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Virtual link to %s could now also direct neighbour!\n", + GNUNET_i2s (&vs->pid)); + } + else + { + GNUNET_assert (n == vl->n); + } + if (0 == GNUNET_memcmp (&vl->my_challenge, &tvr->challenge)) + { + /* Update window size if the challenge matches */ + vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb); + } return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -7401,6 +7788,9 @@ handle_validation_response ( vl->n = n; n->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size_kb = DEFAULT_WINDOW_SIZE_KB; + vl->outbound_fc_window_size_kb = ntohl (tvr->fc_window_size_kb); + vl->my_challenge = tvr->challenge; vl->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); GNUNET_break (GNUNET_YES == @@ -7479,11 +7869,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, struct TransportValidationChallengeMessage, &cmc), GNUNET_MQ_hd_fixed_size ( - validation_challenge_response, - GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE_RESPONSE, - struct TransportValidationChallengeResponseMessage, - &cmc), - GNUNET_MQ_hd_fixed_size ( validation_response, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE, struct TransportValidationResponseMessage, @@ -8398,11 +8783,11 @@ validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs) { struct TransportValidationChallengeMessage tvc; - vs->last_challenge_use = GNUNET_TIME_absolute_get (); + vs->last_challenge_use = GNUNET_TIME_absolute_get_monotonic (GST_cfg); tvc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE); tvc.header.size = htons (sizeof (tvc)); - tvc.last_window_consum_limit_kb = htonl (0); // FIXME! + tvc.last_window_consum_limit_kb = htonl (vs->last_window_consum_limit_kb); tvc.challenge = vs->challenge; tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use); GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -8893,119 +9278,16 @@ handle_suggest_cancel (void *cls, const struct ExpressPreferenceMessage *msg) /** - * Closure for #check_known_address. - */ -struct CheckKnownAddressContext -{ - /** - * Set to the address we are looking for. - */ - const char *address; - - /** - * Set to a matching validation state, if one was found. - */ - struct ValidationState *vs; -}; - - -/** - * Test if the validation state in @a value matches the - * address from @a cls. - * - * @param cls a `struct CheckKnownAddressContext` - * @param pid unused (must match though) - * @param value a `struct ValidationState` - * @return #GNUNET_OK if not matching, #GNUNET_NO if match found - */ -static int -check_known_address (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) -{ - struct CheckKnownAddressContext *ckac = cls; - struct ValidationState *vs = value; - - (void) pid; - if (0 != strcmp (vs->address, ckac->address)) - return GNUNET_OK; - ckac->vs = vs; - return GNUNET_NO; -} - - -/** - * Start address validation. - * - * @param pid peer the @a address is for - * @param address an address to reach @a pid (presumably) - */ -static void -start_address_validation (const struct GNUNET_PeerIdentity *pid, - const char *address) -{ - struct GNUNET_TIME_Absolute now; - struct ValidationState *vs; - struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL}; - - (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, - pid, - &check_known_address, - &ckac); - if (NULL != (vs = ckac.vs)) - { - /* if 'vs' is not currently valid, we need to speed up retrying the - * validation */ - if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us) - { - /* reduce backoff as we got a fresh advertisement */ - vs->challenge_backoff = - GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ, - GNUNET_TIME_relative_divide (vs->challenge_backoff, - 2)); - update_next_challenge_time (vs, - GNUNET_TIME_relative_to_absolute ( - vs->challenge_backoff)); - } - return; - } - now = GNUNET_TIME_absolute_get (); - vs = GNUNET_new (struct ValidationState); - vs->pid = *pid; - vs->valid_until = - GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME); - vs->first_challenge_use = now; - vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL; - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &vs->challenge, - sizeof (vs->challenge)); - vs->address = GNUNET_strdup (address); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting address validation `%s' of peer %s using challenge %s\n", - address, - GNUNET_i2s (pid), - GNUNET_sh2s (&vs->challenge.value)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put ( - validation_map, - &vs->pid, - vs, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - update_next_challenge_time (vs, now); -} - - -/** * Function called by PEERSTORE for each matching record. * - * @param cls closure + * @param cls closure, a `struct PeerRequest` * @param record peerstore record information * @param emsg error message, or NULL if no errors */ static void -handle_hello (void *cls, - const struct GNUNET_PEERSTORE_Record *record, - const char *emsg) +handle_hello_for_client (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) { struct PeerRequest *pr = cls; const char *val; @@ -9077,7 +9359,7 @@ handle_suggest (void *cls, const struct ExpressPreferenceMessage *msg) "transport", &pr->pid, GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY, - &handle_hello, + &handle_hello_for_client, pr); GNUNET_SERVICE_client_continue (tc->client); } @@ -9286,6 +9568,9 @@ do_shutdown (void *cls) NULL); GNUNET_CONTAINER_multipeermap_destroy (validation_map); validation_map = NULL; + while (NULL != ir_head) + free_incoming_request (ir_head); + GNUNET_assert (0 == ir_total); while (NULL != (lle = lle_head)) { GNUNET_CONTAINER_DLL_remove (lle_head, lle_tail, lle); |