From bae0066688e7571b4abdebfb914dba6df0578a6b Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Sun, 2 Aug 2015 14:48:28 +0000 Subject: cancellation of request and according test improvements --- src/rps/gnunet-service-rps_sampler.c | 170 +++++++++++++++++++++-------------- 1 file changed, 101 insertions(+), 69 deletions(-) (limited to 'src/rps/gnunet-service-rps_sampler.c') diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 6857c80ba..b65dd7c47 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -73,16 +73,12 @@ struct GetPeerCls * DLL */ struct GetPeerCls *next; - - /** - * DLL - */ struct GetPeerCls *prev; /** - * The sampler this function operates on. + * The #RPS_SamplerRequestHandle this single request belongs to. */ - struct RPS_Sampler *sampler; + struct RPS_SamplerRequestHandle *req_handle; /** * The task for this function. @@ -166,14 +162,10 @@ struct RPS_Sampler RPS_get_peers_type get_peers; /** - * Head for the DLL to store the closures to pending requests. + * Head and tail for the DLL to store the #RPS_SamplerRequestHandle */ - struct GetPeerCls *gpc_head; - - /** - * Tail for the DLL to store the closures to pending requests. - */ - struct GetPeerCls *gpc_tail; + struct RPS_SamplerRequestHandle *req_handle_head; + struct RPS_SamplerRequestHandle *req_handle_tail; #ifdef TO_FILE /** @@ -186,8 +178,14 @@ struct RPS_Sampler /** * Closure to _get_n_rand_peers_ready_cb() */ -struct NRandPeersReadyCls +struct RPS_SamplerRequestHandle { + /** + * DLL + */ + struct RPS_SamplerRequestHandle *next; + struct RPS_SamplerRequestHandle *prev; + /** * Number of peers we are waiting for. */ @@ -203,6 +201,17 @@ struct NRandPeersReadyCls */ struct GNUNET_PeerIdentity *ids; + /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + /** * Callback to be called when all ids are available. */ @@ -251,23 +260,23 @@ static void check_n_peers_ready (void *cls, const struct GNUNET_PeerIdentity *id) { - struct NRandPeersReadyCls *n_peers_cls = cls; + struct RPS_SamplerRequestHandle *req_handle = cls; - n_peers_cls->cur_num_peers++; + req_handle->cur_num_peers++; LOG (GNUNET_ERROR_TYPE_DEBUG, "Got %" PRIX32 ". of %" PRIX32 " peers\n", - n_peers_cls->cur_num_peers, n_peers_cls->num_peers); + req_handle->cur_num_peers, req_handle->num_peers); - if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) + if (req_handle->num_peers == req_handle->cur_num_peers) { /* All peers are ready -- return those to the client */ - GNUNET_assert (NULL != n_peers_cls->callback); + GNUNET_assert (NULL != req_handle->callback); LOG (GNUNET_ERROR_TYPE_DEBUG, "returning %" PRIX32 " peers to the client\n", - n_peers_cls->num_peers); - n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); + req_handle->num_peers); + req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers); - GNUNET_free (n_peers_cls); + RPS_sampler_request_cancel (req_handle); } } @@ -420,12 +429,8 @@ RPS_sampler_init (size_t init_size, sampler->file_name); #endif /* TO_FILE */ - sampler->sampler_size = 0; - sampler->sampler_elements = NULL; sampler->max_round_interval = max_round_interval; sampler->get_peers = sampler_get_rand_peer; - sampler->gpc_head = NULL; - sampler->gpc_tail = NULL; //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); RPS_sampler_resize (sampler, init_size); @@ -530,19 +535,21 @@ sampler_get_rand_peer (void *cls, { struct GetPeerCls *gpc = cls; uint32_t r_index; + struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + sampler = gpc->req_handle->sampler; /**; * Choose the r_index of the peer we want to return * at random from the interval of the gossip list */ r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, - gpc->sampler->sampler_size); + sampler->sampler_size); - if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty) + if (EMPTY == sampler->sampler_elements[r_index]->is_empty) { //LOG (GNUNET_ERROR_TYPE_DEBUG, // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); @@ -552,20 +559,18 @@ sampler_get_rand_peer (void *cls, * Counter? */ gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_SECONDS, 0.1), - &sampler_get_rand_peer, - cls); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), + &sampler_get_rand_peer, + cls); return; } - *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id; - - gpc->cont (gpc->cont_cls, gpc->id); - - GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, - gpc->sampler->gpc_tail, + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, gpc); + *gpc->id = sampler->sampler_elements[r_index]->peer_id; + gpc->cont (gpc->cont_cls, gpc->id); GNUNET_free (gpc); } @@ -584,17 +589,19 @@ sampler_mod_get_rand_peer (void *cls, struct GetPeerCls *gpc = cls; struct RPS_SamplerElement *s_elem; struct GNUNET_TIME_Relative last_request_diff; + struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + sampler = gpc->req_handle->sampler; LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); /* Cycle the #client_get_index one step further */ - client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size; + client_get_index = (client_get_index + 1) % sampler->sampler_size; - s_elem = gpc->sampler->sampler_elements[client_get_index]; + s_elem = sampler->sampler_elements[client_get_index]; *gpc->id = s_elem->peer_id; GNUNET_assert (NULL != s_elem); @@ -603,7 +610,7 @@ sampler_mod_get_rand_peer (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); GNUNET_assert (NULL == gpc->get_peer_task); gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, &sampler_mod_get_rand_peer, cls); return; @@ -617,7 +624,7 @@ sampler_mod_get_rand_peer (void *cls, GNUNET_TIME_absolute_get ()); /* We're not going to give it back now if it was * already requested by a client this round */ - if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us) + if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); @@ -629,7 +636,7 @@ sampler_mod_get_rand_peer (void *cls, /* Schedule it one round later */ GNUNET_assert (NULL == gpc->get_peer_task); gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, &sampler_mod_get_rand_peer, cls); return; @@ -639,8 +646,8 @@ sampler_mod_get_rand_peer (void *cls, s_elem->last_client_request = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, - gpc->sampler->gpc_tail, + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, gpc); gpc->cont (gpc->cont_cls, gpc->id); GNUNET_free (gpc); @@ -661,26 +668,30 @@ sampler_mod_get_rand_peer (void *cls, * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ - void +struct RPS_SamplerRequestHandle * RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, RPS_sampler_n_rand_peers_ready_cb cb, void *cls, uint32_t num_peers) { GNUNET_assert (0 != sampler->sampler_size); if (0 == num_peers) - return; + return NULL; // TODO check if we have too much (distinct) sampled peers uint32_t i; - struct NRandPeersReadyCls *cb_cls; + struct RPS_SamplerRequestHandle *req_handle; struct GetPeerCls *gpc; - cb_cls = GNUNET_new (struct NRandPeersReadyCls); - cb_cls->num_peers = num_peers; - cb_cls->cur_num_peers = 0; - cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); - cb_cls->callback = cb; - cb_cls->cls = cls; + req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); + req_handle->num_peers = num_peers; + req_handle->cur_num_peers = 0; + req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + req_handle->sampler = sampler; + req_handle->callback = cb; + req_handle->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, + sampler->req_handle_tail, + req_handle); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling requests for %" PRIu32 " peers\n", num_peers); @@ -688,18 +699,43 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, for (i = 0 ; i < num_peers ; i++) { gpc = GNUNET_new (struct GetPeerCls); - gpc->sampler = sampler; + gpc->req_handle = req_handle; gpc->cont = check_n_peers_ready; - gpc->cont_cls = cb_cls; - gpc->id = &cb_cls->ids[i]; + gpc->cont_cls = req_handle; + gpc->id = &req_handle->ids[i]; + GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, + req_handle->gpc_tail, + gpc); // maybe add a little delay gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); + } + return req_handle; +} - GNUNET_CONTAINER_DLL_insert (sampler->gpc_head, - sampler->gpc_tail, - gpc); +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) +{ + struct GetPeerCls *i; + + while (NULL != (i = req_handle->gpc_head) ) + { + GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, + req_handle->gpc_tail, + i); + if (NULL != i->get_peer_task) + GNUNET_SCHEDULER_cancel (i->get_peer_task); + GNUNET_free (i); } + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, + req_handle->sampler->req_handle_tail, + req_handle); + GNUNET_free (req_handle); } @@ -735,17 +771,13 @@ RPS_sampler_count_id (struct RPS_Sampler *sampler, void RPS_sampler_destroy (struct RPS_Sampler *sampler) { - struct GetPeerCls *i; - - for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head) + if (NULL != sampler->req_handle_head) { - GNUNET_CONTAINER_DLL_remove (sampler->gpc_head, - sampler->gpc_tail, - i); - GNUNET_SCHEDULER_cancel (i->get_peer_task); - GNUNET_free (i); + LOG (GNUNET_ERROR_TYPE_WARNING, + "There are still pending requests. Going to remove them.\n"); + while (NULL != sampler->req_handle_head) + RPS_sampler_request_cancel (sampler->req_handle_head); } - sampler_empty (sampler); GNUNET_free (sampler); } -- cgit v1.2.3