aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-01-16 00:57:58 +0000
committerJulius Bünger <buenger@mytum.de>2015-01-16 00:57:58 +0000
commit3b45d8909429849f9c65a1397787a53a26b46488 (patch)
treec8b75477fa9593519041d6c9d146fc57e667180e /src/rps
parentd5c3fac617325809186260b58f736eeaba00e357 (diff)
downloadgnunet-3b45d8909429849f9c65a1397787a53a26b46488.tar.gz
gnunet-3b45d8909429849f9c65a1397787a53a26b46488.zip
resizing lists implemented, fixed type error
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps.c120
1 files changed, 77 insertions, 43 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 41db537a6..a4ad20121 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -173,15 +173,12 @@ static struct GNUNET_PeerIdentity *gossip_list;
173/** 173/**
174 * Size of the gossiped list 174 * Size of the gossiped list
175 */ 175 */
176static unsigned int gossip_list_size; 176//static unsigned int gossip_list_size;
177static uint32_t gossip_list_size;
177 178
178 179
179/** 180/**
180 * The size Brahms needs according to the network size. 181 * The actual size of the sampler
181 *
182 * This is directly taken as the #gossip_list_size on update of the
183 * #gossip_list
184 * This is the minimum size the sampler grows to.
185 */ 182 */
186static unsigned int sampler_size; 183static unsigned int sampler_size;
187//size_t sampler_size; 184//size_t sampler_size;
@@ -190,7 +187,18 @@ static unsigned int sampler_size;
190 * The size of sampler we need to be able to satisfy the client's need of 187 * The size of sampler we need to be able to satisfy the client's need of
191 * random peers. 188 * random peers.
192 */ 189 */
193//static unsigned int sampler_size_client_need; 190static unsigned int sampler_size_client_need;
191
192/**
193 * The size of sampler we need to be able to satisfy the Brahms protocol's
194 * need of random peers.
195 *
196 * This is directly taken as the #gossip_list_size on update of the
197 * #gossip_list
198 *
199 * This is one minimum size the sampler grows to.
200 */
201static unsigned int sampler_size_est_need;
194 202
195 203
196/** 204/**
@@ -469,6 +477,33 @@ T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
469***********************************************************************/ 477***********************************************************************/
470 478
471/** 479/**
480 * Wrapper around _sampler_resize()
481 */
482 void
483resize_wrapper()
484{
485 uint64_t bigger_size;
486
487 // TODO statistics
488
489 if (sampler_size_est_need > sampler_size_client_need)
490 bigger_size = sampler_size_client_need;
491 else
492 bigger_size = sampler_size_est_need;
493
494 // TODO respect the request rate, min, max
495 if (sampler_size > bigger_size*4)
496 { /* Shrinking */
497 RPS_sampler_resize (sampler_size/2);
498 }
499 else if (sampler_size < bigger_size)
500 { /* Growing */
501 RPS_sampler_resize (sampler_size*2);
502 }
503}
504
505
506/**
472 * Function called by NSE. 507 * Function called by NSE.
473 * 508 *
474 * Updates sizes of sampler list and gossip list and adapt those lists 509 * Updates sizes of sampler list and gossip list and adapt those lists
@@ -478,14 +513,11 @@ T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
478nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev) 513nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
479{ 514{
480 double estimate; 515 double estimate;
481 unsigned int old_est;
482 //double scale; // TODO this might go gloabal/config 516 //double scale; // TODO this might go gloabal/config
483 517
484 old_est = sampler_size;
485
486 LOG (GNUNET_ERROR_TYPE_DEBUG, 518 LOG (GNUNET_ERROR_TYPE_DEBUG,
487 "Received a ns estimate - logest: %f, std_dev: %f (old_est: %f)\n", 519 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %f)\n",
488 logestimate, std_dev, old_est); 520 logestimate, std_dev, sampler_size);
489 //scale = .01; 521 //scale = .01;
490 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 522 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
491 // GNUNET_NSE_log_estimate_to_n (logestimate); 523 // GNUNET_NSE_log_estimate_to_n (logestimate);
@@ -494,23 +526,12 @@ nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimat
494 // estimate += (std_dev * scale); 526 // estimate += (std_dev * scale);
495 if ( 0 < estimate ) { 527 if ( 0 < estimate ) {
496 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 528 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
497 sampler_size = estimate; 529 sampler_size_est_need = estimate;
498 } else 530 } else
499 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 531 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
500 532
501 /* If the NSE has changed adapt the lists accordingly */ 533 /* If the NSE has changed adapt the lists accordingly */
502 // TODO respect the request rate, min, max 534 resize_wrapper ();
503 if (old_est > sampler_size*4)
504 { /* Shrinking */
505 RPS_sampler_resize (old_est/2);
506 }
507 else if (old_est < sampler_size)
508 { /* Growing */
509 if (sampler_size < old_est*2)
510 RPS_sampler_resize (old_est*2);
511 else
512 RPS_sampler_resize (sampler_size);
513 }
514} 535}
515 536
516 537
@@ -566,6 +587,7 @@ handle_client_request (void *cls,
566 587
567 struct GNUNET_RPS_CS_RequestMessage *msg; 588 struct GNUNET_RPS_CS_RequestMessage *msg;
568 uint64_t num_peers; 589 uint64_t num_peers;
590 struct GNUNET_TIME_Relative max_round_duration;
569 591
570 592
571 /* Estimate request rate */ 593 /* Estimate request rate */
@@ -581,15 +603,20 @@ handle_client_request (void *cls,
581 request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request, 603 request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
582 GNUNET_TIME_absolute_get ()); 604 GNUNET_TIME_absolute_get ());
583 request_rate = T_relative_avg (request_deltas, req_counter); 605 request_rate = T_relative_avg (request_deltas, req_counter);
606
607 max_round_duration = GNUNET_TIME_relative_add (round_interval,
608 GNUNET_TIME_relative_divide (round_interval, 2));
609 sampler_size_client_need = max_round_duration.rel_value_us / request_rate.rel_value_us;
610
611 resize_wrapper();
584 } 612 }
585 last_request = GNUNET_TIME_absolute_get (); 613 last_request = GNUNET_TIME_absolute_get ();
586 // TODO resize the size of the extended_samplers
587 614
588 615
589 // TODO check message size 616 // TODO check message size
590 msg = (struct GNUNET_RPS_CS_RequestMessage *) message; 617 msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
591 618
592 num_peers = GNUNET_ntohll (msg->num_peers); 619 num_peers = ntohl (msg->num_peers);
593 620
594 RPS_sampler_get_n_rand_peers (client_respond, client, num_peers); 621 RPS_sampler_get_n_rand_peers (client_respond, client, num_peers);
595 622
@@ -622,7 +649,7 @@ handle_client_seed (void *cls,
622 } 649 }
623 in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message; 650 in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
624 if (ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage) / 651 if (ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage) /
625 sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers)) 652 sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
626 { 653 {
627 GNUNET_break_op (0); 654 GNUNET_break_op (0);
628 GNUNET_SERVER_receive_done (client, 655 GNUNET_SERVER_receive_done (client,
@@ -632,7 +659,7 @@ handle_client_seed (void *cls,
632 in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message; 659 in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
633 peers = (struct GNUNET_PeerIdentity *) &message[1]; 660 peers = (struct GNUNET_PeerIdentity *) &message[1];
634 661
635 for ( i = 0 ; i < GNUNET_ntohll (in_msg->num_peers) ; i++ ) 662 for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
636 RPS_sampler_update_list (&peers[i]); 663 RPS_sampler_update_list (&peers[i]);
637 664
638 GNUNET_SERVER_receive_done (client, 665 GNUNET_SERVER_receive_done (client,
@@ -711,14 +738,17 @@ handle_peer_pull_request (void *cls,
711 738
712 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER); 739 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
713 // FIXME wait for cadet to change this function 740 // FIXME wait for cadet to change this function
714 LOG (GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s (peer)); 741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "PULL REQUEST from peer %s received, going to send %u peers\n",
743 GNUNET_i2s (peer), gossip_list_size);
715 744
716 mq = get_mq (peer_map, peer); 745 mq = get_mq (peer_map, peer);
717 746
718 ev = GNUNET_MQ_msg_extra (out_msg, 747 ev = GNUNET_MQ_msg_extra (out_msg,
719 gossip_list_size * sizeof (struct GNUNET_PeerIdentity), 748 gossip_list_size * sizeof (struct GNUNET_PeerIdentity),
720 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY); 749 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
721 out_msg->num_peers = GNUNET_htonll (gossip_list_size); 750 //out_msg->num_peers = GNUNET_htonll (gossip_list_size);
751 out_msg->num_peers = htonl (gossip_list_size);
722 memcpy (&out_msg[1], gossip_list, 752 memcpy (&out_msg[1], gossip_list,
723 gossip_list_size * sizeof (struct GNUNET_PeerIdentity)); 753 gossip_list_size * sizeof (struct GNUNET_PeerIdentity));
724 754
@@ -750,14 +780,17 @@ handle_peer_pull_reply (void *cls,
750 struct GNUNET_PeerIdentity *peers; 780 struct GNUNET_PeerIdentity *peers;
751 uint64_t i; 781 uint64_t i;
752 782
753 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) < ntohs (msg->size)) 783 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
754 { 784 {
755 GNUNET_break_op (0); // At the moment our own implementation seems to break that. 785 GNUNET_break_op (0); // At the moment our own implementation seems to break that.
756 return GNUNET_SYSERR; 786 return GNUNET_SYSERR;
757 } 787 }
758 in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg; 788 in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
759 if (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) / sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers)) 789 if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) / sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
760 { 790 {
791 LOG (GNUNET_ERROR_TYPE_ERROR, "message says it sends %" PRIu64 " peers, have space for %i peers\n",
792 ntohl (in_msg->num_peers),
793 (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) / sizeof (struct GNUNET_PeerIdentity));
761 GNUNET_break_op (0); 794 GNUNET_break_op (0);
762 return GNUNET_SYSERR; 795 return GNUNET_SYSERR;
763 } 796 }
@@ -765,9 +798,9 @@ handle_peer_pull_reply (void *cls,
765 // TODO check that we sent a request and that it is the first reply 798 // TODO check that we sent a request and that it is the first reply
766 799
767 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 800 peers = (struct GNUNET_PeerIdentity *) &msg[1];
768 for ( i = 0 ; i < GNUNET_ntohll (in_msg->num_peers) ; i++ ) 801 for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
769 { 802 {
770 if (GNUNET_NO == in_arr(pull_list, pull_list_size, &peers[i])) 803 if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
771 GNUNET_array_append (pull_list, pull_list_size, peers[i]); 804 GNUNET_array_append (pull_list, pull_list_size, peers[i]);
772 } 805 }
773 806
@@ -862,7 +895,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
862 uint64_t first_border; 895 uint64_t first_border;
863 uint64_t second_border; 896 uint64_t second_border;
864 897
865 GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size); 898 GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
866 899
867 first_border = round (alpha * gossip_list_size); 900 first_border = round (alpha * gossip_list_size);
868 for ( i = 0 ; i < first_border ; i++ ) 901 for ( i = 0 ; i < first_border ; i++ )
@@ -1210,25 +1243,25 @@ run (void *cls,
1210 /* Get initial size of sampler/gossip list from the configuration */ 1243 /* Get initial size of sampler/gossip list from the configuration */
1211 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", 1244 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1212 "INITSIZE", 1245 "INITSIZE",
1213 (long long unsigned int *) &sampler_size)) 1246 (long long unsigned int *) &sampler_size_est_need))
1214 { 1247 {
1215 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n"); 1248 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1216 GNUNET_SCHEDULER_shutdown (); 1249 GNUNET_SCHEDULER_shutdown ();
1217 return; 1250 return;
1218 } 1251 }
1219 LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size); 1252 LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
1220 1253
1221 //gossip_list_size = sampler_size; // TODO rename sampler_size 1254 //gossip_list_size = sampler_size; // TODO rename sampler_size
1222 1255
1223 gossip_list = NULL; 1256 gossip_list = NULL;
1224 GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size); 1257 GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
1225 1258
1226 1259
1227 /* connect to NSE */ 1260 /* connect to NSE */
1228 nse = GNUNET_NSE_connect(cfg, nse_callback, NULL); 1261 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
1229 // TODO check whether that was successful 1262 // TODO check whether that was successful
1230 // TODO disconnect on shutdown 1263 // TODO disconnect on shutdown
1231 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n"); 1264 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1232 1265
1233 1266
1234 alpha = 0.45; 1267 alpha = 0.45;
@@ -1255,7 +1288,7 @@ run (void *cls,
1255 1288
1256 // TODO check that alpha + beta < 1 1289 // TODO check that alpha + beta < 1
1257 1290
1258 peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size, GNUNET_NO); 1291 peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO);
1259 1292
1260 1293
1261 /* Initialise cadet */ 1294 /* Initialise cadet */
@@ -1277,7 +1310,8 @@ run (void *cls,
1277 1310
1278 1311
1279 /* Initialise sampler */ 1312 /* Initialise sampler */
1280 RPS_sampler_init (sampler_size, own_identity, insertCB, NULL, removeCB, NULL); 1313 RPS_sampler_init (sampler_size_est_need, own_identity, insertCB, NULL, removeCB, NULL);
1314 sampler_size = sampler_size_est_need;
1281 1315
1282 /* Initialise push and pull maps */ 1316 /* Initialise push and pull maps */
1283 push_list = NULL; 1317 push_list = NULL;