diff options
author | Julius Bünger <buenger@mytum.de> | 2015-08-02 14:48:28 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-08-02 14:48:28 +0000 |
commit | bae0066688e7571b4abdebfb914dba6df0578a6b (patch) | |
tree | 4ee2b2171170d169ff8cd55cfe5ad9adf640d5d2 /src/rps/gnunet-service-rps_sampler.c | |
parent | 847e9575ed85eadb979bc416afec7cf898cf00d1 (diff) | |
download | gnunet-bae0066688e7571b4abdebfb914dba6df0578a6b.tar.gz gnunet-bae0066688e7571b4abdebfb914dba6df0578a6b.zip |
cancellation of request and according test improvements
Diffstat (limited to 'src/rps/gnunet-service-rps_sampler.c')
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 170 |
1 files changed, 101 insertions, 69 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 6857c80ba..b65dd7c47 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -73,16 +73,12 @@ struct GetPeerCls | |||
73 | * DLL | 73 | * DLL |
74 | */ | 74 | */ |
75 | struct GetPeerCls *next; | 75 | struct GetPeerCls *next; |
76 | |||
77 | /** | ||
78 | * DLL | ||
79 | */ | ||
80 | struct GetPeerCls *prev; | 76 | struct GetPeerCls *prev; |
81 | 77 | ||
82 | /** | 78 | /** |
83 | * The sampler this function operates on. | 79 | * The #RPS_SamplerRequestHandle this single request belongs to. |
84 | */ | 80 | */ |
85 | struct RPS_Sampler *sampler; | 81 | struct RPS_SamplerRequestHandle *req_handle; |
86 | 82 | ||
87 | /** | 83 | /** |
88 | * The task for this function. | 84 | * The task for this function. |
@@ -166,14 +162,10 @@ struct RPS_Sampler | |||
166 | RPS_get_peers_type get_peers; | 162 | RPS_get_peers_type get_peers; |
167 | 163 | ||
168 | /** | 164 | /** |
169 | * Head for the DLL to store the closures to pending requests. | 165 | * Head and tail for the DLL to store the #RPS_SamplerRequestHandle |
170 | */ | 166 | */ |
171 | struct GetPeerCls *gpc_head; | 167 | struct RPS_SamplerRequestHandle *req_handle_head; |
172 | 168 | struct RPS_SamplerRequestHandle *req_handle_tail; | |
173 | /** | ||
174 | * Tail for the DLL to store the closures to pending requests. | ||
175 | */ | ||
176 | struct GetPeerCls *gpc_tail; | ||
177 | 169 | ||
178 | #ifdef TO_FILE | 170 | #ifdef TO_FILE |
179 | /** | 171 | /** |
@@ -186,9 +178,15 @@ struct RPS_Sampler | |||
186 | /** | 178 | /** |
187 | * Closure to _get_n_rand_peers_ready_cb() | 179 | * Closure to _get_n_rand_peers_ready_cb() |
188 | */ | 180 | */ |
189 | struct NRandPeersReadyCls | 181 | struct RPS_SamplerRequestHandle |
190 | { | 182 | { |
191 | /** | 183 | /** |
184 | * DLL | ||
185 | */ | ||
186 | struct RPS_SamplerRequestHandle *next; | ||
187 | struct RPS_SamplerRequestHandle *prev; | ||
188 | |||
189 | /** | ||
192 | * Number of peers we are waiting for. | 190 | * Number of peers we are waiting for. |
193 | */ | 191 | */ |
194 | uint32_t num_peers; | 192 | uint32_t num_peers; |
@@ -204,6 +202,17 @@ struct NRandPeersReadyCls | |||
204 | struct GNUNET_PeerIdentity *ids; | 202 | struct GNUNET_PeerIdentity *ids; |
205 | 203 | ||
206 | /** | 204 | /** |
205 | * Head and tail for the DLL to store the tasks for single requests | ||
206 | */ | ||
207 | struct GetPeerCls *gpc_head; | ||
208 | struct GetPeerCls *gpc_tail; | ||
209 | |||
210 | /** | ||
211 | * Sampler. | ||
212 | */ | ||
213 | struct RPS_Sampler *sampler; | ||
214 | |||
215 | /** | ||
207 | * Callback to be called when all ids are available. | 216 | * Callback to be called when all ids are available. |
208 | */ | 217 | */ |
209 | RPS_sampler_n_rand_peers_ready_cb callback; | 218 | RPS_sampler_n_rand_peers_ready_cb callback; |
@@ -251,23 +260,23 @@ static void | |||
251 | check_n_peers_ready (void *cls, | 260 | check_n_peers_ready (void *cls, |
252 | const struct GNUNET_PeerIdentity *id) | 261 | const struct GNUNET_PeerIdentity *id) |
253 | { | 262 | { |
254 | struct NRandPeersReadyCls *n_peers_cls = cls; | 263 | struct RPS_SamplerRequestHandle *req_handle = cls; |
255 | 264 | ||
256 | n_peers_cls->cur_num_peers++; | 265 | req_handle->cur_num_peers++; |
257 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 266 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
258 | "Got %" PRIX32 ". of %" PRIX32 " peers\n", | 267 | "Got %" PRIX32 ". of %" PRIX32 " peers\n", |
259 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); | 268 | req_handle->cur_num_peers, req_handle->num_peers); |
260 | 269 | ||
261 | if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) | 270 | if (req_handle->num_peers == req_handle->cur_num_peers) |
262 | { /* All peers are ready -- return those to the client */ | 271 | { /* All peers are ready -- return those to the client */ |
263 | GNUNET_assert (NULL != n_peers_cls->callback); | 272 | GNUNET_assert (NULL != req_handle->callback); |
264 | 273 | ||
265 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 274 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
266 | "returning %" PRIX32 " peers to the client\n", | 275 | "returning %" PRIX32 " peers to the client\n", |
267 | n_peers_cls->num_peers); | 276 | req_handle->num_peers); |
268 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); | 277 | req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers); |
269 | 278 | ||
270 | GNUNET_free (n_peers_cls); | 279 | RPS_sampler_request_cancel (req_handle); |
271 | } | 280 | } |
272 | } | 281 | } |
273 | 282 | ||
@@ -420,12 +429,8 @@ RPS_sampler_init (size_t init_size, | |||
420 | sampler->file_name); | 429 | sampler->file_name); |
421 | #endif /* TO_FILE */ | 430 | #endif /* TO_FILE */ |
422 | 431 | ||
423 | sampler->sampler_size = 0; | ||
424 | sampler->sampler_elements = NULL; | ||
425 | sampler->max_round_interval = max_round_interval; | 432 | sampler->max_round_interval = max_round_interval; |
426 | sampler->get_peers = sampler_get_rand_peer; | 433 | sampler->get_peers = sampler_get_rand_peer; |
427 | sampler->gpc_head = NULL; | ||
428 | sampler->gpc_tail = NULL; | ||
429 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); | 434 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); |
430 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); | 435 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); |
431 | RPS_sampler_resize (sampler, init_size); | 436 | RPS_sampler_resize (sampler, init_size); |
@@ -530,19 +535,21 @@ sampler_get_rand_peer (void *cls, | |||
530 | { | 535 | { |
531 | struct GetPeerCls *gpc = cls; | 536 | struct GetPeerCls *gpc = cls; |
532 | uint32_t r_index; | 537 | uint32_t r_index; |
538 | struct RPS_Sampler *sampler; | ||
533 | 539 | ||
534 | gpc->get_peer_task = NULL; | 540 | gpc->get_peer_task = NULL; |
535 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 541 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
536 | return; | 542 | return; |
543 | sampler = gpc->req_handle->sampler; | ||
537 | 544 | ||
538 | /**; | 545 | /**; |
539 | * Choose the r_index of the peer we want to return | 546 | * Choose the r_index of the peer we want to return |
540 | * at random from the interval of the gossip list | 547 | * at random from the interval of the gossip list |
541 | */ | 548 | */ |
542 | r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, | 549 | r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, |
543 | gpc->sampler->sampler_size); | 550 | sampler->sampler_size); |
544 | 551 | ||
545 | if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty) | 552 | if (EMPTY == sampler->sampler_elements[r_index]->is_empty) |
546 | { | 553 | { |
547 | //LOG (GNUNET_ERROR_TYPE_DEBUG, | 554 | //LOG (GNUNET_ERROR_TYPE_DEBUG, |
548 | // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); | 555 | // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); |
@@ -552,20 +559,18 @@ sampler_get_rand_peer (void *cls, | |||
552 | * Counter? | 559 | * Counter? |
553 | */ | 560 | */ |
554 | gpc->get_peer_task = | 561 | gpc->get_peer_task = |
555 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( | 562 | GNUNET_SCHEDULER_add_delayed ( |
556 | GNUNET_TIME_UNIT_SECONDS, 0.1), | 563 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), |
557 | &sampler_get_rand_peer, | 564 | &sampler_get_rand_peer, |
558 | cls); | 565 | cls); |
559 | return; | 566 | return; |
560 | } | 567 | } |
561 | 568 | ||
562 | *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id; | 569 | GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, |
563 | 570 | gpc->req_handle->gpc_tail, | |
564 | gpc->cont (gpc->cont_cls, gpc->id); | ||
565 | |||
566 | GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, | ||
567 | gpc->sampler->gpc_tail, | ||
568 | gpc); | 571 | gpc); |
572 | *gpc->id = sampler->sampler_elements[r_index]->peer_id; | ||
573 | gpc->cont (gpc->cont_cls, gpc->id); | ||
569 | 574 | ||
570 | GNUNET_free (gpc); | 575 | GNUNET_free (gpc); |
571 | } | 576 | } |
@@ -584,17 +589,19 @@ sampler_mod_get_rand_peer (void *cls, | |||
584 | struct GetPeerCls *gpc = cls; | 589 | struct GetPeerCls *gpc = cls; |
585 | struct RPS_SamplerElement *s_elem; | 590 | struct RPS_SamplerElement *s_elem; |
586 | struct GNUNET_TIME_Relative last_request_diff; | 591 | struct GNUNET_TIME_Relative last_request_diff; |
592 | struct RPS_Sampler *sampler; | ||
587 | 593 | ||
588 | gpc->get_peer_task = NULL; | 594 | gpc->get_peer_task = NULL; |
589 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 595 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
590 | return; | 596 | return; |
597 | sampler = gpc->req_handle->sampler; | ||
591 | 598 | ||
592 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); | 599 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); |
593 | 600 | ||
594 | /* Cycle the #client_get_index one step further */ | 601 | /* Cycle the #client_get_index one step further */ |
595 | client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size; | 602 | client_get_index = (client_get_index + 1) % sampler->sampler_size; |
596 | 603 | ||
597 | s_elem = gpc->sampler->sampler_elements[client_get_index]; | 604 | s_elem = sampler->sampler_elements[client_get_index]; |
598 | *gpc->id = s_elem->peer_id; | 605 | *gpc->id = s_elem->peer_id; |
599 | GNUNET_assert (NULL != s_elem); | 606 | GNUNET_assert (NULL != s_elem); |
600 | 607 | ||
@@ -603,7 +610,7 @@ sampler_mod_get_rand_peer (void *cls, | |||
603 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); | 610 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); |
604 | GNUNET_assert (NULL == gpc->get_peer_task); | 611 | GNUNET_assert (NULL == gpc->get_peer_task); |
605 | gpc->get_peer_task = | 612 | gpc->get_peer_task = |
606 | GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, | 613 | GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, |
607 | &sampler_mod_get_rand_peer, | 614 | &sampler_mod_get_rand_peer, |
608 | cls); | 615 | cls); |
609 | return; | 616 | return; |
@@ -617,7 +624,7 @@ sampler_mod_get_rand_peer (void *cls, | |||
617 | GNUNET_TIME_absolute_get ()); | 624 | GNUNET_TIME_absolute_get ()); |
618 | /* We're not going to give it back now if it was | 625 | /* We're not going to give it back now if it was |
619 | * already requested by a client this round */ | 626 | * already requested by a client this round */ |
620 | if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us) | 627 | if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) |
621 | { | 628 | { |
622 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 629 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
623 | "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); | 630 | "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); |
@@ -629,7 +636,7 @@ sampler_mod_get_rand_peer (void *cls, | |||
629 | /* Schedule it one round later */ | 636 | /* Schedule it one round later */ |
630 | GNUNET_assert (NULL == gpc->get_peer_task); | 637 | GNUNET_assert (NULL == gpc->get_peer_task); |
631 | gpc->get_peer_task = | 638 | gpc->get_peer_task = |
632 | GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, | 639 | GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, |
633 | &sampler_mod_get_rand_peer, | 640 | &sampler_mod_get_rand_peer, |
634 | cls); | 641 | cls); |
635 | return; | 642 | return; |
@@ -639,8 +646,8 @@ sampler_mod_get_rand_peer (void *cls, | |||
639 | 646 | ||
640 | s_elem->last_client_request = GNUNET_TIME_absolute_get (); | 647 | s_elem->last_client_request = GNUNET_TIME_absolute_get (); |
641 | 648 | ||
642 | GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, | 649 | GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, |
643 | gpc->sampler->gpc_tail, | 650 | gpc->req_handle->gpc_tail, |
644 | gpc); | 651 | gpc); |
645 | gpc->cont (gpc->cont_cls, gpc->id); | 652 | gpc->cont (gpc->cont_cls, gpc->id); |
646 | GNUNET_free (gpc); | 653 | GNUNET_free (gpc); |
@@ -661,26 +668,30 @@ sampler_mod_get_rand_peer (void *cls, | |||
661 | * #GNUNET_NO if used internally | 668 | * #GNUNET_NO if used internally |
662 | * @param num_peers the number of peers requested | 669 | * @param num_peers the number of peers requested |
663 | */ | 670 | */ |
664 | void | 671 | struct RPS_SamplerRequestHandle * |
665 | RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, | 672 | RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, |
666 | RPS_sampler_n_rand_peers_ready_cb cb, | 673 | RPS_sampler_n_rand_peers_ready_cb cb, |
667 | void *cls, uint32_t num_peers) | 674 | void *cls, uint32_t num_peers) |
668 | { | 675 | { |
669 | GNUNET_assert (0 != sampler->sampler_size); | 676 | GNUNET_assert (0 != sampler->sampler_size); |
670 | if (0 == num_peers) | 677 | if (0 == num_peers) |
671 | return; | 678 | return NULL; |
672 | 679 | ||
673 | // TODO check if we have too much (distinct) sampled peers | 680 | // TODO check if we have too much (distinct) sampled peers |
674 | uint32_t i; | 681 | uint32_t i; |
675 | struct NRandPeersReadyCls *cb_cls; | 682 | struct RPS_SamplerRequestHandle *req_handle; |
676 | struct GetPeerCls *gpc; | 683 | struct GetPeerCls *gpc; |
677 | 684 | ||
678 | cb_cls = GNUNET_new (struct NRandPeersReadyCls); | 685 | req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); |
679 | cb_cls->num_peers = num_peers; | 686 | req_handle->num_peers = num_peers; |
680 | cb_cls->cur_num_peers = 0; | 687 | req_handle->cur_num_peers = 0; |
681 | cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); | 688 | req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); |
682 | cb_cls->callback = cb; | 689 | req_handle->sampler = sampler; |
683 | cb_cls->cls = cls; | 690 | req_handle->callback = cb; |
691 | req_handle->cls = cls; | ||
692 | GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, | ||
693 | sampler->req_handle_tail, | ||
694 | req_handle); | ||
684 | 695 | ||
685 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 696 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
686 | "Scheduling requests for %" PRIu32 " peers\n", num_peers); | 697 | "Scheduling requests for %" PRIu32 " peers\n", num_peers); |
@@ -688,18 +699,43 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, | |||
688 | for (i = 0 ; i < num_peers ; i++) | 699 | for (i = 0 ; i < num_peers ; i++) |
689 | { | 700 | { |
690 | gpc = GNUNET_new (struct GetPeerCls); | 701 | gpc = GNUNET_new (struct GetPeerCls); |
691 | gpc->sampler = sampler; | 702 | gpc->req_handle = req_handle; |
692 | gpc->cont = check_n_peers_ready; | 703 | gpc->cont = check_n_peers_ready; |
693 | gpc->cont_cls = cb_cls; | 704 | gpc->cont_cls = req_handle; |
694 | gpc->id = &cb_cls->ids[i]; | 705 | gpc->id = &req_handle->ids[i]; |
695 | 706 | ||
707 | GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, | ||
708 | req_handle->gpc_tail, | ||
709 | gpc); | ||
696 | // maybe add a little delay | 710 | // maybe add a little delay |
697 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); | 711 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); |
712 | } | ||
713 | return req_handle; | ||
714 | } | ||
698 | 715 | ||
699 | GNUNET_CONTAINER_DLL_insert (sampler->gpc_head, | 716 | /** |
700 | sampler->gpc_tail, | 717 | * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. |
701 | gpc); | 718 | * |
719 | * @param req_handle the handle to the request | ||
720 | */ | ||
721 | void | ||
722 | RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) | ||
723 | { | ||
724 | struct GetPeerCls *i; | ||
725 | |||
726 | while (NULL != (i = req_handle->gpc_head) ) | ||
727 | { | ||
728 | GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, | ||
729 | req_handle->gpc_tail, | ||
730 | i); | ||
731 | if (NULL != i->get_peer_task) | ||
732 | GNUNET_SCHEDULER_cancel (i->get_peer_task); | ||
733 | GNUNET_free (i); | ||
702 | } | 734 | } |
735 | GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, | ||
736 | req_handle->sampler->req_handle_tail, | ||
737 | req_handle); | ||
738 | GNUNET_free (req_handle); | ||
703 | } | 739 | } |
704 | 740 | ||
705 | 741 | ||
@@ -735,17 +771,13 @@ RPS_sampler_count_id (struct RPS_Sampler *sampler, | |||
735 | void | 771 | void |
736 | RPS_sampler_destroy (struct RPS_Sampler *sampler) | 772 | RPS_sampler_destroy (struct RPS_Sampler *sampler) |
737 | { | 773 | { |
738 | struct GetPeerCls *i; | 774 | if (NULL != sampler->req_handle_head) |
739 | |||
740 | for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head) | ||
741 | { | 775 | { |
742 | GNUNET_CONTAINER_DLL_remove (sampler->gpc_head, | 776 | LOG (GNUNET_ERROR_TYPE_WARNING, |
743 | sampler->gpc_tail, | 777 | "There are still pending requests. Going to remove them.\n"); |
744 | i); | 778 | while (NULL != sampler->req_handle_head) |
745 | GNUNET_SCHEDULER_cancel (i->get_peer_task); | 779 | RPS_sampler_request_cancel (sampler->req_handle_head); |
746 | GNUNET_free (i); | ||
747 | } | 780 | } |
748 | |||
749 | sampler_empty (sampler); | 781 | sampler_empty (sampler); |
750 | GNUNET_free (sampler); | 782 | GNUNET_free (sampler); |
751 | } | 783 | } |