aboutsummaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps_sampler.c
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-08-02 14:48:28 +0000
committerJulius Bünger <buenger@mytum.de>2015-08-02 14:48:28 +0000
commitbae0066688e7571b4abdebfb914dba6df0578a6b (patch)
tree4ee2b2171170d169ff8cd55cfe5ad9adf640d5d2 /src/rps/gnunet-service-rps_sampler.c
parent847e9575ed85eadb979bc416afec7cf898cf00d1 (diff)
downloadgnunet-bae0066688e7571b4abdebfb914dba6df0578a6b.tar.gz
gnunet-bae0066688e7571b4abdebfb914dba6df0578a6b.zip
cancellation of request and according test improvements
Diffstat (limited to 'src/rps/gnunet-service-rps_sampler.c')
-rw-r--r--src/rps/gnunet-service-rps_sampler.c170
1 files changed, 101 insertions, 69 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 6857c80ba..b65dd7c47 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -73,16 +73,12 @@ struct GetPeerCls
73 * DLL 73 * DLL
74 */ 74 */
75 struct GetPeerCls *next; 75 struct GetPeerCls *next;
76
77 /**
78 * DLL
79 */
80 struct GetPeerCls *prev; 76 struct GetPeerCls *prev;
81 77
82 /** 78 /**
83 * The sampler this function operates on. 79 * The #RPS_SamplerRequestHandle this single request belongs to.
84 */ 80 */
85 struct RPS_Sampler *sampler; 81 struct RPS_SamplerRequestHandle *req_handle;
86 82
87 /** 83 /**
88 * The task for this function. 84 * The task for this function.
@@ -166,14 +162,10 @@ struct RPS_Sampler
166 RPS_get_peers_type get_peers; 162 RPS_get_peers_type get_peers;
167 163
168 /** 164 /**
169 * Head for the DLL to store the closures to pending requests. 165 * Head and tail for the DLL to store the #RPS_SamplerRequestHandle
170 */ 166 */
171 struct GetPeerCls *gpc_head; 167 struct RPS_SamplerRequestHandle *req_handle_head;
172 168 struct RPS_SamplerRequestHandle *req_handle_tail;
173 /**
174 * Tail for the DLL to store the closures to pending requests.
175 */
176 struct GetPeerCls *gpc_tail;
177 169
178 #ifdef TO_FILE 170 #ifdef TO_FILE
179 /** 171 /**
@@ -186,9 +178,15 @@ struct RPS_Sampler
186/** 178/**
187 * Closure to _get_n_rand_peers_ready_cb() 179 * Closure to _get_n_rand_peers_ready_cb()
188 */ 180 */
189struct NRandPeersReadyCls 181struct RPS_SamplerRequestHandle
190{ 182{
191 /** 183 /**
184 * DLL
185 */
186 struct RPS_SamplerRequestHandle *next;
187 struct RPS_SamplerRequestHandle *prev;
188
189 /**
192 * Number of peers we are waiting for. 190 * Number of peers we are waiting for.
193 */ 191 */
194 uint32_t num_peers; 192 uint32_t num_peers;
@@ -204,6 +202,17 @@ struct NRandPeersReadyCls
204 struct GNUNET_PeerIdentity *ids; 202 struct GNUNET_PeerIdentity *ids;
205 203
206 /** 204 /**
205 * Head and tail for the DLL to store the tasks for single requests
206 */
207 struct GetPeerCls *gpc_head;
208 struct GetPeerCls *gpc_tail;
209
210 /**
211 * Sampler.
212 */
213 struct RPS_Sampler *sampler;
214
215 /**
207 * Callback to be called when all ids are available. 216 * Callback to be called when all ids are available.
208 */ 217 */
209 RPS_sampler_n_rand_peers_ready_cb callback; 218 RPS_sampler_n_rand_peers_ready_cb callback;
@@ -251,23 +260,23 @@ static void
251check_n_peers_ready (void *cls, 260check_n_peers_ready (void *cls,
252 const struct GNUNET_PeerIdentity *id) 261 const struct GNUNET_PeerIdentity *id)
253{ 262{
254 struct NRandPeersReadyCls *n_peers_cls = cls; 263 struct RPS_SamplerRequestHandle *req_handle = cls;
255 264
256 n_peers_cls->cur_num_peers++; 265 req_handle->cur_num_peers++;
257 LOG (GNUNET_ERROR_TYPE_DEBUG, 266 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Got %" PRIX32 ". of %" PRIX32 " peers\n", 267 "Got %" PRIX32 ". of %" PRIX32 " peers\n",
259 n_peers_cls->cur_num_peers, n_peers_cls->num_peers); 268 req_handle->cur_num_peers, req_handle->num_peers);
260 269
261 if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) 270 if (req_handle->num_peers == req_handle->cur_num_peers)
262 { /* All peers are ready -- return those to the client */ 271 { /* All peers are ready -- return those to the client */
263 GNUNET_assert (NULL != n_peers_cls->callback); 272 GNUNET_assert (NULL != req_handle->callback);
264 273
265 LOG (GNUNET_ERROR_TYPE_DEBUG, 274 LOG (GNUNET_ERROR_TYPE_DEBUG,
266 "returning %" PRIX32 " peers to the client\n", 275 "returning %" PRIX32 " peers to the client\n",
267 n_peers_cls->num_peers); 276 req_handle->num_peers);
268 n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); 277 req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers);
269 278
270 GNUNET_free (n_peers_cls); 279 RPS_sampler_request_cancel (req_handle);
271 } 280 }
272} 281}
273 282
@@ -420,12 +429,8 @@ RPS_sampler_init (size_t init_size,
420 sampler->file_name); 429 sampler->file_name);
421 #endif /* TO_FILE */ 430 #endif /* TO_FILE */
422 431
423 sampler->sampler_size = 0;
424 sampler->sampler_elements = NULL;
425 sampler->max_round_interval = max_round_interval; 432 sampler->max_round_interval = max_round_interval;
426 sampler->get_peers = sampler_get_rand_peer; 433 sampler->get_peers = sampler_get_rand_peer;
427 sampler->gpc_head = NULL;
428 sampler->gpc_tail = NULL;
429 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); 434 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
430 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); 435 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
431 RPS_sampler_resize (sampler, init_size); 436 RPS_sampler_resize (sampler, init_size);
@@ -530,19 +535,21 @@ sampler_get_rand_peer (void *cls,
530{ 535{
531 struct GetPeerCls *gpc = cls; 536 struct GetPeerCls *gpc = cls;
532 uint32_t r_index; 537 uint32_t r_index;
538 struct RPS_Sampler *sampler;
533 539
534 gpc->get_peer_task = NULL; 540 gpc->get_peer_task = NULL;
535 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 541 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
536 return; 542 return;
543 sampler = gpc->req_handle->sampler;
537 544
538 /**; 545 /**;
539 * Choose the r_index of the peer we want to return 546 * Choose the r_index of the peer we want to return
540 * at random from the interval of the gossip list 547 * at random from the interval of the gossip list
541 */ 548 */
542 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, 549 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
543 gpc->sampler->sampler_size); 550 sampler->sampler_size);
544 551
545 if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty) 552 if (EMPTY == sampler->sampler_elements[r_index]->is_empty)
546 { 553 {
547 //LOG (GNUNET_ERROR_TYPE_DEBUG, 554 //LOG (GNUNET_ERROR_TYPE_DEBUG,
548 // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); 555 // "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
@@ -552,20 +559,18 @@ sampler_get_rand_peer (void *cls,
552 * Counter? 559 * Counter?
553 */ 560 */
554 gpc->get_peer_task = 561 gpc->get_peer_task =
555 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( 562 GNUNET_SCHEDULER_add_delayed (
556 GNUNET_TIME_UNIT_SECONDS, 0.1), 563 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
557 &sampler_get_rand_peer, 564 &sampler_get_rand_peer,
558 cls); 565 cls);
559 return; 566 return;
560 } 567 }
561 568
562 *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id; 569 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
563 570 gpc->req_handle->gpc_tail,
564 gpc->cont (gpc->cont_cls, gpc->id);
565
566 GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
567 gpc->sampler->gpc_tail,
568 gpc); 571 gpc);
572 *gpc->id = sampler->sampler_elements[r_index]->peer_id;
573 gpc->cont (gpc->cont_cls, gpc->id);
569 574
570 GNUNET_free (gpc); 575 GNUNET_free (gpc);
571} 576}
@@ -584,17 +589,19 @@ sampler_mod_get_rand_peer (void *cls,
584 struct GetPeerCls *gpc = cls; 589 struct GetPeerCls *gpc = cls;
585 struct RPS_SamplerElement *s_elem; 590 struct RPS_SamplerElement *s_elem;
586 struct GNUNET_TIME_Relative last_request_diff; 591 struct GNUNET_TIME_Relative last_request_diff;
592 struct RPS_Sampler *sampler;
587 593
588 gpc->get_peer_task = NULL; 594 gpc->get_peer_task = NULL;
589 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 595 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
590 return; 596 return;
597 sampler = gpc->req_handle->sampler;
591 598
592 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); 599 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
593 600
594 /* Cycle the #client_get_index one step further */ 601 /* Cycle the #client_get_index one step further */
595 client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size; 602 client_get_index = (client_get_index + 1) % sampler->sampler_size;
596 603
597 s_elem = gpc->sampler->sampler_elements[client_get_index]; 604 s_elem = sampler->sampler_elements[client_get_index];
598 *gpc->id = s_elem->peer_id; 605 *gpc->id = s_elem->peer_id;
599 GNUNET_assert (NULL != s_elem); 606 GNUNET_assert (NULL != s_elem);
600 607
@@ -603,7 +610,7 @@ sampler_mod_get_rand_peer (void *cls,
603 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); 610 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n");
604 GNUNET_assert (NULL == gpc->get_peer_task); 611 GNUNET_assert (NULL == gpc->get_peer_task);
605 gpc->get_peer_task = 612 gpc->get_peer_task =
606 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, 613 GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
607 &sampler_mod_get_rand_peer, 614 &sampler_mod_get_rand_peer,
608 cls); 615 cls);
609 return; 616 return;
@@ -617,7 +624,7 @@ sampler_mod_get_rand_peer (void *cls,
617 GNUNET_TIME_absolute_get ()); 624 GNUNET_TIME_absolute_get ());
618 /* We're not going to give it back now if it was 625 /* We're not going to give it back now if it was
619 * already requested by a client this round */ 626 * already requested by a client this round */
620 if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us) 627 if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
621 { 628 {
622 LOG (GNUNET_ERROR_TYPE_DEBUG, 629 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); 630 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
@@ -629,7 +636,7 @@ sampler_mod_get_rand_peer (void *cls,
629 /* Schedule it one round later */ 636 /* Schedule it one round later */
630 GNUNET_assert (NULL == gpc->get_peer_task); 637 GNUNET_assert (NULL == gpc->get_peer_task);
631 gpc->get_peer_task = 638 gpc->get_peer_task =
632 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, 639 GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
633 &sampler_mod_get_rand_peer, 640 &sampler_mod_get_rand_peer,
634 cls); 641 cls);
635 return; 642 return;
@@ -639,8 +646,8 @@ sampler_mod_get_rand_peer (void *cls,
639 646
640 s_elem->last_client_request = GNUNET_TIME_absolute_get (); 647 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
641 648
642 GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, 649 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
643 gpc->sampler->gpc_tail, 650 gpc->req_handle->gpc_tail,
644 gpc); 651 gpc);
645 gpc->cont (gpc->cont_cls, gpc->id); 652 gpc->cont (gpc->cont_cls, gpc->id);
646 GNUNET_free (gpc); 653 GNUNET_free (gpc);
@@ -661,26 +668,30 @@ sampler_mod_get_rand_peer (void *cls,
661 * #GNUNET_NO if used internally 668 * #GNUNET_NO if used internally
662 * @param num_peers the number of peers requested 669 * @param num_peers the number of peers requested
663 */ 670 */
664 void 671struct RPS_SamplerRequestHandle *
665RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, 672RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
666 RPS_sampler_n_rand_peers_ready_cb cb, 673 RPS_sampler_n_rand_peers_ready_cb cb,
667 void *cls, uint32_t num_peers) 674 void *cls, uint32_t num_peers)
668{ 675{
669 GNUNET_assert (0 != sampler->sampler_size); 676 GNUNET_assert (0 != sampler->sampler_size);
670 if (0 == num_peers) 677 if (0 == num_peers)
671 return; 678 return NULL;
672 679
673 // TODO check if we have too much (distinct) sampled peers 680 // TODO check if we have too much (distinct) sampled peers
674 uint32_t i; 681 uint32_t i;
675 struct NRandPeersReadyCls *cb_cls; 682 struct RPS_SamplerRequestHandle *req_handle;
676 struct GetPeerCls *gpc; 683 struct GetPeerCls *gpc;
677 684
678 cb_cls = GNUNET_new (struct NRandPeersReadyCls); 685 req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
679 cb_cls->num_peers = num_peers; 686 req_handle->num_peers = num_peers;
680 cb_cls->cur_num_peers = 0; 687 req_handle->cur_num_peers = 0;
681 cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); 688 req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
682 cb_cls->callback = cb; 689 req_handle->sampler = sampler;
683 cb_cls->cls = cls; 690 req_handle->callback = cb;
691 req_handle->cls = cls;
692 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
693 sampler->req_handle_tail,
694 req_handle);
684 695
685 LOG (GNUNET_ERROR_TYPE_DEBUG, 696 LOG (GNUNET_ERROR_TYPE_DEBUG,
686 "Scheduling requests for %" PRIu32 " peers\n", num_peers); 697 "Scheduling requests for %" PRIu32 " peers\n", num_peers);
@@ -688,18 +699,43 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
688 for (i = 0 ; i < num_peers ; i++) 699 for (i = 0 ; i < num_peers ; i++)
689 { 700 {
690 gpc = GNUNET_new (struct GetPeerCls); 701 gpc = GNUNET_new (struct GetPeerCls);
691 gpc->sampler = sampler; 702 gpc->req_handle = req_handle;
692 gpc->cont = check_n_peers_ready; 703 gpc->cont = check_n_peers_ready;
693 gpc->cont_cls = cb_cls; 704 gpc->cont_cls = req_handle;
694 gpc->id = &cb_cls->ids[i]; 705 gpc->id = &req_handle->ids[i];
695 706
707 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
708 req_handle->gpc_tail,
709 gpc);
696 // maybe add a little delay 710 // maybe add a little delay
697 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); 711 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc);
712 }
713 return req_handle;
714}
698 715
699 GNUNET_CONTAINER_DLL_insert (sampler->gpc_head, 716/**
700 sampler->gpc_tail, 717 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
701 gpc); 718 *
719 * @param req_handle the handle to the request
720 */
721void
722RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
723{
724 struct GetPeerCls *i;
725
726 while (NULL != (i = req_handle->gpc_head) )
727 {
728 GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
729 req_handle->gpc_tail,
730 i);
731 if (NULL != i->get_peer_task)
732 GNUNET_SCHEDULER_cancel (i->get_peer_task);
733 GNUNET_free (i);
702 } 734 }
735 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
736 req_handle->sampler->req_handle_tail,
737 req_handle);
738 GNUNET_free (req_handle);
703} 739}
704 740
705 741
@@ -735,17 +771,13 @@ RPS_sampler_count_id (struct RPS_Sampler *sampler,
735 void 771 void
736RPS_sampler_destroy (struct RPS_Sampler *sampler) 772RPS_sampler_destroy (struct RPS_Sampler *sampler)
737{ 773{
738 struct GetPeerCls *i; 774 if (NULL != sampler->req_handle_head)
739
740 for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head)
741 { 775 {
742 GNUNET_CONTAINER_DLL_remove (sampler->gpc_head, 776 LOG (GNUNET_ERROR_TYPE_WARNING,
743 sampler->gpc_tail, 777 "There are still pending requests. Going to remove them.\n");
744 i); 778 while (NULL != sampler->req_handle_head)
745 GNUNET_SCHEDULER_cancel (i->get_peer_task); 779 RPS_sampler_request_cancel (sampler->req_handle_head);
746 GNUNET_free (i);
747 } 780 }
748
749 sampler_empty (sampler); 781 sampler_empty (sampler);
750 GNUNET_free (sampler); 782 GNUNET_free (sampler);
751} 783}