aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rps/gnunet-rps-profiler.c91
-rw-r--r--src/rps/gnunet-service-rps_sampler.c2
-rw-r--r--src/rps/rps-sampler_client.c64
-rw-r--r--src/rps/rps-sampler_client.h8
-rw-r--r--src/rps/rps-sampler_common.c187
-rw-r--r--src/rps/rps-sampler_common.h56
-rw-r--r--src/rps/rps_api.c193
7 files changed, 576 insertions, 25 deletions
diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c
index a852d94b1..ffc9d6f7e 100644
--- a/src/rps/gnunet-rps-profiler.c
+++ b/src/rps/gnunet-rps-profiler.c
@@ -429,7 +429,7 @@ struct PendingReply
429 /** 429 /**
430 * Handle to the request we are waiting for 430 * Handle to the request we are waiting for
431 */ 431 */
432 struct GNUNET_RPS_Request_Handle *req_handle; 432 struct GNUNET_RPS_Request_Handle_Single_Info *req_handle;
433 433
434 /** 434 /**
435 * The peer that requested 435 * The peer that requested
@@ -1040,7 +1040,7 @@ cancel_request (struct PendingReply *pending_rep)
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Cancelling rps get reply\n"); 1041 "Cancelling rps get reply\n");
1042 GNUNET_assert (NULL != pending_rep->req_handle); 1042 GNUNET_assert (NULL != pending_rep->req_handle);
1043 GNUNET_RPS_request_cancel (pending_rep->req_handle); 1043 GNUNET_RPS_request_single_info_cancel (pending_rep->req_handle);
1044 pending_rep->req_handle = NULL; 1044 pending_rep->req_handle = NULL;
1045 GNUNET_free (pending_rep); 1045 GNUNET_free (pending_rep);
1046 pending_rep = NULL; 1046 pending_rep = NULL;
@@ -1489,6 +1489,13 @@ default_reply_handle (void *cls,
1489 } 1489 }
1490} 1490}
1491 1491
1492
1493static void
1494profiler_reply_handle_info (void *cls,
1495 const struct GNUNET_PeerIdentity *recv_peer,
1496 double probability,
1497 uint32_t num_observed);
1498
1492/** 1499/**
1493 * Request random peers. 1500 * Request random peers.
1494 */ 1501 */
@@ -1510,9 +1517,12 @@ request_peers (void *cls)
1510 "Requesting one peer\n"); 1517 "Requesting one peer\n");
1511 pending_rep = GNUNET_new (struct PendingReply); 1518 pending_rep = GNUNET_new (struct PendingReply);
1512 pending_rep->rps_peer = rps_peer; 1519 pending_rep->rps_peer = rps_peer;
1513 pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1520 //pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
1514 1, 1521 // 1,
1515 cur_test_run.reply_handle, 1522 // cur_test_run.reply_handle,
1523 // pending_rep);
1524 pending_rep->req_handle = GNUNET_RPS_request_peer_info (rps_peer->rps_handle,
1525 profiler_reply_handle_info,
1516 pending_rep); 1526 pending_rep);
1517 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head, 1527 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
1518 rps_peer->pending_rep_tail, 1528 rps_peer->pending_rep_tail,
@@ -1979,6 +1989,77 @@ profiler_reply_handle (void *cls,
1979} 1989}
1980 1990
1981 1991
1992/**
1993 * Callback to call on receipt of a reply
1994 *
1995 * @param cls closure
1996 * @param n number of peers
1997 * @param recv_peers the received peers
1998 */
1999static void
2000profiler_reply_handle_info (void *cls,
2001 const struct GNUNET_PeerIdentity *recv_peer,
2002 double probability,
2003 uint32_t num_observed)
2004{
2005 struct RPSPeer *rps_peer;
2006 struct RPSPeer *rcv_rps_peer;
2007 char file_name_buf[128];
2008 char file_name_dh_buf[128];
2009 char file_name_dhr_buf[128];
2010 char file_name_dhru_buf[128];
2011 char *file_name = file_name_buf;
2012 char *file_name_dh = file_name_dh_buf;
2013 char *file_name_dhr = file_name_dhr_buf;
2014 char *file_name_dhru = file_name_dhru_buf;
2015 unsigned int i;
2016 struct PendingReply *pending_rep = (struct PendingReply *) cls;
2017
2018 pending_rep->req_handle = NULL;
2019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n");
2020 rps_peer = pending_rep->rps_peer;
2021 (void) GNUNET_asprintf (&file_name,
2022 "/tmp/rps/received_ids-%u",
2023 rps_peer->index);
2024
2025 (void) GNUNET_asprintf (&file_name_dh,
2026 "/tmp/rps/diehard_input-%u",
2027 rps_peer->index);
2028 (void) GNUNET_asprintf (&file_name_dhr,
2029 "/tmp/rps/diehard_input_raw-%u",
2030 rps_peer->index);
2031 (void) GNUNET_asprintf (&file_name_dhru,
2032 "/tmp/rps/diehard_input_raw_aligned-%u",
2033 rps_peer->index);
2034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2035 "[%s] got peer with info:\n",
2036 GNUNET_i2s (rps_peer->peer_id));
2037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038 " %s\n",
2039 GNUNET_i2s (recv_peer));
2040 tofile (file_name,
2041 "%s %d %" PRIu32 " \n",
2042 GNUNET_i2s_full (recv_peer),
2043 probability,
2044 num_observed);
2045 rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, recv_peer);
2046 GNUNET_assert (NULL != rcv_rps_peer);
2047 tofile (file_name_dh,
2048 "%" PRIu32 "\n",
2049 (uint32_t) rcv_rps_peer->index);
2050#ifdef TO_FILE
2051 to_file_raw (file_name_dhr,
2052 (char *) &rcv_rps_peer->index,
2053 sizeof (uint32_t));
2054 to_file_raw_unaligned (file_name_dhru,
2055 (char *) &rcv_rps_peer->index,
2056 sizeof (uint32_t),
2057 bits_needed);
2058#endif /* TO_FILE */
2059 default_reply_handle (cls, 1, recv_peer);
2060}
2061
2062
1982static void 2063static void
1983profiler_cb (struct RPSPeer *rps_peer) 2064profiler_cb (struct RPSPeer *rps_peer)
1984{ 2065{
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index a95ac82d4..e17b154ca 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -257,7 +257,7 @@ sampler_get_rand_peer (void *cls)
257 gpc->req_handle->gpc_tail, 257 gpc->req_handle->gpc_tail,
258 gpc); 258 gpc);
259 *gpc->id = sampler->sampler_elements[r_index]->peer_id; 259 *gpc->id = sampler->sampler_elements[r_index]->peer_id;
260 gpc->cont (gpc->cont_cls, gpc->id); 260 gpc->cont (gpc->cont_cls, gpc->id, 0, sampler->sampler_elements[r_index]->num_peers);
261 261
262 GNUNET_free (gpc); 262 GNUNET_free (gpc);
263} 263}
diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c
index 0de25df07..61f9b6385 100644
--- a/src/rps/rps-sampler_client.c
+++ b/src/rps/rps-sampler_client.c
@@ -158,6 +158,46 @@ struct RPS_SamplerRequestHandle
158 void *cls; 158 void *cls;
159}; 159};
160 160
161
162/**
163 * Closure to _get_rand_peer_info()
164 */
165struct RPS_SamplerRequestHandleSingleInfo
166{
167 /**
168 * DLL
169 */
170 struct RPS_SamplerRequestHandleSingleInfo *next;
171 struct RPS_SamplerRequestHandleSingleInfo *prev;
172
173 /**
174 * Pointer to the id
175 */
176 struct GNUNET_PeerIdentity *id;
177
178 /**
179 * Head and tail for the DLL to store the tasks for single requests
180 */
181 struct GetPeerCls *gpc_head;
182 struct GetPeerCls *gpc_tail;
183
184 /**
185 * Sampler.
186 */
187 struct RPS_Sampler *sampler;
188
189 /**
190 * Callback to be called when all ids are available.
191 */
192 RPS_sampler_sinlge_info_ready_cb callback;
193
194 /**
195 * Closure given to the callback
196 */
197 void *cls;
198};
199
200
161///** 201///**
162// * Global sampler variable. 202// * Global sampler variable.
163// */ 203// */
@@ -269,7 +309,12 @@ sampler_mod_get_rand_peer (void *cls)
269 309
270 gpc->get_peer_task = NULL; 310 gpc->get_peer_task = NULL;
271 gpc->notify_ctx = NULL; 311 gpc->notify_ctx = NULL;
272 sampler = gpc->req_handle->sampler; 312 GNUNET_assert ( (NULL != gpc->req_handle) ||
313 (NULL != gpc->req_single_info_handle) );
314 if (NULL != gpc->req_handle)
315 sampler = gpc->req_handle->sampler;
316 else
317 sampler = gpc->req_single_info_handle->sampler;
273 318
274 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); 319 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
275 320
@@ -362,10 +407,19 @@ sampler_mod_get_rand_peer (void *cls)
362 RPS_sampler_elem_reinit (s_elem); 407 RPS_sampler_elem_reinit (s_elem);
363 s_elem->last_client_request = GNUNET_TIME_absolute_get (); 408 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
364 409
365 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, 410 if (NULL != gpc->req_handle)
366 gpc->req_handle->gpc_tail, 411 {
367 gpc); 412 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
368 gpc->cont (gpc->cont_cls, gpc->id); 413 gpc->req_handle->gpc_tail,
414 gpc);
415 }
416 else
417 {
418 GNUNET_CONTAINER_DLL_remove (gpc->req_single_info_handle->gpc_head,
419 gpc->req_single_info_handle->gpc_tail,
420 gpc);
421 }
422 gpc->cont (gpc->cont_cls, gpc->id, prob_observed_n, s_elem->num_peers);
369 GNUNET_free (gpc); 423 GNUNET_free (gpc);
370} 424}
371 425
diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h
index 1b425b754..680fabfda 100644
--- a/src/rps/rps-sampler_client.h
+++ b/src/rps/rps-sampler_client.h
@@ -40,6 +40,11 @@ struct RPS_Sampler;
40 */ 40 */
41struct RPS_SamplerRequestHandle; 41struct RPS_SamplerRequestHandle;
42 42
43/**
44 * Closure to _get_rand_peer_info()
45 */
46struct RPS_SamplerRequestHandleSingleInfo;
47
43 48
44/** 49/**
45 * Get the size of the sampler. 50 * Get the size of the sampler.
@@ -108,8 +113,6 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
108 * @param sampler the sampler to get peers from. 113 * @param sampler the sampler to get peers from.
109 * @param cb callback that will be called once the ids are ready. 114 * @param cb callback that will be called once the ids are ready.
110 * @param cls closure given to @a cb 115 * @param cls closure given to @a cb
111 * @param for_client #GNUNET_YES if result is used for client,
112 * #GNUNET_NO if used internally
113 * @param num_peers the number of peers requested 116 * @param num_peers the number of peers requested
114 */ 117 */
115struct RPS_SamplerRequestHandle * 118struct RPS_SamplerRequestHandle *
@@ -118,6 +121,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
118 RPS_sampler_n_rand_peers_ready_cb cb, 121 RPS_sampler_n_rand_peers_ready_cb cb,
119 void *cls); 122 void *cls);
120 123
124
121/** 125/**
122 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. 126 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
123 * 127 *
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c
index f54de9014..adb69e1b5 100644
--- a/src/rps/rps-sampler_common.c
+++ b/src/rps/rps-sampler_common.c
@@ -116,6 +116,45 @@ struct RPS_SamplerRequestHandle
116 116
117 117
118/** 118/**
119 * Closure to _get_rand_peer_info()
120 */
121struct RPS_SamplerRequestHandleSingleInfo
122{
123 /**
124 * DLL
125 */
126 struct RPS_SamplerRequestHandleSingleInfo *next;
127 struct RPS_SamplerRequestHandleSingleInfo *prev;
128
129 /**
130 * Pointer to the id
131 */
132 struct GNUNET_PeerIdentity *id;
133
134 /**
135 * Head and tail for the DLL to store the tasks for single requests
136 */
137 struct GetPeerCls *gpc_head;
138 struct GetPeerCls *gpc_tail;
139
140 /**
141 * Sampler.
142 */
143 struct RPS_Sampler *sampler;
144
145 /**
146 * Callback to be called when all ids are available.
147 */
148 RPS_sampler_sinlge_info_ready_cb callback;
149
150 /**
151 * Closure given to the callback
152 */
153 void *cls;
154};
155
156
157/**
119 * @brief Update the current estimate of the network size stored at the sampler 158 * @brief Update the current estimate of the network size stored at the sampler
120 * 159 *
121 * Used for computing the condition when to return elements to the client 160 * Used for computing the condition when to return elements to the client
@@ -415,12 +454,20 @@ sampler_empty (struct RPS_Sampler *sampler)
415/** 454/**
416 * Callback to _get_rand_peer() used by _get_n_rand_peers(). 455 * Callback to _get_rand_peer() used by _get_n_rand_peers().
417 * 456 *
457 * Implements #RPS_sampler_rand_peer_ready_cont
458 *
418 * Checks whether all n peers are available. If they are, 459 * Checks whether all n peers are available. If they are,
419 * give those back. 460 * give those back.
461 * @param cls Closure
462 * @param id Peer ID
463 * @param probability The probability with which this sampler has seen all ids
464 * @param num_observed How many ids this sampler has observed
420 */ 465 */
421static void 466static void
422check_n_peers_ready (void *cls, 467check_n_peers_ready (void *cls,
423 const struct GNUNET_PeerIdentity *id) 468 const struct GNUNET_PeerIdentity *id,
469 double probability,
470 uint32_t num_observed)
424{ 471{
425 struct RPS_SamplerRequestHandle *req_handle = cls; 472 struct RPS_SamplerRequestHandle *req_handle = cls;
426 (void) id; 473 (void) id;
@@ -428,6 +475,8 @@ check_n_peers_ready (void *cls,
428 struct GNUNET_PeerIdentity *peers; 475 struct GNUNET_PeerIdentity *peers;
429 uint32_t num_peers; 476 uint32_t num_peers;
430 void *cb_cls; 477 void *cb_cls;
478 (void) probability;
479 (void) num_observed;
431 480
432 req_handle->cur_num_peers++; 481 req_handle->cur_num_peers++;
433 LOG (GNUNET_ERROR_TYPE_DEBUG, 482 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -460,6 +509,53 @@ check_n_peers_ready (void *cls,
460 509
461 510
462/** 511/**
512 * Callback to _get_rand_peer() used by _get_rand_peer_info().
513 *
514 * Implements #RPS_sampler_rand_peer_ready_cont
515 *
516 * @param cls Closure
517 * @param id Peer ID
518 * @param probability The probability with which this sampler has seen all ids
519 * @param num_observed How many ids this sampler has observed
520 */
521static void
522check_peer_info_ready (void *cls,
523 const struct GNUNET_PeerIdentity *id,
524 double probability,
525 uint32_t num_observed)
526{
527 struct RPS_SamplerRequestHandleSingleInfo *req_handle = cls;
528 (void) id;
529 RPS_sampler_sinlge_info_ready_cb tmp_cb;
530 struct GNUNET_PeerIdentity *peer;
531 void *cb_cls;
532 (void) probability;
533 (void) num_observed;
534
535 LOG (GNUNET_ERROR_TYPE_DEBUG,
536 "Got single peer with additional info\n");
537
538 GNUNET_assert (NULL != req_handle->callback);
539
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "returning single peer with info to the client\n");
542
543 /* Copy pointers and peers temporarily as they
544 * might be deleted from within the callback */
545 tmp_cb = req_handle->callback;
546 peer = GNUNET_new (struct GNUNET_PeerIdentity);
547 GNUNET_memcpy (peer,
548 req_handle->id,
549 sizeof (struct GNUNET_PeerIdentity));
550 cb_cls = req_handle->cls;
551 RPS_sampler_request_single_info_cancel (req_handle);
552 req_handle = NULL;
553 tmp_cb (peer, cb_cls, probability, num_observed);
554 GNUNET_free (peer);
555}
556
557
558/**
463 * Get n random peers out of the sampled peers. 559 * Get n random peers out of the sampled peers.
464 * 560 *
465 * We might want to reinitialise this sampler after giving the 561 * We might want to reinitialise this sampler after giving the
@@ -469,8 +565,6 @@ check_n_peers_ready (void *cls,
469 * @param sampler the sampler to get peers from. 565 * @param sampler the sampler to get peers from.
470 * @param cb callback that will be called once the ids are ready. 566 * @param cb callback that will be called once the ids are ready.
471 * @param cls closure given to @a cb 567 * @param cls closure given to @a cb
472 * @param for_client #GNUNET_YES if result is used for client,
473 * #GNUNET_NO if used internally
474 * @param num_peers the number of peers requested 568 * @param num_peers the number of peers requested
475 */ 569 */
476struct RPS_SamplerRequestHandle * 570struct RPS_SamplerRequestHandle *
@@ -506,6 +600,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
506 { 600 {
507 gpc = GNUNET_new (struct GetPeerCls); 601 gpc = GNUNET_new (struct GetPeerCls);
508 gpc->req_handle = req_handle; 602 gpc->req_handle = req_handle;
603 gpc->req_single_info_handle = NULL;
509 gpc->cont = check_n_peers_ready; 604 gpc->cont = check_n_peers_ready;
510 gpc->cont_cls = req_handle; 605 gpc->cont_cls = req_handle;
511 gpc->id = &req_handle->ids[i]; 606 gpc->id = &req_handle->ids[i];
@@ -515,11 +610,56 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
515 gpc); 610 gpc);
516 // maybe add a little delay 611 // maybe add a little delay
517 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, 612 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
518 gpc); 613 gpc);
519 } 614 }
520 return req_handle; 615 return req_handle;
521} 616}
522 617
618
619/**
620 * Get one random peer with additional information.
621 *
622 * @param sampler the sampler to get peers from.
623 * @param cb callback that will be called once the ids are ready.
624 * @param cls closure given to @a cb
625 */
626struct RPS_SamplerRequestHandleSingleInfo *
627RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
628 RPS_sampler_sinlge_info_ready_cb cb,
629 void *cls)
630{
631 struct RPS_SamplerRequestHandleSingleInfo *req_handle;
632 struct GetPeerCls *gpc;
633
634 GNUNET_assert (0 != sampler->sampler_size);
635
636 // TODO check if we have too much (distinct) sampled peers
637 req_handle = GNUNET_new (struct RPS_SamplerRequestHandleSingleInfo);
638 req_handle->id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
639 req_handle->sampler = sampler;
640 req_handle->callback = cb;
641 req_handle->cls = cls;
642 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_single_head,
643 sampler->req_handle_single_tail,
644 req_handle);
645
646 gpc = GNUNET_new (struct GetPeerCls);
647 gpc->req_handle = NULL;
648 gpc->req_single_info_handle = req_handle;
649 gpc->cont = check_peer_info_ready;
650 gpc->cont_cls = req_handle;
651 gpc->id = req_handle->id;
652
653 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
654 req_handle->gpc_tail,
655 gpc);
656 // maybe add a little delay
657 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
658 gpc);
659 return req_handle;
660}
661
662
523/** 663/**
524 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. 664 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
525 * 665 *
@@ -559,6 +699,45 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
559 699
560 700
561/** 701/**
702 * Cancle a request issued through #RPS_sampler_sinlge_info_ready_cb.
703 *
704 * @param req_handle the handle to the request
705 */
706void
707RPS_sampler_request_single_info_cancel (
708 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle)
709{
710 struct GetPeerCls *i;
711
712 while (NULL != (i = req_single_info_handle->gpc_head) )
713 {
714 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->gpc_head,
715 req_single_info_handle->gpc_tail,
716 i);
717 if (NULL != i->get_peer_task)
718 {
719 GNUNET_SCHEDULER_cancel (i->get_peer_task);
720 }
721 if (NULL != i->notify_ctx)
722 {
723 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->notify_ctx_head,
724 req_single_info_handle->sampler->notify_ctx_tail,
725 i->notify_ctx);
726 GNUNET_free (i->notify_ctx);
727 i->notify_ctx = NULL;
728 }
729 GNUNET_free (i);
730 }
731 GNUNET_free (req_single_info_handle->id);
732 req_single_info_handle->id = NULL;
733 GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->req_handle_single_head,
734 req_single_info_handle->sampler->req_handle_single_tail,
735 req_single_info_handle);
736 GNUNET_free (req_single_info_handle);
737}
738
739
740/**
562 * Cleans the sampler. 741 * Cleans the sampler.
563 */ 742 */
564 void 743 void
diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h
index 1abe43720..321efaf1e 100644
--- a/src/rps/rps-sampler_common.h
+++ b/src/rps/rps-sampler_common.h
@@ -44,10 +44,14 @@
44 * 44 *
45 * @param cls the closure given alongside this function. 45 * @param cls the closure given alongside this function.
46 * @param id the PeerID that was returned 46 * @param id the PeerID that was returned
47 * @param probability The probability with which this sampler has seen all ids
48 * @param num_observed How many ids this sampler has observed
47 */ 49 */
48typedef void 50typedef void
49(*RPS_sampler_rand_peer_ready_cont) (void *cls, 51(*RPS_sampler_rand_peer_ready_cont) (void *cls,
50 const struct GNUNET_PeerIdentity *id); 52 const struct GNUNET_PeerIdentity *id,
53 double probability,
54 uint32_t num_observed);
51 55
52 56
53/** 57/**
@@ -72,6 +76,22 @@ typedef void
72 76
73 77
74/** 78/**
79 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
80 *
81 * @param cls the closure given alongside this function.
82 * @param probability Probability with which all IDs have been observed
83 * @param num_observed Number of observed IDs
84 * @param ids the PeerIDs that were returned
85 * to be freed
86 */
87 typedef void
88(*RPS_sampler_sinlge_info_ready_cb) (const struct GNUNET_PeerIdentity *ids,
89 void *cls,
90 double probability,
91 uint32_t num_observed);
92
93
94/**
75 * @brief Callback called each time a new peer was put into the sampler 95 * @brief Callback called each time a new peer was put into the sampler
76 * 96 *
77 * @param cls A possibly given closure 97 * @param cls A possibly given closure
@@ -97,6 +117,11 @@ struct GetPeerCls
97 struct RPS_SamplerRequestHandle *req_handle; 117 struct RPS_SamplerRequestHandle *req_handle;
98 118
99 /** 119 /**
120 * The #RPS_SamplerRequestHandleSingleInfo this single request belongs to.
121 */
122 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle;
123
124 /**
100 * The task for this function. 125 * The task for this function.
101 */ 126 */
102 struct GNUNET_SCHEDULER_Task *get_peer_task; 127 struct GNUNET_SCHEDULER_Task *get_peer_task;
@@ -177,6 +202,12 @@ struct RPS_Sampler
177 struct RPS_SamplerRequestHandle *req_handle_head; 202 struct RPS_SamplerRequestHandle *req_handle_head;
178 struct RPS_SamplerRequestHandle *req_handle_tail; 203 struct RPS_SamplerRequestHandle *req_handle_tail;
179 204
205 /**
206 * Head and tail for the DLL to store the #RPS_SamplerRequestHandleSingleInfo
207 */
208 struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_head;
209 struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_tail;
210
180 struct SamplerNotifyUpdateCTX *notify_ctx_head; 211 struct SamplerNotifyUpdateCTX *notify_ctx_head;
181 struct SamplerNotifyUpdateCTX *notify_ctx_tail; 212 struct SamplerNotifyUpdateCTX *notify_ctx_tail;
182}; 213};
@@ -306,6 +337,19 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
306 337
307 338
308/** 339/**
340 * Get one random peer with additional information.
341 *
342 * @param sampler the sampler to get peers from.
343 * @param cb callback that will be called once the ids are ready.
344 * @param cls closure given to @a cb
345 */
346struct RPS_SamplerRequestHandleSingleInfo *
347RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler,
348 RPS_sampler_sinlge_info_ready_cb cb,
349 void *cls);
350
351
352/**
309 * Counts how many Samplers currently hold a given PeerID. 353 * Counts how many Samplers currently hold a given PeerID.
310 * 354 *
311 * @param sampler the sampler to count ids in. 355 * @param sampler the sampler to count ids in.
@@ -328,6 +372,16 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
328 372
329 373
330/** 374/**
375 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
376 *
377 * @param req_handle the handle to the request
378 */
379void
380RPS_sampler_request_single_info_cancel (
381 struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle);
382
383
384/**
331 * Cleans the sampler. 385 * Cleans the sampler.
332 */ 386 */
333 void 387 void
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 7a3adfa94..83dff27e8 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -128,6 +128,16 @@ struct GNUNET_RPS_Handle
128 struct GNUNET_RPS_Request_Handle *rh_tail; 128 struct GNUNET_RPS_Request_Handle *rh_tail;
129 129
130 /** 130 /**
131 * @brief Pointer to the head element in DLL of single request handles
132 */
133 struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head;
134
135 /**
136 * @brief Pointer to the tail element in DLL of single request handles
137 */
138 struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail;
139
140 /**
131 * @brief The desired probability with which we want to have observed all 141 * @brief The desired probability with which we want to have observed all
132 * peers. 142 * peers.
133 */ 143 */
@@ -197,6 +207,54 @@ struct GNUNET_RPS_Request_Handle
197 207
198 208
199/** 209/**
210 * Handler for a single request from a client.
211 */
212struct GNUNET_RPS_Request_Handle_Single_Info
213{
214 /**
215 * The client issuing the request.
216 */
217 struct GNUNET_RPS_Handle *rps_handle;
218
219 /**
220 * @brief The Sampler for the client request
221 */
222 struct RPS_Sampler *sampler;
223
224 /**
225 * @brief Request handle of the request to the sampler - needed to cancel the request
226 */
227 struct RPS_SamplerRequestHandleSingleInfo *sampler_rh;
228
229 /**
230 * @brief Request handle of the request of the biased stream of peers -
231 * needed to cancel the request
232 */
233 struct GNUNET_RPS_StreamRequestHandle *srh;
234
235 /**
236 * The callback to be called when we receive an answer.
237 */
238 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb;
239
240 /**
241 * The closure for the callback.
242 */
243 void *ready_cb_cls;
244
245 /**
246 * @brief Pointer to next element in DLL
247 */
248 struct GNUNET_RPS_Request_Handle_Single_Info *next;
249
250 /**
251 * @brief Pointer to previous element in DLL
252 */
253 struct GNUNET_RPS_Request_Handle_Single_Info *prev;
254};
255
256
257/**
200 * Struct used to pack the callback, its closure (provided by the caller) 258 * Struct used to pack the callback, its closure (provided by the caller)
201 * and the connection handler to the service to pass it to a callback function. 259 * and the connection handler to the service to pass it to a callback function.
202 */ 260 */
@@ -309,6 +367,36 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
309 367
310 368
311/** 369/**
370 * @brief Called once the sampler has collected the requested peer.
371 *
372 * Calls the callback provided by the client with the corresponding cls.
373 *
374 * @param peers The array of @a num_peers that has been returned.
375 * @param num_peers The number of peers that have been returned
376 * @param cls The #GNUNET_RPS_Request_Handle
377 * @param probability Probability with which all IDs have been observed
378 * @param num_observed Number of observed IDs
379 */
380static void
381peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
382 void *cls,
383 double probability,
384 uint32_t num_observed)
385{
386 struct GNUNET_RPS_Request_Handle *rh = cls;
387 (void) probability;
388 (void) num_observed;
389 uint32_t num_peers = 1;
390
391 rh->sampler_rh = NULL;
392 rh->ready_cb (rh->ready_cb_cls,
393 num_peers,
394 peers);
395 GNUNET_RPS_request_cancel (rh);
396}
397
398
399/**
312 * @brief Callback to collect the peers from the biased stream and put those 400 * @brief Callback to collect the peers from the biased stream and put those
313 * into the sampler. 401 * into the sampler.
314 * 402 *
@@ -632,15 +720,15 @@ mq_error_handler (void *cls,
632 */ 720 */
633static void 721static void
634hash_from_share_val (const char *share_val, 722hash_from_share_val (const char *share_val,
635 struct GNUNET_HashCode *hash) 723 struct GNUNET_HashCode *hash)
636{ 724{
637 GNUNET_CRYPTO_kdf (hash, 725 GNUNET_CRYPTO_kdf (hash,
638 sizeof (struct GNUNET_HashCode), 726 sizeof (struct GNUNET_HashCode),
639 "rps", 727 "rps",
640 strlen ("rps"), 728 strlen ("rps"),
641 share_val, 729 share_val,
642 strlen (share_val), 730 strlen (share_val),
643 NULL, 0); 731 NULL, 0);
644} 732}
645 733
646 734
@@ -672,6 +760,13 @@ nse_cb (void *cls,
672 RPS_sampler_update_with_nw_size (rh_iter->sampler, 760 RPS_sampler_update_with_nw_size (rh_iter->sampler,
673 GNUNET_NSE_log_estimate_to_n (logestimate)); 761 GNUNET_NSE_log_estimate_to_n (logestimate));
674 } 762 }
763 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
764 NULL != rhs_iter && NULL != rhs_iter->next;
765 rhs_iter = rhs_iter->next)
766 {
767 RPS_sampler_update_with_nw_size (rhs_iter->sampler,
768 GNUNET_NSE_log_estimate_to_n (logestimate));
769 }
675} 770}
676 771
677 772
@@ -857,6 +952,48 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
857 952
858 953
859/** 954/**
955 * Request one random peer, getting additional information.
956 *
957 * @param rps_handle handle to the rps service
958 * @param ready_cb the callback called when the peers are available
959 * @param cls closure given to the callback
960 * @return a handle to cancel this request
961 */
962struct GNUNET_RPS_Request_Handle_Single_Info *
963GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
964 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
965 void *cls)
966{
967 struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
968 uint32_t num_req_peers = 1;
969
970 LOG (GNUNET_ERROR_TYPE_INFO,
971 "Client requested peer with additional info\n");
972 rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
973 rhs->rps_handle = rps_handle;
974 rhs->sampler = RPS_sampler_mod_init (num_req_peers,
975 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
976 RPS_sampler_set_desired_probability (rhs->sampler,
977 rps_handle->desired_probability);
978 RPS_sampler_set_deficiency_factor (rhs->sampler,
979 rps_handle->deficiency_factor);
980 rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
981 peer_info_ready_cb,
982 rhs);
983 rhs->srh = GNUNET_RPS_stream_request (rps_handle,
984 collect_peers_cb,
985 rhs); /* cls */
986 rhs->ready_cb = ready_cb;
987 rhs->ready_cb_cls = cls;
988 GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
989 rps_handle->rhs_tail,
990 rhs);
991
992 return rhs;
993}
994
995
996/**
860 * Seed rps service with peerIDs. 997 * Seed rps service with peerIDs.
861 * 998 *
862 * @param h handle to the rps service 999 * @param h handle to the rps service
@@ -1047,6 +1184,37 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1047 1184
1048 1185
1049/** 1186/**
1187 * Cancle an issued single info request.
1188 *
1189 * @param rhs request handle of request to cancle
1190 */
1191void
1192GNUNET_RPS_request_single_info_cancel (
1193 struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1194{
1195 struct GNUNET_RPS_Handle *h;
1196
1197 h = rhs->rps_handle;
1198 GNUNET_assert (NULL != rhs);
1199 GNUNET_assert (NULL != rhs->srh);
1200 GNUNET_assert (h == rhs->srh->rps_handle);
1201 GNUNET_RPS_stream_cancel (rhs->srh);
1202 rhs->srh = NULL;
1203 if (NULL == h->stream_requests_head) cancel_stream(h);
1204 if (NULL != rhs->sampler_rh)
1205 {
1206 RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
1207 }
1208 RPS_sampler_destroy (rhs->sampler);
1209 rhs->sampler = NULL;
1210 GNUNET_CONTAINER_DLL_remove (h->rhs_head,
1211 h->rhs_tail,
1212 rhs);
1213 GNUNET_free (rhs);
1214}
1215
1216
1217/**
1050 * Disconnect from the rps service 1218 * Disconnect from the rps service
1051 * 1219 *
1052 * @param h the handle to the rps service 1220 * @param h the handle to the rps service
@@ -1079,6 +1247,17 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1079 GNUNET_RPS_request_cancel (rh_iter); 1247 GNUNET_RPS_request_cancel (rh_iter);
1080 } 1248 }
1081 } 1249 }
1250 if (NULL != h->rhs_head)
1251 {
1252 LOG (GNUNET_ERROR_TYPE_WARNING,
1253 "Not all requests were cancelled!\n");
1254 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1255 h->rhs_head != NULL;
1256 rhs_iter = h->rhs_head)
1257 {
1258 GNUNET_RPS_request_single_info_cancel (rhs_iter);
1259 }
1260 }
1082 if (NULL != srh_callback_peers) 1261 if (NULL != srh_callback_peers)
1083 { 1262 {
1084 GNUNET_free (srh_callback_peers); 1263 GNUNET_free (srh_callback_peers);