From a611b66e1c0ec87c7f3dfb72da1e223379d735f9 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Wed, 26 Sep 2018 17:40:17 +0200 Subject: Split sampler into common, sampler and client part Now there is rps-sampler_client.{c|h} and gnunet-service-rps_sampler.{c|h} to better reflect which parts are really in the implementation of brahms (only the service) and which are used in the client. The parts both samplers share are now in rps-sampler_common.{c|h}. --- src/rps/Makefile.am | 6 +- src/rps/gnunet-service-rps_sampler.c | 648 +---------------------------------- src/rps/gnunet-service-rps_sampler.h | 26 +- src/rps/rps-sampler_client.c | 328 ++++++++++++++++++ src/rps/rps-sampler_client.h | 150 ++++++++ src/rps/rps-sampler_common.c | 527 ++++++++++++++++++++++++++++ src/rps/rps-sampler_common.h | 280 +++++++++++++++ src/rps/rps_api.c | 2 +- 8 files changed, 1294 insertions(+), 673 deletions(-) create mode 100644 src/rps/rps-sampler_client.c create mode 100644 src/rps/rps-sampler_client.h create mode 100644 src/rps/rps-sampler_common.c create mode 100644 src/rps/rps-sampler_common.h (limited to 'src') diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index 5e9fd09fa..e973bb7ca 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am @@ -21,7 +21,7 @@ bin_PROGRAMS = gnunet-rps gnunet_rps_SOURCES = \ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ - gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ + rps-sampler_common.h rps-sampler_common.c \ gnunet-rps.c gnunet_rps_LDADD = \ @@ -32,6 +32,7 @@ gnunet_rps_LDADD = \ lib_LTLIBRARIES = libgnunetrps.la libgnunetrps_la_SOURCES = \ + rps-sampler_client.h rps-sampler_client.c \ rps_api.c rps.h libgnunetrps_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ @@ -52,6 +53,7 @@ endif gnunet_service_rps_SOURCES = \ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ + rps-sampler_common.h rps-sampler_common.c \ gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ gnunet-service-rps_custommap.h gnunet-service-rps_custommap.c \ gnunet-service-rps_view.h gnunet-service-rps_view.c \ @@ -91,6 +93,7 @@ rps_test_src = \ test_rps.c \ rps-test_util.h rps-test_util.c \ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ + rps-sampler_common.h rps-sampler_common.c \ gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c ld_rps_test_lib = \ @@ -148,6 +151,7 @@ test_rps_churn_LDADD = $(ld_rps_test_lib) gnunet_rps_profiler_SOURCES = \ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ + rps-sampler_common.h rps-sampler_common.c \ gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ rps-test_util.h rps-test_util.c \ gnunet-rps-profiler.c diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 2cd4cb996..9629a29a1 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -26,6 +26,7 @@ #include "gnunet_statistics_service.h" #include "rps.h" +#include "rps-sampler_common.h" #include "gnunet-service-rps_sampler.h" #include "gnunet-service-rps_sampler_elem.h" @@ -52,17 +53,6 @@ // TODO care about invalid input of the caller (size 0 or less...) -/** - * Callback that is called from _get_rand_peer() when the PeerID is ready. - * - * @param cls the closure given alongside this function. - * @param id the PeerID that was returned - */ -typedef void -(*RPS_sampler_rand_peer_ready_cont) (void *cls, - const struct GNUNET_PeerIdentity *id); - - /** * @brief Callback called each time a new peer was put into the sampler * @@ -100,49 +90,6 @@ struct SamplerNotifyUpdateCTX }; -/** - * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer - */ -struct GetPeerCls -{ - /** - * DLL - */ - struct GetPeerCls *next; - struct GetPeerCls *prev; - - /** - * The #RPS_SamplerRequestHandle this single request belongs to. - */ - struct RPS_SamplerRequestHandle *req_handle; - - /** - * The task for this function. - */ - struct GNUNET_SCHEDULER_Task *get_peer_task; - - /** - * @brief Context to the given callback. - */ - struct SamplerNotifyUpdateCTX *notify_ctx; - - /** - * The callback - */ - RPS_sampler_rand_peer_ready_cont cont; - - /** - * The closure to the callback @e cont - */ - void *cont_cls; - - /** - * The address of the id to be stored at - */ - struct GNUNET_PeerIdentity *id; -}; - - /** * Type of function used to differentiate between modified and not modified * Sampler. @@ -161,61 +108,6 @@ static void sampler_get_rand_peer (void *cls); -/** - * Get one random peer out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - */ -static void -sampler_mod_get_rand_peer (void *cls); - - -/** - * Sampler with its own array of SamplerElements - */ -struct RPS_Sampler -{ - /** - * Number of sampler elements we hold. - */ - unsigned int sampler_size; - //size_t size; - - /** - * All sampler elements in one array. - */ - struct RPS_SamplerElement **sampler_elements; - - /** - * Maximum time a round takes - * - * Used in the context of RPS - */ - struct GNUNET_TIME_Relative max_round_interval; - - /** - * Stores the function to return peers. Which one it is depends on whether - * the Sampler is the modified one or not. - */ - RPS_get_peers_type get_peers; - - /** - * Head and tail for the DLL to store the #RPS_SamplerRequestHandle - */ - struct RPS_SamplerRequestHandle *req_handle_head; - struct RPS_SamplerRequestHandle *req_handle_tail; - - struct SamplerNotifyUpdateCTX *notify_ctx_head; - struct SamplerNotifyUpdateCTX *notify_ctx_tail; - #ifdef TO_FILE - /** - * File name to log to - */ - char *file_name; - #endif /* TO_FILE */ -}; - /** * Closure to _get_n_rand_peers_ready_cb() */ @@ -291,189 +183,6 @@ static size_t max_size; static uint32_t client_get_index; -/** - * @brief Add a callback that will be called when the next peer is inserted - * into the sampler - * - * @param sampler The sampler on which update it will be called - * @param notify_cb The callback - * @param cls Closure given to the callback - * - * @return The context containing callback and closure - */ -struct SamplerNotifyUpdateCTX * -sampler_notify_on_update (struct RPS_Sampler *sampler, - SamplerNotifyUpdateCB notify_cb, - void *cls) -{ - struct SamplerNotifyUpdateCTX *notify_ctx; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Inserting new context for notification\n"); - notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); - notify_ctx->notify_cb = notify_cb; - notify_ctx->cls = cls; - GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, - sampler->notify_ctx_tail, - notify_ctx); - return notify_ctx; -} - - -/** - * Callback to _get_rand_peer() used by _get_n_rand_peers(). - * - * Checks whether all n peers are available. If they are, - * give those back. - */ -static void -check_n_peers_ready (void *cls, - const struct GNUNET_PeerIdentity *id) -{ - struct RPS_SamplerRequestHandle *req_handle = cls; - (void) id; - - req_handle->cur_num_peers++; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got %" PRIX32 ". of %" PRIX32 " peers\n", - req_handle->cur_num_peers, req_handle->num_peers); - - if (req_handle->num_peers == req_handle->cur_num_peers) - { /* All peers are ready -- return those to the client */ - GNUNET_assert (NULL != req_handle->callback); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "returning %" PRIX32 " peers to the client\n", - req_handle->num_peers); - req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls); - - RPS_sampler_request_cancel (req_handle); - } -} - - -/** - * Get the size of the sampler. - * - * @param sampler the sampler to return the size of. - * @return the size of the sampler - */ -unsigned int -RPS_sampler_get_size (struct RPS_Sampler *sampler) -{ - return sampler->sampler_size; -} - - -/** - * Grow or shrink the size of the sampler. - * - * @param sampler the sampler to resize. - * @param new_size the new size of the sampler - */ -static void -sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) -{ - unsigned int old_size; - uint32_t i; - - // TODO check min and max size - - old_size = sampler->sampler_size; - - if (old_size > new_size) - { /* Shrinking */ - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Shrinking sampler %d -> %d\n", - old_size, - new_size); - - to_file (sampler->file_name, - "Shrinking sampler %d -> %d", - old_size, - new_size); - - for (i = new_size ; i < old_size ; i++) - { - to_file (sampler->file_name, - "-%" PRIu32 ": %s", - i, - sampler->sampler_elements[i]->file_name); - RPS_sampler_elem_destroy (sampler->sampler_elements[i]); - } - - GNUNET_array_grow (sampler->sampler_elements, - sampler->sampler_size, - new_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "sampler->sampler_elements now points to %p\n", - sampler->sampler_elements); - - } - else if (old_size < new_size) - { /* Growing */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Growing sampler %d -> %d\n", - old_size, - new_size); - - to_file (sampler->file_name, - "Growing sampler %d -> %d", - old_size, - new_size); - - GNUNET_array_grow (sampler->sampler_elements, - sampler->sampler_size, - new_size); - - for (i = old_size ; i < new_size ; i++) - { /* Add new sampler elements */ - sampler->sampler_elements[i] = RPS_sampler_elem_create (); - - to_file (sampler->file_name, - "+%" PRIu32 ": %s", - i, - sampler->sampler_elements[i]->file_name); - } - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); - return; - } - - GNUNET_assert (sampler->sampler_size == new_size); -} - - -/** - * Grow or shrink the size of the sampler. - * - * @param sampler the sampler to resize. - * @param new_size the new size of the sampler - */ -void -RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) -{ - GNUNET_assert (0 < new_size); - sampler_resize (sampler, new_size); -} - - -/** - * Empty the sampler. - * - * @param sampler the sampler to empty. - * @param new_size the new size of the sampler - */ -static void -sampler_empty (struct RPS_Sampler *sampler) -{ - sampler_resize (sampler, 0); -} - - /** * Initialise a tuple of sampler elements. * @@ -513,119 +222,6 @@ RPS_sampler_init (size_t init_size, return sampler; } -/** - * Initialise a modified tuple of sampler elements. - * - * @param init_size the size the sampler is initialised with - * @param max_round_interval maximum time a round takes - * @return a handle to a sampler that consists of sampler elements. - */ -struct RPS_Sampler * -RPS_sampler_mod_init (size_t init_size, - struct GNUNET_TIME_Relative max_round_interval) -{ - struct RPS_Sampler *sampler; - - sampler = RPS_sampler_init (init_size, max_round_interval); - sampler->get_peers = sampler_mod_get_rand_peer; - -#ifdef TO_FILE - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Initialised modified sampler %s\n", - sampler->file_name); - to_file (sampler->file_name, - "This is a modified sampler"); -#endif /* TO_FILE */ - - return sampler; -} - - -/** - * @brief Notify about update of the sampler. - * - * Call the callbacks that are waiting for notification on updates to the - * sampler. - * - * @param sampler The sampler the updates are waiting for - */ -static void -notify_update (struct RPS_Sampler *sampler) -{ - struct SamplerNotifyUpdateCTX *tmp_notify_head; - struct SamplerNotifyUpdateCTX *tmp_notify_tail; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Calling callbacks waiting for update notification.\n"); - tmp_notify_head = sampler->notify_ctx_head; - tmp_notify_tail = sampler->notify_ctx_tail; - sampler->notify_ctx_head = NULL; - sampler->notify_ctx_tail = NULL; - for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head; - NULL != tmp_notify_head; - notify_iter = tmp_notify_head) - { - GNUNET_assert (NULL != notify_iter->notify_cb); - GNUNET_CONTAINER_DLL_remove (tmp_notify_head, - tmp_notify_tail, - notify_iter); - notify_iter->notify_cb (notify_iter->cls); - GNUNET_free (notify_iter); - } -} - - -/** - * Update every sampler element of this sampler with given peer - * - * @param sampler the sampler to update. - * @param id the PeerID that is put in the sampler - */ - void -RPS_sampler_update (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - to_file (sampler->file_name, - "Got %s", - GNUNET_i2s_full (id)); - - for (uint32_t i = 0; i < sampler->sampler_size; i++) - { - RPS_sampler_elem_next (sampler->sampler_elements[i], - id); - } - notify_update (sampler); -} - - -/** - * Reinitialise all previously initialised sampler elements with the given value. - * - * Used to get rid of a PeerID. - * - * @param sampler the sampler to reinitialise a sampler element in. - * @param id the id of the sampler elements to update. - */ - void -RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - uint32_t i; - - for (i = 0; i < sampler->sampler_size; i++) - { - if (0 == GNUNET_CRYPTO_cmp_peer_identity(id, - &(sampler->sampler_elements[i]->peer_id)) ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); - to_file (sampler->sampler_elements[i]->file_name, - "--- non-active"); - RPS_sampler_elem_reinit (sampler->sampler_elements[i]); - } - } -} - - /** * Get one random peer out of the sampled peers. * @@ -658,7 +254,7 @@ sampler_get_rand_peer (void *cls) gpc->notify_ctx = sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, + &sampler_get_rand_peer, gpc); return; } @@ -673,244 +269,4 @@ sampler_get_rand_peer (void *cls) } -/** - * Get one random peer out of the sampled peers. - * - * This reinitialises the queried sampler element. - */ -static void -sampler_mod_get_rand_peer (void *cls) -{ - struct GetPeerCls *gpc = cls; - struct RPS_SamplerElement *s_elem; - struct GNUNET_TIME_Relative last_request_diff; - struct RPS_Sampler *sampler; - - gpc->get_peer_task = NULL; - gpc->notify_ctx = NULL; - sampler = gpc->req_handle->sampler; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); - - /* Cycle the #client_get_index one step further */ - client_get_index = (client_get_index + 1) % sampler->sampler_size; - - s_elem = sampler->sampler_elements[client_get_index]; - *gpc->id = s_elem->peer_id; - GNUNET_assert (NULL != s_elem); - - if (EMPTY == s_elem->is_empty) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sampler_mod element empty, rescheduling.\n"); - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - - /* Check whether we may use this sampler to give it back to the client */ - if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) - { - // TODO remove this condition at least for the client sampler - last_request_diff = - GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, - GNUNET_TIME_absolute_get ()); - /* We're not going to give it back now if it was - * already requested by a client this round */ - if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); - ///* How many time remains untile the next round has started? */ - //inv_last_request_diff = - // GNUNET_TIME_absolute_get_difference (last_request_diff, - // sampler->max_round_interval); - // add a little delay - /* Schedule it one round later */ - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - } - if (2 > s_elem->num_peers) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "This s_elem saw less than two peers -- scheduling for later\n"); - GNUNET_assert (NULL == gpc->notify_ctx); - gpc->notify_ctx = - sampler_notify_on_update (sampler, - &sampler_mod_get_rand_peer, - gpc); - return; - } - /* More reasons to wait could be added here */ - -// GNUNET_STATISTICS_set (stats, -// "# client sampler element input", -// s_elem->num_peers, -// GNUNET_NO); -// GNUNET_STATISTICS_set (stats, -// "# client sampler element change", -// s_elem->num_change, -// GNUNET_NO); - - RPS_sampler_elem_reinit (s_elem); - s_elem->last_client_request = GNUNET_TIME_absolute_get (); - - GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, - gpc->req_handle->gpc_tail, - gpc); - gpc->cont (gpc->cont_cls, gpc->id); - GNUNET_free (gpc); -} - - -/** - * Get n random peers out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - * Random with or without consumption? - * - * @param sampler the sampler to get peers from. - * @param cb callback that will be called once the ids are ready. - * @param cls closure given to @a cb - * @param for_client #GNUNET_YES if result is used for client, - * #GNUNET_NO if used internally - * @param num_peers the number of peers requested - */ -struct RPS_SamplerRequestHandle * -RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, - uint32_t num_peers, - RPS_sampler_n_rand_peers_ready_cb cb, - void *cls) -{ - uint32_t i; - struct RPS_SamplerRequestHandle *req_handle; - struct GetPeerCls *gpc; - - GNUNET_assert (0 != sampler->sampler_size); - if (0 == num_peers) - return NULL; - - // TODO check if we have too much (distinct) sampled peers - req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); - req_handle->num_peers = num_peers; - req_handle->cur_num_peers = 0; - req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); - req_handle->sampler = sampler; - req_handle->callback = cb; - req_handle->cls = cls; - GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, - sampler->req_handle_tail, - req_handle); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling requests for %" PRIu32 " peers\n", num_peers); - - for (i = 0; i < num_peers; i++) - { - gpc = GNUNET_new (struct GetPeerCls); - gpc->req_handle = req_handle; - gpc->cont = check_n_peers_ready; - gpc->cont_cls = req_handle; - gpc->id = &req_handle->ids[i]; - - GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, - req_handle->gpc_tail, - gpc); - // maybe add a little delay - gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, - gpc); - } - return req_handle; -} - -/** - * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. - * - * @param req_handle the handle to the request - */ -void -RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) -{ - struct GetPeerCls *i; - - while (NULL != (i = req_handle->gpc_head) ) - { - GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, - req_handle->gpc_tail, - i); - if (NULL != i->get_peer_task) - { - GNUNET_SCHEDULER_cancel (i->get_peer_task); - } - if (NULL != i->notify_ctx) - { - GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head, - req_handle->sampler->notify_ctx_tail, - i->notify_ctx); - GNUNET_free (i->notify_ctx); - } - GNUNET_free (i); - } - GNUNET_free (req_handle->ids); - GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, - req_handle->sampler->req_handle_tail, - req_handle); - GNUNET_free (req_handle); -} - - -/** - * Counts how many Samplers currently hold a given PeerID. - * - * @param sampler the sampler to count ids in. - * @param id the PeerID to count. - * - * @return the number of occurrences of id. - */ - uint32_t -RPS_sampler_count_id (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) -{ - uint32_t count; - uint32_t i; - - count = 0; - for ( i = 0 ; i < sampler->sampler_size ; i++ ) - { - if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) - && EMPTY != sampler->sampler_elements[i]->is_empty) - count++; - } - return count; -} - - -/** - * Cleans the sampler. - */ - void -RPS_sampler_destroy (struct RPS_Sampler *sampler) -{ - if (NULL != sampler->req_handle_head) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "There are still pending requests. Going to remove them.\n"); - while (NULL != sampler->req_handle_head) - { - RPS_sampler_request_cancel (sampler->req_handle_head); - } - } - sampler_empty (sampler); - GNUNET_free (sampler); -} - /* end of gnunet-service-rps.c */ diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index f33aa6eb1..ab4a6bbbb 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h @@ -25,6 +25,7 @@ #ifndef RPS_SAMPLER_H #define RPS_SAMPLER_H #include +#include "rps-sampler_common.h" /** @@ -38,19 +39,6 @@ struct RPS_Sampler; struct RPS_SamplerRequestHandle; -/** - * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. - * - * @param cls the closure given alongside this function. - * @param ids the PeerIDs that were returned - * to be freed - */ - typedef void -(*RPS_sampler_n_rand_peers_ready_cb) (const struct GNUNET_PeerIdentity *ids, - uint32_t num_peers, - void *cls); - - /** * Get the size of the sampler. * @@ -83,18 +71,6 @@ RPS_sampler_init (size_t init_size, struct GNUNET_TIME_Relative max_round_interval); -/** - * Initialise a modified tuple of sampler elements. - * - * @param init_size the size the sampler is initialised with - * @param max_round_interval maximum time a round takes - * @return a handle to a sampler that consists of sampler elements. - */ -struct RPS_Sampler * -RPS_sampler_mod_init (size_t init_size, - struct GNUNET_TIME_Relative max_round_interval); - - /** * Update every sampler element of this sampler with given peer * diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c new file mode 100644 index 000000000..da832a323 --- /dev/null +++ b/src/rps/rps-sampler_client.c @@ -0,0 +1,328 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +/** + * @file rps/gnunet-service-rps_sampler.c + * @brief sampler implementation + * @author Julius Bünger + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "rps.h" + +#include "rps-sampler_common.h" +#include "gnunet-service-rps_sampler.h" +#include "gnunet-service-rps_sampler_elem.h" + +#include +#include + +#include "rps-test_util.h" + +#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__) + + +// multiple 'clients'? + +// TODO check for overflows + +// TODO align message structs + +// hist_size_init, hist_size_max + +/*********************************************************************** + * WARNING: This section needs to be reviewed regarding the use of + * functions providing (pseudo)randomness! +***********************************************************************/ + +// TODO care about invalid input of the caller (size 0 or less...) + +/** + * @brief Callback called each time a new peer was put into the sampler + * + * @param cls A possibly given closure + */ +typedef void +(*SamplerNotifyUpdateCB) (void *cls); + +/** + * @brief Context for a callback. Contains callback and closure. + * + * Meant to be an entry in an DLL. + */ +struct SamplerNotifyUpdateCTX +{ + /** + * @brief The Callback to call on updates + */ + SamplerNotifyUpdateCB notify_cb; + + /** + * @brief The according closure. + */ + void *cls; + + /** + * @brief Next element in DLL. + */ + struct SamplerNotifyUpdateCTX *next; + + /** + * @brief Previous element in DLL. + */ + struct SamplerNotifyUpdateCTX *prev; +}; + + +/** + * Type of function used to differentiate between modified and not modified + * Sampler. + */ +typedef void +(*RPS_get_peers_type) (void *cls); + + +/** + * Get one random peer out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + */ +static void +sampler_mod_get_rand_peer (void *cls); + + +/** + * Closure to _get_n_rand_peers_ready_cb() + */ +struct RPS_SamplerRequestHandle +{ + /** + * DLL + */ + struct RPS_SamplerRequestHandle *next; + struct RPS_SamplerRequestHandle *prev; + + /** + * Number of peers we are waiting for. + */ + uint32_t num_peers; + + /** + * Number of peers we currently have. + */ + uint32_t cur_num_peers; + + /** + * Pointer to the array holding the ids. + */ + struct GNUNET_PeerIdentity *ids; + + /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + + /** + * Callback to be called when all ids are available. + */ + RPS_sampler_n_rand_peers_ready_cb callback; + + /** + * Closure given to the callback + */ + void *cls; +}; + +///** +// * Global sampler variable. +// */ +//struct RPS_Sampler *sampler; + + +/** + * The minimal size for the extended sampler elements. + */ +static size_t min_size; + +/** + * The maximal size the extended sampler elements should grow to. + */ +static size_t max_size; + +/** + * The size the extended sampler elements currently have. + */ +//static size_t extra_size; + +/** + * Inedex to the sampler element that is the next to be returned + */ +static uint32_t client_get_index; + + +/** + * Initialise a modified tuple of sampler elements. + * + * @param init_size the size the sampler is initialised with + * @param max_round_interval maximum time a round takes + * @return a handle to a sampler that consists of sampler elements. + */ +struct RPS_Sampler * +RPS_sampler_mod_init (size_t init_size, + struct GNUNET_TIME_Relative max_round_interval) +{ + struct RPS_Sampler *sampler; + + /* Initialise context around extended sampler */ + min_size = 10; // TODO make input to _samplers_init() + max_size = 1000; // TODO make input to _samplers_init() + + sampler = GNUNET_new (struct RPS_Sampler); + + sampler->max_round_interval = max_round_interval; + sampler->get_peers = sampler_mod_get_rand_peer; + //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); + //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); + RPS_sampler_resize (sampler, init_size); + + client_get_index = 0; + + //GNUNET_assert (init_size == sampler->sampler_size); + +#ifdef TO_FILE + sampler->file_name = create_file ("sampler-"); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initialised modified sampler %s\n", + sampler->file_name); + to_file (sampler->file_name, + "This is a modified sampler"); +#endif /* TO_FILE */ + + return sampler; +} + + +/** + * Get one random peer out of the sampled peers. + * + * This reinitialises the queried sampler element. + */ +static void +sampler_mod_get_rand_peer (void *cls) +{ + struct GetPeerCls *gpc = cls; + struct RPS_SamplerElement *s_elem; + struct GNUNET_TIME_Relative last_request_diff; + struct RPS_Sampler *sampler; + + gpc->get_peer_task = NULL; + gpc->notify_ctx = NULL; + sampler = gpc->req_handle->sampler; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); + + /* Cycle the #client_get_index one step further */ + client_get_index = (client_get_index + 1) % sampler->sampler_size; + + s_elem = sampler->sampler_elements[client_get_index]; + *gpc->id = s_elem->peer_id; + GNUNET_assert (NULL != s_elem); + + if (EMPTY == s_elem->is_empty) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sampler_mod element empty, rescheduling.\n"); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); + return; + } + + /* Check whether we may use this sampler to give it back to the client */ + if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) + { + // TODO remove this condition at least for the client sampler + last_request_diff = + GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, + GNUNET_TIME_absolute_get ()); + /* We're not going to give it back now if it was + * already requested by a client this round */ + if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); + ///* How many time remains untile the next round has started? */ + //inv_last_request_diff = + // GNUNET_TIME_absolute_get_difference (last_request_diff, + // sampler->max_round_interval); + // add a little delay + /* Schedule it one round later */ + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); + return; + } + } + if (2 > s_elem->num_peers) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "This s_elem saw less than two peers -- scheduling for later\n"); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); + return; + } + /* More reasons to wait could be added here */ + +// GNUNET_STATISTICS_set (stats, +// "# client sampler element input", +// s_elem->num_peers, +// GNUNET_NO); +// GNUNET_STATISTICS_set (stats, +// "# client sampler element change", +// s_elem->num_change, +// GNUNET_NO); + + RPS_sampler_elem_reinit (s_elem); + s_elem->last_client_request = GNUNET_TIME_absolute_get (); + + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, + gpc); + gpc->cont (gpc->cont_cls, gpc->id); + GNUNET_free (gpc); +} + + +/* end of gnunet-service-rps.c */ + diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h new file mode 100644 index 000000000..fd0538efa --- /dev/null +++ b/src/rps/rps-sampler_client.h @@ -0,0 +1,150 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +/** + * @file rps/rps-sampler_client.h + * @brief client sampler implementation + * @author Julius Bünger + */ + +#ifndef RPS_SAMPLER_CLIENT_H +#define RPS_SAMPLER_CLIENT_H +#include +#include "rps-sampler_common.h" + + +/** + * A sampler sampling a stream of PeerIDs. + */ +struct RPS_Sampler; + +/** + * A handle to cancel a request. + */ +struct RPS_SamplerRequestHandle; + + +/** + * Get the size of the sampler. + * + * @param sampler the sampler to return the size of. + * @return the size of the sampler + */ +unsigned int +RPS_sampler_get_size (struct RPS_Sampler *sampler); + + +/** + * Grow or shrink the size of the sampler. + * + * @param sampler the sampler to resize. + * @param new_size the new size of the sampler (not 0) + */ +void +RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size); + + +/** + * Initialise a modified tuple of sampler elements. + * + * @param init_size the size the sampler is initialised with + * @param max_round_interval maximum time a round takes + * @return a handle to a sampler that consists of sampler elements. + */ +struct RPS_Sampler * +RPS_sampler_mod_init (size_t init_size, + struct GNUNET_TIME_Relative max_round_interval); + + +/** + * Update every sampler element of this sampler with given peer + * + * @param sampler the sampler to update. + * @param id the PeerID that is put in the sampler + */ + void +RPS_sampler_update (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Reinitialise all previously initialised sampler elements with the given + * value. + * + * Used to get rid of a PeerID. + * + * @param sampler the sampler to reinitialise a sampler in. + * @param id the id of the samplers to update. + */ + void +RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Get n random peers out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + * Random with or without consumption? + * + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally + * @param num_peers the number of peers requested + */ +struct RPS_SamplerRequestHandle * +RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, + uint32_t num_peers, + RPS_sampler_n_rand_peers_ready_cb cb, + void *cls); + +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle); + + +/** + * Counts how many Samplers currently hold a given PeerID. + * + * @param sampler the sampler to cound ids in. + * @param id the PeerID to count. + * + * @return the number of occurrences of id. + */ + uint32_t +RPS_sampler_count_id (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Cleans the samplers. + * + * @param sampler the sampler to destroy. + */ + void +RPS_sampler_destroy (struct RPS_Sampler *sampler); + +#endif /* RPS_SAMPLER_CLIENT_H */ +/* end of gnunet-service-rps.c */ diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c new file mode 100644 index 000000000..d004c06a5 --- /dev/null +++ b/src/rps/rps-sampler_common.c @@ -0,0 +1,527 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +/** + * @file rps/rps-sampler_common.c + * @brief Code common to client and service sampler + * @author Julius Bünger + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" + +#include "rps-sampler_common.h" +#include "gnunet-service-rps_sampler_elem.h" + +#include +#include + +#include "rps-test_util.h" + +#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler_common",__VA_ARGS__) + +/** + * @brief Context for a callback. Contains callback and closure. + * + * Meant to be an entry in an DLL. + */ +struct SamplerNotifyUpdateCTX +{ + /** + * @brief The Callback to call on updates + */ + SamplerNotifyUpdateCB notify_cb; + + /** + * @brief The according closure. + */ + void *cls; + + /** + * @brief Next element in DLL. + */ + struct SamplerNotifyUpdateCTX *next; + + /** + * @brief Previous element in DLL. + */ + struct SamplerNotifyUpdateCTX *prev; +}; + + +/** + * Closure to _get_n_rand_peers_ready_cb() + */ +struct RPS_SamplerRequestHandle +{ + /** + * DLL + */ + struct RPS_SamplerRequestHandle *next; + struct RPS_SamplerRequestHandle *prev; + + /** + * Number of peers we are waiting for. + */ + uint32_t num_peers; + + /** + * Number of peers we currently have. + */ + uint32_t cur_num_peers; + + /** + * Pointer to the array holding the ids. + */ + struct GNUNET_PeerIdentity *ids; + + /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + + /** + * Callback to be called when all ids are available. + */ + RPS_sampler_n_rand_peers_ready_cb callback; + + /** + * Closure given to the callback + */ + void *cls; +}; + + +/** + * @brief Add a callback that will be called when the next peer is inserted + * into the sampler + * + * @param sampler The sampler on which update it will be called + * @param notify_cb The callback + * @param cls Closure given to the callback + * + * @return The context containing callback and closure + */ +struct SamplerNotifyUpdateCTX * +sampler_notify_on_update (struct RPS_Sampler *sampler, + SamplerNotifyUpdateCB notify_cb, + void *cls) +{ + struct SamplerNotifyUpdateCTX *notify_ctx; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Inserting new context for notification\n"); + notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); + notify_ctx->notify_cb = notify_cb; + notify_ctx->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, + sampler->notify_ctx_tail, + notify_ctx); + return notify_ctx; +} + + +/** + * Get the size of the sampler. + * + * @param sampler the sampler to return the size of. + * @return the size of the sampler + */ +unsigned int +RPS_sampler_get_size (struct RPS_Sampler *sampler) +{ + return sampler->sampler_size; +} + + +/** + * @brief Notify about update of the sampler. + * + * Call the callbacks that are waiting for notification on updates to the + * sampler. + * + * @param sampler The sampler the updates are waiting for + */ +static void +notify_update (struct RPS_Sampler *sampler) +{ + struct SamplerNotifyUpdateCTX *tmp_notify_head; + struct SamplerNotifyUpdateCTX *tmp_notify_tail; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling callbacks waiting for update notification.\n"); + tmp_notify_head = sampler->notify_ctx_head; + tmp_notify_tail = sampler->notify_ctx_tail; + sampler->notify_ctx_head = NULL; + sampler->notify_ctx_tail = NULL; + for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head; + NULL != tmp_notify_head; + notify_iter = tmp_notify_head) + { + GNUNET_assert (NULL != notify_iter->notify_cb); + GNUNET_CONTAINER_DLL_remove (tmp_notify_head, + tmp_notify_tail, + notify_iter); + notify_iter->notify_cb (notify_iter->cls); + GNUNET_free (notify_iter); + } +} + + +/** + * Update every sampler element of this sampler with given peer + * + * @param sampler the sampler to update. + * @param id the PeerID that is put in the sampler + */ + void +RPS_sampler_update (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) +{ + to_file (sampler->file_name, + "Got %s", + GNUNET_i2s_full (id)); + + for (uint32_t i = 0; i < sampler->sampler_size; i++) + { + RPS_sampler_elem_next (sampler->sampler_elements[i], + id); + } + notify_update (sampler); +} + + +/** + * Reinitialise all previously initialised sampler elements with the given value. + * + * Used to get rid of a PeerID. + * + * @param sampler the sampler to reinitialise a sampler element in. + * @param id the id of the sampler elements to update. + */ + void +RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) +{ + uint32_t i; + + for (i = 0; i < sampler->sampler_size; i++) + { + if (0 == GNUNET_CRYPTO_cmp_peer_identity(id, + &(sampler->sampler_elements[i]->peer_id)) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); + to_file (sampler->sampler_elements[i]->file_name, + "--- non-active"); + RPS_sampler_elem_reinit (sampler->sampler_elements[i]); + } + } +} + + +/** + * Counts how many Samplers currently hold a given PeerID. + * + * @param sampler the sampler to count ids in. + * @param id the PeerID to count. + * + * @return the number of occurrences of id. + */ + uint32_t +RPS_sampler_count_id (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) +{ + uint32_t count; + uint32_t i; + + count = 0; + for ( i = 0 ; i < sampler->sampler_size ; i++ ) + { + if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) + && EMPTY != sampler->sampler_elements[i]->is_empty) + count++; + } + return count; +} + + +/** + * Grow or shrink the size of the sampler. + * + * @param sampler the sampler to resize. + * @param new_size the new size of the sampler + */ +static void +sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) +{ + unsigned int old_size; + uint32_t i; + + // TODO check min and max size + + old_size = sampler->sampler_size; + + if (old_size > new_size) + { /* Shrinking */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Shrinking sampler %d -> %d\n", + old_size, + new_size); + + to_file (sampler->file_name, + "Shrinking sampler %d -> %d", + old_size, + new_size); + + for (i = new_size ; i < old_size ; i++) + { + to_file (sampler->file_name, + "-%" PRIu32 ": %s", + i, + sampler->sampler_elements[i]->file_name); + RPS_sampler_elem_destroy (sampler->sampler_elements[i]); + } + + GNUNET_array_grow (sampler->sampler_elements, + sampler->sampler_size, + new_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sampler->sampler_elements now points to %p\n", + sampler->sampler_elements); + + } + else if (old_size < new_size) + { /* Growing */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Growing sampler %d -> %d\n", + old_size, + new_size); + + to_file (sampler->file_name, + "Growing sampler %d -> %d", + old_size, + new_size); + + GNUNET_array_grow (sampler->sampler_elements, + sampler->sampler_size, + new_size); + + for (i = old_size ; i < new_size ; i++) + { /* Add new sampler elements */ + sampler->sampler_elements[i] = RPS_sampler_elem_create (); + + to_file (sampler->file_name, + "+%" PRIu32 ": %s", + i, + sampler->sampler_elements[i]->file_name); + } + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); + return; + } + + GNUNET_assert (sampler->sampler_size == new_size); +} + + +/** + * Grow or shrink the size of the sampler. + * + * @param sampler the sampler to resize. + * @param new_size the new size of the sampler + */ +void +RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) +{ + GNUNET_assert (0 < new_size); + sampler_resize (sampler, new_size); +} + + +/** + * Empty the sampler. + * + * @param sampler the sampler to empty. + * @param new_size the new size of the sampler + */ +static void +sampler_empty (struct RPS_Sampler *sampler) +{ + sampler_resize (sampler, 0); +} + + +/** + * Callback to _get_rand_peer() used by _get_n_rand_peers(). + * + * Checks whether all n peers are available. If they are, + * give those back. + */ +static void +check_n_peers_ready (void *cls, + const struct GNUNET_PeerIdentity *id) +{ + struct RPS_SamplerRequestHandle *req_handle = cls; + (void) id; + + req_handle->cur_num_peers++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got %" PRIX32 ". of %" PRIX32 " peers\n", + req_handle->cur_num_peers, req_handle->num_peers); + + if (req_handle->num_peers == req_handle->cur_num_peers) + { /* All peers are ready -- return those to the client */ + GNUNET_assert (NULL != req_handle->callback); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "returning %" PRIX32 " peers to the client\n", + req_handle->num_peers); + req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls); + + RPS_sampler_request_cancel (req_handle); + } +} + + +/** + * Get n random peers out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + * Random with or without consumption? + * + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally + * @param num_peers the number of peers requested + */ +struct RPS_SamplerRequestHandle * +RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, + uint32_t num_peers, + RPS_sampler_n_rand_peers_ready_cb cb, + void *cls) +{ + uint32_t i; + struct RPS_SamplerRequestHandle *req_handle; + struct GetPeerCls *gpc; + + GNUNET_assert (0 != sampler->sampler_size); + if (0 == num_peers) + return NULL; + + // TODO check if we have too much (distinct) sampled peers + req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); + req_handle->num_peers = num_peers; + req_handle->cur_num_peers = 0; + req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + req_handle->sampler = sampler; + req_handle->callback = cb; + req_handle->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, + sampler->req_handle_tail, + req_handle); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling requests for %" PRIu32 " peers\n", num_peers); + + for (i = 0; i < num_peers; i++) + { + gpc = GNUNET_new (struct GetPeerCls); + gpc->req_handle = req_handle; + gpc->cont = check_n_peers_ready; + gpc->cont_cls = req_handle; + gpc->id = &req_handle->ids[i]; + + GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, + req_handle->gpc_tail, + gpc); + // maybe add a little delay + gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, + gpc); + } + return req_handle; +} + +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) +{ + struct GetPeerCls *i; + + while (NULL != (i = req_handle->gpc_head) ) + { + GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, + req_handle->gpc_tail, + i); + if (NULL != i->get_peer_task) + { + GNUNET_SCHEDULER_cancel (i->get_peer_task); + } + if (NULL != i->notify_ctx) + { + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head, + req_handle->sampler->notify_ctx_tail, + i->notify_ctx); + GNUNET_free (i->notify_ctx); + } + GNUNET_free (i); + } + GNUNET_free (req_handle->ids); + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, + req_handle->sampler->req_handle_tail, + req_handle); + GNUNET_free (req_handle); +} + + +/** + * Cleans the sampler. + */ + void +RPS_sampler_destroy (struct RPS_Sampler *sampler) +{ + if (NULL != sampler->req_handle_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "There are still pending requests. Going to remove them.\n"); + while (NULL != sampler->req_handle_head) + { + RPS_sampler_request_cancel (sampler->req_handle_head); + } + } + sampler_empty (sampler); + GNUNET_free (sampler); +} + + +/* end of rps-sampler_common.c */ diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h new file mode 100644 index 000000000..68f5865a9 --- /dev/null +++ b/src/rps/rps-sampler_common.h @@ -0,0 +1,280 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +/** + * @file rps/rps-sampler_common.h + * @brief Code common to client and service sampler + * @author Julius Bünger + */ + +#ifndef RPS_SAMPLER_COMMON_H +#define RPS_SAMPLER_COMMON_H + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" + +#include "gnunet-service-rps_sampler_elem.h" + +#include +#include + +#include "rps-test_util.h" + + +/** + * Callback that is called from _get_rand_peer() when the PeerID is ready. + * + * @param cls the closure given alongside this function. + * @param id the PeerID that was returned + */ +typedef void +(*RPS_sampler_rand_peer_ready_cont) (void *cls, + const struct GNUNET_PeerIdentity *id); + + +/** + * Type of function used to differentiate between modified and not modified + * Sampler. + */ +typedef void +(*RPS_get_peers_type) (void *cls); + + +/** + * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. + * + * @param cls the closure given alongside this function. + * @param ids the PeerIDs that were returned + * to be freed + */ + typedef void +(*RPS_sampler_n_rand_peers_ready_cb) (const struct GNUNET_PeerIdentity *ids, + uint32_t num_peers, + void *cls); + + +/** + * @brief Callback called each time a new peer was put into the sampler + * + * @param cls A possibly given closure + */ +typedef void +(*SamplerNotifyUpdateCB) (void *cls); + + +/** + * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer + */ +struct GetPeerCls +{ + /** + * DLL + */ + struct GetPeerCls *next; + struct GetPeerCls *prev; + + /** + * The #RPS_SamplerRequestHandle this single request belongs to. + */ + struct RPS_SamplerRequestHandle *req_handle; + + /** + * The task for this function. + */ + struct GNUNET_SCHEDULER_Task *get_peer_task; + + /** + * @brief Context to the given callback. + */ + struct SamplerNotifyUpdateCTX *notify_ctx; + + /** + * The callback + */ + RPS_sampler_rand_peer_ready_cont cont; + + /** + * The closure to the callback @e cont + */ + void *cont_cls; + + /** + * The address of the id to be stored at + */ + struct GNUNET_PeerIdentity *id; +}; + + +/** + * Sampler with its own array of SamplerElements + */ +struct RPS_Sampler +{ + /** + * Number of sampler elements we hold. + */ + unsigned int sampler_size; + //size_t size; + + /** + * All sampler elements in one array. + */ + struct RPS_SamplerElement **sampler_elements; + + /** + * Maximum time a round takes + * + * Used in the context of RPS + */ + struct GNUNET_TIME_Relative max_round_interval; + + /** + * Stores the function to return peers. Which one it is depends on whether + * the Sampler is the modified one or not. + */ + RPS_get_peers_type get_peers; + + /** + * Head and tail for the DLL to store the #RPS_SamplerRequestHandle + */ + struct RPS_SamplerRequestHandle *req_handle_head; + struct RPS_SamplerRequestHandle *req_handle_tail; + + struct SamplerNotifyUpdateCTX *notify_ctx_head; + struct SamplerNotifyUpdateCTX *notify_ctx_tail; + #ifdef TO_FILE + /** + * File name to log to + */ + char *file_name; + #endif /* TO_FILE */ +}; + + +/** + * @brief Add a callback that will be called when the next peer is inserted + * into the sampler + * + * @param sampler The sampler on which update it will be called + * @param notify_cb The callback + * @param cls Closure given to the callback + * + * @return The context containing callback and closure + */ +struct SamplerNotifyUpdateCTX * +sampler_notify_on_update (struct RPS_Sampler *sampler, + SamplerNotifyUpdateCB notify_cb, + void *cls); + + +/** + * Update every sampler element of this sampler with given peer + * + * @param sampler the sampler to update. + * @param id the PeerID that is put in the sampler + */ + void +RPS_sampler_update (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Reinitialise all previously initialised sampler elements with the given value. + * + * Used to get rid of a PeerID. + * + * @param sampler the sampler to reinitialise a sampler element in. + * @param id the id of the sampler elements to update. + */ + void +RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Get the size of the sampler. + * + * @param sampler the sampler to return the size of. + * @return the size of the sampler + */ +unsigned int +RPS_sampler_get_size (struct RPS_Sampler *sampler); + + +/** + * Grow or shrink the size of the sampler. + * + * @param sampler the sampler to resize. + * @param new_size the new size of the sampler + */ +void +RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size); + + +/** + * Get n random peers out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + * Random with or without consumption? + * + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally + * @param num_peers the number of peers requested + */ +struct RPS_SamplerRequestHandle * +RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, + uint32_t num_peers, + RPS_sampler_n_rand_peers_ready_cb cb, + void *cls); + + +/** + * Counts how many Samplers currently hold a given PeerID. + * + * @param sampler the sampler to count ids in. + * @param id the PeerID to count. + * + * @return the number of occurrences of id. + */ + uint32_t +RPS_sampler_count_id (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id); + + +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle); + + +/** + * Cleans the sampler. + */ + void +RPS_sampler_destroy (struct RPS_Sampler *sampler); + +#endif /* RPS_SAMPLER_COMMON_H */ +/* end of rps-sampler_common.h */ diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index ee65c2a82..6e124644d 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -25,7 +25,7 @@ #include "gnunet_util_lib.h" #include "rps.h" #include "gnunet_rps_service.h" -#include "gnunet-service-rps_sampler.h" +#include "rps-sampler_client.h" #include -- cgit v1.2.3