aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-02-07 16:19:34 +0000
committerJulius Bünger <buenger@mytum.de>2015-02-07 16:19:34 +0000
commit9faea17ef30144323b311e282238f5caaabe451e (patch)
treeb66df9c6bff36959b44889f18a5414dae2e0df66
parentb85b68adb5960859e735319eb27a0d1594020d52 (diff)
downloadgnunet-9faea17ef30144323b311e282238f5caaabe451e.tar.gz
gnunet-9faea17ef30144323b311e282238f5caaabe451e.zip
distinct samplers for client and Brahms protocol
-rw-r--r--src/rps/gnunet-service-rps.c100
-rw-r--r--src/rps/gnunet-service-rps_sampler.c157
-rw-r--r--src/rps/gnunet-service-rps_sampler.h50
3 files changed, 201 insertions, 106 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 7d443e5d1..5e57e902e 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -190,6 +190,16 @@ struct PeerContext
190***********************************************************************/ 190***********************************************************************/
191 191
192/** 192/**
193 * Sampler used for the Brahms protocol itself.
194 */
195static struct RPS_Sampler *prot_sampler;
196
197/**
198 * Sampler used for the clients.
199 */
200static struct RPS_Sampler *client_sampler;
201
202/**
193 * Set of all peers to keep track of them. 203 * Set of all peers to keep track of them.
194 */ 204 */
195static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; 205static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
@@ -695,14 +705,15 @@ insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
695 void 705 void
696insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) 706insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
697{ 707{
698 RPS_sampler_update_list (peer); 708 RPS_sampler_update (prot_sampler, peer);
709 RPS_sampler_update (client_sampler, peer);
699} 710}
700 711
701 712
702/** 713/**
703 * Check whether #insert_in_sampler was already scheduled 714 * Check whether #insert_in_sampler was already scheduled
704 */ 715 */
705 int 716static int
706insert_in_sampler_scheduled (const struct PeerContext *peer_ctx) 717insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
707{ 718{
708 unsigned int i; 719 unsigned int i;
@@ -716,30 +727,50 @@ insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
716 727
717/** 728/**
718 * Wrapper around #RPS_sampler_resize() 729 * Wrapper around #RPS_sampler_resize()
730 *
731 * If we do not have enough sampler elements, double current sampler size
732 * If we have more than enough sampler elements, halv current sampler size
719 */ 733 */
720 void 734static void
721resize_wrapper () 735resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
736{
737 unsigned int sampler_size;
738
739 // TODO statistics
740 // TODO respect the min, max
741 sampler_size = RPS_sampler_get_size (sampler);
742 if (sampler_size > new_size * 4)
743 { /* Shrinking */
744 RPS_sampler_resize (sampler, sampler_size / 2);
745 }
746 else if (sampler_size < new_size)
747 { /* Growing */
748 RPS_sampler_resize (sampler, sampler_size * 2);
749 }
750 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
751}
752
753
754/**
755 * Wrapper around #RPS_sampler_resize() resizing the client sampler
756 */
757static void
758client_resize_wrapper ()
722{ 759{
723 uint32_t bigger_size; 760 uint32_t bigger_size;
724 unsigned int sampler_size; 761 unsigned int sampler_size;
725 762
726 // TODO statistics 763 // TODO statistics
727 764
765 sampler_size = RPS_sampler_get_size (client_sampler);
766
728 if (sampler_size_est_need > sampler_size_client_need) 767 if (sampler_size_est_need > sampler_size_client_need)
729 bigger_size = sampler_size_est_need; 768 bigger_size = sampler_size_est_need;
730 else 769 else
731 bigger_size = sampler_size_client_need; 770 bigger_size = sampler_size_client_need;
732 771
733 // TODO respect the min, max 772 // TODO respect the min, max
734 sampler_size = RPS_sampler_get_size (); 773 resize_wrapper (client_sampler, bigger_size);
735 if (sampler_size > bigger_size * 4)
736 { /* Shrinking */
737 RPS_sampler_resize (sampler_size / 2);
738 }
739 else if (sampler_size < bigger_size)
740 { /* Growing */
741 RPS_sampler_resize (sampler_size * 2);
742 }
743 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size); 774 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
744} 775}
745 776
@@ -780,7 +811,7 @@ est_request_rate()
780 max_round_duration.rel_value_us / request_rate.rel_value_us; 811 max_round_duration.rel_value_us / request_rate.rel_value_us;
781 812
782 /* Resize the sampler */ 813 /* Resize the sampler */
783 resize_wrapper (); 814 client_resize_wrapper ();
784 } 815 }
785 last_request = GNUNET_TIME_absolute_get (); 816 last_request = GNUNET_TIME_absolute_get ();
786} 817}
@@ -790,6 +821,10 @@ est_request_rate()
790 * /Util functions 821 * /Util functions
791***********************************************************************/ 822***********************************************************************/
792 823
824
825
826
827
793/** 828/**
794 * Function called by NSE. 829 * Function called by NSE.
795 * 830 *
@@ -805,7 +840,7 @@ nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
805 840
806 LOG (GNUNET_ERROR_TYPE_DEBUG, 841 LOG (GNUNET_ERROR_TYPE_DEBUG,
807 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", 842 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
808 logestimate, std_dev, RPS_sampler_get_size ()); 843 logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
809 //scale = .01; 844 //scale = .01;
810 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 845 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
811 // GNUNET_NSE_log_estimate_to_n (logestimate); 846 // GNUNET_NSE_log_estimate_to_n (logestimate);
@@ -820,7 +855,8 @@ nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
820 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 855 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
821 856
822 /* If the NSE has changed adapt the lists accordingly */ 857 /* If the NSE has changed adapt the lists accordingly */
823 resize_wrapper (); 858 resize_wrapper (prot_sampler, sampler_size_est_need);
859 client_resize_wrapper ();
824} 860}
825 861
826 862
@@ -901,7 +937,8 @@ handle_client_request (void *cls,
901 937
902 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers); 938 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
903 939
904 RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES); 940 RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
941 client, num_peers, GNUNET_YES);
905 942
906 GNUNET_SERVER_receive_done (client, 943 GNUNET_SERVER_receive_done (client,
907 GNUNET_OK); 944 GNUNET_OK);
@@ -943,7 +980,8 @@ handle_client_seed (void *cls,
943 peers = (struct GNUNET_PeerIdentity *) &message[1]; 980 peers = (struct GNUNET_PeerIdentity *) &message[1];
944 981
945 for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ ) 982 for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
946 RPS_sampler_update_list (&peers[i]); 983 RPS_sampler_update (prot_sampler, &peers[i]);
984 RPS_sampler_update (client_sampler, &peers[i]);
947 985
948 GNUNET_SERVER_receive_done (client, 986 GNUNET_SERVER_receive_done (client,
949 GNUNET_OK); 987 GNUNET_OK);
@@ -1246,7 +1284,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1246 for (i = second_border ; i < sampler_size_est_need ; i++) 1284 for (i = second_border ; i < sampler_size_est_need ; i++)
1247 { 1285 {
1248 /* Update gossip list with peers from history */ 1286 /* Update gossip list with peers from history */
1249 RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO); 1287 RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
1250 num_hist_update_tasks++; 1288 num_hist_update_tasks++;
1251 // TODO change the peer_flags accordingly 1289 // TODO change the peer_flags accordingly
1252 } 1290 }
@@ -1263,13 +1301,15 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1263 1301
1264 for ( i = 0 ; i < push_list_size ; i++ ) 1302 for ( i = 0 ; i < push_list_size ; i++ )
1265 { 1303 {
1266 RPS_sampler_update_list (&push_list[i]); 1304 RPS_sampler_update (prot_sampler, &push_list[i]);
1305 RPS_sampler_update (client_sampler, &push_list[i]);
1267 // TODO set in_flag? 1306 // TODO set in_flag?
1268 } 1307 }
1269 1308
1270 for ( i = 0 ; i < pull_list_size ; i++ ) 1309 for ( i = 0 ; i < pull_list_size ; i++ )
1271 { 1310 {
1272 RPS_sampler_update_list (&pull_list[i]); 1311 RPS_sampler_update (prot_sampler, &push_list[i]);
1312 RPS_sampler_update (client_sampler, &push_list[i]);
1273 // TODO set in_flag? 1313 // TODO set in_flag?
1274 } 1314 }
1275 1315
@@ -1306,7 +1346,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1306 * Open a connection to given peer and store channel and mq. 1346 * Open a connection to given peer and store channel and mq.
1307 */ 1347 */
1308 void 1348 void
1309insertCB (void *cls, const struct GNUNET_PeerIdentity *id) 1349insertCB (void *cls, struct RPS_Sampler *sampler,
1350 const struct GNUNET_PeerIdentity *id)
1310{ 1351{
1311 // We open a channel to be notified when this peer goes down. 1352 // We open a channel to be notified when this peer goes down.
1312 (void) get_channel (peer_map, id); 1353 (void) get_channel (peer_map, id);
@@ -1317,12 +1358,13 @@ insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
1317 * Close the connection to given peer and delete channel and mq. 1358 * Close the connection to given peer and delete channel and mq.
1318 */ 1359 */
1319 void 1360 void
1320removeCB (void *cls, const struct GNUNET_PeerIdentity *id) 1361removeCB (void *cls, struct RPS_Sampler *sampler,
1362 const struct GNUNET_PeerIdentity *id)
1321{ 1363{
1322 size_t s; 1364 size_t s;
1323 struct PeerContext *ctx; 1365 struct PeerContext *ctx;
1324 1366
1325 s = RPS_sampler_count_id (id); 1367 s = RPS_sampler_count_id (sampler, id);
1326 if ( 1 >= s ) 1368 if ( 1 >= s )
1327 { 1369 {
1328 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id)) 1370 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
@@ -1474,7 +1516,8 @@ shutdown_task (void *cls,
1474 1516
1475 GNUNET_NSE_disconnect (nse); 1517 GNUNET_NSE_disconnect (nse);
1476 GNUNET_CADET_disconnect (cadet_handle); 1518 GNUNET_CADET_disconnect (cadet_handle);
1477 RPS_sampler_destroy (); 1519 RPS_sampler_destroy (prot_sampler);
1520 RPS_sampler_destroy (client_sampler);
1478 GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map)); 1521 GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
1479 GNUNET_CONTAINER_multipeermap_destroy (peer_map); 1522 GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1480 GNUNET_array_grow (gossip_list, gossip_list_size, 0); 1523 GNUNET_array_grow (gossip_list, gossip_list_size, 0);
@@ -1562,7 +1605,8 @@ cleanup_channel (void *cls,
1562 LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up channel to peer %s\n", 1605 LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up channel to peer %s\n",
1563 GNUNET_i2s (peer)); 1606 GNUNET_i2s (peer));
1564 1607
1565 RPS_sampler_reinitialise_by_value (peer); 1608 RPS_sampler_reinitialise_by_value (prot_sampler, peer);
1609 RPS_sampler_reinitialise_by_value (client_sampler, peer);
1566 1610
1567 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) 1611 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1568 { 1612 {
@@ -1698,7 +1742,9 @@ run (void *cls,
1698 half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5); 1742 half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
1699 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); 1743 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
1700 1744
1701 RPS_sampler_init (sampler_size_est_need, max_round_interval, 1745 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
1746 insertCB, NULL, removeCB, NULL);
1747 client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
1702 insertCB, NULL, removeCB, NULL); 1748 insertCB, NULL, removeCB, NULL);
1703 1749
1704 /* Initialise push and pull maps */ 1750 /* Initialise push and pull maps */
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 92826d875..ceb3dacd1 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -199,6 +199,11 @@ struct GetPeerCls
199 struct GetPeerCls *prev; 199 struct GetPeerCls *prev;
200 200
201 /** 201 /**
202 * The sampler this function operates on.
203 */
204 struct RPS_Sampler *sampler;
205
206 /**
202 * The task for this function. 207 * The task for this function.
203 */ 208 */
204 struct GNUNET_SCHEDULER_Task *get_peer_task; 209 struct GNUNET_SCHEDULER_Task *get_peer_task;
@@ -220,10 +225,10 @@ struct GetPeerCls
220}; 225};
221 226
222 227
223/** 228///**
224 * Global sampler variable. 229// * Global sampler variable.
225 */ 230// */
226struct RPS_Sampler *sampler; 231//struct RPS_Sampler *sampler;
227 232
228 233
229/** 234/**
@@ -331,12 +336,17 @@ RPS_sampler_elem_create (void)
331 336
332 337
333/** 338/**
334 * Input an PeerID into the given sampler. 339 * Input an PeerID into the given sampler element.
340 *
341 * @param sampler the sampler the @a s_elem belongs to.
342 * Needed to know the
335 */ 343 */
336 static void 344static void
337RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other, 345RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem,
338 RPS_sampler_insert_cb insert_cb, void *insert_cls, 346 struct RPS_Sampler *sampler,
339 RPS_sampler_remove_cb remove_cb, void *remove_cls) 347 const struct GNUNET_PeerIdentity *other,
348 RPS_sampler_insert_cb insert_cb, void *insert_cls,
349 RPS_sampler_remove_cb remove_cb, void *remove_cls)
340{ 350{
341 struct GNUNET_HashCode other_hash; 351 struct GNUNET_HashCode other_hash;
342 352
@@ -363,8 +373,8 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
363 s_elem->peer_id = *other; 373 s_elem->peer_id = *other;
364 s_elem->peer_id_hash = other_hash; 374 s_elem->peer_id_hash = other_hash;
365 375
366 if (NULL != sampler->insert_cb) 376 if (NULL != insert_cb)
367 sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id)); 377 insert_cb (insert_cls, sampler, &(s_elem->peer_id));
368 378
369 s_elem->num_change++; 379 s_elem->num_change++;
370 } 380 }
@@ -375,21 +385,21 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
375 LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n", 385 LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
376 GNUNET_i2s (&s_elem->peer_id)); 386 GNUNET_i2s (&s_elem->peer_id));
377 387
378 if ( NULL != sampler->remove_cb ) 388 if ( NULL != remove_cb )
379 { 389 {
380 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n", 390 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n",
381 GNUNET_i2s (&s_elem->peer_id)); 391 GNUNET_i2s (&s_elem->peer_id));
382 sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id); 392 remove_cb (remove_cls, sampler, &s_elem->peer_id);
383 } 393 }
384 394
385 s_elem->peer_id = *other; 395 s_elem->peer_id = *other;
386 s_elem->peer_id_hash = other_hash; 396 s_elem->peer_id_hash = other_hash;
387 397
388 if ( NULL != sampler->insert_cb ) 398 if ( NULL != insert_cb )
389 { 399 {
390 LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n", 400 LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n",
391 GNUNET_i2s (&s_elem->peer_id)); 401 GNUNET_i2s (&s_elem->peer_id));
392 sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); 402 insert_cb (insert_cls, sampler, &s_elem->peer_id);
393 } 403 }
394 404
395 s_elem->num_change++; 405 s_elem->num_change++;
@@ -405,13 +415,15 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe
405 s_elem->is_empty = NOT_EMPTY; 415 s_elem->is_empty = NOT_EMPTY;
406} 416}
407 417
418
408/** 419/**
409 * Get the size of the sampler. 420 * Get the size of the sampler.
410 * 421 *
422 * @param sampler the sampler to return the size of.
411 * @return the size of the sampler 423 * @return the size of the sampler
412 */ 424 */
413unsigned int 425unsigned int
414RPS_sampler_get_size () 426RPS_sampler_get_size (struct RPS_Sampler *sampler)
415{ 427{
416 return sampler->sampler_size; 428 return sampler->sampler_size;
417} 429}
@@ -420,10 +432,11 @@ RPS_sampler_get_size ()
420/** 432/**
421 * Grow or shrink the size of the sampler. 433 * Grow or shrink the size of the sampler.
422 * 434 *
435 * @param sampler the sampler to resize.
423 * @param new_size the new size of the sampler 436 * @param new_size the new size of the sampler
424 */ 437 */
425static void 438static void
426sampler_resize (unsigned int new_size) 439sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
427{ 440{
428 unsigned int old_size; 441 unsigned int old_size;
429 uint32_t i; 442 uint32_t i;
@@ -451,7 +464,7 @@ sampler_resize (unsigned int new_size)
451 {/* Remove unneeded rest */ 464 {/* Remove unneeded rest */
452 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i); 465 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i);
453 if (NULL != sampler->remove_cb) 466 if (NULL != sampler->remove_cb)
454 sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id); 467 sampler->remove_cb (sampler->remove_cls, sampler, &rem_list[i]->peer_id);
455 GNUNET_free (rem_list[i]); 468 GNUNET_free (rem_list[i]);
456 } 469 }
457 GNUNET_free (rem_list); 470 GNUNET_free (rem_list);
@@ -468,7 +481,7 @@ sampler_resize (unsigned int new_size)
468 { /* Add new sampler elements */ 481 { /* Add new sampler elements */
469 sampler->sampler_elements[i] = RPS_sampler_elem_create (); 482 sampler->sampler_elements[i] = RPS_sampler_elem_create ();
470 if (NULL != sampler->insert_cb) 483 if (NULL != sampler->insert_cb)
471 sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id); 484 sampler->insert_cb (sampler->insert_cls, sampler, &sampler->sampler_elements[i]->peer_id);
472 LOG (GNUNET_ERROR_TYPE_DEBUG, 485 LOG (GNUNET_ERROR_TYPE_DEBUG,
473 "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n", 486 "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
474 i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id)); 487 i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
@@ -488,25 +501,27 @@ sampler_resize (unsigned int new_size)
488/** 501/**
489 * Grow or shrink the size of the sampler. 502 * Grow or shrink the size of the sampler.
490 * 503 *
504 * @param sampler the sampler to resize.
491 * @param new_size the new size of the sampler 505 * @param new_size the new size of the sampler
492 */ 506 */
493void 507void
494RPS_sampler_resize (unsigned int new_size) 508RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
495{ 509{
496 GNUNET_assert (0 < new_size); 510 GNUNET_assert (0 < new_size);
497 sampler_resize (new_size); 511 sampler_resize (sampler, new_size);
498} 512}
499 513
500 514
501/** 515/**
502 * Empty the sampler. 516 * Empty the sampler.
503 * 517 *
518 * @param sampler the sampler to empty.
504 * @param new_size the new size of the sampler 519 * @param new_size the new size of the sampler
505 */ 520 */
506static void 521static void
507sampler_empty () 522sampler_empty (struct RPS_Sampler *sampler)
508{ 523{
509 sampler_resize (0); 524 sampler_resize (sampler, 0);
510} 525}
511 526
512 527
@@ -520,14 +535,15 @@ sampler_empty ()
520 * @param rem_cb the callback that will be called on every PeerID that is 535 * @param rem_cb the callback that will be called on every PeerID that is
521 * removed from a sampler element 536 * removed from a sampler element
522 * @param rem_cls the closure given to #rem_cb 537 * @param rem_cls the closure given to #rem_cb
538 * @return a handle to a sampler that consists of sampler elements.
523 */ 539 */
524 void 540struct RPS_Sampler *
525RPS_sampler_init (size_t init_size, 541RPS_sampler_init (size_t init_size,
526 struct GNUNET_TIME_Relative max_round_interval, 542 struct GNUNET_TIME_Relative max_round_interval,
527 RPS_sampler_insert_cb ins_cb, void *ins_cls, 543 RPS_sampler_insert_cb ins_cb, void *ins_cls,
528 RPS_sampler_remove_cb rem_cb, void *rem_cls) 544 RPS_sampler_remove_cb rem_cb, void *rem_cls)
529{ 545{
530 //struct RPS_Sampler *sampler; 546 struct RPS_Sampler *sampler;
531 //uint32_t i; 547 //uint32_t i;
532 548
533 /* Initialise context around extended sampler */ 549 /* Initialise context around extended sampler */
@@ -544,26 +560,30 @@ RPS_sampler_init (size_t init_size,
544 sampler->remove_cls = rem_cls; 560 sampler->remove_cls = rem_cls;
545 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); 561 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
546 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); 562 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
547 RPS_sampler_resize (init_size); 563 RPS_sampler_resize (sampler, init_size);
548 564
549 client_get_index = 0; 565 client_get_index = 0;
550 566
551 //GNUNET_assert (init_size == sampler->sampler_size); 567 //GNUNET_assert (init_size == sampler->sampler_size);
568 return sampler;
552} 569}
553 570
554 571
555/** 572/**
556 * A fuction to update every sampler in the given list 573 * A fuction to update every sampler in the given list
557 * 574 *
575 * @param sampler the sampler to update.
558 * @param id the PeerID that is put in the sampler 576 * @param id the PeerID that is put in the sampler
559 */ 577 */
560 void 578 void
561RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id) 579RPS_sampler_update (struct RPS_Sampler *sampler,
580 const struct GNUNET_PeerIdentity *id)
562{ 581{
563 uint32_t i; 582 uint32_t i;
564 583
565 for ( i = 0 ; i < sampler->sampler_size ; i++ ) 584 for ( i = 0 ; i < sampler->sampler_size ; i++ )
566 RPS_sampler_elem_next (sampler->sampler_elements[i], id, 585 RPS_sampler_elem_next (sampler->sampler_elements[i],
586 sampler, id,
567 sampler->insert_cb, sampler->insert_cls, 587 sampler->insert_cb, sampler->insert_cls,
568 sampler->remove_cb, sampler->remove_cls); 588 sampler->remove_cb, sampler->remove_cls);
569} 589}
@@ -574,10 +594,12 @@ RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
574 * 594 *
575 * Used to get rid of a PeerID. 595 * Used to get rid of a PeerID.
576 * 596 *
597 * @param sampler the sampler to reinitialise a sampler element in.
577 * @param id the id of the sampler elements to update. 598 * @param id the id of the sampler elements to update.
578 */ 599 */
579 void 600 void
580RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) 601RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
602 const struct GNUNET_PeerIdentity *id)
581{ 603{
582 uint32_t i; 604 uint32_t i;
583 605
@@ -614,10 +636,10 @@ sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
614 * Choose the r_index of the peer we want to return 636 * Choose the r_index of the peer we want to return
615 * at random from the interval of the gossip list 637 * at random from the interval of the gossip list
616 */ 638 */
617 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 639 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
618 sampler->sampler_size); 640 gpc->sampler->sampler_size);
619 641
620 if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) 642 if ( EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty )
621 { 643 {
622 gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( 644 gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
623 GNUNET_TIME_UNIT_SECONDS, 645 GNUNET_TIME_UNIT_SECONDS,
@@ -627,7 +649,7 @@ sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
627 return; 649 return;
628 } 650 }
629 651
630 *gpc->id = sampler->sampler_elements[r_index]->peer_id; 652 *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id;
631 653
632 gpc->cont (gpc->cont_cls, gpc->id); 654 gpc->cont (gpc->cont_cls, gpc->id);
633 GNUNET_free (gpc); 655 GNUNET_free (gpc);
@@ -639,8 +661,6 @@ sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
639 * 661 *
640 * We might want to reinitialise this sampler after giving the 662 * We might want to reinitialise this sampler after giving the
641 * corrsponding peer to the client. 663 * corrsponding peer to the client.
642 *
643 * @return a random PeerID of the PeerIDs previously put into the sampler.
644 */ 664 */
645static void 665static void
646sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 666sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -663,60 +683,65 @@ sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
663 if (0 < client_get_index) 683 if (0 < client_get_index)
664 tmp_client_get_index = client_get_index - 1; 684 tmp_client_get_index = client_get_index - 1;
665 else 685 else
666 tmp_client_get_index = sampler->sampler_size - 1; 686 tmp_client_get_index = gpc->sampler->sampler_size - 1;
667 687
668 LOG (GNUNET_ERROR_TYPE_DEBUG, 688 LOG (GNUNET_ERROR_TYPE_DEBUG,
669 "scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n", 689 "sched for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
670 tmp_client_get_index, sampler->sampler_size); 690 tmp_client_get_index, gpc->sampler->sampler_size);
671 691
672 do 692 do
673 { /* Get first non empty sampler */ 693 { /* Get first non empty sampler */
674 if (tmp_client_get_index == client_get_index) 694 if (tmp_client_get_index == client_get_index)
675 { 695 {
676 LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n", client_get_index); 696 LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n",
697 client_get_index);
677 GNUNET_assert (NULL == gpc->get_peer_task); 698 GNUNET_assert (NULL == gpc->get_peer_task);
678 gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, 699 gpc->get_peer_task =
679 &sampler_get_rand_peer, 700 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
680 cls); 701 &sampler_get_rand_peer, cls);
681 return; 702 return;
682 } 703 }
683 704
684 tmp_id = sampler->sampler_elements[client_get_index]->peer_id; 705 tmp_id = gpc->sampler->sampler_elements[client_get_index]->peer_id;
685 RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); 706 RPS_sampler_elem_reinit (gpc->sampler->sampler_elements[client_get_index]);
686 RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id, 707 RPS_sampler_elem_next (gpc->sampler->sampler_elements[client_get_index],
687 NULL, NULL, NULL, NULL); 708 gpc->sampler, &tmp_id, NULL, NULL, NULL, NULL);
688 709
689 /* Cycle the #client_get_index one step further */ 710 /* Cycle the #client_get_index one step further */
690 if ( client_get_index == sampler->sampler_size - 1 ) 711 if ( client_get_index == gpc->sampler->sampler_size - 1 )
691 client_get_index = 0; 712 client_get_index = 0;
692 else 713 else
693 client_get_index++; 714 client_get_index++;
694 715
695 LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n", client_get_index); 716 LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n",
696 } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); 717 client_get_index);
718 } while (EMPTY == gpc->sampler->sampler_elements[client_get_index]->is_empty);
697 719
698 s_elem = sampler->sampler_elements[client_get_index]; 720 s_elem = gpc->sampler->sampler_elements[client_get_index];
699 *gpc->id = s_elem->peer_id; 721 *gpc->id = s_elem->peer_id;
700 722
701 /* Check whether we may use this sampler to give it back to the client */ 723 /* Check whether we may use this sampler to give it back to the client */
702 if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) 724 if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
703 { 725 {
704 last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, 726 last_request_diff =
705 GNUNET_TIME_absolute_get ()); 727 GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
706 /* We're not going to give it back now if it was already requested by a client this round */ 728 GNUNET_TIME_absolute_get ());
707 if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) 729 /* We're not going to give it back now if it was
730 * already requested by a client this round */
731 if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us)
708 { 732 {
709 LOG (GNUNET_ERROR_TYPE_DEBUG, 733 LOG (GNUNET_ERROR_TYPE_DEBUG,
710 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); 734 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
711 ///* How many time remains untile the next round has started? */ 735 ///* How many time remains untile the next round has started? */
712 //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff, 736 //inv_last_request_diff =
713 // sampler->max_round_interval); 737 // GNUNET_TIME_absolute_get_difference (last_request_diff,
738 // sampler->max_round_interval);
714 // add a little delay 739 // add a little delay
715 /* Schedule it one round later */ 740 /* Schedule it one round later */
716 GNUNET_assert (NULL == gpc->get_peer_task); 741 GNUNET_assert (NULL == gpc->get_peer_task);
717 gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, 742 gpc->get_peer_task =
718 &sampler_get_rand_peer, 743 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
719 cls); 744 &sampler_get_rand_peer, cls);
720 return; 745 return;
721 } 746 }
722 // TODO add other reasons to wait here 747 // TODO add other reasons to wait here
@@ -736,6 +761,7 @@ sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
736 * corrsponding peer to the client. 761 * corrsponding peer to the client.
737 * Random with or without consumption? 762 * Random with or without consumption?
738 * 763 *
764 * @param sampler the sampler to get peers from.
739 * @param cb callback that will be called once the ids are ready. 765 * @param cb callback that will be called once the ids are ready.
740 * @param cls closure given to @a cb 766 * @param cls closure given to @a cb
741 * @param for_client #GNUNET_YES if result is used for client, 767 * @param for_client #GNUNET_YES if result is used for client,
@@ -743,7 +769,8 @@ sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
743 * @param num_peers the number of peers requested 769 * @param num_peers the number of peers requested
744 */ 770 */
745 void 771 void
746RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, 772RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
773 RPS_sampler_n_rand_peers_ready_cb cb,
747 void *cls, uint32_t num_peers, int for_client) 774 void *cls, uint32_t num_peers, int for_client)
748{ 775{
749 GNUNET_assert (0 != sampler->sampler_size); 776 GNUNET_assert (0 != sampler->sampler_size);
@@ -766,6 +793,7 @@ RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
766 for ( i = 0 ; i < num_peers ; i++ ) 793 for ( i = 0 ; i < num_peers ; i++ )
767 { 794 {
768 gpc = GNUNET_new (struct GetPeerCls); 795 gpc = GNUNET_new (struct GetPeerCls);
796 gpc->sampler = sampler;
769 gpc->cont = check_n_peers_ready; 797 gpc->cont = check_n_peers_ready;
770 gpc->cont_cls = cb_cls; 798 gpc->cont_cls = cb_cls;
771 gpc->id = &cb_cls->ids[i]; 799 gpc->id = &cb_cls->ids[i];
@@ -786,12 +814,14 @@ RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
786/** 814/**
787 * Counts how many Samplers currently hold a given PeerID. 815 * Counts how many Samplers currently hold a given PeerID.
788 * 816 *
817 * @param sampler the sampler to count ids in.
789 * @param id the PeerID to count. 818 * @param id the PeerID to count.
790 * 819 *
791 * @return the number of occurrences of id. 820 * @return the number of occurrences of id.
792 */ 821 */
793 uint32_t 822 uint32_t
794RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id) 823RPS_sampler_count_id (struct RPS_Sampler *sampler,
824 const struct GNUNET_PeerIdentity *id)
795{ 825{
796 uint32_t count; 826 uint32_t count;
797 uint32_t i; 827 uint32_t i;
@@ -811,7 +841,7 @@ RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
811 * Cleans the sampler. 841 * Cleans the sampler.
812 */ 842 */
813 void 843 void
814RPS_sampler_destroy () 844RPS_sampler_destroy (struct RPS_Sampler *sampler)
815{ 845{
816 struct GetPeerCls *i; 846 struct GetPeerCls *i;
817 847
@@ -822,7 +852,8 @@ RPS_sampler_destroy ()
822 GNUNET_free (i); 852 GNUNET_free (i);
823 } 853 }
824 854
825 sampler_empty (); 855 sampler_empty (sampler);
856 GNUNET_free (sampler);
826} 857}
827 858
828/* end of gnunet-service-rps.c */ 859/* end of gnunet-service-rps.c */
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index bc3994e36..3bbfc2a40 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -28,6 +28,13 @@
28#define RPS_SAMPLER_H 28#define RPS_SAMPLER_H
29#include <inttypes.h> 29#include <inttypes.h>
30 30
31
32/**
33 * A sampler sampling a stream of PeerIDs.
34 */
35struct RPS_Sampler;
36
37
31/** 38/**
32 * Callback that is called when a new PeerID is inserted into a sampler. 39 * Callback that is called when a new PeerID is inserted into a sampler.
33 * 40 *
@@ -36,6 +43,7 @@
36 */ 43 */
37typedef void 44typedef void
38(*RPS_sampler_insert_cb) (void *cls, 45(*RPS_sampler_insert_cb) (void *cls,
46 struct RPS_Sampler *sampler,
39 const struct GNUNET_PeerIdentity *id); 47 const struct GNUNET_PeerIdentity *id);
40 48
41/** 49/**
@@ -46,6 +54,7 @@ typedef void
46 */ 54 */
47typedef void 55typedef void
48(*RPS_sampler_remove_cb) (void *cls, 56(*RPS_sampler_remove_cb) (void *cls,
57 struct RPS_Sampler *sampler,
49 const struct GNUNET_PeerIdentity *id); 58 const struct GNUNET_PeerIdentity *id);
50 59
51/** 60/**
@@ -61,26 +70,23 @@ typedef void
61 70
62 71
63/** 72/**
64 * A sampler sampling a stream of PeerIDs.
65 */
66//struct RPS_Sampler;
67
68/**
69 * Get the size of the sampler. 73 * Get the size of the sampler.
70 * 74 *
75 * @param sampler the sampler to return the size of.
71 * @return the size of the sampler 76 * @return the size of the sampler
72 */ 77 */
73unsigned int 78unsigned int
74RPS_sampler_get_size (); 79RPS_sampler_get_size (struct RPS_Sampler *sampler);
75 80
76 81
77/** 82/**
78 * Grow or shrink the size of the sampler. 83 * Grow or shrink the size of the sampler.
79 * 84 *
85 * @param sampler the sampler to resize.
80 * @param new_size the new size of the sampler (not 0) 86 * @param new_size the new size of the sampler (not 0)
81 */ 87 */
82 void 88void
83RPS_sampler_resize (unsigned int new_size); 89RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size);
84 90
85 91
86/** 92/**
@@ -94,8 +100,9 @@ RPS_sampler_resize (unsigned int new_size);
94 * @param rem_cb the callback that will be called on every PeerID that is 100 * @param rem_cb the callback that will be called on every PeerID that is
95 * removed from a sampler element 101 * removed from a sampler element
96 * @param rem_cls the closure given to #rem_cb 102 * @param rem_cls the closure given to #rem_cb
103 * @return a handle to a sampler that consists of sampler elements.
97 */ 104 */
98 void 105struct RPS_Sampler *
99RPS_sampler_init (size_t init_size, 106RPS_sampler_init (size_t init_size,
100 struct GNUNET_TIME_Relative max_round_interval, 107 struct GNUNET_TIME_Relative max_round_interval,
101 RPS_sampler_insert_cb ins_cb, void *ins_cls, 108 RPS_sampler_insert_cb ins_cb, void *ins_cls,
@@ -105,21 +112,26 @@ RPS_sampler_init (size_t init_size,
105/** 112/**
106 * A fuction to update every sampler in the given list 113 * A fuction to update every sampler in the given list
107 * 114 *
115 * @param sampler the sampler to update.
108 * @param id the PeerID that is put in the sampler 116 * @param id the PeerID that is put in the sampler
109 */ 117 */
110 void 118 void
111RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id); 119RPS_sampler_update (struct RPS_Sampler *sampler,
120 const struct GNUNET_PeerIdentity *id);
112 121
113 122
114/** 123/**
115 * Reinitialise all previously initialised sampler elements with the given value. 124 * Reinitialise all previously initialised sampler elements with the given
125 * value.
116 * 126 *
117 * Used to get rid of a PeerID. 127 * Used to get rid of a PeerID.
118 * 128 *
129 * @param sampler the sampler to reinitialise a sampler in.
119 * @param id the id of the samplers to update. 130 * @param id the id of the samplers to update.
120 */ 131 */
121 void 132 void
122RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id); 133RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
134 const struct GNUNET_PeerIdentity *id);
123 135
124 136
125/** 137/**
@@ -129,6 +141,7 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
129 * corrsponding peer to the client. 141 * corrsponding peer to the client.
130 * Random with or without consumption? 142 * Random with or without consumption?
131 * 143 *
144 * @param sampler the sampler to get peers from.
132 * @param cb callback that will be called once the ids are ready. 145 * @param cb callback that will be called once the ids are ready.
133 * @param cls closure given to @a cb 146 * @param cls closure given to @a cb
134 * @param for_client #GNUNET_YES if result is used for client, 147 * @param for_client #GNUNET_YES if result is used for client,
@@ -136,26 +149,31 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
136 * @param num_peers the number of peers requested 149 * @param num_peers the number of peers requested
137 */ 150 */
138 void 151 void
139RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, 152RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
140 void *cls, uint32_t num_peers, int for_client); 153 RPS_sampler_n_rand_peers_ready_cb cb,
154 void *cls, uint32_t num_peers, int for_client);
141 155
142 156
143/** 157/**
144 * Counts how many Samplers currently hold a given PeerID. 158 * Counts how many Samplers currently hold a given PeerID.
145 * 159 *
160 * @param sampler the sampler to cound ids in.
146 * @param id the PeerID to count. 161 * @param id the PeerID to count.
147 * 162 *
148 * @return the number of occurrences of id. 163 * @return the number of occurrences of id.
149 */ 164 */
150 uint32_t 165 uint32_t
151RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id); 166RPS_sampler_count_id (struct RPS_Sampler *sampler,
167 const struct GNUNET_PeerIdentity *id);
152 168
153 169
154/** 170/**
155 * Cleans the samplers. 171 * Cleans the samplers.
172 *
173 * @param sampler the sampler to destroy.
156 */ 174 */
157 void 175 void
158RPS_sampler_destroy (); 176RPS_sampler_destroy (struct RPS_Sampler *sampler);
159 177
160#endif 178#endif
161/* end of gnunet-service-rps.c */ 179/* end of gnunet-service-rps.c */