From 70ae2bd54ee0ff610d95a63856f38395920b804d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 10 Dec 2018 23:29:52 +0100 Subject: finish first draft of ats2 simple plugin (untested) --- src/ats/Makefile.am | 3 +- src/ats/gnunet-service-ats-new.c | 69 +++-- src/ats/plugin_ats2_simple.c | 600 ++++++++++++++++++++++++++++++++++--- src/hello/hello-ng.c | 5 +- src/include/gnunet_hello_lib.h | 10 +- src/transport/gnunet-service-tng.c | 2 +- 6 files changed, 612 insertions(+), 77 deletions(-) diff --git a/src/ats/Makefile.am b/src/ats/Makefile.am index 97497c94e..147d371e6 100644 --- a/src/ats/Makefile.am +++ b/src/ats/Makefile.am @@ -81,6 +81,7 @@ libgnunet_plugin_ats_proportional_la_LDFLAGS = \ libgnunet_plugin_ats2_simple_la_SOURCES = \ plugin_ats2_simple.c libgnunet_plugin_ats2_simple_la_LIBADD = \ + $(top_builddir)/src/hello/libgnunethello.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ $(LTLIBINTL) @@ -130,7 +131,7 @@ gnunet_service_ats_LDADD = \ $(GN_LIBINTL) gnunet_service_ats_new_SOURCES = \ - gnunet-service-ats-new.c + gnunet-service-ats-new.c gnunet_service_ats_new_LDADD = \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ diff --git a/src/ats/gnunet-service-ats-new.c b/src/ats/gnunet-service-ats-new.c index d3b2f1ead..fa8c07a1a 100644 --- a/src/ats/gnunet-service-ats-new.c +++ b/src/ats/gnunet-service-ats-new.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ @@ -78,7 +78,7 @@ struct ClientPreference * Plugin's representation of the preference. */ struct GNUNET_ATS_PreferenceHandle *ph; - + /** * Details about the preference. */ @@ -93,7 +93,7 @@ struct GNUNET_ATS_Session { /** - * Session data exposed to the plugin. + * Session data exposed to the plugin. */ struct GNUNET_ATS_SessionData data; @@ -106,12 +106,12 @@ struct GNUNET_ATS_Session * Session state in the plugin. */ struct GNUNET_ATS_SessionHandle *sh; - + /** * Unique ID for the session when talking with the client. - */ + */ uint32_t session_id; - + }; @@ -146,12 +146,12 @@ struct Client * Head of DLL of preferences expressed by this client. */ struct ClientPreference *cp_head; - + /** * Tail of DLL of preferences expressed by this client. */ struct ClientPreference *cp_tail; - + } application; struct { @@ -160,9 +160,9 @@ struct Client * Map from session IDs to `struct GNUNET_ATS_Session` objects. */ struct GNUNET_CONTAINER_MultiHashMap32 *sessions; - + } transport; - + } details; }; @@ -196,7 +196,7 @@ static struct Client *transport_client; * @param cls closure, NULL * @param pid peer this is about * @param address address the transport should try - */ + */ static void suggest_cb (void *cls, const struct GNUNET_PeerIdentity *pid, @@ -205,7 +205,7 @@ suggest_cb (void *cls, struct GNUNET_MQ_Envelope *env; size_t slen = strlen (address) + 1; struct AddressSuggestionMessage *as; - + if (NULL == transport_client) { // FIXME: stats! @@ -285,7 +285,7 @@ prop_ntoh (const struct PropertiesNBO *properties, /** - * We have received a `struct ExpressPreferenceMessage` from an application client. + * We have received a `struct ExpressPreferenceMessage` from an application client. * * @param cls handle to the client * @param msg the start message @@ -320,7 +320,7 @@ handle_suggest (void *cls, /** - * We have received a `struct ExpressPreferenceMessage` from an application client. + * We have received a `struct ExpressPreferenceMessage` from an application client. * * @param cls handle to the client * @param msg the start message @@ -331,7 +331,7 @@ handle_suggest_cancel (void *cls, { struct Client *c = cls; struct ClientPreference *cp; - + if (CT_NONE == c->type) c->type = CT_APPLICATION; if (CT_APPLICATION != c->type) @@ -398,7 +398,7 @@ handle_start (void *cls, /** - * Check 'session_add' message is well-formed and comes from a + * Check 'session_add' message is well-formed and comes from a * transport client. * * @param cls client that sent the request @@ -433,7 +433,7 @@ handle_session_add (void *cls, { struct Client *c = cls; const char *address = (const char *) &message[1]; - struct GNUNET_ATS_Session *session; + struct GNUNET_ATS_Session *session; int inbound_only = (GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY == ntohs (message->header.type)); @@ -477,7 +477,7 @@ handle_session_update (void *cls, { struct Client *c = cls; struct GNUNET_ATS_Session *session; - + if (CT_TRANSPORT != c->type) { GNUNET_break (0); @@ -527,7 +527,7 @@ handle_session_del (void *cls, GNUNET_break (0); GNUNET_SERVICE_client_drop (c->client); return; - } + } plugin->session_del (plugin->cls, session->sh, &session->data); @@ -637,15 +637,14 @@ client_disconnect_cb (void *cls, /** - * Task run during shutdown. + * Task run at the end during shutdown. * * @param cls unused */ static void -cleanup_task (void *cls) +final_cleanup (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ATS shutdown initiated\n"); + (void) cls; if (NULL != stats) { GNUNET_STATISTICS_destroy (stats, @@ -666,6 +665,22 @@ cleanup_task (void *cls) } +/** + * Task run during shutdown. + * + * @param cls unused + */ +static void +cleanup_task (void *cls) +{ + (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS shutdown initiated\n"); + GNUNET_SCHEDULER_add_now (&final_cleanup, + NULL); +} + + /** * Process template requests. * @@ -680,7 +695,7 @@ run (void *cls, { static struct GNUNET_ATS_PluginEnvironment env; char *solver; - + stats = GNUNET_STATISTICS_create ("ats", cfg); if (GNUNET_SYSERR == @@ -711,7 +726,7 @@ run (void *cls, _("Failed to initialize solver `%s'!\n"), plugin_name); GNUNET_SCHEDULER_shutdown (); - return; + return; } } @@ -746,11 +761,11 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY, struct SessionAddMessage, NULL), - GNUNET_MQ_hd_fixed_size (session_update, + GNUNET_MQ_hd_fixed_size (session_update, GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE, struct SessionUpdateMessage, NULL), - GNUNET_MQ_hd_fixed_size (session_del, + GNUNET_MQ_hd_fixed_size (session_del, GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL, struct SessionDelMessage, NULL), diff --git a/src/ats/plugin_ats2_simple.c b/src/ats/plugin_ats2_simple.c index 55234f1bc..6faf8ad7c 100644 --- a/src/ats/plugin_ats2_simple.c +++ b/src/ats/plugin_ats2_simple.c @@ -22,25 +22,47 @@ * @author Christian Grothoff * * TODO: - * - subscribe to PEERSTORE when short on HELLOs (given application preferences!) - * - keep track of HELLOs and when we tried them last => re-suggest - * - sum up preferences per peer, keep totals! => PeerMap pid -> [preferences + sessions + addrs!] - * - sum up preferences overall, keep global sum => starting point for "proportional" - * - store DLL of available sessions per peer + * - needs testing */ #include "platform.h" #include "gnunet_ats_plugin_new.h" +#include "gnunet_hello_lib.h" #include "gnunet_peerstore_service.h" #define LOG(kind,...) GNUNET_log_from (kind, "ats-simple",__VA_ARGS__) +/** + * Base frequency at which we suggest addresses to transport. + * Multiplied by the square of the number of active connections + * (and randomized) to calculate the actual frequency at which + * we will suggest addresses to the transport. Furthermore, each + * address is also bounded by an exponential back-off. + */ +#define SUGGEST_FREQ GNUNET_TIME_UNIT_SECONDS + +/** + * What is the minimum bandwidth we always try to allocate for + * any session that is up? (May still be scaled down lower if + * the number of sessions is so high that the total bandwidth + * is insufficient to allow for this value to be granted.) + */ +#define MIN_BANDWIDTH_PER_SESSION 1024 + + /** * A handle for the proportional solver */ struct SimpleHandle; +/** + * Information about preferences and sessions we track + * per peer. + */ +struct Peer; + + /** * Entry in list of addresses we could try per peer. */ @@ -57,11 +79,27 @@ struct Hello */ struct Hello *prev; + /** + * Peer this hello belongs to. + */ + struct Peer *peer; + /** * The address we could try. */ const char *address; + /** + * Is a session with this address already up? + * If not, set to NULL. + */ + struct GNUNET_ATS_SessionHandle *sh; + + /** + * When does the HELLO expire? + */ + struct GNUNET_TIME_Absolute expiration; + /** * When did we try it last? */ @@ -73,21 +111,13 @@ struct Hello struct GNUNET_TIME_Relative backoff; /** - * Is a session with this address already up? - * If not, set to NULL. + * Type of the network for this HELLO. */ - struct GNUNET_ATS_SessionHandle *sh; + enum GNUNET_NetworkType nt; }; -/** - * Information about preferences and sessions we track - * per peer. - */ -struct Peer; - - /** * Internal representation of a session by the plugin. * (If desired, plugin may just use NULL.) @@ -130,6 +160,12 @@ struct GNUNET_ATS_SessionHandle */ const char *address; + /** + * When did we last update transport about the allocation? + * Used to dampen the frequency of updates. + */ + struct GNUNET_TIME_Absolute last_allocation; + /** * Last BW-in allocation given to the transport service. */ @@ -140,6 +176,16 @@ struct GNUNET_ATS_SessionHandle */ struct GNUNET_BANDWIDTH_Value32NBO bw_out; + /** + * New BW-in allocation given to the transport service. + */ + uint64_t target_in; + + /** + * New BW-out allocation given to the transport service. + */ + uint64_t target_out; + }; @@ -176,26 +222,31 @@ struct Peer struct SimpleHandle *h; /** - * Which peer is this for? + * Watch context where we are currently looking for HELLOs for + * this peer. */ - struct GNUNET_PeerIdentity pid; + struct GNUNET_PEERSTORE_WatchContext *wc; /** - * Array where we sum up the bandwidth requests received indexed - * by preference kind (see `enum GNUNET_MQ_PreferenceKind`) + * Task used to try again to suggest an address for this peer. */ - uint64_t bw_by_pk[GNUNET_MQ_PREFERENCE_COUNT]; + struct GNUNET_SCHEDULER_Task *task; /** - * Watch context where we are currently looking for HELLOs for - * this peer. + * Which peer is this for? */ - struct GNUNET_PEERSTORE_WatchContext *wc; + struct GNUNET_PeerIdentity pid; /** - * Task used to try again to suggest an address for this peer. + * When did we last suggest an address to connect to for this peer? */ - struct GNUNET_SCHEDULER_Task *task; + struct GNUNET_TIME_Absolute last_suggestion; + + /** + * Array where we sum up the bandwidth requests received indexed + * by preference kind (see `enum GNUNET_MQ_PreferenceKind`) + */ + uint64_t bw_by_pk[GNUNET_MQ_PREFERENCE_COUNT]; }; @@ -241,14 +292,21 @@ struct SimpleHandle struct GNUNET_CONTAINER_MultiPeerMap *peers; /** - * Information we track per network type (quotas). + * Handle to the peerstore service. */ - struct Network networks[GNUNET_NT_COUNT]; + struct GNUNET_PEERSTORE_Handle *ps; /** - * Handle to the peerstore service. + * Array where we sum up the bandwidth requests received indexed + * by preference kind (see `enum GNUNET_MQ_PreferenceKind`) (sums + * over all peers). */ - struct GNUNET_PEERSTORE_Handle *ps; + uint64_t bw_by_pk[GNUNET_MQ_PREFERENCE_COUNT]; + + /** + * Information we track per network type (quotas). + */ + struct Network networks[GNUNET_NT_COUNT]; }; @@ -290,6 +348,129 @@ peer_test_dead (struct Peer *p) } +/** + * Contact the transport service and suggest to it to + * try connecting to the address of @a hello. Updates + * backoff and timestamp values in the @a hello. + * + * @param hello[in,out] address suggestion to make + */ +static void +suggest_hello (struct Hello *hello) +{ + struct Peer *p = hello->peer; + struct SimpleHandle *h = p->h; + + p->last_suggestion + = hello->last_attempt + = GNUNET_TIME_absolute_get (); + hello->backoff = GNUNET_TIME_randomized_backoff (hello->backoff, + GNUNET_TIME_absolute_get_remaining (hello->expiration)); + h->env->suggest_cb (h->env->cls, + &p->pid, + hello->address); +} + + +/** + * Consider suggesting a HELLO (without a session) to transport. + * We look at how many active sessions we have for the peer, and + * if there are many, reduce the frequency of trying new addresses. + * Also, for each address we consider when we last tried it, and + * its exponential backoff if the attempt failed. Note that it + * is possible that this function is called when no suggestion + * is to be made. + * + * In this case, we only calculate the time until we make the next + * suggestion. + * + * @param cls a `struct Peer` + */ +static void +suggest_start_cb (void *cls) +{ + struct Peer *p = cls; + struct GNUNET_TIME_Relative delay = GNUNET_TIME_UNIT_ZERO; + struct Hello *hello = NULL; + struct GNUNET_TIME_Absolute hpt = GNUNET_TIME_UNIT_FOREVER_ABS; + struct GNUNET_TIME_Relative xdelay; + struct GNUNET_TIME_Absolute xnext; + unsigned int num_sessions = 0; + uint32_t sq; + + /* count number of active sessions */ + for (struct GNUNET_ATS_SessionHandle *sh = p->sh_head; + NULL != sh; + sh = sh->next) + num_sessions++; + /* calculate square of number of sessions */ + num_sessions++; /* start with 1, even if we have zero sessions */ + if (num_sessions < UINT16_MAX) + sq = num_sessions * (uint32_t) num_sessions; + else + sq = UINT32_MAX; + xdelay = GNUNET_TIME_randomized_backoff (GNUNET_TIME_relative_multiply (SUGGEST_FREQ, + sq), + GNUNET_TIME_UNIT_FOREVER_REL); + xnext = GNUNET_TIME_relative_to_absolute (xdelay); + + p->task = NULL; + while (0 == delay.rel_value_us) + { + struct Hello *next; + struct GNUNET_TIME_Absolute xmax; + + if (NULL != hello) + { + /* We went through the loop already once and found + a HELLO that is due *now*, so make a suggestion! */ + GNUNET_break (NULL == hello->sh); + suggest_hello (hello); + hello = NULL; + hpt = GNUNET_TIME_UNIT_FOREVER_ABS; + } + for (struct Hello *pos = p->h_head; NULL != pos; pos = next) + { + struct GNUNET_TIME_Absolute pt; + + next = pos->next; + if (NULL != pos->sh) + continue; + if (0 == GNUNET_TIME_absolute_get_remaining (pos->expiration).rel_value_us) + { + /* expired, remove! */ + GNUNET_CONTAINER_DLL_remove (p->h_head, + p->h_tail, + pos); + GNUNET_free (pos); + continue; + } + pt = GNUNET_TIME_absolute_add (pos->last_attempt, + pos->backoff); + if ( (NULL == hello) || + (pt.abs_value_us < hpt.abs_value_us) ) + { + hello = pos; + hpt = pt; + } + } + if (NULL == hello) + return; /* no HELLOs that could still be tried */ + + /* hpt is now the *earliest* possible time for any HELLO + but we might not want to go for as early as possible for + this peer. So the actual time is the max of the earliest + HELLO and the 'xnext' */ + xmax = GNUNET_TIME_absolute_max (hpt, + xnext); + delay = GNUNET_TIME_absolute_get_remaining (xmax); + } + p->task = GNUNET_SCHEDULER_add_delayed (delay, + &suggest_start_cb, + p); +} + + /** * Function called by PEERSTORE for each matching record. * @@ -303,15 +484,85 @@ watch_cb (void *cls, const char *emsg) { struct Peer *p = cls; + char *addr; + size_t alen; + enum GNUNET_NetworkType nt; + struct GNUNET_TIME_Absolute expiration; + struct Hello *hello; - // FIXME: process hello! - // check for expiration - // (add to p's doubly-linked list) - - if (NULL == p->task) + if (0 != memcmp (&p->pid, + &record->peer, + sizeof (struct GNUNET_PeerIdentity))) { - // start suggestion task! + GNUNET_break (0); + return; + } + if (0 != strcmp (record->key, + GNUNET_HELLO_PEERSTORE_KEY)) + { + GNUNET_break (0); + return; + } + addr = GNUNET_HELLO_extract_address (record->value, + record->value_size, + &p->pid, + &nt, + &expiration); + if (NULL == addr) + return; /* invalid hello, bad signature, other problem */ + if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us) + { + /* expired, ignore */ + GNUNET_free (addr); + return; + } + /* check if addr is already known */ + for (struct Hello *he = p->h_head; + NULL != he; + he = he->next) + { + if (0 != strcmp (he->address, + addr)) + continue; + if (he->expiration.abs_value_us < expiration.abs_value_us) + { + he->expiration = expiration; + he->nt = nt; + } + GNUNET_free (addr); + return; + } + /* create new HELLO */ + alen = strlen (addr) + 1; + hello = GNUNET_malloc (sizeof (struct Hello) + alen); + hello->address = (const char *) &hello[1]; + hello->expiration = expiration; + hello->nt = nt; + hello->peer = p; + memcpy (&hello[1], + addr, + alen); + GNUNET_free (addr); + GNUNET_CONTAINER_DLL_insert (p->h_head, + p->h_tail, + hello); + /* check if sh for this HELLO already exists */ + for (struct GNUNET_ATS_SessionHandle *sh = p->sh_head; + NULL != sh; + sh = sh->next) + { + if ( (NULL == sh->address) || + (0 != strcmp (sh->address, + addr)) ) + continue; + GNUNET_assert (NULL == sh->hello); + sh->hello = hello; + hello->sh = sh; + break; } + if (NULL == p->task) + p->task = GNUNET_SCHEDULER_add_now (&suggest_start_cb, + p); } @@ -337,7 +588,7 @@ peer_add (struct SimpleHandle *h, p->wc = GNUNET_PEERSTORE_watch (h->ps, "transport", &p->pid, - "HELLO" /* key */, + GNUNET_HELLO_PEERSTORE_KEY, &watch_cb, p); GNUNET_assert (GNUNET_YES == @@ -389,14 +640,260 @@ peer_free (struct Peer *p) } +/** + * Check if the new allocation for @a sh is significantly different + * from the last one, and if so, tell transport. + * + * @param sh session handle to consider updating transport for + */ +static void +consider_notify_transport (struct GNUNET_ATS_SessionHandle *sh) +{ + struct Peer *peer = sh->peer; + struct SimpleHandle *h = peer->h; + enum GNUNET_NetworkType nt = sh->data->prop.nt; + struct GNUNET_TIME_Relative delay; + uint64_t sig_in; + uint64_t sig_out; + int64_t delta_in; + int64_t delta_out; + + delay = GNUNET_TIME_absolute_get_duration (sh->last_allocation); + /* A significant change is more than 10% of the quota, + which is given in bytes/second */ + sig_in + = h->networks[nt].total_quota_in * (delay.rel_value_us / 1000LL) / 1000LL / 10; + sig_out + = h->networks[nt].total_quota_out * (delay.rel_value_us / 1000LL) / 1000LL / 10; + delta_in = ( (int64_t) ntohl (sh->bw_in.value__)) - ((int64_t) sh->target_in); + delta_out = ( (int64_t) ntohl (sh->bw_in.value__)) - ((int64_t) sh->target_in); + /* we want the absolute values */ + if (delta_in < 0) + delta_in = - delta_in; + if (INT64_MIN == delta_in) + delta_in = INT64_MAX; /* Handle corner case: INT_MIN == - INT_MIN */ + if (delta_out < 0) + delta_out = - delta_out; + if (INT64_MIN == delta_out) + delta_out = INT64_MAX; /* Handle corner case: INT_MIN == - INT_MIN */ + if ( (sig_in > delta_in) && + (sig_out > delta_out) ) + return; /* insignificant change */ + /* change is significant, tell transport! */ + if (sh->target_in > UINT32_MAX) + sh->target_in = UINT32_MAX; + sh->bw_in.value__ = htonl ((uint32_t) sh->target_in); + if (sh->target_out > UINT32_MAX) + sh->target_out = UINT32_MAX; + sh->bw_out.value__ = htonl ((uint32_t) sh->target_out); + sh->last_allocation = GNUNET_TIME_absolute_get (); + h->env->allocate_cb (h->env->cls, + sh->session, + &peer->pid, + sh->bw_in, + sh->bw_out); +} + + +/** + * Closure for #update_counters and #update_allocation. + */ +struct Counters +{ + /** + * Plugin's state. + */ + struct SimpleHandle *h; + + /** + * Bandwidth that applications would prefer to allocate in this + * network type. We initially add all requested allocations to the + * respective network type where the given preference is best + * satisfied. Later we may rebalance. + */ + uint64_t bw_out_by_nt[GNUNET_NT_COUNT]; + + /** + * Current bandwidth utilization for this network type. We simply + * add the current goodput up (with some fairness considerations). + */ + uint64_t bw_in_by_nt[GNUNET_NT_COUNT]; + + /** + * By how much do we have to scale (up or down) our expectations + * for outbound bandwidth? + */ + double scale_out[GNUNET_NT_COUNT]; + + /** + * By how much do we have to scale (up or down) our expectations + * for inbound bandwidth? + */ + double scale_in[GNUNET_NT_COUNT]; + +}; + + +/** + * Function used to iterate over all peers and collect + * counter data. + * + * @param cls a `struct Counters *` + * @param pid identity of the peer we process, unused + * @param value a `struct Peer *` + * @return #GNUNET_YES (continue to iterate) + */ +static int +update_counters (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct Counters *c = cls; + struct Peer *peer = value; + struct GNUNET_ATS_SessionHandle *best[GNUNET_MQ_PREFERENCE_COUNT]; + + (void) pid; + memset (best, + 0, + sizeof (best)); + for (struct GNUNET_ATS_SessionHandle *sh = peer->sh_head; + NULL != sh; + sh = sh->next) + { + enum GNUNET_NetworkType nt = sh->data->prop.nt; + + sh->target_out = MIN_BANDWIDTH_PER_SESSION; + c->bw_out_by_nt[nt] += MIN_BANDWIDTH_PER_SESSION; + c->bw_in_by_nt[nt] += GNUNET_MAX (MIN_BANDWIDTH_PER_SESSION, + sh->data->prop.goodput_in); + for (enum GNUNET_MQ_PreferenceKind pk = 0; + pk < GNUNET_MQ_PREFERENCE_COUNT; + pk++) + { + /* General rule: always prefer smaller distance if possible, + otherwise decide by pk: */ + switch (pk) { + case GNUNET_MQ_PREFERENCE_NONE: + break; + case GNUNET_MQ_PREFERENCE_BANDWIDTH: + /* For bandwidth, we compare the sum of transmitted bytes and + confirmed transmitted bytes, so confirmed data counts twice */ + if ( (NULL == best[pk]) || + (sh->data->prop.distance < best[pk]->data->prop.distance) || + (sh->data->prop.utilization_out + sh->data->prop.goodput_out > + best[pk]->data->prop.utilization_out + best[pk]->data->prop.goodput_out) ) + best[pk] = sh; + /* If both are equal (i.e. usually this happens if there is a zero), use + latency as a yardstick */ + if ( (sh->data->prop.utilization_out + sh->data->prop.goodput_out == + best[pk]->data->prop.utilization_out + best[pk]->data->prop.goodput_out) && + (sh->data->prop.distance == best[pk]->data->prop.distance) && + (sh->data->prop.delay.rel_value_us < + best[pk]->data->prop.delay.rel_value_us) ) + best[pk] = sh; + break; + case GNUNET_MQ_PREFERENCE_LATENCY: + if ( (NULL == best[pk]) || + (sh->data->prop.distance < best[pk]->data->prop.distance) || + ( (sh->data->prop.distance == best[pk]->data->prop.distance) && + (sh->data->prop.delay.rel_value_us < + best[pk]->data->prop.delay.rel_value_us) ) ) + best[pk] = sh; + break; + case GNUNET_MQ_PREFERENCE_RELIABILITY: + /* For reliability, we consider the ratio of goodput to utilization + (but use multiplicative formultations to avoid division by zero) */ + if ( (NULL == best[pk]) || + (1ULL * sh->data->prop.goodput_out * best[pk]->data->prop.utilization_out > + 1ULL * sh->data->prop.utilization_out * best[pk]->data->prop.goodput_out) ) + best[pk] = sh; + /* If both are equal (i.e. usually this happens if there is a zero), use + latency as a yardstick */ + if ( (1ULL * sh->data->prop.goodput_out * best[pk]->data->prop.utilization_out == + 1ULL * sh->data->prop.utilization_out * best[pk]->data->prop.goodput_out) && + (sh->data->prop.distance == best[pk]->data->prop.distance) && + (sh->data->prop.delay.rel_value_us < + best[pk]->data->prop.delay.rel_value_us) ) + best[pk] = sh; + break; + } + } + } + /* for first round, assign target bandwidth simply to sum of + requested bandwidth */ + for (enum GNUNET_MQ_PreferenceKind pk = 0; + pk < GNUNET_MQ_PREFERENCE_COUNT; + pk++) + { + enum GNUNET_NetworkType nt = best[pk]->data->prop.nt; + + best[pk]->target_out = GNUNET_MIN (peer->bw_by_pk[pk], + MIN_BANDWIDTH_PER_SESSION); + c->bw_out_by_nt[nt] += (uint64_t) (best[pk]->target_out - MIN_BANDWIDTH_PER_SESSION); + } + return GNUNET_YES; +} + + +/** + * Function used to iterate over all peers and collect + * counter data. + * + * @param cls a `struct Counters *` + * @param pid identity of the peer we process, unused + * @param value a `struct Peer *` + * @return #GNUNET_YES (continue to iterate) + */ +static int +update_allocation (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct Counters *c = cls; + struct Peer *peer = value; + + (void) pid; + for (struct GNUNET_ATS_SessionHandle *sh = peer->sh_head; + NULL != sh; + sh = sh->next) + { + enum GNUNET_NetworkType nt = sh->data->prop.nt; + + sh->target_out = (uint64_t) (c->scale_out[nt] * sh->target_out); + sh->target_in = (uint64_t) (c->scale_in[nt] * sh->target_in); + consider_notify_transport (sh); + } + return GNUNET_YES; +} + + /** * The world changed, recalculate our allocations. */ static void update (struct SimpleHandle *h) { - // recalculate allocations - // notify transport if it makes sense (delta significant) + struct Counters cnt = { + .h = h + }; + + GNUNET_CONTAINER_multipeermap_iterate (h->peers, + &update_counters, + &cnt); + /* calculate how badly the missmatch between requested + allocations and available bandwidth is per network type */ + for (enum GNUNET_NetworkType nt = 0; + nt < GNUNET_NT_COUNT; + nt++) + { + cnt.scale_out[nt] = 1.0 * cnt.bw_out_by_nt[nt] / h->networks[nt].total_quota_out; + cnt.scale_in[nt] = 1.0 * cnt.bw_in_by_nt[nt] / h->networks[nt].total_quota_in; + } + /* recalculate allocations, considering scaling factor, and + update transport if the change is significant */ + GNUNET_CONTAINER_multipeermap_iterate (h->peers, + &update_allocation, + &cnt); } @@ -417,6 +914,7 @@ simple_preference_add (void *cls, GNUNET_assert (pref->pk < GNUNET_MQ_PREFERENCE_COUNT); p->bw_by_pk[pref->pk] += ntohl (pref->bw.value__); + h->bw_by_pk[pref->pk] += ntohl (pref->bw.value__); update (h); return NULL; } @@ -442,6 +940,7 @@ simple_preference_del (void *cls, GNUNET_assert (NULL != p); GNUNET_assert (pref->pk < GNUNET_MQ_PREFERENCE_COUNT); p->bw_by_pk[pref->pk] -= ntohl (pref->bw.value__); + h->bw_by_pk[pref->pk] -= ntohl (pref->bw.value__); if ( (0 == p->bw_by_pk[pref->pk]) && (GNUNET_YES == peer_test_dead (p)) ) peer_free (p); @@ -502,6 +1001,7 @@ simple_session_add (void *cls, if (NULL != hello) { hello->sh = sh; + hello->backoff = GNUNET_TIME_UNIT_ZERO; sh->hello = hello; } update (h); @@ -543,11 +1043,24 @@ simple_session_del (void *cls, { struct SimpleHandle *h = cls; struct Peer *p = sh->peer; + struct Hello *hello = sh->hello; - // FIXME: tear down session - // del peer if otherwise dead - - + /* clean up sh */ + GNUNET_CONTAINER_DLL_remove (p->sh_head, + p->sh_tail, + sh); + if (NULL != hello) + { + GNUNET_assert (sh == hello->sh); + hello->sh = NULL; + /* session went down, if necessary restart suggesting + addresses */ + if (NULL == p->task) + p->task = GNUNET_SCHEDULER_add_now (&suggest_start_cb, + p); + } + GNUNET_free (sh); + /* del peer if otherwise dead */ if ( (NULL == p->sh_head) && (GNUNET_YES == peer_test_dead (p)) ) peer_free (p); @@ -619,7 +1132,8 @@ libgnunet_plugin_ats2_simple_done (void *cls) struct GNUNET_ATS_SolverFunctions *sf = cls; struct SimpleHandle *s = sf->cls; - // FIXME: iterate over peers and clean up! + GNUNET_break (0 == + GNUNET_CONTAINER_multipeermap_size (s->peers)); GNUNET_CONTAINER_multipeermap_destroy (s->peers); GNUNET_PEERSTORE_disconnect (s->ps, GNUNET_NO); diff --git a/src/hello/hello-ng.c b/src/hello/hello-ng.c index a16ceb944..723ec0eaa 100644 --- a/src/hello/hello-ng.c +++ b/src/hello/hello-ng.c @@ -101,7 +101,7 @@ GNUNET_HELLO_sign_address (const char *address, * * @param raw raw signed address * @param raw_size size of @a raw - * @param public_key public key to use for signature verification + * @param pid public key to use for signature verification * @param nt[out] set to network type * @param expiration[out] how long is the address valid * @return NULL on error, otherwise the address @@ -109,10 +109,11 @@ GNUNET_HELLO_sign_address (const char *address, char * GNUNET_HELLO_extract_address (const void *raw, size_t raw_size, - const struct GNUNET_CRYPTO_EddsaPublicKey *public_key, + const struct GNUNET_PeerIdentity *pid, enum GNUNET_NetworkType *nt, struct GNUNET_TIME_Absolute *expiration) { + const struct GNUNET_CRYPTO_EddsaPublicKey *public_key = &pid->public_key; const char *raws = raw; unsigned long long raw_us; unsigned int raw_nt; diff --git a/src/include/gnunet_hello_lib.h b/src/include/gnunet_hello_lib.h index 8a405a25e..a47162f99 100644 --- a/src/include/gnunet_hello_lib.h +++ b/src/include/gnunet_hello_lib.h @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ @@ -476,6 +476,10 @@ GNUNET_HELLO_parse_uri (const char *uri, /* NG API */ #include "gnunet_nt_lib.h" +/** + * Key used for storing HELLOs in the peerstore + */ +#define GNUNET_HELLO_PEERSTORE_KEY "hello" /** * Build address record by signing raw information with private key. @@ -501,7 +505,7 @@ GNUNET_HELLO_sign_address (const char *address, * * @param raw raw signed address * @param raw_size size of @a raw - * @param public_key public key to use for signature verification + * @param pid public key to use for signature verification * @param nt[out] set to network type * @param expiration[out] how long is the address valid * @return NULL on error, otherwise the address @@ -509,7 +513,7 @@ GNUNET_HELLO_sign_address (const char *address, char * GNUNET_HELLO_extract_address (const void *raw, size_t raw_size, - const struct GNUNET_CRYPTO_EddsaPublicKey *public_key, + const struct GNUNET_PeerIdentity *pid, enum GNUNET_NetworkType *nt, struct GNUNET_TIME_Absolute *expiration); diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index feaa0cfff..3630e6af0 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -1196,7 +1196,7 @@ store_pi (void *cls) ale->sc = GNUNET_PEERSTORE_store (peerstore, "transport", &GST_my_identity, - "hello", + GNUNET_HELLO_PEERSTORE_KEY, addr, addr_len, expiration, -- cgit v1.2.3