summaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorng0 <ng0@n0.is>2019-04-09 16:55:39 +0000
committerng0 <ng0@n0.is>2019-04-09 16:55:39 +0000
commit6e626937fd5133188d2bd06f280a1b889219eef2 (patch)
tree5cce69b3fe6f74297b8246656c8bf611d98022fb /src/rps
parentc2e1ab0e17a7b76e66be1b4fff0d02f196cba34a (diff)
parentac1134ca9fb7ff972cf5e5f7e0e070368f77789b (diff)
downloadgnunet-6e626937fd5133188d2bd06f280a1b889219eef2.tar.gz
gnunet-6e626937fd5133188d2bd06f280a1b889219eef2.zip
Merge branch 'master' of gnunet.org:gnunet
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-rps-profiler.c93
-rw-r--r--src/rps/gnunet-service-rps.c11
-rw-r--r--src/rps/gnunet-service-rps_sampler.c2
-rw-r--r--src/rps/rps-sampler_client.c70
-rw-r--r--src/rps/rps-sampler_client.h8
-rw-r--r--src/rps/rps-sampler_common.c187
-rw-r--r--src/rps/rps-sampler_common.h56
-rw-r--r--src/rps/rps-test_util.h22
-rw-r--r--src/rps/rps_api.c218
9 files changed, 631 insertions, 36 deletions
diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c
index a852d94b1..a13ee4078 100644
--- a/src/rps/gnunet-rps-profiler.c
+++ b/src/rps/gnunet-rps-profiler.c
@@ -429,7 +429,7 @@ struct PendingReply
429 /** 429 /**
430 * Handle to the request we are waiting for 430 * Handle to the request we are waiting for
431 */ 431 */
432 struct GNUNET_RPS_Request_Handle *req_handle; 432 struct GNUNET_RPS_Request_Handle_Single_Info *req_handle;
433 433
434 /** 434 /**
435 * The peer that requested 435 * The peer that requested
@@ -1040,7 +1040,7 @@ cancel_request (struct PendingReply *pending_rep)
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Cancelling rps get reply\n"); 1041 "Cancelling rps get reply\n");
1042 GNUNET_assert (NULL != pending_rep->req_handle); 1042 GNUNET_assert (NULL != pending_rep->req_handle);
1043 GNUNET_RPS_request_cancel (pending_rep->req_handle); 1043 GNUNET_RPS_request_single_info_cancel (pending_rep->req_handle);
1044 pending_rep->req_handle = NULL; 1044 pending_rep->req_handle = NULL;
1045 GNUNET_free (pending_rep); 1045 GNUNET_free (pending_rep);
1046 pending_rep = NULL; 1046 pending_rep = NULL;
@@ -1489,6 +1489,13 @@ default_reply_handle (void *cls,
1489 } 1489 }
1490} 1490}
1491 1491
1492
1493static void
1494profiler_reply_handle_info (void *cls,
1495 const struct GNUNET_PeerIdentity *recv_peer,
1496 double probability,
1497 uint32_t num_observed);
1498
1492/** 1499/**
1493 * Request random peers. 1500 * Request random peers.
1494 */ 1501 */
@@ -1510,9 +1517,12 @@ request_peers (void *cls)
1510 "Requesting one peer\n"); 1517 "Requesting one peer\n");
1511 pending_rep = GNUNET_new (struct PendingReply); 1518 pending_rep = GNUNET_new (struct PendingReply);
1512 pending_rep->rps_peer = rps_peer; 1519 pending_rep->rps_peer = rps_peer;
1513 pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1520 //pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
1514 1, 1521 // 1,
1515 cur_test_run.reply_handle, 1522 // cur_test_run.reply_handle,
1523 // pending_rep);
1524 pending_rep->req_handle = GNUNET_RPS_request_peer_info (rps_peer->rps_handle,
1525 profiler_reply_handle_info,
1516 pending_rep); 1526 pending_rep);
1517 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head, 1527 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
1518 rps_peer->pending_rep_tail, 1528 rps_peer->pending_rep_tail,
@@ -1979,6 +1989,77 @@ profiler_reply_handle (void *cls,
1979} 1989}
1980 1990
1981 1991
1992/**
1993 * Callback to call on receipt of a reply
1994 *
1995 * @param cls closure
1996 * @param n number of peers
1997 * @param recv_peers the received peers
1998 */
1999static void
2000profiler_reply_handle_info (void *cls,
2001 const struct GNUNET_PeerIdentity *recv_peer,
2002 double probability,
2003 uint32_t num_observed)
2004{
2005 struct RPSPeer *rps_peer;
2006 struct RPSPeer *rcv_rps_peer;
2007 char file_name_buf[128];
2008 char file_name_dh_buf[128];
2009 char file_name_dhr_buf[128];
2010 char file_name_dhru_buf[128];
2011 char *file_name = file_name_buf;
2012 char *file_name_dh = file_name_dh_buf;
2013 char *file_name_dhr = file_name_dhr_buf;
2014 char *file_name_dhru = file_name_dhru_buf;
2015 unsigned int i;
2016 struct PendingReply *pending_rep = (struct PendingReply *) cls;
2017
2018 pending_rep->req_handle = NULL;
2019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n");
2020 rps_peer = pending_rep->rps_peer;
2021 (void) GNUNET_asprintf (&file_name,
2022 "/tmp/rps/received_ids-%u",
2023 rps_peer->index);
2024
2025 (void) GNUNET_asprintf (&file_name_dh,
2026 "/tmp/rps/diehard_input-%u",
2027 rps_peer->index);
2028 (void) GNUNET_asprintf (&file_name_dhr,
2029 "/tmp/rps/diehard_input_raw-%u",
2030 rps_peer->index);
2031 (void) GNUNET_asprintf (&file_name_dhru,
2032 "/tmp/rps/diehard_input_raw_aligned-%u",
2033 rps_peer->index);
2034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2035 "[%s] got peer with info:\n",
2036 GNUNET_i2s (rps_peer->peer_id));
2037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038 " %s\n",
2039 GNUNET_i2s (recv_peer));
2040 tofile (file_name,
2041 "%s %d %" PRIu32 " \n",
2042 GNUNET_i2s_full (recv_peer),
2043 probability,
2044 num_observed);
2045 rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, recv_peer);
2046 GNUNET_assert (NULL != rcv_rps_peer);
2047 tofile (file_name_dh,
2048 "%" PRIu32 "\n",
2049 (uint32_t) rcv_rps_peer->index);
2050#ifdef TO_FILE
2051 to_file_raw (file_name_dhr,
2052 (char *) &rcv_rps_peer->index,
2053 sizeof (uint32_t));
2054 to_file_raw_unaligned (file_name_dhru,
2055 (char *) &rcv_rps_peer->index,
2056 sizeof (uint32_t),
2057 bits_needed);
2058#endif /* TO_FILE */
2059 default_reply_handle (cls, 1, recv_peer);
2060}
2061
2062
1982static void 2063static void
1983profiler_cb (struct RPSPeer *rps_peer) 2064profiler_cb (struct RPSPeer *rps_peer)
1984{ 2065{
@@ -2141,7 +2222,7 @@ static void compute_probabilities (uint32_t peer_idx)
2141{ 2222{
2142 //double probs[num_peers] = { 0 }; 2223 //double probs[num_peers] = { 0 };
2143 double probs[num_peers]; 2224 double probs[num_peers];
2144 size_t probs_as_str_size = (num_peers * 10 + 1) * sizeof (char); 2225 size_t probs_as_str_size = (num_peers * 10 + 2) * sizeof (char);
2145 char *probs_as_str = GNUNET_malloc (probs_as_str_size); 2226 char *probs_as_str = GNUNET_malloc (probs_as_str_size);
2146 char *probs_as_str_cpy; 2227 char *probs_as_str_cpy;
2147 uint32_t i; 2228 uint32_t i;
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index f6fe17589..e929c89de 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -2602,6 +2602,13 @@ insert_in_sampler (void *cls,
2602 * messages to it */ 2602 * messages to it */
2603 //indicate_sending_intention (peer); 2603 //indicate_sending_intention (peer);
2604 } 2604 }
2605 if (sub == msub)
2606 {
2607 GNUNET_STATISTICS_update (stats,
2608 "# observed peers in gossip",
2609 1,
2610 GNUNET_NO);
2611 }
2605#ifdef TO_FILE 2612#ifdef TO_FILE
2606 sub->num_observed_peers++; 2613 sub->num_observed_peers++;
2607 GNUNET_CONTAINER_multipeermap_put 2614 GNUNET_CONTAINER_multipeermap_put
@@ -2611,6 +2618,10 @@ insert_in_sampler (void *cls,
2611 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2618 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2612 uint32_t num_observed_unique_peers = 2619 uint32_t num_observed_unique_peers =
2613 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers); 2620 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2621 GNUNET_STATISTICS_set (stats,
2622 "# unique peers in gossip",
2623 num_observed_unique_peers,
2624 GNUNET_NO);
2614#ifdef TO_FILE_FULL 2625#ifdef TO_FILE_FULL
2615 to_file (sub->file_name_observed_log, 2626 to_file (sub->file_name_observed_log,
2616 "%" PRIu32 " %" PRIu32 " %f\n", 2627 "%" PRIu32 " %" PRIu32 " %f\n",
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index a95ac82d4..e17b154ca 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -257,7 +257,7 @@ sampler_get_rand_peer (void *cls)
257 gpc->req_handle->gpc_tail, 257 gpc->req_handle->gpc_tail,
258 gpc); 258 gpc);
259 *gpc->id = sampler->sampler_elements[r_index]->peer_id; 259 *gpc->id = sampler->sampler_elements[r_index]->peer_id;
260 gpc->cont (gpc->cont_cls, gpc->id); 260 gpc->cont (gpc->cont_cls, gpc->id, 0, sampler->sampler_elements[r_index]->num_peers);
261 261
262 GNUNET_free (gpc); 262 GNUNET_free (gpc);
263} 263}
diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c
index 0de25df07..20cd9d0c4 100644
--- a/src/rps/rps-sampler_client.c
+++ b/src/rps/rps-sampler_client.c
@@ -158,6 +158,46 @@ struct RPS_SamplerRequestHandle
158 void *cls; 158 void *cls;
159}; 159};
160 160
161
162/**
163 * Closure to _get_rand_peer_info()
164 */
165struct RPS_SamplerRequestHandleSingleInfo
166{
167 /**
168 * DLL
169 */
170 struct RPS_SamplerRequestHandleSingleInfo *next;
171 struct RPS_SamplerRequestHandleSingleInfo *prev;
172
173 /**
174 * Pointer to the id
175 */
176 struct GNUNET_PeerIdentity *id;
177
178 /**
179 * Head and tail for the DLL to store the tasks for single requests
180 */
181 struct GetPeerCls *gpc_head;
182 struct GetPeerCls *gpc_tail;
183
184 /**
185 * Sampler.
186 */
187 struct RPS_Sampler *sampler;
188
189 /**
190 * Callback to be called when all ids are available.
191 */
192 RPS_sampler_sinlge_info_ready_cb callback;
193
194 /**
195 * Closure given to the callback
196 */
197 void *cls;
198};
199
200
161///** 201///**
162// * Global sampler variable. 202// * Global sampler variable.
163// */ 203// */
@@ -266,10 +306,16 @@ sampler_mod_get_rand_peer (void *cls)
266 struct GNUNET_TIME_Relative last_request_diff; 306 struct GNUNET_TIME_Relative last_request_diff;
267 struct RPS_Sampler *sampler; 307 struct RPS_Sampler *sampler;
268 double prob_observed_n; 308 double prob_observed_n;
309 uint32_t num_observed;
269 310
270 gpc->get_peer_task = NULL; 311 gpc->get_peer_task = NULL;
271 gpc->notify_ctx = NULL; 312 gpc->notify_ctx = NULL;
272 sampler = gpc->req_handle->sampler; 313 GNUNET_assert ( (NULL != gpc->req_handle) ||
314 (NULL != gpc->req_single_info_handle) );
315 if (NULL != gpc->req_handle)
316 sampler = gpc->req_handle->sampler;
317 else
318 sampler = gpc->req_single_info_handle->sampler;
273 319
274 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); 320 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
275 321
@@ -335,10 +381,10 @@ sampler_mod_get_rand_peer (void *cls)
335 s_elem->num_peers, 381 s_elem->num_peers,
336 sampler->deficiency_factor); 382 sampler->deficiency_factor);
337 /* check if probability is above desired */ 383 /* check if probability is above desired */
338 if (prob_observed_n >= sampler->desired_probability) 384 if (prob_observed_n < sampler->desired_probability)
339 { 385 {
340 LOG (GNUNET_ERROR_TYPE_DEBUG, 386 LOG (GNUNET_ERROR_TYPE_DEBUG,
341 "Probability of having observed all peers (%d) too small ( < %d).\n", 387 "Probability of having observed all peers (%f) too small ( < %f).\n",
342 prob_observed_n, 388 prob_observed_n,
343 sampler->desired_probability); 389 sampler->desired_probability);
344 GNUNET_assert (NULL == gpc->notify_ctx); 390 GNUNET_assert (NULL == gpc->notify_ctx);
@@ -359,13 +405,23 @@ sampler_mod_get_rand_peer (void *cls)
359// s_elem->num_change, 405// s_elem->num_change,
360// GNUNET_NO); 406// GNUNET_NO);
361 407
408 num_observed = s_elem->num_peers;
362 RPS_sampler_elem_reinit (s_elem); 409 RPS_sampler_elem_reinit (s_elem);
363 s_elem->last_client_request = GNUNET_TIME_absolute_get (); 410 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
364 411
365 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, 412 if (NULL != gpc->req_handle)
366 gpc->req_handle->gpc_tail, 413 {
367 gpc); 414 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
368 gpc->cont (gpc->cont_cls, gpc->id); 415 gpc->req_handle->gpc_tail,
416 gpc);
417 }
418 else
419 {
420 GNUNET_CONTAINER_DLL_remove (gpc->req_single_info_handle->gpc_head,
421 gpc->req_single_info_handle->gpc_tail,
422 gpc);
423 }
424 gpc->cont (gpc->cont_cls, gpc->id, prob_observed_n, num_observed);
369 GNUNET_free (gpc); 425 GNUNET_free (gpc);
370} 426}
371 427
diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h
index 1b425b754..680fabfda 100644
--- a/src/rps/rps-sampler_client.h
+++ b/src/rps/rps-sampler_client.h
@@ -40,6 +40,11 @@ struct RPS_Sampler;
40 */ 40 */
41struct RPS_SamplerRequestHandle; 41struct RPS_SamplerRequestHandle;
42 42
43/**
44 * Closure to _get_rand_peer_info()
45 */
46struct RPS_SamplerRequestHandleSingleInfo;
47
43 48
44/** 49/**
45 * Get the size of the sampler. 50 * Get the size of the sampler.
@@ -108,8 +113,6 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
108 * @param sampler the sampler to get peers from. 113 * @param sampler the sampler to get peers from.
109 * @param cb callback that will be called once the ids are ready. 114 * @param cb callback that will be called once the ids are ready.
110 * @param cls closure given to @a cb 115 * @param cls closure given to @a cb
111 * @param for_client #GNUNET_YES if result is used for client,
112 * #GNUNET_NO if used internally
113 * @param num_peers the number of peers requested 116 * @param num_peers the number of peers requested
114 */ 117 */
115struct RPS_SamplerRequestHandle * 118struct RPS_SamplerRequestHandle *
@@ -118,6 +121,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
118 RPS_sampler_n_rand_peers_ready_cb cb, 121 RPS_sampler_n_rand_peers_ready_cb cb,
119 void *cls); 122 void *cls);
120 123
124
121/** 125/**
122 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. 126 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
123 * 127 *
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c
index f54de9014..adb69e1b5 100644
--- a/src/rps/rps-sampler_common.c
+++ b/src/rps/rps-sampler_common.c
@@ -116,6 +116,45 @@ struct RPS_SamplerRequestHandle
116 116
117 117
118/** 118/**
119 * Closure to _get_rand_peer_info()
120 */
121struct RPS_SamplerRequestHandleSingleInfo
122{
123 /**
124 * DLL
125 */
126 struct RPS_SamplerRequestHandleSingleInfo *next;
127 struct RPS_SamplerRequestHandleSingleInfo *prev;
128
129 /**
130 * Pointer to the id
131 */
132 struct GNUNET_PeerIdentity *id;
133
134 /**
135 * Head and tail for the DLL to store the tasks for single requests
136 */
137 struct GetPeerCls *gpc_head;
138 struct GetPeerCls *gpc_tail;
139
140 /**
141 * Sampler.
142 */
143 struct RPS_Sampler *sampler;
144
145 /**
146 * Callback to be called when all ids are available.
147 */
148 RPS_sampler_sinlge_info_ready_cb callback;
149
150 /**
151 * Closure given to the callback
152 */
153 void *cls;
154};
155
156
157/**
119 * @brief Update the current estimate of the network size stored at the sampler 158 * @brief Update the current estimate of the network size stored at the sampler
120 * 159 *
121 * Used for computing the condition when to return elements to the client 160 * Used for computing the condition when to return elements to the client
@@ -415,12 +454,20 @@ sampler_empty (struct RPS_Sampler *sampler)
415/** 454/**
416 * Callback to _get_rand_peer() used by _get_n_rand_peers(). 455 * Callback to _get_rand_peer() used by _get_n_rand_peers().
417 * 456 *
457 * Implements #RPS_sampler_rand_peer_ready_cont
458 *
418 * Checks whether all n peers are available. If they are, 459 * Checks whether all n peers are available. If they are,
419 * give those back. 460 * give those back.
461 * @param cls Closure
462 * @param id Peer ID
463 * @param probability The probability with which this sampler has seen all ids
464 * @param num_observed How many ids this sampler has observed
420 */ 465 */
421static void 466static void
422check_n_peers_ready (void *cls, 467check_n_peers_ready (void *cls,
423 const struct GNUNET_PeerIdentity *id) 468 const struct GNUNET_PeerIdentity *id,
469 double probability,
470 uint32_t num_observed)
424{ 471{
425 struct RPS_SamplerRequestHandle *req_handle = cls; 472 struct RPS_SamplerRequestHandle *req_handle = cls;
426 (void) id; 473 (void) id;
@@ -428,6 +475,8 @@ check_n_peers_ready (void *cls,
428 struct GNUNET_PeerIdentity *peers; 475 struct GNUNET_PeerIdentity *peers;
429 uint32_t num_peers; 476 uint32_t num_peers;
430 void *cb_cls; 477 void *cb_cls;
478 (void) probability;
479 (void) num_observed;
431 480
432 req_handle->cur_num_peers++; 481 req_handle->cur_num_peers++;
433 LOG (GNUNET_ERROR_TYPE_DEBUG, 482 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -460,6 +509,53 @@ check_n_peers_ready (void *cls,
460 509
461 510
462/** 511/**
512 * Callback to _get_rand_peer() used by _get_rand_peer_info().
513 *
514 * Implements #RPS_sampler_rand_peer_ready_cont
515 *
516 * @param cls Closure
517 * @param id Peer ID
518 * @param probability The probability with which this sampler has seen all ids
519 * @param num_observed How many ids this sampler has observed
520 */
521static void
522check_peer_info_ready (void *cls,
523 const struct GNUNET_PeerIdentity *id,
524 double probability,
525 uint32_t num_observed)
526{
527 struct RPS_SamplerRequestHandleSingleInfo *req_handle = cls;
528 (void) id;
529 RPS_sampler_sinlge_info_ready_cb tmp_cb;
530 struct GNUNET_PeerIdentity *peer;
531 void *cb_cls;
532 (void) probability;
533 (void) num_observed;
534
535 LOG (GNUNET_ERROR_TYPE_DEBUG,
536 "Got single peer with additional info\n");
537
538 GNUNET_assert (NULL != req_handle->callback);
539
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "returning single peer with info to the client\n");
542
543 /* Copy pointers and peers temporarily as they
544 * might be deleted from within the callback */
545 tmp_cb = req_handle->callback;
546 peer = GNUNET_new (struct GNUNET_PeerIdentity);
547 GNUNET_memcpy (peer,
548 req_handle->id,
549 sizeof (struct GNUNET_PeerIdentity));
550 cb_cls = req_handle->cls;
551 RPS_sampler_request_single_info_cancel (req_handle);
552 req_handle = NULL;
553 tmp_cb (peer, cb_cls, probability, num_observed);
554 GNUNET_free (peer);
555}
556
557
558/**
463 * Get n random peers out of the sampled peers. 559 * Get n random peers out of the sampled peers.
464 * 560 *
465 * We might want to reinitialise this sampler after giving the 561 * We might want to reinitialise this sampler after giving the
@@ -469,8 +565,6 @@ check_n_peers_ready (void *cls,
469 * @param sampler the sampler to get peers from. 565 * @param sampler the sampler to get peers from.
470 * @param cb callback that will be called once the ids are ready. 566 * @param cb callback that will be called once the ids are ready.
471 * @param cls closure given to @a cb 567 * @param cls closure given to @a cb
472 * @param for_client #GNUNET_YES if result is used for client,
473 * #GNUNET_NO if used internally
474 * @param num_peers the number of peers requested 568 * @param num_peers the number of peers requested
475 */ 569 */
476struct RPS_SamplerRequestHandle * 570struct RPS_SamplerRequestHandle *
@@ -506,6 +600,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
506 { 600 {
507 gpc = GNUNET_new (struct GetPeerCls); 601 gpc = GNUNET_new (struct GetPeerCls);
508 gpc->req_handle = req_handle; 602 gpc->req_handle = req_handle;
603 gpc->req_single_info_handle = NULL;
509 gpc->cont = check_n_peers_ready; 604 gpc->cont = check_n_peers_ready;
510 gpc->cont_cls = req_handle; 605 gpc->cont_cls = req_handle;
511 gpc->id = &req_handle->ids[i]; 606 gpc->id = &req_handle->ids[i];
@@ -515,11 +610,56 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
515 gpc); 610 gpc);
516 // maybe add a little delay 611 // maybe add a little delay
517 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, 612 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
518 gpc); 613 gpc);
519 } 614 }
520 return req_handle; 615 return req_handle;
521} 616}
522 617
618
619/**
620 * Get one random peer with additional information.
621 *
622 * @param sampler the sampler to get peers from.
623 * @param cb callback that will be called once the ids are ready.
624 * @param cls closure given to @a cb
625 */
626struct RPS_SamplerRequestHandleSingleInfo *
627RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
628 RPS_sampler_sinlge_info_ready_cb cb,
629 void *cls)
630{
631 struct RPS_SamplerRequestHandleSingleInfo *req_handle;
632 struct GetPeerCls *gpc;
633
634 GNUNET_assert (0 != sampler->sampler_size);
635
636 // TODO check if we have too much (distinct) sampled peers
637 req_handle = GNUNET_new (struct RPS_SamplerRequestHandleSingleInfo);
638 req_handle->id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
639 req_handle->sampler = sampler;
640 req_handle->callback = cb;
641 req_handle->cls = cls;
642 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_single_head,
643 sampler->req_handle_single_tail,
644 req_handle);
645
646 gpc = GNUNET_new (struct GetPeerCls);
647 gpc->req_handle = NULL;
648 gpc->req_single_info_handle = req_handle;
649 gpc->cont = check_peer_info_ready;
650 gpc->cont_cls = req_handle;
651 gpc->id = req_handle->id;
652
653 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
654 req_handle->gpc_tail,
655 gpc);
656 // maybe add a little delay
657 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
658 gpc);
659 return req_handle;
660}
661
662
523/** 663/**
524 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. 664 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
525 * 665 *
@@ -559,6 +699,45 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
559 699
560 700
561/** 701/**
702 * Cancle a request issued through #RPS_sampler_sinlge_info_ready_cb.
703 *
704 * @param req_handle the handle to the request
705 */
706void
707RPS_sampler_request_single_info_cancel (
708 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle)
709{
710 struct GetPeerCls *i;
711
712 while (NULL != (i = req_single_info_handle->gpc_head) )
713 {
714 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->gpc_head,
715 req_single_info_handle->gpc_tail,
716 i);
717 if (NULL != i->get_peer_task)
718 {
719 GNUNET_SCHEDULER_cancel (i->get_peer_task);
720 }
721 if (NULL != i->notify_ctx)
722 {
723 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->notify_ctx_head,
724 req_single_info_handle->sampler->notify_ctx_tail,
725 i->notify_ctx);
726 GNUNET_free (i->notify_ctx);
727 i->notify_ctx = NULL;
728 }
729 GNUNET_free (i);
730 }
731 GNUNET_free (req_single_info_handle->id);
732 req_single_info_handle->id = NULL;
733 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->req_handle_single_head,
734 req_single_info_handle->sampler->req_handle_single_tail,
735 req_single_info_handle);
736 GNUNET_free (req_single_info_handle);
737}
738
739
740/**
562 * Cleans the sampler. 741 * Cleans the sampler.
563 */ 742 */
564 void 743 void
diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h
index 1abe43720..321efaf1e 100644
--- a/src/rps/rps-sampler_common.h
+++ b/src/rps/rps-sampler_common.h
@@ -44,10 +44,14 @@
44 * 44 *
45 * @param cls the closure given alongside this function. 45 * @param cls the closure given alongside this function.
46 * @param id the PeerID that was returned 46 * @param id the PeerID that was returned
47 * @param probability The probability with which this sampler has seen all ids
48 * @param num_observed How many ids this sampler has observed
47 */ 49 */
48typedef void 50typedef void
49(*RPS_sampler_rand_peer_ready_cont) (void *cls, 51(*RPS_sampler_rand_peer_ready_cont) (void *cls,
50 const struct GNUNET_PeerIdentity *id); 52 const struct GNUNET_PeerIdentity *id,
53 double probability,
54 uint32_t num_observed);
51 55
52 56
53/** 57/**
@@ -72,6 +76,22 @@ typedef void
72 76
73 77
74/** 78/**
79 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
80 *
81 * @param cls the closure given alongside this function.
82 * @param probability Probability with which all IDs have been observed
83 * @param num_observed Number of observed IDs
84 * @param ids the PeerIDs that were returned
85 * to be freed
86 */
87 typedef void
88(*RPS_sampler_sinlge_info_ready_cb) (const struct GNUNET_PeerIdentity *ids,
89 void *cls,
90 double probability,
91 uint32_t num_observed);
92
93
94/**
75 * @brief Callback called each time a new peer was put into the sampler 95 * @brief Callback called each time a new peer was put into the sampler
76 * 96 *
77 * @param cls A possibly given closure 97 * @param cls A possibly given closure
@@ -97,6 +117,11 @@ struct GetPeerCls
97 struct RPS_SamplerRequestHandle *req_handle; 117 struct RPS_SamplerRequestHandle *req_handle;
98 118
99 /** 119 /**
120 * The #RPS_SamplerRequestHandleSingleInfo this single request belongs to.
121 */
122 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle;
123
124 /**
100 * The task for this function. 125 * The task for this function.
101 */ 126 */
102 struct GNUNET_SCHEDULER_Task *get_peer_task; 127 struct GNUNET_SCHEDULER_Task *get_peer_task;
@@ -177,6 +202,12 @@ struct RPS_Sampler
177 struct RPS_SamplerRequestHandle *req_handle_head; 202 struct RPS_SamplerRequestHandle *req_handle_head;
178 struct RPS_SamplerRequestHandle *req_handle_tail; 203 struct RPS_SamplerRequestHandle *req_handle_tail;
179 204
205 /**
206 * Head and tail for the DLL to store the #RPS_SamplerRequestHandleSingleInfo
207 */
208 struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_head;
209 struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_tail;
210
180 struct SamplerNotifyUpdateCTX *notify_ctx_head; 211 struct SamplerNotifyUpdateCTX *notify_ctx_head;
181 struct SamplerNotifyUpdateCTX *notify_ctx_tail; 212 struct SamplerNotifyUpdateCTX *notify_ctx_tail;
182}; 213};
@@ -306,6 +337,19 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
306 337
307 338
308/** 339/**
340 * Get one random peer with additional information.
341 *
342 * @param sampler the sampler to get peers from.
343 * @param cb callback that will be called once the ids are ready.
344 * @param cls closure given to @a cb
345 */
346struct RPS_SamplerRequestHandleSingleInfo *
347RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
348 RPS_sampler_sinlge_info_ready_cb cb,
349 void *cls);
350
351
352/**
309 * Counts how many Samplers currently hold a given PeerID. 353 * Counts how many Samplers currently hold a given PeerID.
310 * 354 *
311 * @param sampler the sampler to count ids in. 355 * @param sampler the sampler to count ids in.
@@ -328,6 +372,16 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
328 372
329 373
330/** 374/**
375 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
376 *
377 * @param req_handle the handle to the request
378 */
379void
380RPS_sampler_request_single_info_cancel (
381 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle);
382
383
384/**
331 * Cleans the sampler. 385 * Cleans the sampler.
332 */ 386 */
333 void 387 void
diff --git a/src/rps/rps-test_util.h b/src/rps/rps-test_util.h
index 6b5f568d7..d5a2db9de 100644
--- a/src/rps/rps-test_util.h
+++ b/src/rps/rps-test_util.h
@@ -68,12 +68,15 @@ close_all_files ();
68 if (NULL == file_name) break; \ 68 if (NULL == file_name) break; \
69 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\ 69 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
70 if (0 > size)\ 70 if (0 > size)\
71 {\
71 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\ 72 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
72 "Failed to create tmp_buf\n");\ 73 "Failed to create tmp_buf\n");\
73 else\ 74 break;\
74 GNUNET_DISK_file_write (get_file_handle (file_name),\ 75 }\
75 tmp_buf,\ 76 (void) strncat(tmp_buf,"\n",512);\
76 strnlen (tmp_buf, 512));\ 77 GNUNET_DISK_file_write (get_file_handle (file_name),\
78 tmp_buf,\
79 strnlen (tmp_buf, 512));\
77 } while (0); 80 } while (0);
78 81
79 82
@@ -82,12 +85,15 @@ close_all_files ();
82 memset (tmp_buf, 0, len);\ 85 memset (tmp_buf, 0, len);\
83 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\ 86 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
84 if (0 > size)\ 87 if (0 > size)\
88 {\
85 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\ 89 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
86 "Failed to create tmp_buf\n");\ 90 "Failed to create tmp_buf\n");\
87 else\ 91 break;\
88 GNUNET_DISK_file_write (get_file_handle (file_name),\ 92 }\
89 tmp_buf,\ 93 (void) strncat(tmp_buf,"\n",len);\
90 strnlen (tmp_buf, 512));\ 94 GNUNET_DISK_file_write (get_file_handle (file_name),\
95 tmp_buf,\
96 strnlen (tmp_buf, len));\
91 } while (0); 97 } while (0);
92#else /* TO_FILE */ 98#else /* TO_FILE */
93# define to_file(file_name, ...) 99# define to_file(file_name, ...)
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 7a3adfa94..9e405fdef 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -128,6 +128,16 @@ struct GNUNET_RPS_Handle
128 struct GNUNET_RPS_Request_Handle *rh_tail; 128 struct GNUNET_RPS_Request_Handle *rh_tail;
129 129
130 /** 130 /**
131 * @brief Pointer to the head element in DLL of single request handles
132 */
133 struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head;
134
135 /**
136 * @brief Pointer to the tail element in DLL of single request handles
137 */
138 struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail;
139
140 /**
131 * @brief The desired probability with which we want to have observed all 141 * @brief The desired probability with which we want to have observed all
132 * peers. 142 * peers.
133 */ 143 */
@@ -197,6 +207,54 @@ struct GNUNET_RPS_Request_Handle
197 207
198 208
199/** 209/**
210 * Handler for a single request from a client.
211 */
212struct GNUNET_RPS_Request_Handle_Single_Info
213{
214 /**
215 * The client issuing the request.
216 */
217 struct GNUNET_RPS_Handle *rps_handle;
218
219 /**
220 * @brief The Sampler for the client request
221 */
222 struct RPS_Sampler *sampler;
223
224 /**
225 * @brief Request handle of the request to the sampler - needed to cancel the request
226 */
227 struct RPS_SamplerRequestHandleSingleInfo *sampler_rh;
228
229 /**
230 * @brief Request handle of the request of the biased stream of peers -
231 * needed to cancel the request
232 */
233 struct GNUNET_RPS_StreamRequestHandle *srh;
234
235 /**
236 * The callback to be called when we receive an answer.
237 */
238 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb;
239
240 /**
241 * The closure for the callback.
242 */
243 void *ready_cb_cls;
244
245 /**
246 * @brief Pointer to next element in DLL
247 */
248 struct GNUNET_RPS_Request_Handle_Single_Info *next;
249
250 /**
251 * @brief Pointer to previous element in DLL
252 */
253 struct GNUNET_RPS_Request_Handle_Single_Info *prev;
254};
255
256
257/**
200 * Struct used to pack the callback, its closure (provided by the caller) 258 * Struct used to pack the callback, its closure (provided by the caller)
201 * and the connection handler to the service to pass it to a callback function. 259 * and the connection handler to the service to pass it to a callback function.
202 */ 260 */
@@ -309,6 +367,34 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
309 367
310 368
311/** 369/**
370 * @brief Called once the sampler has collected the requested peer.
371 *
372 * Calls the callback provided by the client with the corresponding cls.
373 *
374 * @param peers The array of @a num_peers that has been returned.
375 * @param num_peers The number of peers that have been returned
376 * @param cls The #GNUNET_RPS_Request_Handle
377 * @param probability Probability with which all IDs have been observed
378 * @param num_observed Number of observed IDs
379 */
380static void
381peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
382 void *cls,
383 double probability,
384 uint32_t num_observed)
385{
386 struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
387
388 rh->sampler_rh = NULL;
389 rh->ready_cb (rh->ready_cb_cls,
390 peers,
391 probability,
392 num_observed);
393 GNUNET_RPS_request_single_info_cancel (rh);
394}
395
396
397/**
312 * @brief Callback to collect the peers from the biased stream and put those 398 * @brief Callback to collect the peers from the biased stream and put those
313 * into the sampler. 399 * into the sampler.
314 * 400 *
@@ -333,6 +419,33 @@ collect_peers_cb (void *cls,
333} 419}
334 420
335 421
422/**
423 * @brief Callback to collect the peers from the biased stream and put those
424 * into the sampler.
425 *
426 * This version is for the modified #GNUNET_RPS_Request_Handle_Single_Info
427 *
428 * @param cls The #GNUNET_RPS_Request_Handle
429 * @param num_peers The number of peer that have been returned
430 * @param peers The array of @a num_peers that have been returned
431 */
432static void
433collect_peers_info_cb (void *cls,
434 uint64_t num_peers,
435 const struct GNUNET_PeerIdentity *peers)
436{
437 struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
438
439 LOG (GNUNET_ERROR_TYPE_DEBUG,
440 "Service sent %" PRIu64 " peers from stream\n",
441 num_peers);
442 for (uint64_t i = 0; i < num_peers; i++)
443 {
444 RPS_sampler_update (rhs->sampler, &peers[i]);
445 }
446}
447
448
336/* Get internals for debugging/profiling purposes */ 449/* Get internals for debugging/profiling purposes */
337 450
338/** 451/**
@@ -632,15 +745,15 @@ mq_error_handler (void *cls,
632 */ 745 */
633static void 746static void
634hash_from_share_val (const char *share_val, 747hash_from_share_val (const char *share_val,
635 struct GNUNET_HashCode *hash) 748 struct GNUNET_HashCode *hash)
636{ 749{
637 GNUNET_CRYPTO_kdf (hash, 750 GNUNET_CRYPTO_kdf (hash,
638 sizeof (struct GNUNET_HashCode), 751 sizeof (struct GNUNET_HashCode),
639 "rps", 752 "rps",
640 strlen ("rps"), 753 strlen ("rps"),
641 share_val, 754 share_val,
642 strlen (share_val), 755 strlen (share_val),
643 NULL, 0); 756 NULL, 0);
644} 757}
645 758
646 759
@@ -672,6 +785,13 @@ nse_cb (void *cls,
672 RPS_sampler_update_with_nw_size (rh_iter->sampler, 785 RPS_sampler_update_with_nw_size (rh_iter->sampler,
673 GNUNET_NSE_log_estimate_to_n (logestimate)); 786 GNUNET_NSE_log_estimate_to_n (logestimate));
674 } 787 }
788 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
789 NULL != rhs_iter && NULL != rhs_iter->next;
790 rhs_iter = rhs_iter->next)
791 {
792 RPS_sampler_update_with_nw_size (rhs_iter->sampler,
793 GNUNET_NSE_log_estimate_to_n (logestimate));
794 }
675} 795}
676 796
677 797
@@ -857,6 +977,48 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
857 977
858 978
859/** 979/**
980 * Request one random peer, getting additional information.
981 *
982 * @param rps_handle handle to the rps service
983 * @param ready_cb the callback called when the peers are available
984 * @param cls closure given to the callback
985 * @return a handle to cancel this request
986 */
987struct GNUNET_RPS_Request_Handle_Single_Info *
988GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
989 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
990 void *cls)
991{
992 struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
993 uint32_t num_req_peers = 1;
994
995 LOG (GNUNET_ERROR_TYPE_INFO,
996 "Client requested peer with additional info\n");
997 rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
998 rhs->rps_handle = rps_handle;
999 rhs->sampler = RPS_sampler_mod_init (num_req_peers,
1000 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1001 RPS_sampler_set_desired_probability (rhs->sampler,
1002 rps_handle->desired_probability);
1003 RPS_sampler_set_deficiency_factor (rhs->sampler,
1004 rps_handle->deficiency_factor);
1005 rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
1006 peer_info_ready_cb,
1007 rhs);
1008 rhs->srh = GNUNET_RPS_stream_request (rps_handle,
1009 collect_peers_info_cb,
1010 rhs); /* cls */
1011 rhs->ready_cb = ready_cb;
1012 rhs->ready_cb_cls = cls;
1013 GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
1014 rps_handle->rhs_tail,
1015 rhs);
1016
1017 return rhs;
1018}
1019
1020
1021/**
860 * Seed rps service with peerIDs. 1022 * Seed rps service with peerIDs.
861 * 1023 *
862 * @param h handle to the rps service 1024 * @param h handle to the rps service
@@ -1047,6 +1209,37 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1047 1209
1048 1210
1049/** 1211/**
1212 * Cancle an issued single info request.
1213 *
1214 * @param rhs request handle of request to cancle
1215 */
1216void
1217GNUNET_RPS_request_single_info_cancel (
1218 struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1219{
1220 struct GNUNET_RPS_Handle *h;
1221
1222 h = rhs->rps_handle;
1223 GNUNET_assert (NULL != rhs);
1224 GNUNET_assert (NULL != rhs->srh);
1225 GNUNET_assert (h == rhs->srh->rps_handle);
1226 GNUNET_RPS_stream_cancel (rhs->srh);
1227 rhs->srh = NULL;
1228 if (NULL == h->stream_requests_head) cancel_stream(h);
1229 if (NULL != rhs->sampler_rh)
1230 {
1231 RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
1232 }
1233 RPS_sampler_destroy (rhs->sampler);
1234 rhs->sampler = NULL;
1235 GNUNET_CONTAINER_DLL_remove (h->rhs_head,
1236 h->rhs_tail,
1237 rhs);
1238 GNUNET_free (rhs);
1239}
1240
1241
1242/**
1050 * Disconnect from the rps service 1243 * Disconnect from the rps service
1051 * 1244 *
1052 * @param h the handle to the rps service 1245 * @param h the handle to the rps service
@@ -1079,6 +1272,17 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1079 GNUNET_RPS_request_cancel (rh_iter); 1272 GNUNET_RPS_request_cancel (rh_iter);
1080 } 1273 }
1081 } 1274 }
1275 if (NULL != h->rhs_head)
1276 {
1277 LOG (GNUNET_ERROR_TYPE_WARNING,
1278 "Not all requests were cancelled!\n");
1279 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1280 h->rhs_head != NULL;
1281 rhs_iter = h->rhs_head)
1282 {
1283 GNUNET_RPS_request_single_info_cancel (rhs_iter);
1284 }
1285 }
1082 if (NULL != srh_callback_peers) 1286 if (NULL != srh_callback_peers)
1083 { 1287 {
1084 GNUNET_free (srh_callback_peers); 1288 GNUNET_free (srh_callback_peers);