From 0ce2284452e86e3fb6991e573bed2d37b0be8331 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Thu, 20 Sep 2018 22:34:18 +0200 Subject: Move from timer-based to callback-based updates in sampler --- src/rps/gnunet-service-rps_sampler.c | 164 +++++++++++++++++++++++++++++------ 1 file changed, 137 insertions(+), 27 deletions(-) (limited to 'src/rps/gnunet-service-rps_sampler.c') diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 0de15bbc0..ff4bc9e42 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -63,6 +63,43 @@ typedef void const struct GNUNET_PeerIdentity *id); +/** + * @brief Callback called each time a new peer was put into the sampler + * + * @param cls A possibly given closure + */ +typedef void +(*SamplerNotifyUpdateCB) (void *cls); + +/** + * @brief Context for a callback. Contains callback and closure. + * + * Meant to be an entry in an DLL. + */ +struct SamplerNotifyUpdateCTX +{ + /** + * @brief The Callback to call on updates + */ + SamplerNotifyUpdateCB notify_cb; + + /** + * @brief The according closure. + */ + void *cls; + + /** + * @brief Next element in DLL. + */ + struct SamplerNotifyUpdateCTX *next; + + /** + * @brief Previous element in DLL. + */ + struct SamplerNotifyUpdateCTX *prev; +}; + + /** * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer */ @@ -84,6 +121,11 @@ struct GetPeerCls */ struct GNUNET_SCHEDULER_Task *get_peer_task; + /** + * @brief Context to the given callback. + */ + struct SamplerNotifyUpdateCTX *notify_ctx; + /** * The callback */ @@ -164,6 +206,8 @@ struct RPS_Sampler struct RPS_SamplerRequestHandle *req_handle_head; struct RPS_SamplerRequestHandle *req_handle_tail; + struct SamplerNotifyUpdateCTX *notify_ctx_head; + struct SamplerNotifyUpdateCTX *notify_ctx_tail; #ifdef TO_FILE /** * File name to log to @@ -247,6 +291,52 @@ static size_t max_size; static uint32_t client_get_index; +/** + * @brief Add a callback that will be called when the next peer is inserted + * into the sampler + * + * @param sampler The sampler on which update it will be called + * @param notify_cb The callback + * @param cls Closure given to the callback + * + * @return The context containing callback and closure + */ +struct SamplerNotifyUpdateCTX * +sampler_notify_on_update (struct RPS_Sampler *sampler, + SamplerNotifyUpdateCB notify_cb, + void *cls) +{ + struct SamplerNotifyUpdateCTX *notify_ctx; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Inserting new context for notification\n"); + notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); + notify_ctx->notify_cb = notify_cb; + notify_ctx->cls = cls; + if (NULL != sampler->notify_ctx_head) + { + for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; + NULL != notify_iter->next; + notify_iter = notify_iter->next) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Pre: Context\n"); + } + } + GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, + sampler->notify_ctx_tail, + notify_ctx); + for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; + NULL != notify_iter; + notify_iter = notify_iter->next) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Post: Context\n"); + } + return notify_ctx; +} + + /** * Callback to _get_rand_peer() used by _get_n_rand_peers(). * @@ -469,7 +559,7 @@ RPS_sampler_mod_init (size_t init_size, /** - * A fuction to update every sampler in the given list + * Update every sampler element of this sampler with given peer * * @param sampler the sampler to update. * @param id the PeerID that is put in the sampler @@ -478,17 +568,33 @@ RPS_sampler_mod_init (size_t init_size, RPS_sampler_update (struct RPS_Sampler *sampler, const struct GNUNET_PeerIdentity *id) { - uint32_t i; + struct SamplerNotifyUpdateCTX *tmp_notify_head; + struct SamplerNotifyUpdateCTX *tmp_notify_tail; to_file (sampler->file_name, "Got %s", GNUNET_i2s_full (id)); - for (i = 0; i < sampler->sampler_size; i++) + for (uint32_t i = 0; i < sampler->sampler_size; i++) { RPS_sampler_elem_next (sampler->sampler_elements[i], id); } + tmp_notify_head = sampler->notify_ctx_head; + tmp_notify_tail = sampler->notify_ctx_tail; + sampler->notify_ctx_head = NULL; + sampler->notify_ctx_tail = NULL; + for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head; + NULL != tmp_notify_head; + notify_iter = tmp_notify_head) + { + GNUNET_assert (NULL != notify_iter->notify_cb); + GNUNET_CONTAINER_DLL_remove (tmp_notify_head, + tmp_notify_tail, + notify_iter); + notify_iter->notify_cb (notify_iter->cls); + GNUNET_free (notify_iter); + } } @@ -535,6 +641,7 @@ sampler_get_rand_peer (void *cls) struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; + gpc->notify_ctx = NULL; sampler = gpc->req_handle->sampler; /**; @@ -549,15 +656,10 @@ sampler_get_rand_peer (void *cls) //LOG (GNUNET_ERROR_TYPE_DEBUG, // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); - /* FIXME no active wait - get notified, when new id arrives? - * Might also be a freshly emptied one. Others might still contain ids. - * Counter? - */ - gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), - &sampler_get_rand_peer, - cls); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); return; } @@ -585,6 +687,7 @@ sampler_mod_get_rand_peer (void *cls) struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; + gpc->notify_ctx = NULL; sampler = gpc->req_handle->sampler; LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); @@ -600,11 +703,11 @@ sampler_mod_get_rand_peer (void *cls) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); - GNUNET_assert (NULL == gpc->get_peer_task); - gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, - &sampler_mod_get_rand_peer, - cls); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); return; } @@ -626,11 +729,11 @@ sampler_mod_get_rand_peer (void *cls) // sampler->max_round_interval); // add a little delay /* Schedule it one round later */ - GNUNET_assert (NULL == gpc->get_peer_task); - gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, - &sampler_mod_get_rand_peer, - cls); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); return; } } @@ -638,11 +741,11 @@ sampler_mod_get_rand_peer (void *cls) { LOG (GNUNET_ERROR_TYPE_DEBUG, "This s_elem saw less than two peers -- scheduling for later\n"); - GNUNET_assert (NULL == gpc->get_peer_task); - gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, - &sampler_mod_get_rand_peer, - cls); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); return; } /* More reasons to wait could be added here */ @@ -747,6 +850,13 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) { GNUNET_SCHEDULER_cancel (i->get_peer_task); } + if (NULL != i->notify_ctx) + { + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head, + req_handle->sampler->notify_ctx_tail, + i->notify_ctx); + GNUNET_free (i->notify_ctx); + } GNUNET_free (i); } GNUNET_free (req_handle->ids); -- cgit v1.2.3