From a611b66e1c0ec87c7f3dfb72da1e223379d735f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julius=20B=C3=BCnger?= Date: Wed, 26 Sep 2018 17:40:17 +0200 Subject: Split sampler into common, sampler and client part Now there is rps-sampler_client.{c|h} and gnunet-service-rps_sampler.{c|h} to better reflect which parts are really in the implementation of brahms (only the service) and which are used in the client. The parts both samplers share are now in rps-sampler_common.{c|h}. --- src/rps/gnunet-service-rps_sampler.c | 648 +---------------------------------- 1 file changed, 2 insertions(+), 646 deletions(-) (limited to 'src/rps/gnunet-service-rps_sampler.c') diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 2cd4cb996..9629a29a1 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -26,6 +26,7 @@ #include "gnunet_statistics_service.h" #include "rps.h" +#include "rps-sampler_common.h" #include "gnunet-service-rps_sampler.h" #include "gnunet-service-rps_sampler_elem.h" @@ -52,17 +53,6 @@ // TODO care about invalid input of the caller (size 0 or less...) -/** - * Callback that is called from _get_rand_peer() when the PeerID is ready. - * - * @param cls the closure given alongside this function. - * @param id the PeerID that was returned - */ -typedef void -(*RPS_sampler_rand_peer_ready_cont) (void *cls, - const struct GNUNET_PeerIdentity *id); - - /** * @brief Callback called each time a new peer was put into the sampler * @@ -100,49 +90,6 @@ struct SamplerNotifyUpdateCTX }; -/** - * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer - */ -struct GetPeerCls -{ - /** - * DLL - */ - struct GetPeerCls *next; - struct GetPeerCls *prev; - - /** - * The #RPS_SamplerRequestHandle this single request belongs to. - */ - struct RPS_SamplerRequestHandle *req_handle; - - /** - * The task for this function. - */ - struct GNUNET_SCHEDULER_Task *get_peer_task; - - /** - * @brief Context to the given callback. - */ - struct SamplerNotifyUpdateCTX *notify_ctx; - - /** - * The callback - */ - RPS_sampler_rand_peer_ready_cont cont; - - /** - * The closure to the callback @e cont - */ - void *cont_cls; - - /** - * The address of the id to be stored at - */ - struct GNUNET_PeerIdentity *id; -}; - - /** * Type of function used to differentiate between modified and not modified * Sampler. @@ -161,61 +108,6 @@ static void sampler_get_rand_peer (void *cls); -/** - * Get one random peer out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - */ -static void -sampler_mod_get_rand_peer (void *cls); - - -/** - * Sampler with its own array of SamplerElements - */ -struct RPS_Sampler -{ - /** - * Number of sampler elements we hold. - */ - unsigned int sampler_size; - //size_t size; - - /** - * All sampler elements in one array. - */ - struct RPS_SamplerElement **sampler_elements; - - /** - * Maximum time a round takes - * - * Used in the context of RPS - */ - struct GNUNET_TIME_Relative max_round_interval; - - /** - * Stores the function to return peers. Which one it is depends on whether - * the Sampler is the modified one or not. - */ - RPS_get_peers_type get_peers; - - /** - * Head and tail for the DLL to store the #RPS_SamplerRequestHandle - */ - struct RPS_SamplerRequestHandle *req_handle_head; - struct RPS_SamplerRequestHandle *req_handle_tail; - - struct SamplerNotifyUpdateCTX *notify_ctx_head; - struct SamplerNotifyUpdateCTX *notify_ctx_tail; - #ifdef TO_FILE - /** - * File name to log to - */ - char *file_name; - #endif /* TO_FILE */ -}; - /** * Closure to _get_n_rand_peers_ready_cb() */ @@ -291,189 +183,6 @@ static size_t max_size; static uint32_t client_get_index; -/** - * @brief Add a callback that will be called when the next peer is inserted - * into the sampler - * - * @param sampler The sampler on which update it will be called - * @param notify_cb The callback - * @param cls Closure given to the callback - * - * @return The context containing callback and closure - */ -struct SamplerNotifyUpdateCTX * -sampler_notify_on_update (struct RPS_Sampler *sampler, - SamplerNotifyUpdateCB notify_cb, - void *cls) -{ - struct SamplerNotifyUpdateCTX *notify_ctx; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Inserting new context for notification\n"); - notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); - notify_ctx->notify_cb = notify_cb; - notify_ctx->cls = cls; - GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, - sampler->notify_ctx_tail, - notify_ctx); - return notify_ctx; -} - - -/** - * Callback to _get_rand_peer() used by _get_n_rand_peers(). - * - * Checks whether all n peers are available. If they are, - * give those back. - */ -static void -check_n_peers_ready (void *cls, - const struct GNUNET_PeerIdentity *id) -{ - struct RPS_SamplerRequestHandle *req_handle = cls; - (void) id; - - req_handle->cur_num_peers++; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got %" PRIX32 ". of %" PRIX32 " peers\n", - req_handle->cur_num_peers, req_handle->num_peers); - - if (req_handle->num_peers == req_handle->cur_num_peers) - { /* All peers are ready -- return those to the client */ - GNUNET_assert (NULL != req_handle->callback); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "returning %" PRIX32 " peers to the client\n", - req_handle->num_peers); - req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls); - - RPS_sampler_request_cancel (req_handle); - } -} - - -/** - * Get the size of the sampler. - * - * @param sampler the sampler to return the size of. - * @return the size of the sampler - */ -unsigned int -RPS_sampler_get_size (struct RPS_Sampler *sampler) -{ - return sampler->sampler_size; -} - - -/** - * Grow or shrink the size of the sampler. - * - * @param sampler the sampler to resize. - * @param new_size the new size of the sampler - */ -static void -sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) -{ - unsigned int old_size; - uint32_t i; - - // TODO check min and max size - - old_size = sampler->sampler_size; - - if (old_size > new_size) - { /* Shrinking */ - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Shrinking sampler %d -> %d\n", - old_size, - new_size); - - to_file (sampler->file_name, - "Shrinking sampler %d -> %d", - old_size, - new_size); - - for (i = new_size ; i < old_size ; i++) - { - to_file (sampler->file_name, - "-%" PRIu32 ": %s", - i, - sampler->sampler_elements[i]->file_name); - RPS_sampler_elem_destroy (sampler->sampler_elements[i]); - } - - GNUNET_array_grow (sampler->sampler_elements, - sampler->sampler_size, - new_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "sampler->sampler_elements now points to %p\n", - sampler->sampler_elements); - - } - else if (old_size < new_size) - { /* Growing */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Growing sampler %d -> %d\n", - old_size, - new_size); - - to_file (sampler->file_name, - "Growing sampler %d -> %d", - old_size, - new_size); - - GNUNET_array_grow (sampler->sampler_elements, - sampler->sampler_size, - new_size); - - for (i = old_size ; i < new_size ; i++) - { /* Add new sampler elements */ - sampler->sampler_elements[i] = RPS_sampler_elem_create (); - - to_file (sampler->file_name, - "+%" PRIu32 ": %s", - i, - sampler->sampler_elements[i]->file_name); - } - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); - return; - } - - GNUNET_assert (sampler->sampler_size == new_size); -} - - -/** - * Grow or shrink the size of the sampler. - * - * @param sampler the sampler to resize. - * @param new_size the new size of the sampler - */ -void -RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) -{ - GNUNET_assert (0 < new_size); - sampler_resize (sampler, new_size); -} - - -/** - * Empty the sampler. - * - * @param sampler the sampler to empty. - * @param new_size the new size of the sampler - */ -static void -sampler_empty (struct RPS_Sampler *sampler) -{ - sampler_resize (sampler, 0); -} - - /** * Initialise a tuple of sampler elements. * @@ -513,119 +222,6 @@ RPS_sampler_init (size_t init_size, return sampler; } -/** - * Initialise a modified tuple of sampler elements. - * - * @param init_size the size the sampler is initialised with - * @param max_round_interval maximum time a round takes - * @return a handle to a sampler that consists of sampler elements. - */ -struct RPS_Sampler * -RPS_sampler_mod_init (size_t init_size, - struct GNUNET_TIME_Relative max_round_interval) -{ - struct RPS_Sampler *sampler; - - sampler = RPS_sampler_init (init_size, max_round_interval); - sampler->get_peers = sampler_mod_get_rand_peer; - -#ifdef TO_FILE - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Initialised modified sampler %s\n", - sampler->file_name); - to_file (sampler->file_name, - "This is a modified sampler"); -#endif /* TO_FILE */ - - return sampler; -} - - -/** - * @brief Notify about update of the sampler. - * - * Call the callbacks that are waiting for notification on updates to the - * sampler. - * - * @param sampler The sampler the updates are waiting for - */ -static void -notify_update (struct RPS_Sampler *sampler) -{ - struct SamplerNotifyUpdateCTX *tmp_notify_head; - struct SamplerNotifyUpdateCTX *tmp_notify_tail; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Calling callbacks waiting for update notification.\n"); - tmp_notify_head = sampler->notify_ctx_head; - tmp_notify_tail = sampler->notify_ctx_tail; - sampler->notify_ctx_head = NULL; - sampler->notify_ctx_tail = NULL; - for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head; - NULL != tmp_notify_head; - notify_iter = tmp_notify_head) - { - GNUNET_assert (NULL != notify_iter->notify_cb); - GNUNET_CONTAINER_DLL_remove (tmp_notify_head, - tmp_notify_tail, - notify_iter); - notify_iter->notify_cb (notify_iter->cls); - GNUNET_free (notify_iter); - } -} - - -/** - * Update every sampler element of this sampler with given peer - * - * @param sampler the sampler to update. - * @param id the PeerID that is put in the sampler - */ - void -RPS_sampler_update (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - to_file (sampler->file_name, - "Got %s", - GNUNET_i2s_full (id)); - - for (uint32_t i = 0; i < sampler->sampler_size; i++) - { - RPS_sampler_elem_next (sampler->sampler_elements[i], - id); - } - notify_update (sampler); -} - - -/** - * Reinitialise all previously initialised sampler elements with the given value. - * - * Used to get rid of a PeerID. - * - * @param sampler the sampler to reinitialise a sampler element in. - * @param id the id of the sampler elements to update. - */ - void -RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - uint32_t i; - - for (i = 0; i < sampler->sampler_size; i++) - { - if (0 == GNUNET_CRYPTO_cmp_peer_identity(id, - &(sampler->sampler_elements[i]->peer_id)) ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); - to_file (sampler->sampler_elements[i]->file_name, - "--- non-active"); - RPS_sampler_elem_reinit (sampler->sampler_elements[i]); - } - } -} - - /** * Get one random peer out of the sampled peers. * @@ -658,7 +254,7 @@ sampler_get_rand_peer (void *cls) gpc->notify_ctx = sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, + &sampler_get_rand_peer, gpc); return; } @@ -673,244 +269,4 @@ sampler_get_rand_peer (void *cls) } -/** - * Get one random peer out of the sampled peers. - * - * This reinitialises the queried sampler element. - */ -static void -sampler_mod_get_rand_peer (void *cls) -{ - struct GetPeerCls *gpc = cls; - struct RPS_SamplerElement *s_elem; - struct GNUNET_TIME_Relative last_request_diff; - struct RPS_Sampler *sampler; - - gpc->get_peer_task = NULL; - gpc->notify_ctx = NULL; - sampler = gpc->req_handle->sampler; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); - - /* Cycle the #client_get_index one step further */ - client_get_index = (client_get_index + 1) % sampler->sampler_size; - - s_elem = sampler->sampler_elements[client_get_index]; - *gpc->id = s_elem->peer_id; - GNUNET_assert (NULL != s_elem); - - if (EMPTY == s_elem->is_empty) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sampler_mod element empty, rescheduling.\n"); - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - - /* Check whether we may use this sampler to give it back to the client */ - if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) - { - // TODO remove this condition at least for the client sampler - last_request_diff = - GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, - GNUNET_TIME_absolute_get ()); - /* We're not going to give it back now if it was - * already requested by a client this round */ - if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); - ///* How many time remains untile the next round has started? */ - //inv_last_request_diff = - // GNUNET_TIME_absolute_get_difference (last_request_diff, - // sampler->max_round_interval); - // add a little delay - /* Schedule it one round later */ - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - } - if (2 > s_elem->num_peers) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "This s_elem saw less than two peers -- scheduling for later\n"); - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - /* More reasons to wait could be added here */ - -// GNUNET_STATISTICS_set (stats, -// "# client sampler element input", -// s_elem->num_peers, -// GNUNET_NO); -// GNUNET_STATISTICS_set (stats, -// "# client sampler element change", -// s_elem->num_change, -// GNUNET_NO); - - RPS_sampler_elem_reinit (s_elem); - s_elem->last_client_request = GNUNET_TIME_absolute_get (); - - GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, - gpc->req_handle->gpc_tail, - gpc); - gpc->cont (gpc->cont_cls, gpc->id); - GNUNET_free (gpc); -} - - -/** - * Get n random peers out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - * Random with or without consumption? - * - * @param sampler the sampler to get peers from. - * @param cb callback that will be called once the ids are ready. - * @param cls closure given to @a cb - * @param for_client #GNUNET_YES if result is used for client, - * #GNUNET_NO if used internally - * @param num_peers the number of peers requested - */ -struct RPS_SamplerRequestHandle * -RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, - uint32_t num_peers, - RPS_sampler_n_rand_peers_ready_cb cb, - void *cls) -{ - uint32_t i; - struct RPS_SamplerRequestHandle *req_handle; - struct GetPeerCls *gpc; - - GNUNET_assert (0 != sampler->sampler_size); - if (0 == num_peers) - return NULL; - - // TODO check if we have too much (distinct) sampled peers - req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); - req_handle->num_peers = num_peers; - req_handle->cur_num_peers = 0; - req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); - req_handle->sampler = sampler; - req_handle->callback = cb; - req_handle->cls = cls; - GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, - sampler->req_handle_tail, - req_handle); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling requests for %" PRIu32 " peers\n", num_peers); - - for (i = 0; i < num_peers; i++) - { - gpc = GNUNET_new (struct GetPeerCls); - gpc->req_handle = req_handle; - gpc->cont = check_n_peers_ready; - gpc->cont_cls = req_handle; - gpc->id = &req_handle->ids[i]; - - GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, - req_handle->gpc_tail, - gpc); - // maybe add a little delay - gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, - gpc); - } - return req_handle; -} - -/** - * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. - * - * @param req_handle the handle to the request - */ -void -RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) -{ - struct GetPeerCls *i; - - while (NULL != (i = req_handle->gpc_head) ) - { - GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, - req_handle->gpc_tail, - i); - if (NULL != i->get_peer_task) - { - GNUNET_SCHEDULER_cancel (i->get_peer_task); - } - if (NULL != i->notify_ctx) - { - GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head, - req_handle->sampler->notify_ctx_tail, - i->notify_ctx); - GNUNET_free (i->notify_ctx); - } - GNUNET_free (i); - } - GNUNET_free (req_handle->ids); - GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, - req_handle->sampler->req_handle_tail, - req_handle); - GNUNET_free (req_handle); -} - - -/** - * Counts how many Samplers currently hold a given PeerID. - * - * @param sampler the sampler to count ids in. - * @param id the PeerID to count. - * - * @return the number of occurrences of id. - */ - uint32_t -RPS_sampler_count_id (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - uint32_t count; - uint32_t i; - - count = 0; - for ( i = 0 ; i < sampler->sampler_size ; i++ ) - { - if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) - && EMPTY != sampler->sampler_elements[i]->is_empty) - count++; - } - return count; -} - - -/** - * Cleans the sampler. - */ - void -RPS_sampler_destroy (struct RPS_Sampler *sampler) -{ - if (NULL != sampler->req_handle_head) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "There are still pending requests. Going to remove them.\n"); - while (NULL != sampler->req_handle_head) - { - RPS_sampler_request_cancel (sampler->req_handle_head); - } - } - sampler_empty (sampler); - GNUNET_free (sampler); -} - /* end of gnunet-service-rps.c */ -- cgit v1.2.3