From cb4610c2c23191e0977738402d87e7b6bd0808c9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 5 Mar 2010 20:31:58 +0000 Subject: trying to fix bandwidth calculations --- src/core/core.h | 10 +- src/core/core_api_peer_get_info.c | 21 +- src/core/gnunet-service-core.c | 360 +++++++++------------ src/fs/gnunet-service-fs.c | 6 +- src/include/Makefile.am | 1 + src/include/gnunet_constants.h | 12 +- src/include/gnunet_core_service.h | 12 +- src/include/gnunet_transport_service.h | 9 +- src/include/gnunet_util_lib.h | 1 + src/topology/gnunet-daemon-topology.c | 31 +- .../test_gnunet_service_topology_data.conf | 1 + src/transport/gnunet-service-transport.c | 214 +++++------- src/transport/plugin_transport_tcp.c | 2 +- src/transport/test_transport_api.c | 2 - src/transport/transport.h | 21 +- src/transport/transport_api.c | 155 ++++----- 16 files changed, 356 insertions(+), 502 deletions(-) diff --git a/src/core/core.h b/src/core/core.h index d74ee06f6..45d138140 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -211,7 +211,7 @@ struct RequestInfoMessage * peer to at most the specified amount (naturally, the * amount is also limited by the receiving peer). */ - uint32_t limit_outbound_bpm GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO limit_outbound; /** * Number of bytes of inbound traffic to reserve, can @@ -253,16 +253,16 @@ struct ConfigurationInfoMessage int32_t reserved_amount GNUNET_PACKED; /** - * Available bandwidth in (in bytes per minute) for this peer. + * Available bandwidth in for this peer. * 0 if we have been disconnected. */ - uint32_t bpm_in GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO bw_in; /** - * Available bandwidth out (in bytes per minute) for this peer, + * Available bandwidth out for this peer, * 0 if we have been disconnected. */ - uint32_t bpm_out GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO bw_out; /** * Current traffic preference for the peer. diff --git a/src/core/core_api_peer_get_info.c b/src/core/core_api_peer_get_info.c index cd963375f..c28ae2feb 100644 --- a/src/core/core_api_peer_get_info.c +++ b/src/core/core_api_peer_get_info.c @@ -61,12 +61,13 @@ receive_info (void *cls, { struct GNUNET_CORE_InformationRequestContext *irc = cls; const struct ConfigurationInfoMessage *cim; + static struct GNUNET_BANDWIDTH_Value32NBO zbw; /* zero bandwidth */ if (msg == NULL) { if (irc->info != NULL) irc->info (irc->info_cls, - NULL, 0, 0, 0, 0); + NULL, zbw, zbw, 0, 0); GNUNET_CLIENT_disconnect (irc->client); GNUNET_free (irc); return; @@ -77,7 +78,7 @@ receive_info (void *cls, GNUNET_break (0); if (irc->info != NULL) irc->info (irc->info_cls, - NULL, 0, 0, 0, 0); + NULL, zbw, zbw, 0, 0); GNUNET_CLIENT_disconnect (irc->client); GNUNET_free (irc); return; @@ -86,8 +87,8 @@ receive_info (void *cls, if (irc->info != NULL) irc->info (irc->info_cls, &cim->peer, - ntohl (cim->bpm_in), - ntohl (cim->bpm_out), + cim->bw_in, + cim->bw_out, ntohl (cim->reserved_amount), GNUNET_ntohll (cim->preference)); GNUNET_CLIENT_disconnect (irc->client); @@ -103,11 +104,11 @@ receive_info (void *cls, * @param peer identifies the peer * @param timeout after how long should we give up (and call "info" with NULL * for "peer" to signal an error)? - * @param bpm_out set to the current bandwidth limit (sending) for this peer, - * caller should set "bpm_out" to "-1" to avoid changing - * the current value; otherwise "bpm_out" will be lowered to + * @param bw_out set to the current bandwidth limit (sending) for this peer, + * caller should set "bw_out" to "-1" to avoid changing + * the current value; otherwise "bw_out" will be lowered to * the specified value; passing a pointer to "0" can be used to force - * us to disconnect from the peer; "bpm_out" might not increase + * us to disconnect from the peer; "bw_out" might not increase * as specified since the upper bound is generally * determined by the other peer! * @param amount reserve N bytes for receiving, negative @@ -125,7 +126,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_SCHEDULER_Handle *sched, const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *peer, struct GNUNET_TIME_Relative timeout, - uint32_t bpm_out, + struct GNUNET_BANDWIDTH_Value32NBO bw_out, int32_t amount, uint64_t preference, GNUNET_CORE_PeerConfigurationInfoCallback info, @@ -146,7 +147,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_SCHEDULER_Handle *sched, rim.header.size = htons (sizeof (struct RequestInfoMessage)); rim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO); rim.reserved = htonl (0); - rim.limit_outbound_bpm = htonl (bpm_out); + rim.limit_outbound = bw_out; rim.reserve_inbound = htonl (amount); rim.preference_change = GNUNET_htonll(preference); rim.peer = *peer; diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c index 0001fccbf..85be80101 100644 --- a/src/core/gnunet-service-core.c +++ b/src/core/gnunet-service-core.c @@ -48,9 +48,9 @@ /** * Receive and send buffer windows grow over time. For * how long can 'unused' bandwidth accumulate before we - * need to cap it? (specified in ms). + * need to cap it? (specified in seconds). */ -#define MAX_WINDOW_TIME (5 * 60 * 1000) +#define MAX_WINDOW_TIME_S (5 * 60) /** * How many messages do we queue up at most for optional @@ -60,17 +60,11 @@ #define MAX_NOTIFY_QUEUE 16 /** - * Minimum of bytes per minute (out) to assign to any connected peer. - * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no + * Minimum bandwidth (out) to assign to any connected peer. + * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no * sense. */ -#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT - -/** - * What is the smallest change (in number of bytes per minute) - * that we consider significant enough to bother triggering? - */ -#define MIN_BPM_CHANGE 32 +#define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT /** * After how much time past the "official" expiration time do @@ -84,27 +78,27 @@ /** * What is the maximum delay for a SET_KEY message? */ -#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS +#define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** * What how long do we wait for SET_KEY confirmation initially? */ -#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3) +#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3) /** * What is the maximum delay for a PING message? */ -#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS +#define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2) /** * What is the maximum delay for a PONG message? */ -#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS +#define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2) /** * How often do we recalculate bandwidth quotas? */ -#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS +#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** * What is the priority for a SET_KEY message? @@ -206,11 +200,10 @@ struct EncryptedMessage uint32_t sequence_number GNUNET_PACKED; /** - * Desired bandwidth (how much we should send to this - * peer / how much is the sender willing to receive), - * in bytes per minute. + * Desired bandwidth (how much we should send to this peer / how + * much is the sender willing to receive)? */ - uint32_t inbound_bpm_limit GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit; /** * Timestamp. Used to prevent reply of ancient messages @@ -271,10 +264,9 @@ struct PongMessage /** * Desired bandwidth (how much we should send to this - * peer / how much is the sender willing to receive), - * in bytes per minute. + * peer / how much is the sender willing to receive). */ - uint32_t inbound_bpm_limit GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit; /** * Intended target of the PING, used primarily to check @@ -510,32 +502,14 @@ struct Neighbour struct GNUNET_TIME_Relative set_key_retry_frequency; /** - * Time of our last update to the "available_send_window". - */ - struct GNUNET_TIME_Absolute last_asw_update; - - /** - * Time of our last update to the "available_recv_window". - */ - struct GNUNET_TIME_Absolute last_arw_update; - - /** - * Number of bytes that we are eligible to transmit to this - * peer at this point. Incremented every minute by max_out_bpm, - * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes, - * bandwidth-hogs are sampled at a frequency of about 78s!); - * may get negative if we have VERY high priority content. + * Tracking bandwidth for sending to this peer. */ - long long available_send_window; + struct GNUNET_BANDWIDTH_Tracker available_send_window; /** - * How much downstream capacity of this peer has been reserved for - * our traffic? (Our clients can request that a certain amount of - * bandwidth is available for replies to them; this value is used to - * make sure that this reserved amount of bandwidth is actually - * available). + * Tracking bandwidth for receiving from this peer. */ - long long available_recv_window; + struct GNUNET_BANDWIDTH_Tracker available_recv_window; /** * How valueable were the messages of this peer recently? @@ -562,26 +536,26 @@ struct Neighbour /** * Available bandwidth in for this peer (current target). */ - uint32_t bpm_in; + struct GNUNET_BANDWIDTH_Value32NBO bw_in; /** * Available bandwidth out for this peer (current target). */ - uint32_t bpm_out; + struct GNUNET_BANDWIDTH_Value32NBO bw_out; /** - * Internal bandwidth limit set for this peer (initially - * typically set to "-1"). "bpm_out" is MAX of - * "bpm_out_internal_limit" and "bpm_out_external_limit". + * Internal bandwidth limit set for this peer (initially typically + * set to "-1"). Actual "bw_out" is MIN of + * "bpm_out_internal_limit" and "bw_out_external_limit". */ - uint32_t bpm_out_internal_limit; + struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit; /** * External bandwidth limit set for this peer by the - * peer that we are communicating with. "bpm_out" is MAX of - * "bpm_out_internal_limit" and "bpm_out_external_limit". + * peer that we are communicating with. "bw_out" is MIN of + * "bw_out_internal_limit" and "bw_out_external_limit". */ - uint32_t bpm_out_external_limit; + struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit; /** * What was our PING challenge number (for this peer)? @@ -602,6 +576,7 @@ struct Neighbour * Are we currently connected to this neighbour? */ int is_connected; + }; @@ -703,14 +678,15 @@ static unsigned long long preference_sum; static unsigned int neighbour_count; /** - * How much inbound bandwidth are we supposed to be using? + * How much inbound bandwidth are we supposed to be using per second? + * FIXME: this value is not used! */ -static unsigned long long bandwidth_target_in; +static unsigned long long bandwidth_target_in_bps; /** - * How much outbound bandwidth are we supposed to be using? + * How much outbound bandwidth are we supposed to be using per second? */ -static unsigned long long bandwidth_target_out; +static unsigned long long bandwidth_target_out_bps; @@ -742,50 +718,6 @@ update_preference_sum (unsigned long long inc) } -/** - * Recalculate the number of bytes we expect to - * receive or transmit in a given window. - * - * @param force force an update now (even if not much time has passed) - * @param window pointer to the byte counter (updated) - * @param ts pointer to the timestamp (updated) - * @param bpm number of bytes per minute that should - * be added to the window. - */ -static void -update_window (int force, - long long *window, - struct GNUNET_TIME_Absolute *ts, unsigned int bpm) -{ - struct GNUNET_TIME_Relative since; - unsigned long long increment; - - since = GNUNET_TIME_absolute_get_duration (*ts); - increment = (bpm * since.value) / 60 / 1000; -#if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Updating window with %u bpm after %llu ms by %llu\n", - bpm, - (unsigned long long) since.value, - increment); -#endif - if ( (force == GNUNET_NO) && - (since.value < 60 * 1000) && - (increment < 32 * 1024) ) - { -#if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not updating window, change too small.\n"); -#endif - return; /* not even a minute has passed */ - } - *ts = GNUNET_TIME_absolute_get (); - *window += increment; - if (*window > MAX_WINDOW_TIME * bpm) - *window = MAX_WINDOW_TIME * bpm; -} - - /** * Find the entry for the given neighbour. * @@ -1004,8 +936,8 @@ handle_client_request_info (void *cls, const struct RequestInfoMessage *rcm; struct Neighbour *n; struct ConfigurationInfoMessage cim; - int want_reserv; - int got_reserv; + int32_t want_reserv; + int32_t got_reserv; unsigned long long old_preference; struct GNUNET_SERVER_TransmitContext *tc; @@ -1019,32 +951,27 @@ handle_client_request_info (void *cls, if (n != NULL) { want_reserv = ntohl (rcm->reserve_inbound); - if (n->bpm_out_internal_limit != ntohl (rcm->limit_outbound_bpm)) - update_window (GNUNET_YES, - &n->available_send_window, - &n->last_asw_update, - n->bpm_out); - n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm); - n->bpm_out = GNUNET_MIN (n->bpm_out_internal_limit, - n->bpm_out_external_limit); + n->bw_out_internal_limit = rcm->limit_outbound; + n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, + n->bw_out_external_limit); + GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window, + n->bw_out); if (want_reserv < 0) { got_reserv = want_reserv; - n->available_recv_window -= want_reserv; } else if (want_reserv > 0) { - update_window (GNUNET_NO, - &n->available_recv_window, - &n->last_arw_update, n->bpm_in); - if (n->available_recv_window < want_reserv) - got_reserv = 0; /* all or nothing */ - else + if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window, + want_reserv).value == 0) got_reserv = want_reserv; - n->available_recv_window -= got_reserv; + else + got_reserv = 0; /* all or nothing */ } else got_reserv = 0; + GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, + got_reserv); old_preference = n->current_preference; n->current_preference += GNUNET_ntohll(rcm->preference_change); if (old_preference > n->current_preference) @@ -1056,13 +983,13 @@ handle_client_request_info (void *cls, #if DEBUG_CORE_QUOTA GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n", - want_reserv, + (int) want_reserv, GNUNET_i2s (&rcm->peer), - got_reserv); + (int) got_reserv); #endif cim.reserved_amount = htonl (got_reserv); - cim.bpm_in = htonl (n->bpm_in); - cim.bpm_out = htonl (n->bpm_out); + cim.bw_in = n->bw_in; + cim.bw_out = n->bw_out; cim.preference = n->current_preference; } cim.header.size = htons (sizeof (struct ConfigurationInfoMessage)); @@ -1247,7 +1174,8 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf) GNUNET_assert (size >= m->size); memcpy (cbuf, &m[1], m->size); ret = m->size; - n->available_send_window -= m->size; + GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, + m->size); #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Copied message of type %u and size %u into transport buffer for `%4s'\n", @@ -1444,9 +1372,9 @@ select_messages (struct Neighbour *n, unsigned int min_prio; struct GNUNET_TIME_Absolute t; struct GNUNET_TIME_Absolute now; - uint64_t delta; + struct GNUNET_TIME_Relative delta; uint64_t avail; - unsigned long long slack; /* how long could we wait before missing deadlines? */ + struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */ size_t off; int discard_low_prio; unsigned int queue_size; @@ -1472,18 +1400,13 @@ select_messages (struct Neighbour *n, min_prio = -1; discard_low_prio = GNUNET_NO; /* calculate number of bytes available for transmission at time "t" */ - update_window (GNUNET_NO, - &n->available_send_window, - &n->last_asw_update, - n->bpm_out); - avail = n->available_send_window; - t = n->last_asw_update; + avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window); + t = now; /* how many bytes have we (hypothetically) scheduled so far */ off = 0; /* maximum time we can wait before transmitting anything and still make all of our deadlines */ - slack = -1; - + slack = GNUNET_TIME_UNIT_FOREVER_REL; pos = n->messages; /* note that we use "*2" here because we want to look a bit further into the future; much more makes no @@ -1499,14 +1422,17 @@ select_messages (struct Neighbour *n, } if (discard_low_prio == GNUNET_NO) { - delta = pos->deadline.value; - if (delta < t.value) - delta = 0; - else - delta = t.value - delta; - avail += delta * n->bpm_out / 1000 / 60; + delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline); + if (delta.value > 0) + { + // FIXME: HUH? Check! + t = pos->deadline; + avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, + delta); + } if (avail < pos->size) { + // FIXME: HUH? Check! discard_low_prio = GNUNET_YES; /* we could not schedule this one! */ } else @@ -1515,23 +1441,25 @@ select_messages (struct Neighbour *n, /* update slack, considering both its absolute deadline and relative deadlines caused by other messages with their respective load */ - slack = GNUNET_MIN (slack, avail / n->bpm_out); + slack = GNUNET_TIME_relative_min (slack, + GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out, + avail)); if ( (pos->deadline.value < now.value) || (GNUNET_YES == pos->got_slack) ) { - slack = 0; + slack = GNUNET_TIME_UNIT_ZERO; } else { slack = - GNUNET_MIN (slack, pos->deadline.value - now.value); + GNUNET_TIME_relative_min (slack, + GNUNET_TIME_absolute_get_difference (now, pos->deadline)); pos->got_slack = GNUNET_YES; } } } - off += pos->size; - t.value = GNUNET_MAX (pos->deadline.value, t.value); + t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check! if (pos->priority <= min_prio) { /* update min for discard */ @@ -1550,7 +1478,7 @@ select_messages (struct Neighbour *n, } /* guard against sending "tiny" messages with large headers without urgent deadlines */ - if ( (slack > 1000) && + if ( (slack.value > 1000) && (size > 4 * off) && (queue_size < MAX_PEER_QUEUE_SIZE / 2) ) { @@ -1786,6 +1714,9 @@ set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Neighbour *n = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Retrying key transmission to `%4s'\n", + GNUNET_i2s (&n->peer)); n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK; n->set_key_retry_frequency = GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2); @@ -1901,13 +1832,13 @@ process_plaintext_neighbour_queue (struct Neighbour *n) } #if DEBUG_CORE_QUOTA GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending %llu as new limit to peer `%4s'\n", - (unsigned long long) n->bpm_in, + "Sending %u b/s as new limit to peer `%4s'\n", + (unsigned int) ntohl (n->bw_in.value__), GNUNET_i2s (&n->peer)); #endif ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1)); ph->sequence_number = htonl (++n->last_sequence_number_sent); - ph->inbound_bpm_limit = htonl (n->bpm_in); + ph->inbound_bw_limit = n->bw_in; ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); /* setup encryption message header */ @@ -1998,10 +1929,10 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid) n->encrypt_key_created = now; n->last_activity = now; n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY; - n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; - n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; - n->bpm_out_internal_limit = (uint32_t) - 1; - n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; + n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; + n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; + n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1); + n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t) - 1); schedule_quota_update (n); @@ -2148,7 +2079,15 @@ static size_t notify_transport_connect_done (void *cls, size_t size, void *buf) { struct Neighbour *n = cls; + n->th = NULL; + if (buf == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Failed to connect to `%4s': transport failed to connect\n"), + GNUNET_i2s (&n->peer)); + return 0; + } send_key (n); return 0; } @@ -2170,6 +2109,12 @@ handle_client_request_connect (void *cls, struct Neighbour *n; struct GNUNET_TIME_Relative timeout; + if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } GNUNET_SERVER_receive_done (client, GNUNET_OK); n = find_neighbour (&cm->peer); if (n == NULL) @@ -2541,7 +2486,7 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m) me->priority = PONG_PRIORITY; me->size = sizeof (struct PongMessage); tx.reserved = htonl (0); - tx.inbound_bpm_limit = htonl (n->bpm_in); + tx.inbound_bw_limit = n->bw_in; tx.challenge = t.challenge; tx.target = t.target; tp = (struct PongMessage *) &me[1]; @@ -2628,9 +2573,11 @@ handle_pong (struct Neighbour *n, return; case PEER_STATE_KEY_RECEIVED: n->status = PEER_STATE_KEY_CONFIRMED; - n->bpm_out_external_limit = ntohl (t.inbound_bpm_limit); - n->bpm_out = GNUNET_MIN (n->bpm_out_external_limit, - n->bpm_out_internal_limit); + n->bw_out_external_limit = t.inbound_bw_limit; + n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit, + n->bw_out_internal_limit); + GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window, + n->bw_out); #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Confirmed key via `%s' message for peer `%4s'\n", @@ -3087,22 +3034,18 @@ handle_encrypted_message (struct Neighbour *n, } /* process decrypted message(s) */ - if (n->bpm_out_external_limit != ntohl (pt->inbound_bpm_limit)) - { - update_window (GNUNET_YES, - &n->available_send_window, - &n->last_asw_update, - n->bpm_out); #if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received %llu as new inbound limit for peer `%4s'\n", - (unsigned long long) ntohl (pt->inbound_bpm_limit), - GNUNET_i2s (&n->peer)); -#endif - } - n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit); - n->bpm_out = GNUNET_MIN (n->bpm_out_external_limit, - n->bpm_out_internal_limit); + if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received %u b/s as new inbound limit for peer `%4s'\n", + (unsigned int) ntohl (pt->inbound_bw_limit.value__), + GNUNET_i2s (&n->peer)); +#endif + n->bw_out_external_limit = pt->inbound_bw_limit; + n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit, + n->bw_out_internal_limit); + GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window, + n->bw_out); n->last_activity = GNUNET_TIME_absolute_get (); off = sizeof (struct EncryptedMessage); deliver_messages (n, buf, size, off); @@ -3247,12 +3190,12 @@ neighbour_quota_update (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Neighbour *n = cls; - uint32_t q_in; + struct GNUNET_BANDWIDTH_Value32NBO q_in; double pref_rel; double share; unsigned long long distributable; - uint32_t qin_ms; - uint32_t qout_ms; + uint64_t need_per_peer; + uint64_t need_per_second; n->quota_update_task = GNUNET_SCHEDULER_NO_TASK; /* calculate relative preference among all neighbours; @@ -3267,43 +3210,45 @@ neighbour_quota_update (void *cls, { pref_rel = n->current_preference / preference_sum; } - + need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER, + GNUNET_TIME_UNIT_SECONDS); + need_per_second = need_per_peer * neighbour_count; distributable = 0; - if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER) - distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER; + if (bandwidth_target_out_bps > need_per_second) + distributable = bandwidth_target_out_bps - need_per_second; share = distributable * pref_rel; - q_in = MIN_BPM_PER_PEER + (unsigned long long) share; + if (share + need_per_peer > ( (uint32_t)-1)) + q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1); + else + q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share); /* check if we want to disconnect for good due to inactivity */ if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) && (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) ) { #if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Forcing disconnect of `%4s' due to inactivity (?).\n", - GNUNET_i2s (&n->peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Forcing disconnect of `%4s' due to inactivity (?).\n", + GNUNET_i2s (&n->peer)); #endif - q_in = 0; /* force disconnect */ + q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */ } #if DEBUG_CORE_QUOTA GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Current quota for `%4s' is %llu in (old: %llu) / %llu out (%llu internal)\n", + "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n", GNUNET_i2s (&n->peer), - (unsigned long long) q_in, - (unsigned long long) n->bpm_in, - (unsigned long long) n->bpm_out, - (unsigned long long) n->bpm_out_internal_limit); -#endif - if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) || - (n->bpm_in - MIN_BPM_CHANGE > q_in) ) - { - n->bpm_in = q_in; - /* need to convert to bytes / ms, rounding up! */ - qin_ms = (q_in == 0) ? 0 : 1 + q_in / 60000; - qout_ms = (n->bpm_out == 0) ? 0 : 1 + n->bpm_out / 60000; + (unsigned int) ntohl (q_in.value__), + bandwidth_target_out_bps, + (unsigned int) ntohl (n->bw_in.value__), + (unsigned int) ntohl (n->bw_out.value__), + (unsigned int) ntohl (n->bw_out_internal_limit.value__)); +#endif + if (n->bw_in.value__ != q_in.value__) + { + n->bw_in = q_in; GNUNET_TRANSPORT_set_quota (transport, &n->peer, - qin_ms, - qout_ms, + n->bw_in, + n->bw_out, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); } @@ -3348,8 +3293,12 @@ handle_transport_notify_connect (void *cls, n->is_connected = GNUNET_YES; n->last_latency = latency; n->last_distance = distance; - n->last_asw_update = now; - n->last_arw_update = now; + GNUNET_BANDWIDTH_tracker_init (&n->available_send_window, + n->bw_out, + MAX_WINDOW_TIME_S); + GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window, + n->bw_in, + MAX_WINDOW_TIME_S); #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n", @@ -3445,11 +3394,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *serv, const struct GNUNET_CONFIGURATION_Handle *c) { -#if 0 - unsigned long long qin; - unsigned long long qout; - unsigned long long tneigh; -#endif char *keyfile; sched = s; @@ -3460,12 +3404,12 @@ run (void *cls, GNUNET_CONFIGURATION_get_value_number (c, "CORE", "TOTAL_QUOTA_IN", - &bandwidth_target_in)) || + &bandwidth_target_in_bps)) || (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, "CORE", "TOTAL_QUOTA_OUT", - &bandwidth_target_out)) || + &bandwidth_target_out_bps)) || (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (c, "GNUNETD", diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index ac9290948..8fd47a006 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1217,8 +1217,8 @@ static void target_reservation_cb (void *cls, const struct GNUNET_PeerIdentity * peer, - unsigned int bpm_in, - unsigned int bpm_out, + struct GNUNET_BANDWIDTH_Value32NBO bpm_in, + struct GNUNET_BANDWIDTH_Value32NBO bpm_out, int amount, uint64_t preference) { @@ -1440,7 +1440,7 @@ forward_request_task (void *cls, pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, &psc.target, GNUNET_CONSTANTS_SERVICE_TIMEOUT, - (uint32_t) -1 /* no limit */, + GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), DBLOCK_SIZE, (uint64_t) cp->inc_preference, &target_reservation_cb, diff --git a/src/include/Makefile.am b/src/include/Makefile.am index 39fb2f0f5..605cbbe1e 100644 --- a/src/include/Makefile.am +++ b/src/include/Makefile.am @@ -13,6 +13,7 @@ nodist_gnunetinclude_HEADERS = \ gnunetinclude_HEADERS = \ gnunet_arm_service.h \ + gnunet_bandwidth_lib.h \ gnunet_bio_lib.h \ gnunet_client_lib.h \ gnunet_common.h \ diff --git a/src/include/gnunet_constants.h b/src/include/gnunet_constants.h index eb8addf5f..035cc60b9 100644 --- a/src/include/gnunet_constants.h +++ b/src/include/gnunet_constants.h @@ -35,13 +35,15 @@ extern "C" #endif #endif +#include "gnunet_bandwidth_lib.h" + /** - * Amount of bytes per minute (in/out) to assume initially (before - * either peer has communicated any particular preference). Should be - * rather low; set so that at least one maximum-size message can be - * send each minute. + * Bandwidth (in/out) to assume initially (before either peer has + * communicated any particular preference). Should be rather low; set + * so that at least one maximum-size message can be send roughly once + * per minute. */ -#define GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT GNUNET_SERVER_MAX_MESSAGE_SIZE +#define GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT GNUNET_BANDWIDTH_value_init (1024) /** * After how long do we consider a connection to a peer dead diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index af0a4eaa9..47b8d6f98 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -281,8 +281,8 @@ typedef void (*GNUNET_CORE_PeerConfigurationInfoCallback) (void *cls, const struct GNUNET_PeerIdentity * peer, - unsigned int bpm_in, - unsigned int bpm_out, + struct GNUNET_BANDWIDTH_Value32NBO bpm_in, + struct GNUNET_BANDWIDTH_Value32NBO bpm_out, int amount, uint64_t preference); @@ -302,11 +302,11 @@ struct GNUNET_CORE_InformationRequestContext; * @param peer identifies the peer * @param timeout after how long should we give up (and call "info" with NULL * for "peer" to signal an error)? - * @param bpm_out set to the current bandwidth limit (sending) for this peer, + * @param bw_out set to the current bandwidth limit (sending) for this peer, * caller should set "bpm_out" to "-1" to avoid changing - * the current value; otherwise "bpm_out" will be lowered to + * the current value; otherwise "bw_out" will be lowered to * the specified value; passing a pointer to "0" can be used to force - * us to disconnect from the peer; "bpm_out" might not increase + * us to disconnect from the peer; "bw_out" might not increase * as specified since the upper bound is generally * determined by the other peer! * @param amount reserve N bytes for receiving, negative @@ -324,7 +324,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_SCHEDULER_Handle *sched, const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *peer, struct GNUNET_TIME_Relative timeout, - uint32_t bpm_out, + struct GNUNET_BANDWIDTH_Value32NBO bw_out, int32_t amount, uint64_t preference, GNUNET_CORE_PeerConfigurationInfoCallback info, diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h index 7a3ccbf7b..47d4334bb 100644 --- a/src/include/gnunet_transport_service.h +++ b/src/include/gnunet_transport_service.h @@ -35,6 +35,7 @@ extern "C" #endif #endif +#include "gnunet_bandwidth_lib.h" #include "gnunet_configuration_lib.h" #include "gnunet_crypto_lib.h" #include "gnunet_connection_lib.h" @@ -153,8 +154,8 @@ void GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle); * * @param handle connection to transport service * @param target who's bandwidth quota is being changed - * @param quota_in incoming bandwidth quota in bytes per ms - * @param quota_out outgoing bandwidth quota in bytes per ms + * @param quota_in incoming bandwidth quota + * @param quota_out outgoing bandwidth quota * @param timeout how long to wait until signaling failure if * we can not communicate the quota change * @param cont continuation to call when done, will be called @@ -164,8 +165,8 @@ void GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle); void GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, const struct GNUNET_PeerIdentity *target, - uint32_t quota_in, - uint32_t quota_out, + struct GNUNET_BANDWIDTH_Value32NBO quota_in, + struct GNUNET_BANDWIDTH_Value32NBO quota_out, struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, void *cont_cls); diff --git a/src/include/gnunet_util_lib.h b/src/include/gnunet_util_lib.h index 1c5e1d7a1..e0a5b0774 100644 --- a/src/include/gnunet_util_lib.h +++ b/src/include/gnunet_util_lib.h @@ -37,6 +37,7 @@ extern "C" #endif #include "gnunet_common.h" +#include "gnunet_bandwidth_lib.h" #include "gnunet_bio_lib.h" #include "gnunet_client_lib.h" #include "gnunet_configuration_lib.h" diff --git a/src/topology/gnunet-daemon-topology.c b/src/topology/gnunet-daemon-topology.c index 6ae4d8210..7f761360d 100644 --- a/src/topology/gnunet-daemon-topology.c +++ b/src/topology/gnunet-daemon-topology.c @@ -832,6 +832,11 @@ consider_for_advertising (const struct GNUNET_HELLO_Message *hello) struct PeerList *pos; uint16_t size; + GNUNET_break (GNUNET_OK == GNUNET_HELLO_get_id (hello, &pid)); + if (0 == memcmp (&pid, + &my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; /* that's me! */ have_address = GNUNET_NO; GNUNET_HELLO_iterate_addresses (hello, GNUNET_NO, @@ -839,7 +844,6 @@ consider_for_advertising (const struct GNUNET_HELLO_Message *hello) &have_address); if (GNUNET_NO == have_address) return; /* no point in advertising this one... */ - GNUNET_break (GNUNET_OK == GNUNET_HELLO_get_id (hello, &pid)); peer = find_peer (&pid); if (peer == NULL) peer = make_peer (&pid, hello, GNUNET_NO); @@ -1098,13 +1102,24 @@ read_friends_file (const struct GNUNET_CONFIGURATION_Handle *cfg) } else { - entries_found++; - fl = make_peer (&pid, - NULL, - GNUNET_YES); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Found friend `%s' in configuration\n"), - GNUNET_i2s (&fl->id)); + if (0 != memcmp (&pid, + &my_identity, + sizeof (struct GNUNET_PeerIdentity))) + { + entries_found++; + fl = make_peer (&pid, + NULL, + GNUNET_YES); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Found friend `%s' in configuration\n"), + GNUNET_i2s (&fl->id)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Found myself `%s' in friend list (useless, ignored)\n"), + GNUNET_i2s (&fl->id)); + } } pos = pos + sizeof (struct GNUNET_CRYPTO_HashAsciiEncoded); while ((pos < frstat.st_size) && isspace (data[pos])) diff --git a/src/topology/test_gnunet_service_topology_data.conf b/src/topology/test_gnunet_service_topology_data.conf index 588134b8a..a7cf18955 100644 --- a/src/topology/test_gnunet_service_topology_data.conf +++ b/src/topology/test_gnunet_service_topology_data.conf @@ -8,6 +8,7 @@ PORT = 2664 [transport] PORT = 2665 PLUGINS = tcp +DEBUG = YES #PREFIX = xterm -e xterm -T transport -e gdb -x cmd --args #PREFIX = valgrind --tool=memcheck --log-file=logs%p diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index b99955807..3566c229c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -79,7 +79,7 @@ * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? */ -#define QUOTA_VIOLATION_DROP_THRESHOLD 100 +#define QUOTA_VIOLATION_DROP_THRESHOLD 10 /** * How long until a HELLO verification attempt should time out? @@ -99,7 +99,7 @@ /** * How often will we re-validate for latency information */ -#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) +#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) /** * Priority to use for PONG messages. @@ -456,9 +456,9 @@ struct NeighbourList struct GNUNET_TIME_Absolute peer_timeout; /** - * At what time did we reset last_received last? + * Tracker for inbound bandwidth. */ - struct GNUNET_TIME_Absolute last_quota_update; + struct GNUNET_BANDWIDTH_Tracker in_tracker; /** * The latency we have seen for this particular address for @@ -473,22 +473,6 @@ struct NeighbourList */ struct GNUNET_TIME_Relative latency; - /** - * DV distance to this peer (1 if no DV is used). - */ - uint32_t distance; - - /** - * How many bytes have we received since the "last_quota_update" - * timestamp? - */ - uint64_t last_received; - - /** - * Global quota for inbound traffic for the neighbour in bytes/ms. - */ - uint32_t quota_in; - /** * How often has the other peer (recently) violated the * inbound traffic limit? Incremented by 10 per violation, @@ -497,6 +481,11 @@ struct NeighbourList */ unsigned int quota_violation_count; + /** + * DV distance to this peer (1 if no DV is used). + */ + uint32_t distance; + /** * Have we seen an PONG from this neighbour in the past (and * not had a disconnect since)? @@ -898,56 +887,6 @@ find_transport (const char *short_name) } -/** - * Update the quota values for the given neighbour now. - * - * @param n neighbour to update - * @param force GNUNET_YES to force recalculation now - */ -static void -update_quota (struct NeighbourList *n, - int force) -{ - struct GNUNET_TIME_Absolute now; - unsigned long long delta; - uint64_t allowed; - uint64_t remaining; - - now = GNUNET_TIME_absolute_get (); - delta = now.value - n->last_quota_update.value; - allowed = n->quota_in * delta; - if ( (delta < MIN_QUOTA_REFRESH_TIME) && - (!force) && - (allowed < 32 * 1024) ) - return; /* too early, not enough data */ - if (n->last_received < allowed) - { - remaining = allowed - n->last_received; - if (n->quota_in > 0) - remaining /= n->quota_in; - else - remaining = 0; - if (remaining > MAX_BANDWIDTH_CARRY) - remaining = MAX_BANDWIDTH_CARRY; - n->last_received = 0; - n->last_quota_update = now; - n->last_quota_update.value -= remaining; - if (n->quota_violation_count > 0) - n->quota_violation_count--; - } - else - { - n->last_received -= allowed; - n->last_quota_update = now; - if (n->last_received > allowed) - { - /* much more than the allowed rate! */ - n->quota_violation_count += 10; - } - } -} - - /** * Function called to notify a client about the socket being ready to * queue more data. "buf" will be NULL and "size" zero if the socket @@ -1274,6 +1213,8 @@ try_transmission_to_peer (struct NeighbourList *neighbour) GNUNET_i2s (&mq->neighbour_id), timeout.value); #endif + /* FIXME: might want to trigger peerinfo lookup here + (unless that's already pending...) */ return; } GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, @@ -1788,12 +1729,14 @@ add_validated_address (void *cls, max); } -static void send_periodic_ping(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + +static void send_periodic_ping(void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + /** - * Iterator over hash map entries. Checks if the given - * validation entry is for the same challenge as what - * is given in the PONG. + * Iterator over hash map entries. Checks if the given validation + * entry is for the same challenge as what is given in the PONG. * * @param cls the 'struct TransportPongMessage*' * @param key peer identity @@ -1843,19 +1786,28 @@ check_pending_validation (void *cls, n = find_neighbour (&target); if (n != NULL) { - fal = add_peer_address (n, ve->transport_name, + fal = add_peer_address (n, + ve->transport_name, ve->addr, ve->addrlen); fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); fal->validated = GNUNET_YES; fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time); - periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext)); periodic_validation_context->foreign_address = fal; periodic_validation_context->transport = strdup(ve->transport_name); - memcpy(&periodic_validation_context->publicKey, &ve->publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); - - fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched, TRANSPORT_DEFAULT_REVALIDATION, &send_periodic_ping, periodic_validation_context); + memcpy(&periodic_validation_context->publicKey, + &ve->publicKey, + sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + /* FIXME: this causes all of the revalidation PINGs for the same HELLO + to be transmitted in bulk, which is not nice; also, + triggering these HERE means that revalidations do NOT happen AT ALL + for HELLOs a previous instance of this process validated (since + there is no "initial" validation PING => no revalidation => BUG! */ + fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched, + TRANSPORT_DEFAULT_REVALIDATION, + &send_periodic_ping, + periodic_validation_context); if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value) n->latency = fal->latency; else @@ -1985,11 +1937,12 @@ setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) n->next = neighbours; neighbours = n; n->id = *peer; - n->last_quota_update = GNUNET_TIME_absolute_get (); n->peer_timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); + GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + MAX_BANDWIDTH_CARRY_S); tp = plugins; while (tp != NULL) { @@ -2098,6 +2051,7 @@ timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext * GNUNET_free (va); } + /** * Check if the given address is already being validated; if not, * append the given address to the list of entries that are being be @@ -2229,7 +2183,9 @@ rerun_validation (void *cls, * that gets discarded on the other side instead of initiating * a flood. */ -static void send_periodic_ping(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +static void +send_periodic_ping (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeriodicValidationContext *periodic_validation_context = cls; @@ -2239,8 +2195,11 @@ static void send_periodic_ping(void *cls, const struct GNUNET_SCHEDULER_TaskCont GNUNET_free(periodic_validation_context); return; /* We have been shutdown, don't do anything! */ } - - rerun_validation(&periodic_validation_context->publicKey, periodic_validation_context->transport, periodic_validation_context->foreign_address->expires, periodic_validation_context->foreign_address->addr, periodic_validation_context->foreign_address->addrlen); + rerun_validation(&periodic_validation_context->publicKey, + periodic_validation_context->transport, + periodic_validation_context->foreign_address->expires, + periodic_validation_context->foreign_address->addr, + periodic_validation_context->foreign_address->addrlen); GNUNET_free(periodic_validation_context->transport); GNUNET_free(periodic_validation_context); } @@ -2367,8 +2326,6 @@ run_validation (void *cls, } - - /** * Add the given address to the list of foreign addresses * available for the given peer (check for duplicates). @@ -2388,7 +2345,9 @@ add_to_foreign_address_list (void *cls, { struct NeighbourList *n = cls; struct ForeignAddressList *fal; + int try; + try = GNUNET_NO; fal = find_peer_address (n, tname, addr, addrlen); if (fal == NULL) { @@ -2401,12 +2360,15 @@ add_to_foreign_address_list (void *cls, expiration.value); #endif fal = add_peer_address (n, tname, addr, addrlen); + try = GNUNET_YES; } if (fal == NULL) return GNUNET_OK; fal->expires = GNUNET_TIME_absolute_max (expiration, fal->expires); - fal->validated = GNUNET_YES; + fal->validated = GNUNET_YES; + if (try == GNUNET_YES) + try_transmission_to_peer (n); return GNUNET_OK; } @@ -2524,14 +2486,13 @@ process_hello (struct TransportPlugin *plugin, GNUNET_CRYPTO_hash (&publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &target.hashPubKey); -#if DEBUG_TRANSPORT +#if DEBUG_TRANSPORT > 1 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' message for `%4s' of size %u\n", "HELLO", GNUNET_i2s (&target), GNUNET_HELLO_size(hello)); #endif - chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize); chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1]; memcpy (&chvc[1], hello, hsize); @@ -2739,47 +2700,6 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, } -/** - * Calculate how long we should delay reading from the TCP socket to - * ensure that we stay within our bandwidth limits (push back). - * - * @param n for which neighbour should this be calculated - * @return how long to delay receiving more data - */ -static struct GNUNET_TIME_Relative -calculate_throttle_delay (struct NeighbourList *n) -{ - struct GNUNET_TIME_Relative ret; - struct GNUNET_TIME_Absolute now; - uint64_t del; - uint64_t avail; - uint64_t excess; - - now = GNUNET_TIME_absolute_get (); - del = now.value - n->last_quota_update.value; - if (del > MAX_BANDWIDTH_CARRY) - { - update_quota (n, GNUNET_YES); - del = now.value - n->last_quota_update.value; - GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); - } - if (n->quota_in == 0) - n->quota_in = 1; /* avoid divison by zero */ - avail = del * n->quota_in; - if (avail > n->last_received) - return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ - excess = n->last_received - avail; - ret.value = excess / n->quota_in; - if (ret.value > 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", - (unsigned long long) excess, - (unsigned long long) n->quota_in, - (unsigned long long) ret.value); - return ret; -} - - /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2809,11 +2729,11 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, struct ForeignAddressList *peer_address; uint16_t msize; struct NeighbourList *n; + struct GNUNET_TIME_Relative ret; n = find_neighbour (peer); if (n == NULL) n = setup_new_neighbour (peer); - update_quota (n, GNUNET_NO); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; @@ -2838,7 +2758,7 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); } /* update traffic received amount ... */ - msize = ntohs (message->size); + msize = ntohs (message->size); n->distance = distance; n->peer_timeout = GNUNET_TIME_relative_to_absolute @@ -2855,7 +2775,8 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"), + n->in_tracker.available_bytes_per_s__, n->quota_violation_count); return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ } @@ -2876,6 +2797,11 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, "Received message of type %u from `%4s', sending to all clients.\n", ntohs (message->type), GNUNET_i2s (peer)); #endif + if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, + msize)) + n->quota_violation_count++; + else + n->quota_violation_count = 0; /* back within limits */ /* transmit message to all clients */ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); im->header.size = htons (sizeof (struct InboundMessage) + msize); @@ -2892,7 +2818,14 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_free (im); } } - return calculate_throttle_delay (n); + ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0); + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n", + (unsigned long long) n->in_tracker.consumption_since_last_update__, + (unsigned int) n->in_tracker.available_bytes_per_s__, + (unsigned long long) ret.value); + return ret; } @@ -3058,7 +2991,6 @@ handle_set_quota (void *cls, const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; - uint32_t qin; n = find_neighbour (&qsm->peer); if (n == NULL) @@ -3066,16 +2998,16 @@ handle_set_quota (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - qin = ntohl (qsm->quota_in); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", - "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); + "SET_QUOTA", + (unsigned int) ntohl (qsm->quota.value__), + (unsigned int) n->in_tracker.available_bytes_per_s__, + GNUNET_i2s (&qsm->peer)); #endif - update_quota (n, GNUNET_YES); - if (n->quota_in < qin) - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = qin; + GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, + qsm->quota); GNUNET_SERVER_receive_done (client, GNUNET_OK); } diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 9b6200b80..3b198841a 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -39,7 +39,7 @@ #include "plugin_transport.h" #include "transport.h" -#define DEBUG_TCP GNUNET_NO +#define DEBUG_TCP GNUNET_YES /** * How long until we give up on transmitting the welcome message? diff --git a/src/transport/test_transport_api.c b/src/transport/test_transport_api.c index 4895e758a..e159b054e 100644 --- a/src/transport/test_transport_api.c +++ b/src/transport/test_transport_api.c @@ -226,7 +226,6 @@ exchange_hello_last (void *cls, struct PeerContext *me = cls; GNUNET_TRANSPORT_get_hello_cancel (p2.th, &exchange_hello_last, me); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Exchanging HELLO with peer (%p)!\n", cls); GNUNET_assert (ok >= 3); @@ -266,7 +265,6 @@ exchange_hello (void *cls, "Received HELLO size %d\n", GNUNET_HELLO_size((const struct GNUNET_HELLO_Message *)message)); GNUNET_TRANSPORT_offer_hello (p2.th, message); - GNUNET_TRANSPORT_get_hello (p2.th, &exchange_hello_last, &p2); } diff --git a/src/transport/transport.h b/src/transport/transport.h index 993efac7c..d66b87b35 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -30,15 +30,15 @@ #include "gnunet_time_lib.h" #include "gnunet_transport_service.h" -#define DEBUG_TRANSPORT GNUNET_NO -#define DEBUG_TRANSPORT_TIMEOUT GNUNET_NO -#define DEBUG_TRANSPORT_DISCONNECT GNUNET_NO +#define DEBUG_TRANSPORT GNUNET_YES +#define DEBUG_TRANSPORT_TIMEOUT GNUNET_YES +#define DEBUG_TRANSPORT_DISCONNECT GNUNET_YES /** * For how long do we allow unused bandwidth - * from the past to carry over into the future? (in ms) + * from the past to carry over into the future? (in seconds) */ -#define MAX_BANDWIDTH_CARRY 5000 +#define MAX_BANDWIDTH_CARRY_S 5 /** * How often do we (at most) do a full quota @@ -102,9 +102,9 @@ struct DisconnectInfoMessage /** - * Message used to set a particular bandwidth quota. Send - * TO the service to set an incoming quota, send FROM the - * service to update an outgoing quota. + * Message used to set a particular bandwidth quota. Send TO the + * service to set an incoming quota, send FROM the service to update + * an outgoing quota. */ struct QuotaSetMessage { @@ -115,10 +115,9 @@ struct QuotaSetMessage struct GNUNET_MessageHeader header; /** - * Quota in bytes per ms, 0 to drop everything; - * in network byte order. + * Quota. */ - uint32_t quota_in GNUNET_PACKED; + struct GNUNET_BANDWIDTH_Value32NBO quota; /** * About which peer are we talking here? diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index 26106ae48..7328dc95d 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -24,6 +24,7 @@ * @author Christian Grothoff */ #include "platform.h" +#include "gnunet_bandwidth_lib.h" #include "gnunet_client_lib.h" #include "gnunet_constants.h" #include "gnunet_container_lib.h" @@ -215,20 +216,9 @@ struct NeighbourList struct GNUNET_PeerIdentity id; /** - * At what time did we reset last_sent last? + * Outbound bandwidh tracker. */ - struct GNUNET_TIME_Absolute last_quota_update; - - /** - * How many bytes have we sent since the "last_quota_update" - * timestamp? - */ - uint64_t last_sent; - - /** - * Quota for outbound traffic to the neighbour in bytes/ms. - */ - uint32_t quota_out; + struct GNUNET_BANDWIDTH_Tracker out_tracker; /** * Set to GNUNET_NO if we are currently allowed to accept a @@ -416,41 +406,6 @@ quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } -/** - * Update the quota values for the given neighbour now. - * - * @param n neighbour to update - */ -static void -update_quota (struct NeighbourList *n) -{ - struct GNUNET_TIME_Relative delta; - uint64_t allowed; - uint64_t remaining; - - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - allowed = delta.value * n->quota_out; - if (n->last_sent < allowed) - { - remaining = allowed - n->last_sent; - if (n->quota_out > 0) - remaining /= n->quota_out; - else - remaining = 0; - if (remaining > MAX_BANDWIDTH_CARRY) - remaining = MAX_BANDWIDTH_CARRY; - n->last_sent = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->last_quota_update.value -= remaining; - } - else - { - n->last_sent -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); - } -} - - /** * Figure out which transmission to a peer can be done right now. * If none can, schedule a task to call 'schedule_transmission' @@ -470,7 +425,6 @@ schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) struct NeighbourList *next; struct GNUNET_TIME_Relative retry_time; struct GNUNET_TIME_Relative duration; - uint64_t available; if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) { @@ -487,47 +441,38 @@ schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) if (n->transmit_stage != TS_QUEUED) continue; /* not eligible */ th = &n->transmit_handle; + GNUNET_break (n == th->neighbour); /* check outgoing quota */ - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (duration.value > MIN_QUOTA_REFRESH_TIME) - { - update_quota (n); - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - } - available = duration.value * n->quota_out; - if (available < n->last_sent + th->notify_size) + duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, + th->notify_size - sizeof (struct OutboundMessage)); + if (th->timeout.value < duration.value) { - /* calculate how much bandwidth we'd still need to - accumulate and based on that how long we'll have - to wait... */ - available = n->last_sent + th->notify_size - available; - duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - available / n->quota_out); - if (duration.value == 0) - duration = GNUNET_TIME_UNIT_MILLISECONDS; - if (th->timeout.value < - GNUNET_TIME_relative_to_absolute (duration).value) - { - /* signal timeout! */ + /* signal timeout! */ #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", - duration.value, GNUNET_i2s (&n->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", + duration.value, + GNUNET_i2s (&n->id)); #endif - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - n->transmit_stage = TS_NEW; - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - continue; - } + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + n->transmit_stage = TS_NEW; + if (NULL != th->notify) + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + continue; + } + if (duration.value > 0) + { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", - GNUNET_i2s (&n->id), duration.value); + "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n", + (unsigned int) n->out_tracker.available_bytes_per_s__, + (unsigned int) th->notify_size - sizeof (struct OutboundMessage), + GNUNET_i2s (&n->id), + duration.value); #endif retry_time = GNUNET_TIME_relative_min (retry_time, duration); @@ -536,9 +481,9 @@ schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Have %u bytes of bandwidth available for transmission to `%4s' right now\n", - th->notify_size, + th->notify_size - sizeof (struct OutboundMessage), GNUNET_i2s (&n->id)); -#endif +#endif if ( (ret == NULL) || (ret->priority < th->priority) ) ret = th; @@ -630,6 +575,13 @@ transport_notify_ready (void *cls, size_t size, void *buf) size - sizeof (struct OutboundMessage), &cbuf[ret + sizeof (struct OutboundMessage)]); GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message of %u bytes with timeout %llums constructed for `%4s'\n", + (unsigned int) mret, + (unsigned long long) GNUNET_TIME_absolute_get_remaining (th->timeout).value, + GNUNET_i2s (&n->id)); +#endif if (mret != 0) { obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); @@ -640,6 +592,7 @@ transport_notify_ready (void *cls, size_t size, void *buf) memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage)); ret += (mret + sizeof (struct OutboundMessage)); size -= (mret + sizeof (struct OutboundMessage)); + GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret); } else { @@ -799,7 +752,7 @@ struct SetQuotaContext struct GNUNET_TIME_Absolute timeout; - uint32_t quota_in; + struct GNUNET_BANDWIDTH_Value32NBO quota_in; }; @@ -836,7 +789,7 @@ send_set_quota (void *cls, size_t size, void *buf) msg = buf; msg->header.size = htons (sizeof (struct QuotaSetMessage)); msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); - msg->quota_in = htonl (sqc->quota_in); + msg->quota = sqc->quota_in; memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity)); if (sqc->cont != NULL) GNUNET_SCHEDULER_add_continuation (sqc->handle->sched, @@ -865,8 +818,8 @@ send_set_quota (void *cls, size_t size, void *buf) void GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, const struct GNUNET_PeerIdentity *target, - uint32_t quota_in, - uint32_t quota_out, + struct GNUNET_BANDWIDTH_Value32NBO quota_in, + struct GNUNET_BANDWIDTH_Value32NBO quota_out, struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, void *cont_cls) { @@ -876,10 +829,14 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, n = neighbour_find (handle, target); if (n != NULL) { - update_quota (n); - if (n->quota_out < quota_out) - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_out = quota_out; + if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Quota changed from %u to %u for peer `%s'\n", + (unsigned int) n->out_tracker.available_bytes_per_s__, + (unsigned int) ntohl (quota_out.value__), + GNUNET_i2s (target)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, + quota_out); } sqc = GNUNET_malloc (sizeof (struct SetQuotaContext)); sqc->handle = handle; @@ -1247,9 +1204,10 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h, #endif n = GNUNET_malloc (sizeof (struct NeighbourList)); n->id = *pid; - n->last_quota_update = GNUNET_TIME_absolute_get (); + GNUNET_BANDWIDTH_tracker_init (&n->out_tracker, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + MAX_BANDWIDTH_CARRY_S); n->next = h->neighbours; - n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; n->h = h; h->neighbours = n; return n; @@ -1688,8 +1646,9 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking transport service for transmission of %u bytes to peer `%4s'.\n", - size, GNUNET_i2s (target)); + "Asking transport service for transmission of %u bytes to peer `%4s' within %llu ms.\n", + size, GNUNET_i2s (target), + (unsigned long long) timeout.value); #endif n = neighbour_find (handle, target); if (n == NULL) -- cgit v1.2.3