diff options
author | Julius Bünger <buenger@mytum.de> | 2015-01-16 00:57:50 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-01-16 00:57:50 +0000 |
commit | d9a59c38bc4928dceacb2cacbc6aa361e0381996 (patch) | |
tree | 111d8f93a91ecf0c8aed742d13b53ad6fe7232bf /src/rps | |
parent | 13faf361bba55fda139ce33d7e57af02ec4c9ce0 (diff) | |
download | gnunet-d9a59c38bc4928dceacb2cacbc6aa361e0381996.tar.gz gnunet-d9a59c38bc4928dceacb2cacbc6aa361e0381996.zip |
restructured the sampler
Diffstat (limited to 'src/rps')
-rw-r--r-- | src/rps/gnunet-service-rps.c | 82 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 112 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.h | 31 |
3 files changed, 160 insertions, 65 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 0896e126b..96fcdacf3 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -518,6 +518,43 @@ nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimat | |||
518 | } | 518 | } |
519 | } | 519 | } |
520 | 520 | ||
521 | |||
522 | /** | ||
523 | * Callback called once the requested PeerIDs are ready. | ||
524 | * | ||
525 | * Sends those to the requesting client. | ||
526 | */ | ||
527 | void client_respond (void *cls, | ||
528 | struct GNUNET_PeerIdentity *ids, uint64_t num_peers) | ||
529 | { | ||
530 | struct GNUNET_MQ_Envelope *ev; | ||
531 | struct GNUNET_RPS_CS_ReplyMessage *out_msg; | ||
532 | struct GNUNET_SERVER_Client *client; | ||
533 | struct client_ctx *cli_ctx; | ||
534 | |||
535 | client = (struct GNUNET_SERVER_Client *) cls; | ||
536 | |||
537 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
538 | num_peers * sizeof (struct GNUNET_PeerIdentity), | ||
539 | GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); | ||
540 | out_msg->num_peers = GNUNET_htonll (num_peers); | ||
541 | |||
542 | memcpy(&out_msg[1], | ||
543 | ids, | ||
544 | num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
545 | GNUNET_free (ids); | ||
546 | |||
547 | cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx); | ||
548 | if ( NULL == cli_ctx ) { | ||
549 | cli_ctx = GNUNET_new (struct client_ctx); | ||
550 | cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client); | ||
551 | GNUNET_SERVER_client_set_user_context (client, cli_ctx); | ||
552 | } | ||
553 | |||
554 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
555 | } | ||
556 | |||
557 | |||
521 | /** | 558 | /** |
522 | * Handle RPS request from the client. | 559 | * Handle RPS request from the client. |
523 | * | 560 | * |
@@ -533,15 +570,7 @@ handle_client_request (void *cls, | |||
533 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n"); | 570 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n"); |
534 | 571 | ||
535 | struct GNUNET_RPS_CS_RequestMessage *msg; | 572 | struct GNUNET_RPS_CS_RequestMessage *msg; |
536 | //unsigned int n_arr[sampler_list->size];// = | ||
537 | //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size); | ||
538 | //struct GNUNET_MQ_Handle *mq; | ||
539 | struct client_ctx *cli_ctx; | ||
540 | struct GNUNET_MQ_Envelope *ev; | ||
541 | struct GNUNET_RPS_CS_ReplyMessage *out_msg; | ||
542 | uint64_t num_peers; | 573 | uint64_t num_peers; |
543 | const struct GNUNET_PeerIdentity *peers; | ||
544 | //uint64_t i; | ||
545 | 574 | ||
546 | 575 | ||
547 | /* Estimate request rate */ | 576 | /* Estimate request rate */ |
@@ -558,37 +587,16 @@ handle_client_request (void *cls, | |||
558 | GNUNET_TIME_absolute_get ()); | 587 | GNUNET_TIME_absolute_get ()); |
559 | request_rate = T_relative_avg (request_deltas, req_counter); | 588 | request_rate = T_relative_avg (request_deltas, req_counter); |
560 | } | 589 | } |
561 | last_request = GNUNET_TIME_absolute_get(); | 590 | last_request = GNUNET_TIME_absolute_get (); |
562 | // TODO resize the size of the extended_samplers | 591 | // TODO resize the size of the extended_samplers |
563 | 592 | ||
564 | 593 | ||
565 | // TODO check message size | 594 | // TODO check message size |
566 | msg = (struct GNUNET_RPS_CS_RequestMessage *) message; | 595 | msg = (struct GNUNET_RPS_CS_RequestMessage *) message; |
567 | cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx); | ||
568 | if ( NULL == cli_ctx ) { | ||
569 | cli_ctx = GNUNET_new(struct client_ctx); | ||
570 | cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client); | ||
571 | GNUNET_SERVER_client_set_user_context (client, cli_ctx); | ||
572 | } | ||
573 | |||
574 | // How many peers do we give back? | ||
575 | // Wait until we have enough random peers? | ||
576 | 596 | ||
577 | num_peers = GNUNET_ntohll (msg->num_peers); | 597 | num_peers = GNUNET_ntohll (msg->num_peers); |
578 | 598 | ||
579 | ev = GNUNET_MQ_msg_extra (out_msg, | 599 | RPS_sampler_get_n_rand_peers (client_respond, client, num_peers); |
580 | num_peers * sizeof (struct GNUNET_PeerIdentity), | ||
581 | GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); | ||
582 | out_msg->num_peers = msg->num_peers; // No conversion between network and network order | ||
583 | |||
584 | //&out_msg[1] = RPS_sampler_get_n_rand_peers (num_peers); | ||
585 | peers = RPS_sampler_get_n_rand_peers (num_peers); | ||
586 | memcpy(&out_msg[1], | ||
587 | peers, | ||
588 | num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
589 | |||
590 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
591 | //GNUNET_MQ_destroy(mq); | ||
592 | 600 | ||
593 | GNUNET_SERVER_receive_done (client, | 601 | GNUNET_SERVER_receive_done (client, |
594 | GNUNET_OK); | 602 | GNUNET_OK); |
@@ -859,13 +867,13 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
859 | uint64_t first_border; | 867 | uint64_t first_border; |
860 | uint64_t second_border; | 868 | uint64_t second_border; |
861 | 869 | ||
862 | GNUNET_array_grow(gossip_list, gossip_list_size, sampler_size); | 870 | GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size); |
863 | 871 | ||
864 | first_border = round(alpha * gossip_list_size); | 872 | first_border = round (alpha * gossip_list_size); |
865 | for ( i = 0 ; i < first_border ; i++ ) | 873 | for ( i = 0 ; i < first_border ; i++ ) |
866 | { // TODO use RPS_sampler_get_n_rand_peers | 874 | { // TODO use RPS_sampler_get_n_rand_peers |
867 | /* Update gossip list with peers received through PUSHes */ | 875 | /* Update gossip list with peers received through PUSHes */ |
868 | r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, | 876 | r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, |
869 | push_list_size); | 877 | push_list_size); |
870 | gossip_list[i] = push_list[r_index]; | 878 | gossip_list[i] = push_list[r_index]; |
871 | // TODO change the in_flags accordingly | 879 | // TODO change the in_flags accordingly |
@@ -875,7 +883,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
875 | for ( i = first_border ; i < second_border ; i++ ) | 883 | for ( i = first_border ; i < second_border ; i++ ) |
876 | { | 884 | { |
877 | /* Update gossip list with peers received through PULLs */ | 885 | /* Update gossip list with peers received through PULLs */ |
878 | r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, | 886 | r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, |
879 | pull_list_size); | 887 | pull_list_size); |
880 | gossip_list[i] = pull_list[r_index]; | 888 | gossip_list[i] = pull_list[r_index]; |
881 | // TODO change the in_flags accordingly | 889 | // TODO change the in_flags accordingly |
@@ -884,7 +892,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
884 | for ( i = second_border ; i < gossip_list_size ; i++ ) | 892 | for ( i = second_border ; i < gossip_list_size ; i++ ) |
885 | { | 893 | { |
886 | /* Update gossip list with peers from history */ | 894 | /* Update gossip list with peers from history */ |
887 | peer = RPS_sampler_get_n_rand_peers (1), | 895 | peer = RPS_sampler_get_n_rand_peers_ (1); |
888 | gossip_list[i] = *peer; | 896 | gossip_list[i] = *peer; |
889 | // TODO change the in_flags accordingly | 897 | // TODO change the in_flags accordingly |
890 | } | 898 | } |
@@ -1023,7 +1031,7 @@ init_peer_cb (void *cls, | |||
1023 | if (ipc->i < gossip_list_size) | 1031 | if (ipc->i < gossip_list_size) |
1024 | { | 1032 | { |
1025 | memcpy(&gossip_list[ipc->i], | 1033 | memcpy(&gossip_list[ipc->i], |
1026 | RPS_sampler_get_n_rand_peers (1), | 1034 | RPS_sampler_get_n_rand_peers_ (1), |
1027 | (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity)); | 1035 | (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity)); |
1028 | } | 1036 | } |
1029 | rps_start (ipc->server); | 1037 | rps_start (ipc->server); |
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 93b6b1263..1ab00aaf5 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -133,6 +133,47 @@ struct RPS_Sampler | |||
133 | }; | 133 | }; |
134 | 134 | ||
135 | /** | 135 | /** |
136 | * Closure to _get_n_rand_peers_ready_cb() | ||
137 | */ | ||
138 | struct RPS_GetNRandPeersReadyCls | ||
139 | { | ||
140 | /** | ||
141 | * Number of peers we are waiting for. | ||
142 | */ | ||
143 | uint64_t num_peers; | ||
144 | |||
145 | /** | ||
146 | * Number of peers we currently have. | ||
147 | */ | ||
148 | uint64_t cur_num_peers; | ||
149 | |||
150 | /** | ||
151 | * Pointer to the array holding the ids. | ||
152 | */ | ||
153 | struct GNUNET_PeerIdentity *ids; | ||
154 | |||
155 | /** | ||
156 | * Callback to be called when all ids are available. | ||
157 | */ | ||
158 | RPS_sampler_n_rand_peers_ready_cb callback; | ||
159 | |||
160 | /** | ||
161 | * Closure given to the callback | ||
162 | */ | ||
163 | void *cls; | ||
164 | }; | ||
165 | |||
166 | /** | ||
167 | * Callback that is called from _get_rand_peer() when the PeerID is ready. | ||
168 | * | ||
169 | * @param cls the closure given alongside this function. | ||
170 | * @param id the PeerID that was returned | ||
171 | */ | ||
172 | typedef void | ||
173 | (*RPS_sampler_rand_peer_ready_cb) (void *cls, | ||
174 | const struct GNUNET_PeerIdentity *id); | ||
175 | |||
176 | /** | ||
136 | * Global sampler variable. | 177 | * Global sampler variable. |
137 | */ | 178 | */ |
138 | struct RPS_Sampler *sampler; | 179 | struct RPS_Sampler *sampler; |
@@ -160,6 +201,31 @@ static uint64_t client_get_index; | |||
160 | 201 | ||
161 | 202 | ||
162 | /** | 203 | /** |
204 | * Callback to _get_rand_peer() used by _get_n_rand_peers(). | ||
205 | * | ||
206 | * Checks whether all n peers are available. If they are, | ||
207 | * give those back. | ||
208 | */ | ||
209 | void | ||
210 | RPS_sampler_get_n_rand_peers_ready_cb (void *cls, | ||
211 | const struct GNUNET_PeerIdentity *id) | ||
212 | { | ||
213 | struct RPS_GetNRandPeersReadyCls *n_peers_cls; | ||
214 | |||
215 | n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls; | ||
216 | |||
217 | if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) | ||
218 | { | ||
219 | GNUNET_assert (NULL != n_peers_cls->callback); | ||
220 | |||
221 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); | ||
222 | |||
223 | GNUNET_free (n_peers_cls); | ||
224 | } | ||
225 | } | ||
226 | |||
227 | |||
228 | /** | ||
163 | * Reinitialise a previously initialised sampler element. | 229 | * Reinitialise a previously initialised sampler element. |
164 | * | 230 | * |
165 | * @param sampler pointer to the memory that keeps the value. | 231 | * @param sampler pointer to the memory that keeps the value. |
@@ -507,20 +573,22 @@ RPS_sampler_get_n_rand_peers_ (uint64_t n) | |||
507 | * | 573 | * |
508 | * @return a random PeerID of the PeerIDs previously put into the sampler. | 574 | * @return a random PeerID of the PeerIDs previously put into the sampler. |
509 | */ | 575 | */ |
510 | const struct GNUNET_PeerIdentity * | 576 | void |
511 | RPS_sampler_get_rand_peer () | 577 | RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, |
578 | void *cls, struct GNUNET_PeerIdentity *id) | ||
512 | { | 579 | { |
513 | struct GNUNET_PeerIdentity *peer; | 580 | do |
581 | { | ||
582 | *id = sampler->sampler_elements[client_get_index]->peer_id; | ||
514 | 583 | ||
515 | // use _get_rand_peer_ ? | ||
516 | peer = GNUNET_new (struct GNUNET_PeerIdentity); | ||
517 | *peer = sampler->sampler_elements[client_get_index]->peer_id; | ||
518 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); | 584 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); |
519 | if ( client_get_index == sampler->sampler_size ) | 585 | if ( client_get_index == sampler->sampler_size ) |
520 | client_get_index = 0; | 586 | client_get_index = 0; |
521 | else | 587 | else |
522 | client_get_index++; | 588 | client_get_index++; |
523 | return peer; | 589 | } while (NOT_EMPTY == sampler->sampler_elements[client_get_index]->is_empty); |
590 | |||
591 | cb (cls, id); | ||
524 | } | 592 | } |
525 | 593 | ||
526 | 594 | ||
@@ -531,35 +599,39 @@ RPS_sampler_get_rand_peer () | |||
531 | * corrsponding peer to the client. | 599 | * corrsponding peer to the client. |
532 | * Random with or without consumption? | 600 | * Random with or without consumption? |
533 | * | 601 | * |
534 | * @return n random PeerIDs of the PeerIDs previously put into the sampler. | 602 | * @param cb callback that will be called once the ids are ready. |
603 | * @param cls closure given to @a cb | ||
604 | * @param num_peers the number of peers requested | ||
535 | */ | 605 | */ |
536 | const struct GNUNET_PeerIdentity * | 606 | void |
537 | RPS_sampler_get_n_rand_peers (uint64_t n) | 607 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
608 | void *cls, uint64_t num_peers) | ||
538 | { | 609 | { |
539 | // use _get_rand_peers_ ? | 610 | // use _get_rand_peers_ ? |
540 | if ( 0 == sampler->sampler_size ) | 611 | if ( 0 == sampler->sampler_size ) |
541 | { | 612 | { |
542 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 613 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
543 | "Sgrp: List empty - Returning NULL\n"); | 614 | "Sgrp: List empty - Returning NULL\n"); |
544 | return NULL; | ||
545 | } | 615 | } |
546 | else | 616 | else |
547 | { | 617 | { |
548 | // TODO check if we have too much (distinct) sampled peers | 618 | // TODO check if we have too much (distinct) sampled peers |
549 | // If we are not ready yet maybe schedule for later | 619 | // If we are not ready yet maybe schedule for later |
550 | struct GNUNET_PeerIdentity *peers; | 620 | struct GNUNET_PeerIdentity *peers; |
551 | const struct GNUNET_PeerIdentity *peer; | ||
552 | uint64_t i; | 621 | uint64_t i; |
622 | struct RPS_GetNRandPeersReadyCls *cb_cls; | ||
553 | 623 | ||
554 | peers = GNUNET_malloc (n * sizeof (struct GNUNET_PeerIdentity)); | 624 | peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); |
555 | 625 | ||
556 | for ( i = 0 ; i < n ; i++ ) { | 626 | cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls); |
557 | //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); | 627 | cb_cls->num_peers = num_peers; |
558 | peer = RPS_sampler_get_rand_peer (); | 628 | cb_cls->cur_num_peers = 0; |
559 | memcpy (&peers[i], peer, sizeof (struct GNUNET_PeerIdentity)); | 629 | cb_cls->callback = NULL; |
560 | //GNUNET_free (peer); | 630 | cb_cls->cls = NULL; |
561 | } | 631 | |
562 | return peers; | 632 | for ( i = 0 ; i < num_peers ; i++ ) |
633 | RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb, | ||
634 | cb_cls, &peers[i]); | ||
563 | } | 635 | } |
564 | } | 636 | } |
565 | 637 | ||
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index 3772f9f0b..b6aaf1b41 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h | |||
@@ -49,6 +49,18 @@ typedef void | |||
49 | const struct GNUNET_PeerIdentity *id); | 49 | const struct GNUNET_PeerIdentity *id); |
50 | 50 | ||
51 | /** | 51 | /** |
52 | * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. | ||
53 | * | ||
54 | * @param cls the closure given alongside this function. | ||
55 | * @param ids the PeerIDs that were returned | ||
56 | * to be freed | ||
57 | */ | ||
58 | typedef void | ||
59 | (*RPS_sampler_n_rand_peers_ready_cb) (void *cls, | ||
60 | struct GNUNET_PeerIdentity *ids, uint64_t num_peers); | ||
61 | |||
62 | |||
63 | /** | ||
52 | * A sampler sampling a stream of PeerIDs. | 64 | * A sampler sampling a stream of PeerIDs. |
53 | */ | 65 | */ |
54 | //struct RPS_Sampler; | 66 | //struct RPS_Sampler; |
@@ -102,15 +114,15 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id); | |||
102 | 114 | ||
103 | 115 | ||
104 | /** | 116 | /** |
105 | * Get one random peer out of the sampled peers. | 117 | * Get n random peers out of the sampled peers. |
106 | * | 118 | * |
107 | * We might want to reinitialise this sampler after giving the | 119 | * We might want to reinitialise this sampler after giving the |
108 | * corrsponding peer to the client. | 120 | * corrsponding peer to the client. |
109 | * | 121 | * Random with or without consumption? |
110 | * @return a random PeerID of the PeerIDs previously put into the sampler. | 122 | * Only used internally |
111 | */ | 123 | */ |
112 | const struct GNUNET_PeerIdentity * | 124 | const struct GNUNET_PeerIdentity * |
113 | RPS_sampler_get_rand_peer (); | 125 | RPS_sampler_get_n_rand_peers_ (uint64_t n); |
114 | 126 | ||
115 | 127 | ||
116 | /** | 128 | /** |
@@ -120,10 +132,13 @@ RPS_sampler_get_rand_peer (); | |||
120 | * corrsponding peer to the client. | 132 | * corrsponding peer to the client. |
121 | * Random with or without consumption? | 133 | * Random with or without consumption? |
122 | * | 134 | * |
123 | * @return n random PeerIDs of the PeerIDs previously put into the sampler. | 135 | * @param cb callback that will be called once the ids are ready. |
136 | * @param cls closure given to @a cb | ||
137 | * @param num_peers the number of peers requested | ||
124 | */ | 138 | */ |
125 | const struct GNUNET_PeerIdentity * | 139 | void |
126 | RPS_sampler_get_n_rand_peers (uint64_t n); | 140 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
141 | void *cls, uint64_t num_peers); | ||
127 | 142 | ||
128 | 143 | ||
129 | /** | 144 | /** |