aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rps/Makefile.am6
-rw-r--r--src/rps/gnunet-service-rps_sampler.c648
-rw-r--r--src/rps/gnunet-service-rps_sampler.h26
-rw-r--r--src/rps/rps-sampler_client.c328
-rw-r--r--src/rps/rps-sampler_client.h150
-rw-r--r--src/rps/rps-sampler_common.c527
-rw-r--r--src/rps/rps-sampler_common.h280
-rw-r--r--src/rps/rps_api.c2
8 files changed, 1294 insertions, 673 deletions
diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am
index 5e9fd09fa..e973bb7ca 100644
--- a/src/rps/Makefile.am
+++ b/src/rps/Makefile.am
@@ -21,7 +21,7 @@ bin_PROGRAMS = gnunet-rps
21 21
22gnunet_rps_SOURCES = \ 22gnunet_rps_SOURCES = \
23 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ 23 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
24 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ 24 rps-sampler_common.h rps-sampler_common.c \
25 gnunet-rps.c 25 gnunet-rps.c
26 26
27gnunet_rps_LDADD = \ 27gnunet_rps_LDADD = \
@@ -32,6 +32,7 @@ gnunet_rps_LDADD = \
32lib_LTLIBRARIES = libgnunetrps.la 32lib_LTLIBRARIES = libgnunetrps.la
33 33
34libgnunetrps_la_SOURCES = \ 34libgnunetrps_la_SOURCES = \
35 rps-sampler_client.h rps-sampler_client.c \
35 rps_api.c rps.h 36 rps_api.c rps.h
36libgnunetrps_la_LIBADD = \ 37libgnunetrps_la_LIBADD = \
37 $(top_builddir)/src/util/libgnunetutil.la \ 38 $(top_builddir)/src/util/libgnunetutil.la \
@@ -52,6 +53,7 @@ endif
52 53
53gnunet_service_rps_SOURCES = \ 54gnunet_service_rps_SOURCES = \
54 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ 55 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
56 rps-sampler_common.h rps-sampler_common.c \
55 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ 57 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \
56 gnunet-service-rps_custommap.h gnunet-service-rps_custommap.c \ 58 gnunet-service-rps_custommap.h gnunet-service-rps_custommap.c \
57 gnunet-service-rps_view.h gnunet-service-rps_view.c \ 59 gnunet-service-rps_view.h gnunet-service-rps_view.c \
@@ -91,6 +93,7 @@ rps_test_src = \
91 test_rps.c \ 93 test_rps.c \
92 rps-test_util.h rps-test_util.c \ 94 rps-test_util.h rps-test_util.c \
93 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ 95 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
96 rps-sampler_common.h rps-sampler_common.c \
94 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c 97 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c
95 98
96ld_rps_test_lib = \ 99ld_rps_test_lib = \
@@ -148,6 +151,7 @@ test_rps_churn_LDADD = $(ld_rps_test_lib)
148 151
149gnunet_rps_profiler_SOURCES = \ 152gnunet_rps_profiler_SOURCES = \
150 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ 153 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
154 rps-sampler_common.h rps-sampler_common.c \
151 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ 155 gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \
152 rps-test_util.h rps-test_util.c \ 156 rps-test_util.h rps-test_util.c \
153 gnunet-rps-profiler.c 157 gnunet-rps-profiler.c
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 2cd4cb996..9629a29a1 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -26,6 +26,7 @@
26#include "gnunet_statistics_service.h" 26#include "gnunet_statistics_service.h"
27#include "rps.h" 27#include "rps.h"
28 28
29#include "rps-sampler_common.h"
29#include "gnunet-service-rps_sampler.h" 30#include "gnunet-service-rps_sampler.h"
30#include "gnunet-service-rps_sampler_elem.h" 31#include "gnunet-service-rps_sampler_elem.h"
31 32
@@ -53,17 +54,6 @@
53// TODO care about invalid input of the caller (size 0 or less...) 54// TODO care about invalid input of the caller (size 0 or less...)
54 55
55/** 56/**
56 * Callback that is called from _get_rand_peer() when the PeerID is ready.
57 *
58 * @param cls the closure given alongside this function.
59 * @param id the PeerID that was returned
60 */
61typedef void
62(*RPS_sampler_rand_peer_ready_cont) (void *cls,
63 const struct GNUNET_PeerIdentity *id);
64
65
66/**
67 * @brief Callback called each time a new peer was put into the sampler 57 * @brief Callback called each time a new peer was put into the sampler
68 * 58 *
69 * @param cls A possibly given closure 59 * @param cls A possibly given closure
@@ -101,49 +91,6 @@ struct SamplerNotifyUpdateCTX
101 91
102 92
103/** 93/**
104 * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
105 */
106struct GetPeerCls
107{
108 /**
109 * DLL
110 */
111 struct GetPeerCls *next;
112 struct GetPeerCls *prev;
113
114 /**
115 * The #RPS_SamplerRequestHandle this single request belongs to.
116 */
117 struct RPS_SamplerRequestHandle *req_handle;
118
119 /**
120 * The task for this function.
121 */
122 struct GNUNET_SCHEDULER_Task *get_peer_task;
123
124 /**
125 * @brief Context to the given callback.
126 */
127 struct SamplerNotifyUpdateCTX *notify_ctx;
128
129 /**
130 * The callback
131 */
132 RPS_sampler_rand_peer_ready_cont cont;
133
134 /**
135 * The closure to the callback @e cont
136 */
137 void *cont_cls;
138
139 /**
140 * The address of the id to be stored at
141 */
142 struct GNUNET_PeerIdentity *id;
143};
144
145
146/**
147 * Type of function used to differentiate between modified and not modified 94 * Type of function used to differentiate between modified and not modified
148 * Sampler. 95 * Sampler.
149 */ 96 */
@@ -162,61 +109,6 @@ sampler_get_rand_peer (void *cls);
162 109
163 110
164/** 111/**
165 * Get one random peer out of the sampled peers.
166 *
167 * We might want to reinitialise this sampler after giving the
168 * corrsponding peer to the client.
169 */
170static void
171sampler_mod_get_rand_peer (void *cls);
172
173
174/**
175 * Sampler with its own array of SamplerElements
176 */
177struct RPS_Sampler
178{
179 /**
180 * Number of sampler elements we hold.
181 */
182 unsigned int sampler_size;
183 //size_t size;
184
185 /**
186 * All sampler elements in one array.
187 */
188 struct RPS_SamplerElement **sampler_elements;
189
190 /**
191 * Maximum time a round takes
192 *
193 * Used in the context of RPS
194 */
195 struct GNUNET_TIME_Relative max_round_interval;
196
197 /**
198 * Stores the function to return peers. Which one it is depends on whether
199 * the Sampler is the modified one or not.
200 */
201 RPS_get_peers_type get_peers;
202
203 /**
204 * Head and tail for the DLL to store the #RPS_SamplerRequestHandle
205 */
206 struct RPS_SamplerRequestHandle *req_handle_head;
207 struct RPS_SamplerRequestHandle *req_handle_tail;
208
209 struct SamplerNotifyUpdateCTX *notify_ctx_head;
210 struct SamplerNotifyUpdateCTX *notify_ctx_tail;
211 #ifdef TO_FILE
212 /**
213 * File name to log to
214 */
215 char *file_name;
216 #endif /* TO_FILE */
217};
218
219/**
220 * Closure to _get_n_rand_peers_ready_cb() 112 * Closure to _get_n_rand_peers_ready_cb()
221 */ 113 */
222struct RPS_SamplerRequestHandle 114struct RPS_SamplerRequestHandle
@@ -292,189 +184,6 @@ static uint32_t client_get_index;
292 184
293 185
294/** 186/**
295 * @brief Add a callback that will be called when the next peer is inserted
296 * into the sampler
297 *
298 * @param sampler The sampler on which update it will be called
299 * @param notify_cb The callback
300 * @param cls Closure given to the callback
301 *
302 * @return The context containing callback and closure
303 */
304struct SamplerNotifyUpdateCTX *
305sampler_notify_on_update (struct RPS_Sampler *sampler,
306 SamplerNotifyUpdateCB notify_cb,
307 void *cls)
308{
309 struct SamplerNotifyUpdateCTX *notify_ctx;
310
311 LOG (GNUNET_ERROR_TYPE_DEBUG,
312 "Inserting new context for notification\n");
313 notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
314 notify_ctx->notify_cb = notify_cb;
315 notify_ctx->cls = cls;
316 GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
317 sampler->notify_ctx_tail,
318 notify_ctx);
319 return notify_ctx;
320}
321
322
323/**
324 * Callback to _get_rand_peer() used by _get_n_rand_peers().
325 *
326 * Checks whether all n peers are available. If they are,
327 * give those back.
328 */
329static void
330check_n_peers_ready (void *cls,
331 const struct GNUNET_PeerIdentity *id)
332{
333 struct RPS_SamplerRequestHandle *req_handle = cls;
334 (void) id;
335
336 req_handle->cur_num_peers++;
337 LOG (GNUNET_ERROR_TYPE_DEBUG,
338 "Got %" PRIX32 ". of %" PRIX32 " peers\n",
339 req_handle->cur_num_peers, req_handle->num_peers);
340
341 if (req_handle->num_peers == req_handle->cur_num_peers)
342 { /* All peers are ready -- return those to the client */
343 GNUNET_assert (NULL != req_handle->callback);
344
345 LOG (GNUNET_ERROR_TYPE_DEBUG,
346 "returning %" PRIX32 " peers to the client\n",
347 req_handle->num_peers);
348 req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls);
349
350 RPS_sampler_request_cancel (req_handle);
351 }
352}
353
354
355/**
356 * Get the size of the sampler.
357 *
358 * @param sampler the sampler to return the size of.
359 * @return the size of the sampler
360 */
361unsigned int
362RPS_sampler_get_size (struct RPS_Sampler *sampler)
363{
364 return sampler->sampler_size;
365}
366
367
368/**
369 * Grow or shrink the size of the sampler.
370 *
371 * @param sampler the sampler to resize.
372 * @param new_size the new size of the sampler
373 */
374static void
375sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
376{
377 unsigned int old_size;
378 uint32_t i;
379
380 // TODO check min and max size
381
382 old_size = sampler->sampler_size;
383
384 if (old_size > new_size)
385 { /* Shrinking */
386
387 LOG (GNUNET_ERROR_TYPE_DEBUG,
388 "Shrinking sampler %d -> %d\n",
389 old_size,
390 new_size);
391
392 to_file (sampler->file_name,
393 "Shrinking sampler %d -> %d",
394 old_size,
395 new_size);
396
397 for (i = new_size ; i < old_size ; i++)
398 {
399 to_file (sampler->file_name,
400 "-%" PRIu32 ": %s",
401 i,
402 sampler->sampler_elements[i]->file_name);
403 RPS_sampler_elem_destroy (sampler->sampler_elements[i]);
404 }
405
406 GNUNET_array_grow (sampler->sampler_elements,
407 sampler->sampler_size,
408 new_size);
409 LOG (GNUNET_ERROR_TYPE_DEBUG,
410 "sampler->sampler_elements now points to %p\n",
411 sampler->sampler_elements);
412
413 }
414 else if (old_size < new_size)
415 { /* Growing */
416 LOG (GNUNET_ERROR_TYPE_DEBUG,
417 "Growing sampler %d -> %d\n",
418 old_size,
419 new_size);
420
421 to_file (sampler->file_name,
422 "Growing sampler %d -> %d",
423 old_size,
424 new_size);
425
426 GNUNET_array_grow (sampler->sampler_elements,
427 sampler->sampler_size,
428 new_size);
429
430 for (i = old_size ; i < new_size ; i++)
431 { /* Add new sampler elements */
432 sampler->sampler_elements[i] = RPS_sampler_elem_create ();
433
434 to_file (sampler->file_name,
435 "+%" PRIu32 ": %s",
436 i,
437 sampler->sampler_elements[i]->file_name);
438 }
439 }
440 else
441 {
442 LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
443 return;
444 }
445
446 GNUNET_assert (sampler->sampler_size == new_size);
447}
448
449
450/**
451 * Grow or shrink the size of the sampler.
452 *
453 * @param sampler the sampler to resize.
454 * @param new_size the new size of the sampler
455 */
456void
457RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
458{
459 GNUNET_assert (0 < new_size);
460 sampler_resize (sampler, new_size);
461}
462
463
464/**
465 * Empty the sampler.
466 *
467 * @param sampler the sampler to empty.
468 * @param new_size the new size of the sampler
469 */
470static void
471sampler_empty (struct RPS_Sampler *sampler)
472{
473 sampler_resize (sampler, 0);
474}
475
476
477/**
478 * Initialise a tuple of sampler elements. 187 * Initialise a tuple of sampler elements.
479 * 188 *
480 * @param init_size the size the sampler is initialised with 189 * @param init_size the size the sampler is initialised with
@@ -514,119 +223,6 @@ RPS_sampler_init (size_t init_size,
514} 223}
515 224
516/** 225/**
517 * Initialise a modified tuple of sampler elements.
518 *
519 * @param init_size the size the sampler is initialised with
520 * @param max_round_interval maximum time a round takes
521 * @return a handle to a sampler that consists of sampler elements.
522 */
523struct RPS_Sampler *
524RPS_sampler_mod_init (size_t init_size,
525 struct GNUNET_TIME_Relative max_round_interval)
526{
527 struct RPS_Sampler *sampler;
528
529 sampler = RPS_sampler_init (init_size, max_round_interval);
530 sampler->get_peers = sampler_mod_get_rand_peer;
531
532#ifdef TO_FILE
533 LOG (GNUNET_ERROR_TYPE_DEBUG,
534 "Initialised modified sampler %s\n",
535 sampler->file_name);
536 to_file (sampler->file_name,
537 "This is a modified sampler");
538#endif /* TO_FILE */
539
540 return sampler;
541}
542
543
544/**
545 * @brief Notify about update of the sampler.
546 *
547 * Call the callbacks that are waiting for notification on updates to the
548 * sampler.
549 *
550 * @param sampler The sampler the updates are waiting for
551 */
552static void
553notify_update (struct RPS_Sampler *sampler)
554{
555 struct SamplerNotifyUpdateCTX *tmp_notify_head;
556 struct SamplerNotifyUpdateCTX *tmp_notify_tail;
557
558 LOG (GNUNET_ERROR_TYPE_DEBUG,
559 "Calling callbacks waiting for update notification.\n");
560 tmp_notify_head = sampler->notify_ctx_head;
561 tmp_notify_tail = sampler->notify_ctx_tail;
562 sampler->notify_ctx_head = NULL;
563 sampler->notify_ctx_tail = NULL;
564 for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
565 NULL != tmp_notify_head;
566 notify_iter = tmp_notify_head)
567 {
568 GNUNET_assert (NULL != notify_iter->notify_cb);
569 GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
570 tmp_notify_tail,
571 notify_iter);
572 notify_iter->notify_cb (notify_iter->cls);
573 GNUNET_free (notify_iter);
574 }
575}
576
577
578/**
579 * Update every sampler element of this sampler with given peer
580 *
581 * @param sampler the sampler to update.
582 * @param id the PeerID that is put in the sampler
583 */
584 void
585RPS_sampler_update (struct RPS_Sampler *sampler,
586 const struct GNUNET_PeerIdentity *id)
587{
588 to_file (sampler->file_name,
589 "Got %s",
590 GNUNET_i2s_full (id));
591
592 for (uint32_t i = 0; i < sampler->sampler_size; i++)
593 {
594 RPS_sampler_elem_next (sampler->sampler_elements[i],
595 id);
596 }
597 notify_update (sampler);
598}
599
600
601/**
602 * Reinitialise all previously initialised sampler elements with the given value.
603 *
604 * Used to get rid of a PeerID.
605 *
606 * @param sampler the sampler to reinitialise a sampler element in.
607 * @param id the id of the sampler elements to update.
608 */
609 void
610RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
611 const struct GNUNET_PeerIdentity *id)
612{
613 uint32_t i;
614
615 for (i = 0; i < sampler->sampler_size; i++)
616 {
617 if (0 == GNUNET_CRYPTO_cmp_peer_identity(id,
618 &(sampler->sampler_elements[i]->peer_id)) )
619 {
620 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
621 to_file (sampler->sampler_elements[i]->file_name,
622 "--- non-active");
623 RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
624 }
625 }
626}
627
628
629/**
630 * Get one random peer out of the sampled peers. 226 * Get one random peer out of the sampled peers.
631 * 227 *
632 * We might want to reinitialise this sampler after giving the 228 * We might want to reinitialise this sampler after giving the
@@ -658,7 +254,7 @@ sampler_get_rand_peer (void *cls)
658 254
659 gpc->notify_ctx = 255 gpc->notify_ctx =
660 sampler_notify_on_update (sampler, 256 sampler_notify_on_update (sampler,
661 &sampler_mod_get_rand_peer, 257 &sampler_get_rand_peer,
662 gpc); 258 gpc);
663 return; 259 return;
664 } 260 }
@@ -673,244 +269,4 @@ sampler_get_rand_peer (void *cls)
673} 269}
674 270
675 271
676/**
677 * Get one random peer out of the sampled peers.
678 *
679 * This reinitialises the queried sampler element.
680 */
681static void
682sampler_mod_get_rand_peer (void *cls)
683{
684 struct GetPeerCls *gpc = cls;
685 struct RPS_SamplerElement *s_elem;
686 struct GNUNET_TIME_Relative last_request_diff;
687 struct RPS_Sampler *sampler;
688
689 gpc->get_peer_task = NULL;
690 gpc->notify_ctx = NULL;
691 sampler = gpc->req_handle->sampler;
692
693 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
694
695 /* Cycle the #client_get_index one step further */
696 client_get_index = (client_get_index + 1) % sampler->sampler_size;
697
698 s_elem = sampler->sampler_elements[client_get_index];
699 *gpc->id = s_elem->peer_id;
700 GNUNET_assert (NULL != s_elem);
701
702 if (EMPTY == s_elem->is_empty)
703 {
704 LOG (GNUNET_ERROR_TYPE_DEBUG,
705 "Sampler_mod element empty, rescheduling.\n");
706 GNUNET_assert (NULL == gpc->notify_ctx);
707 gpc->notify_ctx =
708 sampler_notify_on_update (sampler,
709 &sampler_mod_get_rand_peer,
710 gpc);
711 return;
712 }
713
714 /* Check whether we may use this sampler to give it back to the client */
715 if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
716 {
717 // TODO remove this condition at least for the client sampler
718 last_request_diff =
719 GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
720 GNUNET_TIME_absolute_get ());
721 /* We're not going to give it back now if it was
722 * already requested by a client this round */
723 if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
724 {
725 LOG (GNUNET_ERROR_TYPE_DEBUG,
726 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
727 ///* How many time remains untile the next round has started? */
728 //inv_last_request_diff =
729 // GNUNET_TIME_absolute_get_difference (last_request_diff,
730 // sampler->max_round_interval);
731 // add a little delay
732 /* Schedule it one round later */
733 GNUNET_assert (NULL == gpc->notify_ctx);
734 gpc->notify_ctx =
735 sampler_notify_on_update (sampler,
736 &sampler_mod_get_rand_peer,
737 gpc);
738 return;
739 }
740 }
741 if (2 > s_elem->num_peers)
742 {
743 LOG (GNUNET_ERROR_TYPE_DEBUG,
744 "This s_elem saw less than two peers -- scheduling for later\n");
745 GNUNET_assert (NULL == gpc->notify_ctx);
746 gpc->notify_ctx =
747 sampler_notify_on_update (sampler,
748 &sampler_mod_get_rand_peer,
749 gpc);
750 return;
751 }
752 /* More reasons to wait could be added here */
753
754// GNUNET_STATISTICS_set (stats,
755// "# client sampler element input",
756// s_elem->num_peers,
757// GNUNET_NO);
758// GNUNET_STATISTICS_set (stats,
759// "# client sampler element change",
760// s_elem->num_change,
761// GNUNET_NO);
762
763 RPS_sampler_elem_reinit (s_elem);
764 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
765
766 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
767 gpc->req_handle->gpc_tail,
768 gpc);
769 gpc->cont (gpc->cont_cls, gpc->id);
770 GNUNET_free (gpc);
771}
772
773
774/**
775 * Get n random peers out of the sampled peers.
776 *
777 * We might want to reinitialise this sampler after giving the
778 * corrsponding peer to the client.
779 * Random with or without consumption?
780 *
781 * @param sampler the sampler to get peers from.
782 * @param cb callback that will be called once the ids are ready.
783 * @param cls closure given to @a cb
784 * @param for_client #GNUNET_YES if result is used for client,
785 * #GNUNET_NO if used internally
786 * @param num_peers the number of peers requested
787 */
788struct RPS_SamplerRequestHandle *
789RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
790 uint32_t num_peers,
791 RPS_sampler_n_rand_peers_ready_cb cb,
792 void *cls)
793{
794 uint32_t i;
795 struct RPS_SamplerRequestHandle *req_handle;
796 struct GetPeerCls *gpc;
797
798 GNUNET_assert (0 != sampler->sampler_size);
799 if (0 == num_peers)
800 return NULL;
801
802 // TODO check if we have too much (distinct) sampled peers
803 req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
804 req_handle->num_peers = num_peers;
805 req_handle->cur_num_peers = 0;
806 req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
807 req_handle->sampler = sampler;
808 req_handle->callback = cb;
809 req_handle->cls = cls;
810 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
811 sampler->req_handle_tail,
812 req_handle);
813
814 LOG (GNUNET_ERROR_TYPE_DEBUG,
815 "Scheduling requests for %" PRIu32 " peers\n", num_peers);
816
817 for (i = 0; i < num_peers; i++)
818 {
819 gpc = GNUNET_new (struct GetPeerCls);
820 gpc->req_handle = req_handle;
821 gpc->cont = check_n_peers_ready;
822 gpc->cont_cls = req_handle;
823 gpc->id = &req_handle->ids[i];
824
825 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
826 req_handle->gpc_tail,
827 gpc);
828 // maybe add a little delay
829 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
830 gpc);
831 }
832 return req_handle;
833}
834
835/**
836 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
837 *
838 * @param req_handle the handle to the request
839 */
840void
841RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
842{
843 struct GetPeerCls *i;
844
845 while (NULL != (i = req_handle->gpc_head) )
846 {
847 GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
848 req_handle->gpc_tail,
849 i);
850 if (NULL != i->get_peer_task)
851 {
852 GNUNET_SCHEDULER_cancel (i->get_peer_task);
853 }
854 if (NULL != i->notify_ctx)
855 {
856 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
857 req_handle->sampler->notify_ctx_tail,
858 i->notify_ctx);
859 GNUNET_free (i->notify_ctx);
860 }
861 GNUNET_free (i);
862 }
863 GNUNET_free (req_handle->ids);
864 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
865 req_handle->sampler->req_handle_tail,
866 req_handle);
867 GNUNET_free (req_handle);
868}
869
870
871/**
872 * Counts how many Samplers currently hold a given PeerID.
873 *
874 * @param sampler the sampler to count ids in.
875 * @param id the PeerID to count.
876 *
877 * @return the number of occurrences of id.
878 */
879 uint32_t
880RPS_sampler_count_id (struct RPS_Sampler *sampler,
881 const struct GNUNET_PeerIdentity *id)
882{
883 uint32_t count;
884 uint32_t i;
885
886 count = 0;
887 for ( i = 0 ; i < sampler->sampler_size ; i++ )
888 {
889 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
890 && EMPTY != sampler->sampler_elements[i]->is_empty)
891 count++;
892 }
893 return count;
894}
895
896
897/**
898 * Cleans the sampler.
899 */
900 void
901RPS_sampler_destroy (struct RPS_Sampler *sampler)
902{
903 if (NULL != sampler->req_handle_head)
904 {
905 LOG (GNUNET_ERROR_TYPE_WARNING,
906 "There are still pending requests. Going to remove them.\n");
907 while (NULL != sampler->req_handle_head)
908 {
909 RPS_sampler_request_cancel (sampler->req_handle_head);
910 }
911 }
912 sampler_empty (sampler);
913 GNUNET_free (sampler);
914}
915
916/* end of gnunet-service-rps.c */ 272/* end of gnunet-service-rps.c */
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index f33aa6eb1..ab4a6bbbb 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -25,6 +25,7 @@
25#ifndef RPS_SAMPLER_H 25#ifndef RPS_SAMPLER_H
26#define RPS_SAMPLER_H 26#define RPS_SAMPLER_H
27#include <inttypes.h> 27#include <inttypes.h>
28#include "rps-sampler_common.h"
28 29
29 30
30/** 31/**
@@ -39,19 +40,6 @@ struct RPS_SamplerRequestHandle;
39 40
40 41
41/** 42/**
42 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
43 *
44 * @param cls the closure given alongside this function.
45 * @param ids the PeerIDs that were returned
46 * to be freed
47 */
48 typedef void
49(*RPS_sampler_n_rand_peers_ready_cb) (const struct GNUNET_PeerIdentity *ids,
50 uint32_t num_peers,
51 void *cls);
52
53
54/**
55 * Get the size of the sampler. 43 * Get the size of the sampler.
56 * 44 *
57 * @param sampler the sampler to return the size of. 45 * @param sampler the sampler to return the size of.
@@ -84,18 +72,6 @@ RPS_sampler_init (size_t init_size,
84 72
85 73
86/** 74/**
87 * Initialise a modified tuple of sampler elements.
88 *
89 * @param init_size the size the sampler is initialised with
90 * @param max_round_interval maximum time a round takes
91 * @return a handle to a sampler that consists of sampler elements.
92 */
93struct RPS_Sampler *
94RPS_sampler_mod_init (size_t init_size,
95 struct GNUNET_TIME_Relative max_round_interval);
96
97
98/**
99 * Update every sampler element of this sampler with given peer 75 * Update every sampler element of this sampler with given peer
100 * 76 *
101 * @param sampler the sampler to update. 77 * @param sampler the sampler to update.
diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c
new file mode 100644
index 000000000..da832a323
--- /dev/null
+++ b/src/rps/rps-sampler_client.c
@@ -0,0 +1,328 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C)
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file rps/gnunet-service-rps_sampler.c
21 * @brief sampler implementation
22 * @author Julius Bünger
23 */
24#include "platform.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_statistics_service.h"
27#include "rps.h"
28
29#include "rps-sampler_common.h"
30#include "gnunet-service-rps_sampler.h"
31#include "gnunet-service-rps_sampler_elem.h"
32
33#include <math.h>
34#include <inttypes.h>
35
36#include "rps-test_util.h"
37
38#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
39
40
41// multiple 'clients'?
42
43// TODO check for overflows
44
45// TODO align message structs
46
47// hist_size_init, hist_size_max
48
49/***********************************************************************
50 * WARNING: This section needs to be reviewed regarding the use of
51 * functions providing (pseudo)randomness!
52***********************************************************************/
53
54// TODO care about invalid input of the caller (size 0 or less...)
55
56/**
57 * @brief Callback called each time a new peer was put into the sampler
58 *
59 * @param cls A possibly given closure
60 */
61typedef void
62(*SamplerNotifyUpdateCB) (void *cls);
63
64/**
65 * @brief Context for a callback. Contains callback and closure.
66 *
67 * Meant to be an entry in an DLL.
68 */
69struct SamplerNotifyUpdateCTX
70{
71 /**
72 * @brief The Callback to call on updates
73 */
74 SamplerNotifyUpdateCB notify_cb;
75
76 /**
77 * @brief The according closure.
78 */
79 void *cls;
80
81 /**
82 * @brief Next element in DLL.
83 */
84 struct SamplerNotifyUpdateCTX *next;
85
86 /**
87 * @brief Previous element in DLL.
88 */
89 struct SamplerNotifyUpdateCTX *prev;
90};
91
92
93/**
94 * Type of function used to differentiate between modified and not modified
95 * Sampler.
96 */
97typedef void
98(*RPS_get_peers_type) (void *cls);
99
100
101/**
102 * Get one random peer out of the sampled peers.
103 *
104 * We might want to reinitialise this sampler after giving the
105 * corrsponding peer to the client.
106 */
107static void
108sampler_mod_get_rand_peer (void *cls);
109
110
111/**
112 * Closure to _get_n_rand_peers_ready_cb()
113 */
114struct RPS_SamplerRequestHandle
115{
116 /**
117 * DLL
118 */
119 struct RPS_SamplerRequestHandle *next;
120 struct RPS_SamplerRequestHandle *prev;
121
122 /**
123 * Number of peers we are waiting for.
124 */
125 uint32_t num_peers;
126
127 /**
128 * Number of peers we currently have.
129 */
130 uint32_t cur_num_peers;
131
132 /**
133 * Pointer to the array holding the ids.
134 */
135 struct GNUNET_PeerIdentity *ids;
136
137 /**
138 * Head and tail for the DLL to store the tasks for single requests
139 */
140 struct GetPeerCls *gpc_head;
141 struct GetPeerCls *gpc_tail;
142
143 /**
144 * Sampler.
145 */
146 struct RPS_Sampler *sampler;
147
148 /**
149 * Callback to be called when all ids are available.
150 */
151 RPS_sampler_n_rand_peers_ready_cb callback;
152
153 /**
154 * Closure given to the callback
155 */
156 void *cls;
157};
158
159///**
160// * Global sampler variable.
161// */
162//struct RPS_Sampler *sampler;
163
164
165/**
166 * The minimal size for the extended sampler elements.
167 */
168static size_t min_size;
169
170/**
171 * The maximal size the extended sampler elements should grow to.
172 */
173static size_t max_size;
174
175/**
176 * The size the extended sampler elements currently have.
177 */
178//static size_t extra_size;
179
180/**
181 * Inedex to the sampler element that is the next to be returned
182 */
183static uint32_t client_get_index;
184
185
186/**
187 * Initialise a modified tuple of sampler elements.
188 *
189 * @param init_size the size the sampler is initialised with
190 * @param max_round_interval maximum time a round takes
191 * @return a handle to a sampler that consists of sampler elements.
192 */
193struct RPS_Sampler *
194RPS_sampler_mod_init (size_t init_size,
195 struct GNUNET_TIME_Relative max_round_interval)
196{
197 struct RPS_Sampler *sampler;
198
199 /* Initialise context around extended sampler */
200 min_size = 10; // TODO make input to _samplers_init()
201 max_size = 1000; // TODO make input to _samplers_init()
202
203 sampler = GNUNET_new (struct RPS_Sampler);
204
205 sampler->max_round_interval = max_round_interval;
206 sampler->get_peers = sampler_mod_get_rand_peer;
207 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
208 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
209 RPS_sampler_resize (sampler, init_size);
210
211 client_get_index = 0;
212
213 //GNUNET_assert (init_size == sampler->sampler_size);
214
215#ifdef TO_FILE
216 sampler->file_name = create_file ("sampler-");
217
218 LOG (GNUNET_ERROR_TYPE_DEBUG,
219 "Initialised modified sampler %s\n",
220 sampler->file_name);
221 to_file (sampler->file_name,
222 "This is a modified sampler");
223#endif /* TO_FILE */
224
225 return sampler;
226}
227
228
229/**
230 * Get one random peer out of the sampled peers.
231 *
232 * This reinitialises the queried sampler element.
233 */
234static void
235sampler_mod_get_rand_peer (void *cls)
236{
237 struct GetPeerCls *gpc = cls;
238 struct RPS_SamplerElement *s_elem;
239 struct GNUNET_TIME_Relative last_request_diff;
240 struct RPS_Sampler *sampler;
241
242 gpc->get_peer_task = NULL;
243 gpc->notify_ctx = NULL;
244 sampler = gpc->req_handle->sampler;
245
246 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
247
248 /* Cycle the #client_get_index one step further */
249 client_get_index = (client_get_index + 1) % sampler->sampler_size;
250
251 s_elem = sampler->sampler_elements[client_get_index];
252 *gpc->id = s_elem->peer_id;
253 GNUNET_assert (NULL != s_elem);
254
255 if (EMPTY == s_elem->is_empty)
256 {
257 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Sampler_mod element empty, rescheduling.\n");
259 GNUNET_assert (NULL == gpc->notify_ctx);
260 gpc->notify_ctx =
261 sampler_notify_on_update (sampler,
262 &sampler_mod_get_rand_peer,
263 gpc);
264 return;
265 }
266
267 /* Check whether we may use this sampler to give it back to the client */
268 if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
269 {
270 // TODO remove this condition at least for the client sampler
271 last_request_diff =
272 GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
273 GNUNET_TIME_absolute_get ());
274 /* We're not going to give it back now if it was
275 * already requested by a client this round */
276 if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
277 {
278 LOG (GNUNET_ERROR_TYPE_DEBUG,
279 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
280 ///* How many time remains untile the next round has started? */
281 //inv_last_request_diff =
282 // GNUNET_TIME_absolute_get_difference (last_request_diff,
283 // sampler->max_round_interval);
284 // add a little delay
285 /* Schedule it one round later */
286 GNUNET_assert (NULL == gpc->notify_ctx);
287 gpc->notify_ctx =
288 sampler_notify_on_update (sampler,
289 &sampler_mod_get_rand_peer,
290 gpc);
291 return;
292 }
293 }
294 if (2 > s_elem->num_peers)
295 {
296 LOG (GNUNET_ERROR_TYPE_DEBUG,
297 "This s_elem saw less than two peers -- scheduling for later\n");
298 GNUNET_assert (NULL == gpc->notify_ctx);
299 gpc->notify_ctx =
300 sampler_notify_on_update (sampler,
301 &sampler_mod_get_rand_peer,
302 gpc);
303 return;
304 }
305 /* More reasons to wait could be added here */
306
307// GNUNET_STATISTICS_set (stats,
308// "# client sampler element input",
309// s_elem->num_peers,
310// GNUNET_NO);
311// GNUNET_STATISTICS_set (stats,
312// "# client sampler element change",
313// s_elem->num_change,
314// GNUNET_NO);
315
316 RPS_sampler_elem_reinit (s_elem);
317 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
318
319 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
320 gpc->req_handle->gpc_tail,
321 gpc);
322 gpc->cont (gpc->cont_cls, gpc->id);
323 GNUNET_free (gpc);
324}
325
326
327/* end of gnunet-service-rps.c */
328
diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h
new file mode 100644
index 000000000..fd0538efa
--- /dev/null
+++ b/src/rps/rps-sampler_client.h
@@ -0,0 +1,150 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C)
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file rps/rps-sampler_client.h
21 * @brief client sampler implementation
22 * @author Julius Bünger
23 */
24
25#ifndef RPS_SAMPLER_CLIENT_H
26#define RPS_SAMPLER_CLIENT_H
27#include <inttypes.h>
28#include "rps-sampler_common.h"
29
30
31/**
32 * A sampler sampling a stream of PeerIDs.
33 */
34struct RPS_Sampler;
35
36/**
37 * A handle to cancel a request.
38 */
39struct RPS_SamplerRequestHandle;
40
41
42/**
43 * Get the size of the sampler.
44 *
45 * @param sampler the sampler to return the size of.
46 * @return the size of the sampler
47 */
48unsigned int
49RPS_sampler_get_size (struct RPS_Sampler *sampler);
50
51
52/**
53 * Grow or shrink the size of the sampler.
54 *
55 * @param sampler the sampler to resize.
56 * @param new_size the new size of the sampler (not 0)
57 */
58void
59RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size);
60
61
62/**
63 * Initialise a modified tuple of sampler elements.
64 *
65 * @param init_size the size the sampler is initialised with
66 * @param max_round_interval maximum time a round takes
67 * @return a handle to a sampler that consists of sampler elements.
68 */
69struct RPS_Sampler *
70RPS_sampler_mod_init (size_t init_size,
71 struct GNUNET_TIME_Relative max_round_interval);
72
73
74/**
75 * Update every sampler element of this sampler with given peer
76 *
77 * @param sampler the sampler to update.
78 * @param id the PeerID that is put in the sampler
79 */
80 void
81RPS_sampler_update (struct RPS_Sampler *sampler,
82 const struct GNUNET_PeerIdentity *id);
83
84
85/**
86 * Reinitialise all previously initialised sampler elements with the given
87 * value.
88 *
89 * Used to get rid of a PeerID.
90 *
91 * @param sampler the sampler to reinitialise a sampler in.
92 * @param id the id of the samplers to update.
93 */
94 void
95RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
96 const struct GNUNET_PeerIdentity *id);
97
98
99/**
100 * Get n random peers out of the sampled peers.
101 *
102 * We might want to reinitialise this sampler after giving the
103 * corrsponding peer to the client.
104 * Random with or without consumption?
105 *
106 * @param sampler the sampler to get peers from.
107 * @param cb callback that will be called once the ids are ready.
108 * @param cls closure given to @a cb
109 * @param for_client #GNUNET_YES if result is used for client,
110 * #GNUNET_NO if used internally
111 * @param num_peers the number of peers requested
112 */
113struct RPS_SamplerRequestHandle *
114RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
115 uint32_t num_peers,
116 RPS_sampler_n_rand_peers_ready_cb cb,
117 void *cls);
118
119/**
120 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
121 *
122 * @param req_handle the handle to the request
123 */
124void
125RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
126
127
128/**
129 * Counts how many Samplers currently hold a given PeerID.
130 *
131 * @param sampler the sampler to cound ids in.
132 * @param id the PeerID to count.
133 *
134 * @return the number of occurrences of id.
135 */
136 uint32_t
137RPS_sampler_count_id (struct RPS_Sampler *sampler,
138 const struct GNUNET_PeerIdentity *id);
139
140
141/**
142 * Cleans the samplers.
143 *
144 * @param sampler the sampler to destroy.
145 */
146 void
147RPS_sampler_destroy (struct RPS_Sampler *sampler);
148
149#endif /* RPS_SAMPLER_CLIENT_H */
150/* end of gnunet-service-rps.c */
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c
new file mode 100644
index 000000000..d004c06a5
--- /dev/null
+++ b/src/rps/rps-sampler_common.c
@@ -0,0 +1,527 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C)
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file rps/rps-sampler_common.c
21 * @brief Code common to client and service sampler
22 * @author Julius Bünger
23 */
24#include "platform.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_statistics_service.h"
27
28#include "rps-sampler_common.h"
29#include "gnunet-service-rps_sampler_elem.h"
30
31#include <math.h>
32#include <inttypes.h>
33
34#include "rps-test_util.h"
35
36#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler_common",__VA_ARGS__)
37
38/**
39 * @brief Context for a callback. Contains callback and closure.
40 *
41 * Meant to be an entry in an DLL.
42 */
43struct SamplerNotifyUpdateCTX
44{
45 /**
46 * @brief The Callback to call on updates
47 */
48 SamplerNotifyUpdateCB notify_cb;
49
50 /**
51 * @brief The according closure.
52 */
53 void *cls;
54
55 /**
56 * @brief Next element in DLL.
57 */
58 struct SamplerNotifyUpdateCTX *next;
59
60 /**
61 * @brief Previous element in DLL.
62 */
63 struct SamplerNotifyUpdateCTX *prev;
64};
65
66
67/**
68 * Closure to _get_n_rand_peers_ready_cb()
69 */
70struct RPS_SamplerRequestHandle
71{
72 /**
73 * DLL
74 */
75 struct RPS_SamplerRequestHandle *next;
76 struct RPS_SamplerRequestHandle *prev;
77
78 /**
79 * Number of peers we are waiting for.
80 */
81 uint32_t num_peers;
82
83 /**
84 * Number of peers we currently have.
85 */
86 uint32_t cur_num_peers;
87
88 /**
89 * Pointer to the array holding the ids.
90 */
91 struct GNUNET_PeerIdentity *ids;
92
93 /**
94 * Head and tail for the DLL to store the tasks for single requests
95 */
96 struct GetPeerCls *gpc_head;
97 struct GetPeerCls *gpc_tail;
98
99 /**
100 * Sampler.
101 */
102 struct RPS_Sampler *sampler;
103
104 /**
105 * Callback to be called when all ids are available.
106 */
107 RPS_sampler_n_rand_peers_ready_cb callback;
108
109 /**
110 * Closure given to the callback
111 */
112 void *cls;
113};
114
115
116/**
117 * @brief Add a callback that will be called when the next peer is inserted
118 * into the sampler
119 *
120 * @param sampler The sampler on which update it will be called
121 * @param notify_cb The callback
122 * @param cls Closure given to the callback
123 *
124 * @return The context containing callback and closure
125 */
126struct SamplerNotifyUpdateCTX *
127sampler_notify_on_update (struct RPS_Sampler *sampler,
128 SamplerNotifyUpdateCB notify_cb,
129 void *cls)
130{
131 struct SamplerNotifyUpdateCTX *notify_ctx;
132
133 LOG (GNUNET_ERROR_TYPE_DEBUG,
134 "Inserting new context for notification\n");
135 notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
136 notify_ctx->notify_cb = notify_cb;
137 notify_ctx->cls = cls;
138 GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
139 sampler->notify_ctx_tail,
140 notify_ctx);
141 return notify_ctx;
142}
143
144
145/**
146 * Get the size of the sampler.
147 *
148 * @param sampler the sampler to return the size of.
149 * @return the size of the sampler
150 */
151unsigned int
152RPS_sampler_get_size (struct RPS_Sampler *sampler)
153{
154 return sampler->sampler_size;
155}
156
157
158/**
159 * @brief Notify about update of the sampler.
160 *
161 * Call the callbacks that are waiting for notification on updates to the
162 * sampler.
163 *
164 * @param sampler The sampler the updates are waiting for
165 */
166static void
167notify_update (struct RPS_Sampler *sampler)
168{
169 struct SamplerNotifyUpdateCTX *tmp_notify_head;
170 struct SamplerNotifyUpdateCTX *tmp_notify_tail;
171
172 LOG (GNUNET_ERROR_TYPE_DEBUG,
173 "Calling callbacks waiting for update notification.\n");
174 tmp_notify_head = sampler->notify_ctx_head;
175 tmp_notify_tail = sampler->notify_ctx_tail;
176 sampler->notify_ctx_head = NULL;
177 sampler->notify_ctx_tail = NULL;
178 for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
179 NULL != tmp_notify_head;
180 notify_iter = tmp_notify_head)
181 {
182 GNUNET_assert (NULL != notify_iter->notify_cb);
183 GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
184 tmp_notify_tail,
185 notify_iter);
186 notify_iter->notify_cb (notify_iter->cls);
187 GNUNET_free (notify_iter);
188 }
189}
190
191
192/**
193 * Update every sampler element of this sampler with given peer
194 *
195 * @param sampler the sampler to update.
196 * @param id the PeerID that is put in the sampler
197 */
198 void
199RPS_sampler_update (struct RPS_Sampler *sampler,
200 const struct GNUNET_PeerIdentity *id)
201{
202 to_file (sampler->file_name,
203 "Got %s",
204 GNUNET_i2s_full (id));
205
206 for (uint32_t i = 0; i < sampler->sampler_size; i++)
207 {
208 RPS_sampler_elem_next (sampler->sampler_elements[i],
209 id);
210 }
211 notify_update (sampler);
212}
213
214
215/**
216 * Reinitialise all previously initialised sampler elements with the given value.
217 *
218 * Used to get rid of a PeerID.
219 *
220 * @param sampler the sampler to reinitialise a sampler element in.
221 * @param id the id of the sampler elements to update.
222 */
223 void
224RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
225 const struct GNUNET_PeerIdentity *id)
226{
227 uint32_t i;
228
229 for (i = 0; i < sampler->sampler_size; i++)
230 {
231 if (0 == GNUNET_CRYPTO_cmp_peer_identity(id,
232 &(sampler->sampler_elements[i]->peer_id)) )
233 {
234 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
235 to_file (sampler->sampler_elements[i]->file_name,
236 "--- non-active");
237 RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
238 }
239 }
240}
241
242
243/**
244 * Counts how many Samplers currently hold a given PeerID.
245 *
246 * @param sampler the sampler to count ids in.
247 * @param id the PeerID to count.
248 *
249 * @return the number of occurrences of id.
250 */
251 uint32_t
252RPS_sampler_count_id (struct RPS_Sampler *sampler,
253 const struct GNUNET_PeerIdentity *id)
254{
255 uint32_t count;
256 uint32_t i;
257
258 count = 0;
259 for ( i = 0 ; i < sampler->sampler_size ; i++ )
260 {
261 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
262 && EMPTY != sampler->sampler_elements[i]->is_empty)
263 count++;
264 }
265 return count;
266}
267
268
269/**
270 * Grow or shrink the size of the sampler.
271 *
272 * @param sampler the sampler to resize.
273 * @param new_size the new size of the sampler
274 */
275static void
276sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
277{
278 unsigned int old_size;
279 uint32_t i;
280
281 // TODO check min and max size
282
283 old_size = sampler->sampler_size;
284
285 if (old_size > new_size)
286 { /* Shrinking */
287
288 LOG (GNUNET_ERROR_TYPE_DEBUG,
289 "Shrinking sampler %d -> %d\n",
290 old_size,
291 new_size);
292
293 to_file (sampler->file_name,
294 "Shrinking sampler %d -> %d",
295 old_size,
296 new_size);
297
298 for (i = new_size ; i < old_size ; i++)
299 {
300 to_file (sampler->file_name,
301 "-%" PRIu32 ": %s",
302 i,
303 sampler->sampler_elements[i]->file_name);
304 RPS_sampler_elem_destroy (sampler->sampler_elements[i]);
305 }
306
307 GNUNET_array_grow (sampler->sampler_elements,
308 sampler->sampler_size,
309 new_size);
310 LOG (GNUNET_ERROR_TYPE_DEBUG,
311 "sampler->sampler_elements now points to %p\n",
312 sampler->sampler_elements);
313
314 }
315 else if (old_size < new_size)
316 { /* Growing */
317 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "Growing sampler %d -> %d\n",
319 old_size,
320 new_size);
321
322 to_file (sampler->file_name,
323 "Growing sampler %d -> %d",
324 old_size,
325 new_size);
326
327 GNUNET_array_grow (sampler->sampler_elements,
328 sampler->sampler_size,
329 new_size);
330
331 for (i = old_size ; i < new_size ; i++)
332 { /* Add new sampler elements */
333 sampler->sampler_elements[i] = RPS_sampler_elem_create ();
334
335 to_file (sampler->file_name,
336 "+%" PRIu32 ": %s",
337 i,
338 sampler->sampler_elements[i]->file_name);
339 }
340 }
341 else
342 {
343 LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
344 return;
345 }
346
347 GNUNET_assert (sampler->sampler_size == new_size);
348}
349
350
351/**
352 * Grow or shrink the size of the sampler.
353 *
354 * @param sampler the sampler to resize.
355 * @param new_size the new size of the sampler
356 */
357void
358RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
359{
360 GNUNET_assert (0 < new_size);
361 sampler_resize (sampler, new_size);
362}
363
364
365/**
366 * Empty the sampler.
367 *
368 * @param sampler the sampler to empty.
369 * @param new_size the new size of the sampler
370 */
371static void
372sampler_empty (struct RPS_Sampler *sampler)
373{
374 sampler_resize (sampler, 0);
375}
376
377
378/**
379 * Callback to _get_rand_peer() used by _get_n_rand_peers().
380 *
381 * Checks whether all n peers are available. If they are,
382 * give those back.
383 */
384static void
385check_n_peers_ready (void *cls,
386 const struct GNUNET_PeerIdentity *id)
387{
388 struct RPS_SamplerRequestHandle *req_handle = cls;
389 (void) id;
390
391 req_handle->cur_num_peers++;
392 LOG (GNUNET_ERROR_TYPE_DEBUG,
393 "Got %" PRIX32 ". of %" PRIX32 " peers\n",
394 req_handle->cur_num_peers, req_handle->num_peers);
395
396 if (req_handle->num_peers == req_handle->cur_num_peers)
397 { /* All peers are ready -- return those to the client */
398 GNUNET_assert (NULL != req_handle->callback);
399
400 LOG (GNUNET_ERROR_TYPE_DEBUG,
401 "returning %" PRIX32 " peers to the client\n",
402 req_handle->num_peers);
403 req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls);
404
405 RPS_sampler_request_cancel (req_handle);
406 }
407}
408
409
410/**
411 * Get n random peers out of the sampled peers.
412 *
413 * We might want to reinitialise this sampler after giving the
414 * corrsponding peer to the client.
415 * Random with or without consumption?
416 *
417 * @param sampler the sampler to get peers from.
418 * @param cb callback that will be called once the ids are ready.
419 * @param cls closure given to @a cb
420 * @param for_client #GNUNET_YES if result is used for client,
421 * #GNUNET_NO if used internally
422 * @param num_peers the number of peers requested
423 */
424struct RPS_SamplerRequestHandle *
425RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
426 uint32_t num_peers,
427 RPS_sampler_n_rand_peers_ready_cb cb,
428 void *cls)
429{
430 uint32_t i;
431 struct RPS_SamplerRequestHandle *req_handle;
432 struct GetPeerCls *gpc;
433
434 GNUNET_assert (0 != sampler->sampler_size);
435 if (0 == num_peers)
436 return NULL;
437
438 // TODO check if we have too much (distinct) sampled peers
439 req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
440 req_handle->num_peers = num_peers;
441 req_handle->cur_num_peers = 0;
442 req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
443 req_handle->sampler = sampler;
444 req_handle->callback = cb;
445 req_handle->cls = cls;
446 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
447 sampler->req_handle_tail,
448 req_handle);
449
450 LOG (GNUNET_ERROR_TYPE_DEBUG,
451 "Scheduling requests for %" PRIu32 " peers\n", num_peers);
452
453 for (i = 0; i < num_peers; i++)
454 {
455 gpc = GNUNET_new (struct GetPeerCls);
456 gpc->req_handle = req_handle;
457 gpc->cont = check_n_peers_ready;
458 gpc->cont_cls = req_handle;
459 gpc->id = &req_handle->ids[i];
460
461 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
462 req_handle->gpc_tail,
463 gpc);
464 // maybe add a little delay
465 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
466 gpc);
467 }
468 return req_handle;
469}
470
471/**
472 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
473 *
474 * @param req_handle the handle to the request
475 */
476void
477RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
478{
479 struct GetPeerCls *i;
480
481 while (NULL != (i = req_handle->gpc_head) )
482 {
483 GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
484 req_handle->gpc_tail,
485 i);
486 if (NULL != i->get_peer_task)
487 {
488 GNUNET_SCHEDULER_cancel (i->get_peer_task);
489 }
490 if (NULL != i->notify_ctx)
491 {
492 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
493 req_handle->sampler->notify_ctx_tail,
494 i->notify_ctx);
495 GNUNET_free (i->notify_ctx);
496 }
497 GNUNET_free (i);
498 }
499 GNUNET_free (req_handle->ids);
500 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
501 req_handle->sampler->req_handle_tail,
502 req_handle);
503 GNUNET_free (req_handle);
504}
505
506
507/**
508 * Cleans the sampler.
509 */
510 void
511RPS_sampler_destroy (struct RPS_Sampler *sampler)
512{
513 if (NULL != sampler->req_handle_head)
514 {
515 LOG (GNUNET_ERROR_TYPE_WARNING,
516 "There are still pending requests. Going to remove them.\n");
517 while (NULL != sampler->req_handle_head)
518 {
519 RPS_sampler_request_cancel (sampler->req_handle_head);
520 }
521 }
522 sampler_empty (sampler);
523 GNUNET_free (sampler);
524}
525
526
527/* end of rps-sampler_common.c */
diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h
new file mode 100644
index 000000000..68f5865a9
--- /dev/null
+++ b/src/rps/rps-sampler_common.h
@@ -0,0 +1,280 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C)
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file rps/rps-sampler_common.h
21 * @brief Code common to client and service sampler
22 * @author Julius Bünger
23 */
24
25#ifndef RPS_SAMPLER_COMMON_H
26#define RPS_SAMPLER_COMMON_H
27
28#include "platform.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_statistics_service.h"
31
32#include "gnunet-service-rps_sampler_elem.h"
33
34#include <math.h>
35#include <inttypes.h>
36
37#include "rps-test_util.h"
38
39
40/**
41 * Callback that is called from _get_rand_peer() when the PeerID is ready.
42 *
43 * @param cls the closure given alongside this function.
44 * @param id the PeerID that was returned
45 */
46typedef void
47(*RPS_sampler_rand_peer_ready_cont) (void *cls,
48 const struct GNUNET_PeerIdentity *id);
49
50
51/**
52 * Type of function used to differentiate between modified and not modified
53 * Sampler.
54 */
55typedef void
56(*RPS_get_peers_type) (void *cls);
57
58
59/**
60 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
61 *
62 * @param cls the closure given alongside this function.
63 * @param ids the PeerIDs that were returned
64 * to be freed
65 */
66 typedef void
67(*RPS_sampler_n_rand_peers_ready_cb) (const struct GNUNET_PeerIdentity *ids,
68 uint32_t num_peers,
69 void *cls);
70
71
72/**
73 * @brief Callback called each time a new peer was put into the sampler
74 *
75 * @param cls A possibly given closure
76 */
77typedef void
78(*SamplerNotifyUpdateCB) (void *cls);
79
80
81/**
82 * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
83 */
84struct GetPeerCls
85{
86 /**
87 * DLL
88 */
89 struct GetPeerCls *next;
90 struct GetPeerCls *prev;
91
92 /**
93 * The #RPS_SamplerRequestHandle this single request belongs to.
94 */
95 struct RPS_SamplerRequestHandle *req_handle;
96
97 /**
98 * The task for this function.
99 */
100 struct GNUNET_SCHEDULER_Task *get_peer_task;
101
102 /**
103 * @brief Context to the given callback.
104 */
105 struct SamplerNotifyUpdateCTX *notify_ctx;
106
107 /**
108 * The callback
109 */
110 RPS_sampler_rand_peer_ready_cont cont;
111
112 /**
113 * The closure to the callback @e cont
114 */
115 void *cont_cls;
116
117 /**
118 * The address of the id to be stored at
119 */
120 struct GNUNET_PeerIdentity *id;
121};
122
123
124/**
125 * Sampler with its own array of SamplerElements
126 */
127struct RPS_Sampler
128{
129 /**
130 * Number of sampler elements we hold.
131 */
132 unsigned int sampler_size;
133 //size_t size;
134
135 /**
136 * All sampler elements in one array.
137 */
138 struct RPS_SamplerElement **sampler_elements;
139
140 /**
141 * Maximum time a round takes
142 *
143 * Used in the context of RPS
144 */
145 struct GNUNET_TIME_Relative max_round_interval;
146
147 /**
148 * Stores the function to return peers. Which one it is depends on whether
149 * the Sampler is the modified one or not.
150 */
151 RPS_get_peers_type get_peers;
152
153 /**
154 * Head and tail for the DLL to store the #RPS_SamplerRequestHandle
155 */
156 struct RPS_SamplerRequestHandle *req_handle_head;
157 struct RPS_SamplerRequestHandle *req_handle_tail;
158
159 struct SamplerNotifyUpdateCTX *notify_ctx_head;
160 struct SamplerNotifyUpdateCTX *notify_ctx_tail;
161 #ifdef TO_FILE
162 /**
163 * File name to log to
164 */
165 char *file_name;
166 #endif /* TO_FILE */
167};
168
169
170/**
171 * @brief Add a callback that will be called when the next peer is inserted
172 * into the sampler
173 *
174 * @param sampler The sampler on which update it will be called
175 * @param notify_cb The callback
176 * @param cls Closure given to the callback
177 *
178 * @return The context containing callback and closure
179 */
180struct SamplerNotifyUpdateCTX *
181sampler_notify_on_update (struct RPS_Sampler *sampler,
182 SamplerNotifyUpdateCB notify_cb,
183 void *cls);
184
185
186/**
187 * Update every sampler element of this sampler with given peer
188 *
189 * @param sampler the sampler to update.
190 * @param id the PeerID that is put in the sampler
191 */
192 void
193RPS_sampler_update (struct RPS_Sampler *sampler,
194 const struct GNUNET_PeerIdentity *id);
195
196
197/**
198 * Reinitialise all previously initialised sampler elements with the given value.
199 *
200 * Used to get rid of a PeerID.
201 *
202 * @param sampler the sampler to reinitialise a sampler element in.
203 * @param id the id of the sampler elements to update.
204 */
205 void
206RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
207 const struct GNUNET_PeerIdentity *id);
208
209
210/**
211 * Get the size of the sampler.
212 *
213 * @param sampler the sampler to return the size of.
214 * @return the size of the sampler
215 */
216unsigned int
217RPS_sampler_get_size (struct RPS_Sampler *sampler);
218
219
220/**
221 * Grow or shrink the size of the sampler.
222 *
223 * @param sampler the sampler to resize.
224 * @param new_size the new size of the sampler
225 */
226void
227RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size);
228
229
230/**
231 * Get n random peers out of the sampled peers.
232 *
233 * We might want to reinitialise this sampler after giving the
234 * corrsponding peer to the client.
235 * Random with or without consumption?
236 *
237 * @param sampler the sampler to get peers from.
238 * @param cb callback that will be called once the ids are ready.
239 * @param cls closure given to @a cb
240 * @param for_client #GNUNET_YES if result is used for client,
241 * #GNUNET_NO if used internally
242 * @param num_peers the number of peers requested
243 */
244struct RPS_SamplerRequestHandle *
245RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
246 uint32_t num_peers,
247 RPS_sampler_n_rand_peers_ready_cb cb,
248 void *cls);
249
250
251/**
252 * Counts how many Samplers currently hold a given PeerID.
253 *
254 * @param sampler the sampler to count ids in.
255 * @param id the PeerID to count.
256 *
257 * @return the number of occurrences of id.
258 */
259 uint32_t
260RPS_sampler_count_id (struct RPS_Sampler *sampler,
261 const struct GNUNET_PeerIdentity *id);
262
263
264/**
265 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
266 *
267 * @param req_handle the handle to the request
268 */
269void
270RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
271
272
273/**
274 * Cleans the sampler.
275 */
276 void
277RPS_sampler_destroy (struct RPS_Sampler *sampler);
278
279#endif /* RPS_SAMPLER_COMMON_H */
280/* end of rps-sampler_common.h */
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index ee65c2a82..6e124644d 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -25,7 +25,7 @@
25#include "gnunet_util_lib.h" 25#include "gnunet_util_lib.h"
26#include "rps.h" 26#include "rps.h"
27#include "gnunet_rps_service.h" 27#include "gnunet_rps_service.h"
28#include "gnunet-service-rps_sampler.h" 28#include "rps-sampler_client.h"
29 29
30#include <inttypes.h> 30#include <inttypes.h>
31 31