/* This file is part of GNUnet. Copyright (C) GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . SPDX-License-Identifier: AGPL3.0-or-later */ /** * @file rps/gnunet-service-rps_sampler.c * @brief sampler implementation * @author Julius Bünger */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "rps.h" #include "rps-sampler_common.h" #include "gnunet-service-rps_sampler.h" #include "gnunet-service-rps_sampler_elem.h" #include #include #include "rps-test_util.h" #define LOG(kind, ...) GNUNET_log_from (kind, "rps-sampler", __VA_ARGS__) // multiple 'clients'? // TODO check for overflows // TODO align message structs // hist_size_init, hist_size_max /*********************************************************************** * WARNING: This section needs to be reviewed regarding the use of * functions providing (pseudo)randomness! ***********************************************************************/ // TODO care about invalid input of the caller (size 0 or less...) /** * @brief Callback called each time a new peer was put into the sampler * * @param cls A possibly given closure */ typedef void (*SamplerNotifyUpdateCB) (void *cls); /** * @brief Context for a callback. Contains callback and closure. * * Meant to be an entry in an DLL. */ struct SamplerNotifyUpdateCTX { /** * @brief The Callback to call on updates */ SamplerNotifyUpdateCB notify_cb; /** * @brief The according closure. */ void *cls; /** * @brief Next element in DLL. */ struct SamplerNotifyUpdateCTX *next; /** * @brief Previous element in DLL. */ struct SamplerNotifyUpdateCTX *prev; }; /** * Type of function used to differentiate between modified and not modified * Sampler. */ typedef void (*RPS_get_peers_type) (void *cls); /** * Get one random peer out of the sampled peers. * * We might want to reinitialise this sampler after giving the * corrsponding peer to the client. */ static void sampler_mod_get_rand_peer (void *cls); /** * Closure to _get_n_rand_peers_ready_cb() */ struct RPS_SamplerRequestHandle { /** * DLL */ struct RPS_SamplerRequestHandle *next; struct RPS_SamplerRequestHandle *prev; /** * Number of peers we are waiting for. */ uint32_t num_peers; /** * Number of peers we currently have. */ uint32_t cur_num_peers; /** * Pointer to the array holding the ids. */ struct GNUNET_PeerIdentity *ids; /** * Head and tail for the DLL to store the tasks for single requests */ struct GetPeerCls *gpc_head; struct GetPeerCls *gpc_tail; /** * Sampler. */ struct RPS_Sampler *sampler; /** * Callback to be called when all ids are available. */ RPS_sampler_n_rand_peers_ready_cb callback; /** * Closure given to the callback */ void *cls; }; /** * Closure to _get_rand_peer_info() */ struct RPS_SamplerRequestHandleSingleInfo { /** * DLL */ struct RPS_SamplerRequestHandleSingleInfo *next; struct RPS_SamplerRequestHandleSingleInfo *prev; /** * Pointer to the id */ struct GNUNET_PeerIdentity *id; /** * Head and tail for the DLL to store the tasks for single requests */ struct GetPeerCls *gpc_head; struct GetPeerCls *gpc_tail; /** * Sampler. */ struct RPS_Sampler *sampler; /** * Callback to be called when all ids are available. */ RPS_sampler_sinlge_info_ready_cb callback; /** * Closure given to the callback */ void *cls; }; ///** // * Global sampler variable. // */ // struct RPS_Sampler *sampler; /** * The minimal size for the extended sampler elements. */ static size_t min_size; /** * The maximal size the extended sampler elements should grow to. */ static size_t max_size; /** * The size the extended sampler elements currently have. */ // static size_t extra_size; /** * Inedex to the sampler element that is the next to be returned */ static uint32_t client_get_index; /** * Initialise a modified tuple of sampler elements. * * @param init_size the size the sampler is initialised with * @param max_round_interval maximum time a round takes * @return a handle to a sampler that consists of sampler elements. */ struct RPS_Sampler * RPS_sampler_mod_init (size_t init_size, struct GNUNET_TIME_Relative max_round_interval) { struct RPS_Sampler *sampler; /* Initialise context around extended sampler */ min_size = 10; // TODO make input to _samplers_init() max_size = 1000; // TODO make input to _samplers_init() sampler = GNUNET_new (struct RPS_Sampler); sampler->max_round_interval = max_round_interval; sampler->get_peers = sampler_mod_get_rand_peer; // sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); // GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); client_get_index = 0; // GNUNET_assert (init_size == sampler->sampler_size); RPS_sampler_resize (sampler, init_size); return sampler; } /** * @brief Compute the probability that we already observed all peers from a * biased stream of peer ids. * * Deficiency factor: * As introduced by Brahms: Factor between the number of unique ids in a * truly random stream and number of unique ids in the gossip stream. * * @param num_peers_estim The estimated number of peers in the network * @param num_peers_observed The number of peers the given element has observed * @param deficiency_factor A factor that catches the 'bias' of a random stream * of peer ids * * @return The estimated probability */ static double prob_observed_n_peers (uint32_t num_peers_estim, uint32_t num_peers_observed, double deficiency_factor) { uint32_t num_peers = num_peers_estim * (1 / deficiency_factor); uint64_t sum = 0; for (uint32_t i = 0; i < num_peers; i++) { uint64_t a = pow (-1, num_peers - i); uint64_t b = binom (num_peers, i); uint64_t c = pow (i, num_peers_observed); sum += a * b * c; } return sum / (double) pow (num_peers, num_peers_observed); } /** * Get one random peer out of the sampled peers. * * This reinitialises the queried sampler element. */ static void sampler_mod_get_rand_peer (void *cls) { struct GetPeerCls *gpc = cls; struct RPS_SamplerElement *s_elem; struct GNUNET_TIME_Relative last_request_diff; struct RPS_Sampler *sampler; double prob_observed_n; uint32_t num_observed; gpc->get_peer_task = NULL; gpc->notify_ctx = NULL; GNUNET_assert ((NULL != gpc->req_handle) || (NULL != gpc->req_single_info_handle)); if (NULL != gpc->req_handle) sampler = gpc->req_handle->sampler; else sampler = gpc->req_single_info_handle->sampler; LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); /* Cycle the #client_get_index one step further */ client_get_index = (client_get_index + 1) % sampler->sampler_size; s_elem = sampler->sampler_elements[client_get_index]; *gpc->id = s_elem->peer_id; GNUNET_assert (NULL != s_elem); if (EMPTY == s_elem->is_empty) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); GNUNET_assert (NULL == gpc->notify_ctx); gpc->notify_ctx = sampler_notify_on_update (sampler, &sampler_mod_get_rand_peer, gpc); return; } /* Check whether we may use this sampler to give it back to the client */ if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) { // TODO remove this condition at least for the client sampler last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, GNUNET_TIME_absolute_get ()); /* We're not going to give it back now if it was * already requested by a client this round */ if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); ///* How many time remains untile the next round has started? */ // inv_last_request_diff = // GNUNET_TIME_absolute_get_difference (last_request_diff, // sampler->max_round_interval); // add a little delay /* Schedule it one round later */ GNUNET_assert (NULL == gpc->notify_ctx); gpc->notify_ctx = sampler_notify_on_update (sampler, &sampler_mod_get_rand_peer, gpc); return; } } if (2 > s_elem->num_peers) { LOG (GNUNET_ERROR_TYPE_DEBUG, "This s_elem saw less than two peers -- scheduling for later\n"); GNUNET_assert (NULL == gpc->notify_ctx); gpc->notify_ctx = sampler_notify_on_update (sampler, &sampler_mod_get_rand_peer, gpc); return; } /* compute probability */ /* Currently disabled due to numerical limitations */ // prob_observed_n = prob_observed_n_peers (sampler->num_peers_estim, // s_elem->num_peers, // sampler->deficiency_factor); // LOG (GNUNET_ERROR_TYPE_DEBUG, // "Computed sample - prob %f, %" PRIu32 " peers, n: %" PRIu32 ", roh: %f\n", // prob_observed_n, // s_elem->num_peers, // sampler->num_peers_estim, // sampler->deficiency_factor); ///* check if probability is above desired */ // if (prob_observed_n < sampler->desired_probability) // { // LOG (GNUNET_ERROR_TYPE_DEBUG, // "Probability of having observed all peers (%f) too small ( < %f).\n", // prob_observed_n, // sampler->desired_probability); // GNUNET_assert (NULL == gpc->notify_ctx); // gpc->notify_ctx = // sampler_notify_on_update (sampler, // &sampler_mod_get_rand_peer, // gpc); // return; // } /* More reasons to wait could be added here */ // GNUNET_STATISTICS_set (stats, // "# client sampler element input", // s_elem->num_peers, // GNUNET_NO); // GNUNET_STATISTICS_set (stats, // "# client sampler element change", // s_elem->num_change, // GNUNET_NO); num_observed = s_elem->num_peers; RPS_sampler_elem_reinit (s_elem); s_elem->last_client_request = GNUNET_TIME_absolute_get (); if (NULL != gpc->req_handle) { GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, gpc->req_handle->gpc_tail, gpc); } else { GNUNET_CONTAINER_DLL_remove (gpc->req_single_info_handle->gpc_head, gpc->req_single_info_handle->gpc_tail, gpc); } gpc->cont (gpc->cont_cls, gpc->id, prob_observed_n, num_observed); GNUNET_free (gpc); } /* end of gnunet-service-rps.c */