aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-01-16 00:57:50 +0000
committerJulius Bünger <buenger@mytum.de>2015-01-16 00:57:50 +0000
commitd9a59c38bc4928dceacb2cacbc6aa361e0381996 (patch)
tree111d8f93a91ecf0c8aed742d13b53ad6fe7232bf /src/rps
parent13faf361bba55fda139ce33d7e57af02ec4c9ce0 (diff)
downloadgnunet-d9a59c38bc4928dceacb2cacbc6aa361e0381996.tar.gz
gnunet-d9a59c38bc4928dceacb2cacbc6aa361e0381996.zip
restructured the sampler
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps.c82
-rw-r--r--src/rps/gnunet-service-rps_sampler.c112
-rw-r--r--src/rps/gnunet-service-rps_sampler.h31
3 files changed, 160 insertions, 65 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 0896e126b..96fcdacf3 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -518,6 +518,43 @@ nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimat
518 } 518 }
519} 519}
520 520
521
522/**
523 * Callback called once the requested PeerIDs are ready.
524 *
525 * Sends those to the requesting client.
526 */
527void client_respond (void *cls,
528 struct GNUNET_PeerIdentity *ids, uint64_t num_peers)
529{
530 struct GNUNET_MQ_Envelope *ev;
531 struct GNUNET_RPS_CS_ReplyMessage *out_msg;
532 struct GNUNET_SERVER_Client *client;
533 struct client_ctx *cli_ctx;
534
535 client = (struct GNUNET_SERVER_Client *) cls;
536
537 ev = GNUNET_MQ_msg_extra (out_msg,
538 num_peers * sizeof (struct GNUNET_PeerIdentity),
539 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
540 out_msg->num_peers = GNUNET_htonll (num_peers);
541
542 memcpy(&out_msg[1],
543 ids,
544 num_peers * sizeof (struct GNUNET_PeerIdentity));
545 GNUNET_free (ids);
546
547 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
548 if ( NULL == cli_ctx ) {
549 cli_ctx = GNUNET_new (struct client_ctx);
550 cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
551 GNUNET_SERVER_client_set_user_context (client, cli_ctx);
552 }
553
554 GNUNET_MQ_send (cli_ctx->mq, ev);
555}
556
557
521/** 558/**
522 * Handle RPS request from the client. 559 * Handle RPS request from the client.
523 * 560 *
@@ -533,15 +570,7 @@ handle_client_request (void *cls,
533 LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n"); 570 LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
534 571
535 struct GNUNET_RPS_CS_RequestMessage *msg; 572 struct GNUNET_RPS_CS_RequestMessage *msg;
536 //unsigned int n_arr[sampler_list->size];// =
537 //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
538 //struct GNUNET_MQ_Handle *mq;
539 struct client_ctx *cli_ctx;
540 struct GNUNET_MQ_Envelope *ev;
541 struct GNUNET_RPS_CS_ReplyMessage *out_msg;
542 uint64_t num_peers; 573 uint64_t num_peers;
543 const struct GNUNET_PeerIdentity *peers;
544 //uint64_t i;
545 574
546 575
547 /* Estimate request rate */ 576 /* Estimate request rate */
@@ -558,37 +587,16 @@ handle_client_request (void *cls,
558 GNUNET_TIME_absolute_get ()); 587 GNUNET_TIME_absolute_get ());
559 request_rate = T_relative_avg (request_deltas, req_counter); 588 request_rate = T_relative_avg (request_deltas, req_counter);
560 } 589 }
561 last_request = GNUNET_TIME_absolute_get(); 590 last_request = GNUNET_TIME_absolute_get ();
562 // TODO resize the size of the extended_samplers 591 // TODO resize the size of the extended_samplers
563 592
564 593
565 // TODO check message size 594 // TODO check message size
566 msg = (struct GNUNET_RPS_CS_RequestMessage *) message; 595 msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
567 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
568 if ( NULL == cli_ctx ) {
569 cli_ctx = GNUNET_new(struct client_ctx);
570 cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
571 GNUNET_SERVER_client_set_user_context (client, cli_ctx);
572 }
573
574 // How many peers do we give back?
575 // Wait until we have enough random peers?
576 596
577 num_peers = GNUNET_ntohll (msg->num_peers); 597 num_peers = GNUNET_ntohll (msg->num_peers);
578 598
579 ev = GNUNET_MQ_msg_extra (out_msg, 599 RPS_sampler_get_n_rand_peers (client_respond, client, num_peers);
580 num_peers * sizeof (struct GNUNET_PeerIdentity),
581 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
582 out_msg->num_peers = msg->num_peers; // No conversion between network and network order
583
584 //&out_msg[1] = RPS_sampler_get_n_rand_peers (num_peers);
585 peers = RPS_sampler_get_n_rand_peers (num_peers);
586 memcpy(&out_msg[1],
587 peers,
588 num_peers * sizeof (struct GNUNET_PeerIdentity));
589
590 GNUNET_MQ_send (cli_ctx->mq, ev);
591 //GNUNET_MQ_destroy(mq);
592 600
593 GNUNET_SERVER_receive_done (client, 601 GNUNET_SERVER_receive_done (client,
594 GNUNET_OK); 602 GNUNET_OK);
@@ -859,13 +867,13 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
859 uint64_t first_border; 867 uint64_t first_border;
860 uint64_t second_border; 868 uint64_t second_border;
861 869
862 GNUNET_array_grow(gossip_list, gossip_list_size, sampler_size); 870 GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size);
863 871
864 first_border = round(alpha * gossip_list_size); 872 first_border = round (alpha * gossip_list_size);
865 for ( i = 0 ; i < first_border ; i++ ) 873 for ( i = 0 ; i < first_border ; i++ )
866 { // TODO use RPS_sampler_get_n_rand_peers 874 { // TODO use RPS_sampler_get_n_rand_peers
867 /* Update gossip list with peers received through PUSHes */ 875 /* Update gossip list with peers received through PUSHes */
868 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 876 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
869 push_list_size); 877 push_list_size);
870 gossip_list[i] = push_list[r_index]; 878 gossip_list[i] = push_list[r_index];
871 // TODO change the in_flags accordingly 879 // TODO change the in_flags accordingly
@@ -875,7 +883,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
875 for ( i = first_border ; i < second_border ; i++ ) 883 for ( i = first_border ; i < second_border ; i++ )
876 { 884 {
877 /* Update gossip list with peers received through PULLs */ 885 /* Update gossip list with peers received through PULLs */
878 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 886 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
879 pull_list_size); 887 pull_list_size);
880 gossip_list[i] = pull_list[r_index]; 888 gossip_list[i] = pull_list[r_index];
881 // TODO change the in_flags accordingly 889 // TODO change the in_flags accordingly
@@ -884,7 +892,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
884 for ( i = second_border ; i < gossip_list_size ; i++ ) 892 for ( i = second_border ; i < gossip_list_size ; i++ )
885 { 893 {
886 /* Update gossip list with peers from history */ 894 /* Update gossip list with peers from history */
887 peer = RPS_sampler_get_n_rand_peers (1), 895 peer = RPS_sampler_get_n_rand_peers_ (1);
888 gossip_list[i] = *peer; 896 gossip_list[i] = *peer;
889 // TODO change the in_flags accordingly 897 // TODO change the in_flags accordingly
890 } 898 }
@@ -1023,7 +1031,7 @@ init_peer_cb (void *cls,
1023 if (ipc->i < gossip_list_size) 1031 if (ipc->i < gossip_list_size)
1024 { 1032 {
1025 memcpy(&gossip_list[ipc->i], 1033 memcpy(&gossip_list[ipc->i],
1026 RPS_sampler_get_n_rand_peers (1), 1034 RPS_sampler_get_n_rand_peers_ (1),
1027 (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity)); 1035 (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity));
1028 } 1036 }
1029 rps_start (ipc->server); 1037 rps_start (ipc->server);
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 93b6b1263..1ab00aaf5 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -133,6 +133,47 @@ struct RPS_Sampler
133}; 133};
134 134
135/** 135/**
136 * Closure to _get_n_rand_peers_ready_cb()
137 */
138struct RPS_GetNRandPeersReadyCls
139{
140 /**
141 * Number of peers we are waiting for.
142 */
143 uint64_t num_peers;
144
145 /**
146 * Number of peers we currently have.
147 */
148 uint64_t cur_num_peers;
149
150 /**
151 * Pointer to the array holding the ids.
152 */
153 struct GNUNET_PeerIdentity *ids;
154
155 /**
156 * Callback to be called when all ids are available.
157 */
158 RPS_sampler_n_rand_peers_ready_cb callback;
159
160 /**
161 * Closure given to the callback
162 */
163 void *cls;
164};
165
166/**
167 * Callback that is called from _get_rand_peer() when the PeerID is ready.
168 *
169 * @param cls the closure given alongside this function.
170 * @param id the PeerID that was returned
171 */
172typedef void
173(*RPS_sampler_rand_peer_ready_cb) (void *cls,
174 const struct GNUNET_PeerIdentity *id);
175
176/**
136 * Global sampler variable. 177 * Global sampler variable.
137 */ 178 */
138struct RPS_Sampler *sampler; 179struct RPS_Sampler *sampler;
@@ -160,6 +201,31 @@ static uint64_t client_get_index;
160 201
161 202
162/** 203/**
204 * Callback to _get_rand_peer() used by _get_n_rand_peers().
205 *
206 * Checks whether all n peers are available. If they are,
207 * give those back.
208 */
209 void
210RPS_sampler_get_n_rand_peers_ready_cb (void *cls,
211 const struct GNUNET_PeerIdentity *id)
212{
213 struct RPS_GetNRandPeersReadyCls *n_peers_cls;
214
215 n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls;
216
217 if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
218 {
219 GNUNET_assert (NULL != n_peers_cls->callback);
220
221 n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
222
223 GNUNET_free (n_peers_cls);
224 }
225}
226
227
228/**
163 * Reinitialise a previously initialised sampler element. 229 * Reinitialise a previously initialised sampler element.
164 * 230 *
165 * @param sampler pointer to the memory that keeps the value. 231 * @param sampler pointer to the memory that keeps the value.
@@ -507,20 +573,22 @@ RPS_sampler_get_n_rand_peers_ (uint64_t n)
507 * 573 *
508 * @return a random PeerID of the PeerIDs previously put into the sampler. 574 * @return a random PeerID of the PeerIDs previously put into the sampler.
509 */ 575 */
510 const struct GNUNET_PeerIdentity * 576 void
511RPS_sampler_get_rand_peer () 577RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
578 void *cls, struct GNUNET_PeerIdentity *id)
512{ 579{
513 struct GNUNET_PeerIdentity *peer; 580 do
581 {
582 *id = sampler->sampler_elements[client_get_index]->peer_id;
514 583
515 // use _get_rand_peer_ ?
516 peer = GNUNET_new (struct GNUNET_PeerIdentity);
517 *peer = sampler->sampler_elements[client_get_index]->peer_id;
518 RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); 584 RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
519 if ( client_get_index == sampler->sampler_size ) 585 if ( client_get_index == sampler->sampler_size )
520 client_get_index = 0; 586 client_get_index = 0;
521 else 587 else
522 client_get_index++; 588 client_get_index++;
523 return peer; 589 } while (NOT_EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
590
591 cb (cls, id);
524} 592}
525 593
526 594
@@ -531,35 +599,39 @@ RPS_sampler_get_rand_peer ()
531 * corrsponding peer to the client. 599 * corrsponding peer to the client.
532 * Random with or without consumption? 600 * Random with or without consumption?
533 * 601 *
534 * @return n random PeerIDs of the PeerIDs previously put into the sampler. 602 * @param cb callback that will be called once the ids are ready.
603 * @param cls closure given to @a cb
604 * @param num_peers the number of peers requested
535 */ 605 */
536 const struct GNUNET_PeerIdentity * 606 void
537RPS_sampler_get_n_rand_peers (uint64_t n) 607RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
608 void *cls, uint64_t num_peers)
538{ 609{
539 // use _get_rand_peers_ ? 610 // use _get_rand_peers_ ?
540 if ( 0 == sampler->sampler_size ) 611 if ( 0 == sampler->sampler_size )
541 { 612 {
542 LOG (GNUNET_ERROR_TYPE_DEBUG, 613 LOG (GNUNET_ERROR_TYPE_DEBUG,
543 "Sgrp: List empty - Returning NULL\n"); 614 "Sgrp: List empty - Returning NULL\n");
544 return NULL;
545 } 615 }
546 else 616 else
547 { 617 {
548 // TODO check if we have too much (distinct) sampled peers 618 // TODO check if we have too much (distinct) sampled peers
549 // If we are not ready yet maybe schedule for later 619 // If we are not ready yet maybe schedule for later
550 struct GNUNET_PeerIdentity *peers; 620 struct GNUNET_PeerIdentity *peers;
551 const struct GNUNET_PeerIdentity *peer;
552 uint64_t i; 621 uint64_t i;
622 struct RPS_GetNRandPeersReadyCls *cb_cls;
553 623
554 peers = GNUNET_malloc (n * sizeof (struct GNUNET_PeerIdentity)); 624 peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
555 625
556 for ( i = 0 ; i < n ; i++ ) { 626 cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls);
557 //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); 627 cb_cls->num_peers = num_peers;
558 peer = RPS_sampler_get_rand_peer (); 628 cb_cls->cur_num_peers = 0;
559 memcpy (&peers[i], peer, sizeof (struct GNUNET_PeerIdentity)); 629 cb_cls->callback = NULL;
560 //GNUNET_free (peer); 630 cb_cls->cls = NULL;
561 } 631
562 return peers; 632 for ( i = 0 ; i < num_peers ; i++ )
633 RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
634 cb_cls, &peers[i]);
563 } 635 }
564} 636}
565 637
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index 3772f9f0b..b6aaf1b41 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -49,6 +49,18 @@ typedef void
49 const struct GNUNET_PeerIdentity *id); 49 const struct GNUNET_PeerIdentity *id);
50 50
51/** 51/**
52 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
53 *
54 * @param cls the closure given alongside this function.
55 * @param ids the PeerIDs that were returned
56 * to be freed
57 */
58 typedef void
59(*RPS_sampler_n_rand_peers_ready_cb) (void *cls,
60 struct GNUNET_PeerIdentity *ids, uint64_t num_peers);
61
62
63/**
52 * A sampler sampling a stream of PeerIDs. 64 * A sampler sampling a stream of PeerIDs.
53 */ 65 */
54//struct RPS_Sampler; 66//struct RPS_Sampler;
@@ -102,15 +114,15 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
102 114
103 115
104/** 116/**
105 * Get one random peer out of the sampled peers. 117 * Get n random peers out of the sampled peers.
106 * 118 *
107 * We might want to reinitialise this sampler after giving the 119 * We might want to reinitialise this sampler after giving the
108 * corrsponding peer to the client. 120 * corrsponding peer to the client.
109 * 121 * Random with or without consumption?
110 * @return a random PeerID of the PeerIDs previously put into the sampler. 122 * Only used internally
111 */ 123 */
112 const struct GNUNET_PeerIdentity * 124 const struct GNUNET_PeerIdentity *
113RPS_sampler_get_rand_peer (); 125RPS_sampler_get_n_rand_peers_ (uint64_t n);
114 126
115 127
116/** 128/**
@@ -120,10 +132,13 @@ RPS_sampler_get_rand_peer ();
120 * corrsponding peer to the client. 132 * corrsponding peer to the client.
121 * Random with or without consumption? 133 * Random with or without consumption?
122 * 134 *
123 * @return n random PeerIDs of the PeerIDs previously put into the sampler. 135 * @param cb callback that will be called once the ids are ready.
136 * @param cls closure given to @a cb
137 * @param num_peers the number of peers requested
124 */ 138 */
125 const struct GNUNET_PeerIdentity * 139 void
126RPS_sampler_get_n_rand_peers (uint64_t n); 140RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
141 void *cls, uint64_t num_peers);
127 142
128 143
129/** 144/**