aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-01-06 23:48:24 +0000
committerJulius Bünger <buenger@mytum.de>2015-01-06 23:48:24 +0000
commit23660470add3cd0de11e3e599f14ec59f80ef5c3 (patch)
tree7e4998968317db71913e05d6508194843b48584a /src/rps
parent933428cef51a444d9fcba0ef2a36c626cb337fa5 (diff)
downloadgnunet-23660470add3cd0de11e3e599f14ec59f80ef5c3.tar.gz
gnunet-23660470add3cd0de11e3e599f14ec59f80ef5c3.zip
moved sampler functionality in file of its own
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps.c700
-rw-r--r--src/rps/gnunet-service-rps_sampler.c677
-rw-r--r--src/rps/gnunet-service-rps_sampler.h147
3 files changed, 946 insertions, 578 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 0265c8931..2d103ff1a 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -29,6 +29,8 @@
29#include "gnunet_nse_service.h" 29#include "gnunet_nse_service.h"
30#include "rps.h" 30#include "rps.h"
31 31
32#include "gnunet-service-rps_sampler.h"
33
32#include <math.h> 34#include <math.h>
33#include <inttypes.h> 35#include <inttypes.h>
34 36
@@ -48,6 +50,11 @@
48 50
49// TODO Change API to accept initialisation peers 51// TODO Change API to accept initialisation peers
50 52
53// TODO Change API to accept good peers 'friends'
54
55
56// hist_size_init, hist_size_max
57
51/** 58/**
52 * Our configuration. 59 * Our configuration.
53 */ 60 */
@@ -58,512 +65,25 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
58 */ 65 */
59static struct GNUNET_PeerIdentity *own_identity; 66static struct GNUNET_PeerIdentity *own_identity;
60 67
61
62 struct GNUNET_PeerIdentity *
63get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
64
65/***********************************************************************
66 * Sampler
67 *
68 * WARNING: This section needs to be reviewed regarding the use of
69 * functions providing (pseudo)randomness!
70***********************************************************************/
71
72// TODO care about invalid input of the caller (size 0 or less...)
73
74// It might be interesting to formulate this independent of PeerIDs.
75
76/** 68/**
77 * Callback that is called when a new PeerID is inserted into a sampler. 69 * Closure to the callback cadet calls on each peer it passes to us
78 *
79 * @param cls the closure given alongside this function.
80 * @param id the PeerID that is inserted
81 * @param hash the hash the sampler produced of the PeerID
82 */ 70 */
83typedef void (* SAMPLER_insertCB) (void *cls, 71struct init_peer_cls
84 const struct GNUNET_PeerIdentity *id,
85 struct GNUNET_HashCode hash);
86
87/**
88 * Callback that is called when a new PeerID is removed from a sampler.
89 *
90 * @param cls the closure given alongside this function.
91 * @param id the PeerID that is removed
92 * @param hash the hash the sampler produced of the PeerID
93 */
94typedef void (* SAMPLER_removeCB) (void *cls,
95 const struct GNUNET_PeerIdentity *id,
96 struct GNUNET_HashCode hash);
97
98/**
99 * A sampler sampling PeerIDs.
100 */
101struct Sampler
102{ 72{
103 /** 73 /**
104 * Min-wise linear permutation used by this sampler. 74 * The server handle to later listen to client requests
105 *
106 * This is an key later used by a hmac.
107 */
108 struct GNUNET_CRYPTO_AuthKey auth_key;
109
110 /**
111 * The PeerID this sampler currently samples.
112 */ 75 */
113 struct GNUNET_PeerIdentity *peer_id; 76 struct GNUNET_SERVER_Handle *server;
114 77
115 /** 78 /**
116 * The according hash value of this PeerID. 79 * Counts how many peers cadet already passed to us
117 */ 80 */
118 struct GNUNET_HashCode peer_id_hash; 81 uint32_t i;
119
120 /**
121 * Samplers are kept in a linked list.
122 */
123 struct Sampler *next;
124
125 /**
126 * Samplers are kept in a linked list.
127 */
128 struct Sampler *prev;
129
130}; 82};
131 83
132/**
133 * A n-tuple of samplers.
134 */
135struct Samplers
136{
137 /**
138 * Number of samplers we hold.
139 */
140 unsigned int size;
141 //size_t size;
142
143 /**
144 * All PeerIDs in one array.
145 */
146 struct GNUNET_PeerIdentity *peer_ids;
147
148 /**
149 * Callback to be called when a peer gets inserted into a sampler.
150 */
151 SAMPLER_insertCB insertCB;
152
153 /**
154 * Closure to the insertCB.
155 */
156 void *insertCLS;
157
158 /**
159 * Callback to be called when a peer gets inserted into a sampler.
160 */
161 SAMPLER_removeCB removeCB;
162
163 /**
164 * Closure to the removeCB.
165 */
166 void *removeCLS;
167
168 /**
169 * The head of the DLL.
170 */
171 struct Sampler *head;
172
173 /**
174 * The tail of the DLL.
175 */
176 struct Sampler *tail;
177
178};
179
180/**
181 * Reinitialise a previously initialised sampler.
182 *
183 * @param sampler the sampler element.
184 * @param id pointer to the memory that keeps the value.
185 */
186 void
187SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct GNUNET_PeerIdentity *id)
188{
189 // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
190 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
191 &(sampler->auth_key.key),
192 GNUNET_CRYPTO_HASH_LENGTH);
193
194 GNUNET_assert(NULL != id);
195 sampler->peer_id = id;
196 memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
197 // Maybe take a PeerID as second argument.
198
199 GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
200 sizeof(struct GNUNET_PeerIdentity),
201 &sampler->peer_id_hash);
202}
203
204
205/**
206 * (Re)Initialise given Sampler with random min-wise independent function.
207 *
208 * In this implementation this means choosing an auth_key for later use in
209 * a hmac at random.
210 *
211 * @param id pointer to the place where this sampler will store the PeerID.
212 * This will be overwritten.
213 */
214 struct Sampler *
215SAMPLER_init(struct GNUNET_PeerIdentity *id)
216{
217 struct Sampler *s;
218
219 s = GNUNET_new(struct Sampler);
220
221 SAMPLER_reinitialise_sampler (s, id);
222 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) \n",
223 GNUNET_i2s(s->peer_id), s->peer_id);
224
225 s->prev = NULL;
226 s->next = NULL;
227
228 return s;
229}
230
231/**
232 * Input an PeerID into the given sampler.
233 */
234 static void
235SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
236 SAMPLER_insertCB insertCB, void *insertCLS,
237 SAMPLER_removeCB removeCB, void *removeCLS)
238 // TODO call update herein
239{
240 struct GNUNET_HashCode other_hash;
241
242 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
243 GNUNET_i2s(other), other);
244 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
245 GNUNET_i2s(s->peer_id), s->peer_id);
246
247 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
248 {
249 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
250 GNUNET_i2s(other));
251 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
252 GNUNET_i2s(s->peer_id));
253 }
254 else
255 {
256 GNUNET_CRYPTO_hmac(&s->auth_key,
257 other,
258 sizeof(struct GNUNET_PeerIdentity),
259 &other_hash);
260
261 if ( NULL == s->peer_id )
262 { // Or whatever is a valid way to say
263 // "we have no PeerID at the moment"
264 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
265 GNUNET_i2s(other));
266 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
267 //s->peer_id = other;
268 s->peer_id_hash = other_hash;
269 if (NULL != insertCB)
270 {
271 insertCB(insertCLS, s->peer_id, s->peer_id_hash);
272 }
273 }
274 else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
275 {
276 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
277 GNUNET_i2s(other));
278 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
279 GNUNET_i2s(s->peer_id));
280
281 if ( NULL != removeCB )
282 {
283 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
284 GNUNET_i2s(s->peer_id));
285 removeCB(removeCLS, s->peer_id, s->peer_id_hash);
286 }
287
288 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
289 //s->peer_id = other;
290 s->peer_id_hash = other_hash;
291
292 if ( NULL != insertCB )
293 {
294 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
295 GNUNET_i2s(s->peer_id));
296 insertCB(insertCLS, s->peer_id, s->peer_id_hash);
297 }
298 }
299 else
300 {
301 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
302 GNUNET_i2s(other));
303 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
304 GNUNET_i2s(s->peer_id));
305 }
306 }
307}
308
309/**
310 * Gow or shrink the size of the tuple of samplers.
311 *
312 * @param samplers the samplers to grow
313 * @param new_size the new size of the samplers
314 * @param fill_up_id if growing, that has to point to a
315 * valid PeerID and will be used
316 * to initialise newly created samplers
317 */
318 void
319SAMPLER_samplers_resize (struct Samplers * samplers,
320 unsigned int new_size,
321 struct GNUNET_PeerIdentity *fill_up_id)
322{
323 if ( samplers->size == new_size )
324 {
325 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
326 return;
327 }
328
329 unsigned int old_size;
330 struct Sampler *iter;
331 uint64_t i;
332 struct Sampler *tmp;
333
334 old_size = samplers->size;
335 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
336
337 iter = samplers->head;
338
339 if ( new_size < old_size )
340 {
341 for ( i = new_size ; i < old_size ; i++ )
342 {/* Remove unneeded rest */
343 tmp = iter->next;
344 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
345 if (NULL != samplers->removeCB)
346 samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
347 // FIXME When this is called and counts the amount of peer_ids in the samplers
348 // this gets a wrong number.
349 GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
350 GNUNET_free(iter);
351 iter = tmp;
352 }
353 }
354
355 GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
356 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
357
358 if ( new_size > old_size )
359 { /* Growing */
360 GNUNET_assert( NULL != fill_up_id );
361 for ( i = 0 ; i < new_size ; i++ )
362 { /* All samplers */
363 if ( i < old_size )
364 { /* Update old samplers */
365 iter->peer_id = &samplers->peer_ids[i];
366 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
367 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
368 iter = iter->next;
369 }
370 else
371 { /* Add new samplers */
372 memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
373 iter = SAMPLER_init(&samplers->peer_ids[i]);
374 if (NULL != samplers->insertCB)
375 {
376 samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
377 }
378 GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
379 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
380 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
381 }
382 }
383 }
384 else// if ( new_size < old_size )
385 { /* Shrinking */
386 for ( i = 0 ; i < new_size ; i++)
387 { /* All samplers */
388 tmp = iter->next;
389 /* Update remaining samplers */
390 iter->peer_id = &samplers->peer_ids[i];
391 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
392 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
393
394 iter = tmp;
395 }
396 }
397
398 GNUNET_assert(samplers->size == new_size);
399 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
400}
401
402
403/**
404 * Initialise a tuple of samplers.
405 */
406struct Samplers *
407SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
408 SAMPLER_insertCB insertCB, void *insertCLS,
409 SAMPLER_removeCB removeCB, void *removeCLS)
410{
411 struct Samplers *samplers;
412 //struct Sampler *s;
413 //uint64_t i;
414
415 samplers = GNUNET_new(struct Samplers);
416 samplers->size = 0;
417 samplers->peer_ids = NULL;
418 samplers->insertCB = insertCB;
419 samplers->insertCLS = insertCLS;
420 samplers->removeCB = removeCB;
421 samplers->removeCLS = removeCLS;
422 samplers->head = samplers->tail = NULL;
423 //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
424
425 SAMPLER_samplers_resize(samplers, init_size, id);
426
427 GNUNET_assert(init_size == samplers->size);
428 return samplers;
429}
430
431
432/**
433 * A fuction to update every sampler in the given list
434 */
435 static void
436SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
437{
438 struct Sampler *iter;
439
440 iter = samplers->head;
441 while ( NULL != iter->next )
442 {
443 SAMPLER_next(iter, id,
444 samplers->insertCB, samplers->insertCLS,
445 samplers->removeCB, samplers->removeCLS);
446 iter = iter->next;
447 }
448
449}
450
451/**
452 * Reinitialise all previously initialised sampler with the given value.
453 *
454 * @param samplers the sampler list.
455 * @param id the id of the samplers to update.
456 */
457 void
458SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
459{
460 uint64_t i;
461 struct Sampler *iter;
462
463 iter = samplers->head;
464 for ( i = 0 ; i < samplers->size ; i++ )
465 {
466 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
467 {
468 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
469 SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
470 }
471 if (NULL != iter->next)
472 iter = iter->next;
473 }
474}
475
476/**
477 * Get one random peer out of the sampled peers.
478 *
479 * We might want to reinitialise this sampler after giving the
480 * corrsponding peer to the client.
481 */
482 const struct GNUNET_PeerIdentity*
483SAMPLER_get_rand_peer (struct Samplers *samplers)
484{
485 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
486
487 if ( 0 == samplers->size )
488 {
489 LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID %s\n", GNUNET_i2s(own_identity));
490 return own_identity;
491 }
492 else
493 {
494 const struct GNUNET_PeerIdentity *peer;
495
496 peer = get_rand_peer(samplers->peer_ids, samplers->size);
497 LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
498 LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", GNUNET_i2s(own_identity));
499
500 return peer;
501 }
502}
503
504/**
505 * Get n random peers out of the sampled peers.
506 *
507 * We might want to reinitialise this sampler after giving the
508 * corrsponding peer to the client.
509 * Random with or without consumption?
510 */
511 const struct GNUNET_PeerIdentity* // TODO give back simple array
512SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
513{
514 // TODO check if we have too much (distinct) sampled peers
515 // If we are not ready yet maybe schedule for later
516 struct GNUNET_PeerIdentity *peers;
517 uint64_t i;
518
519 peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
520
521 for ( i = 0 ; i < n ; i++ ) {
522 //peers[i] = SAMPLER_get_rand_peer(samplers);
523 memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct GNUNET_PeerIdentity));
524 }
525
526 // TODO something else missing?
527 return peers;
528}
529
530/**
531 * Counts how many Samplers currently hold a given PeerID.
532 */
533 uint64_t
534SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
535{
536 struct Sampler *iter;
537 uint64_t count;
538
539 iter = samplers->head;
540 count = 0;
541 while ( NULL != iter )
542 {
543 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
544 count++;
545 iter = iter->next;
546 }
547 return count;
548}
549
550
551/**
552 * Cleans the samplers.
553 *
554 * @param samplers the samplers to clean up.
555 */
556 void
557SAMPLER_samplers_destroy (struct Samplers *samplers)
558{
559 SAMPLER_samplers_resize(samplers, 0, NULL);
560 GNUNET_free(samplers);
561}
562
563/***********************************************************************
564 * /Sampler
565***********************************************************************/
566 84
85 struct GNUNET_PeerIdentity *
86get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
567 87
568 88
569/*********************************************************************** 89/***********************************************************************
@@ -637,12 +157,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
637 157
638 158
639/** 159/**
640 * The samplers.
641 */
642static struct Samplers *sampler_list;
643
644
645/**
646 * The gossiped list of peers. 160 * The gossiped list of peers.
647 */ 161 */
648static struct GNUNET_PeerIdentity *gossip_list; 162static struct GNUNET_PeerIdentity *gossip_list;
@@ -665,12 +179,16 @@ static unsigned int est_size;
665/** 179/**
666 * Percentage of total peer number in the gossip list 180 * Percentage of total peer number in the gossip list
667 * to send random PUSHes to 181 * to send random PUSHes to
182 *
183 * TODO do not read from configuration
668 */ 184 */
669static float alpha; 185static float alpha;
670 186
671/** 187/**
672 * Percentage of total peer number in the gossip list 188 * Percentage of total peer number in the gossip list
673 * to send random PULLs to 189 * to send random PULLs to
190 *
191 * TODO do not read from configuration
674 */ 192 */
675static float beta; 193static float beta;
676 194
@@ -696,6 +214,8 @@ static struct GNUNET_TIME_Relative round_interval;
696 214
697/** 215/**
698 * List to store peers received through pushes temporary. 216 * List to store peers received through pushes temporary.
217 *
218 * TODO -> multipeermap
699 */ 219 */
700static struct GNUNET_PeerIdentity *push_list; 220static struct GNUNET_PeerIdentity *push_list;
701 221
@@ -707,6 +227,8 @@ static unsigned int push_list_size;
707 227
708/** 228/**
709 * List to store peers received through pulls temporary. 229 * List to store peers received through pulls temporary.
230 *
231 * TODO -> multipeermap
710 */ 232 */
711static struct GNUNET_PeerIdentity *pull_list; 233static struct GNUNET_PeerIdentity *pull_list;
712 234
@@ -741,11 +263,12 @@ uint64_t g_i = 0;
741 * Get random peer from the gossip list. 263 * Get random peer from the gossip list.
742 */ 264 */
743 struct GNUNET_PeerIdentity * 265 struct GNUNET_PeerIdentity *
744get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size) 266get_rand_peer(const struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
745{ 267{
746 uint64_t r_index; 268 uint64_t r_index;
747 struct GNUNET_PeerIdentity *peer; 269 struct GNUNET_PeerIdentity *peer;
748 270
271 peer = GNUNET_new(struct GNUNET_PeerIdentity);
749 // FIXME if we have only NULL in gossip list this will block 272 // FIXME if we have only NULL in gossip list this will block
750 // but then we might have a problem nevertheless 273 // but then we might have a problem nevertheless
751 274
@@ -759,12 +282,13 @@ get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
759 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 282 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
760 list_size); 283 list_size);
761 284
762 peer = &(peer_list[r_index]); 285 *peer = peer_list[r_index];
763 } while (NULL == peer); 286 } while (NULL == peer);
764 287
765 return peer; 288 return peer;
766} 289}
767 290
291
768/** 292/**
769 * Make sure the context of a given peer exists in the given peer_map. 293 * Make sure the context of a given peer exists in the given peer_map.
770 */ 294 */
@@ -784,7 +308,7 @@ touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNU
784 ctx->mq = NULL; 308 ctx->mq = NULL;
785 ctx->to_channel = NULL; 309 ctx->to_channel = NULL;
786 ctx->from_channel = NULL; 310 ctx->from_channel = NULL;
787 GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 311 (void) GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
788 } 312 }
789} 313}
790 314
@@ -893,13 +417,14 @@ nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimat
893 //scale = .01; 417 //scale = .01;
894 estimate = GNUNET_NSE_log_estimate_to_n(logestimate); 418 estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
895 // GNUNET_NSE_log_estimate_to_n (logestimate); 419 // GNUNET_NSE_log_estimate_to_n (logestimate);
896 estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add 420 estimate = pow(estimate, 1./3);
421 // TODO add if std_dev is a number
422 // estimate += (std_dev * scale);
897 if ( 0 < estimate ) { 423 if ( 0 < estimate ) {
898 LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 424 LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
899 est_size = estimate; 425 est_size = estimate;
900 } else { 426 } else
901 LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 427 LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
902 }
903} 428}
904 429
905/** 430/**
@@ -929,26 +454,26 @@ handle_cs_request (void *cls,
929 454
930 // TODO 455 // TODO
931 msg = (struct GNUNET_RPS_CS_RequestMessage *) message; 456 msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
932 cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx); 457 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
933 if ( NULL == cli_ctx ) { 458 if ( NULL == cli_ctx ) {
934 cli_ctx = GNUNET_new(struct client_ctx); 459 cli_ctx = GNUNET_new(struct client_ctx);
935 cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client); 460 cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
936 GNUNET_SERVER_client_set_user_context(client, cli_ctx); 461 GNUNET_SERVER_client_set_user_context (client, cli_ctx);
937 } 462 }
938 463
939 // How many peers do we give back? 464 // How many peers do we give back?
940 // Wait until we have enough random peers? 465 // Wait until we have enough random peers?
941 466
942 ev = GNUNET_MQ_msg_extra(out_msg, 467 ev = GNUNET_MQ_msg_extra (out_msg,
943 GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity), 468 GNUNET_ntohll (msg->num_peers) * sizeof (struct GNUNET_PeerIdentity),
944 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); 469 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
945 out_msg->num_peers = msg->num_peers; // No conversion between network and host order 470 out_msg->num_peers = msg->num_peers; // No conversion between network and host order
946 471
947 num_peers = GNUNET_ntohll(msg->num_peers); 472 num_peers = GNUNET_ntohll (msg->num_peers);
948 //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers); 473 //&out_msg[1] = RPS_sampler_get_n_rand_peers(sampler_list, num_peers);
949 memcpy(&out_msg[1], 474 memcpy(&out_msg[1],
950 SAMPLER_get_n_rand_peers(sampler_list, num_peers), 475 RPS_sampler_get_n_rand_peers (num_peers),
951 num_peers * sizeof(struct GNUNET_PeerIdentity)); 476 num_peers * sizeof (struct GNUNET_PeerIdentity));
952 477
953 GNUNET_MQ_send(cli_ctx->mq, ev); 478 GNUNET_MQ_send(cli_ctx->mq, ev);
954 //GNUNET_MQ_destroy(mq); 479 //GNUNET_MQ_destroy(mq);
@@ -974,18 +499,19 @@ handle_peer_push (void *cls,
974 void **channel_ctx, 499 void **channel_ctx,
975 const struct GNUNET_MessageHeader *msg) 500 const struct GNUNET_MessageHeader *msg)
976{ 501{
977 LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n"); 502 LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
978 503
979 struct GNUNET_PeerIdentity *peer; 504 const struct GNUNET_PeerIdentity *peer;
980 505
981 // TODO check the proof of work 506 // TODO check the proof of work
982 507
983 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( channel, GNUNET_CADET_OPTION_PEER ); 508 peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
509 // FIXME wait for cadet to change this function
984 510
985 /* Add the sending peer to the push_list */ 511 /* Add the sending peer to the push_list */
986 LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size); 512 LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size);
987 GNUNET_array_append(push_list, push_list_size, *peer); 513 GNUNET_array_append (push_list, push_list_size, *peer);
988 LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size); 514 LOG (GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size);
989 515
990 return GNUNET_OK; 516 return GNUNET_OK;
991} 517}
@@ -1018,6 +544,7 @@ handle_peer_pull_request (void *cls,
1018 // otherwise remove from peerlist? 544 // otherwise remove from peerlist?
1019 545
1020 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER); 546 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER);
547 // FIXME wait for cadet to change this function
1021 LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer)); 548 LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer));
1022 549
1023 //mq = GNUNET_CADET_mq_create(channel); // without mq? 550 //mq = GNUNET_CADET_mq_create(channel); // without mq?
@@ -1062,8 +589,17 @@ handle_peer_pull_reply (void *cls,
1062 uint64_t i; 589 uint64_t i;
1063 590
1064 // TODO check that we sent a request and that it is the first reply 591 // TODO check that we sent a request and that it is the first reply
1065 592 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) < ntohs (msg->size))
593 {
594 GNUNET_break_op (0); // At the moment our own implementation seems to break that.
595 return GNUNET_SYSERR;
596 }
1066 in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg; 597 in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
598 if (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) / sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers))
599 {
600 GNUNET_break_op (0);
601 return GNUNET_SYSERR;
602 }
1067 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 603 peers = (struct GNUNET_PeerIdentity *) &msg[1];
1068 for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ ) 604 for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ )
1069 { 605 {
@@ -1089,10 +625,11 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1089 struct GNUNET_RPS_P2P_PushMessage *push_msg; 625 struct GNUNET_RPS_P2P_PushMessage *push_msg;
1090 struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message 626 struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
1091 struct GNUNET_MQ_Envelope *ev; 627 struct GNUNET_MQ_Envelope *ev;
1092 struct GNUNET_PeerIdentity *peer; 628 const struct GNUNET_PeerIdentity *peer;
629 struct GNUNET_MQ_Handle *mq;
1093 630
1094 // TODO print lists, ... 631 // TODO print lists, ...
1095 // TODO cleanup peer_map 632 // TODO rendomise and spread calls herein over time
1096 633
1097 634
1098 /* Would it make sense to have one shuffeled gossip list and then 635 /* Would it make sense to have one shuffeled gossip list and then
@@ -1117,7 +654,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1117 push_msg; */ 654 push_msg; */
1118 push_msg->placeholder = 0; 655 push_msg->placeholder = 0;
1119 // FIXME sometimes it returns a pointer to a freed mq 656 // FIXME sometimes it returns a pointer to a freed mq
1120 GNUNET_MQ_send (get_mq (peer_map, peer), ev); 657 mq = get_mq (peer_map, peer);
658 GNUNET_MQ_send (mq, ev);
1121 659
1122 // modify in_flags of respective peer? 660 // modify in_flags of respective peer?
1123 } 661 }
@@ -1137,16 +675,17 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1137 675
1138 ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); 676 ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1139 //ev = GNUNET_MQ_msg_extra(); 677 //ev = GNUNET_MQ_msg_extra();
1140 pull_msg->placeholder = 0; 678 //pull_msg->placeholder = 0;
1141 GNUNET_MQ_send( get_mq(peer_map, peer), ev ); 679 pull_msg = NULL;
680 mq = get_mq (peer_map, peer);
681 GNUNET_MQ_send (mq, ev);
1142 // modify in_flags of respective peer? 682 // modify in_flags of respective peer?
1143 } 683 }
1144 } 684 }
1145 685
1146 686
1147 /* If the NSE has changed adapt the lists accordingly */ 687 /* If the NSE has changed adapt the lists accordingly */
1148 if ( sampler_list->size != est_size ) 688 RPS_sampler_resize(est_size);
1149 SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1150 689
1151 GNUNET_array_grow(gossip_list, gossip_list_size, est_size); 690 GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1152 691
@@ -1165,7 +704,7 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1165 704
1166 first_border = round(alpha * gossip_list_size); 705 first_border = round(alpha * gossip_list_size);
1167 for ( i = 0 ; i < first_border ; i++ ) 706 for ( i = 0 ; i < first_border ; i++ )
1168 { // TODO use SAMPLER_get_n_rand_peers 707 { // TODO use RPS_sampler_get_n_rand_peers
1169 /* Update gossip list with peers received through PUSHes */ 708 /* Update gossip list with peers received through PUSHes */
1170 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 709 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1171 push_list_size); 710 push_list_size);
@@ -1186,9 +725,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1186 for ( i = second_border ; i < gossip_list_size ; i++ ) 725 for ( i = second_border ; i < gossip_list_size ; i++ )
1187 { 726 {
1188 /* Update gossip list with peers from history */ 727 /* Update gossip list with peers from history */
1189 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 728 peer = RPS_sampler_get_rand_peer();
1190 sampler_list->size); 729 gossip_list[i] = *peer;
1191 gossip_list[i] = sampler_list->peer_ids[r_index];
1192 // TODO change the in_flags accordingly 730 // TODO change the in_flags accordingly
1193 } 731 }
1194 732
@@ -1204,60 +742,61 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1204 742
1205 for ( i = 0 ; i < push_list_size ; i++ ) 743 for ( i = 0 ; i < push_list_size ; i++ )
1206 { 744 {
1207 SAMPLER_update_list(sampler_list, &push_list[i]); 745 RPS_sampler_update_list (&push_list[i]);
1208 // TODO set in_flag? 746 // TODO set in_flag?
1209 } 747 }
1210 748
1211 for ( i = 0 ; i < pull_list_size ; i++ ) 749 for ( i = 0 ; i < pull_list_size ; i++ )
1212 { 750 {
1213 SAMPLER_update_list(sampler_list, &pull_list[i]); 751 RPS_sampler_update_list (&pull_list[i]);
1214 // TODO set in_flag? 752 // TODO set in_flag?
1215 } 753 }
1216 754
1217 755
1218 /* Empty push/pull lists */ 756 /* Empty push/pull lists */
1219 GNUNET_array_grow(push_list, push_list_size, 0); 757 GNUNET_array_grow (push_list, push_list_size, 0);
1220 push_list_size = 0; // I guess that's not necessary but doesn't hurt 758 push_list_size = 0; // I guess that's not necessary but doesn't hurt
1221 GNUNET_array_grow(pull_list, pull_list_size, 0); 759 GNUNET_array_grow (pull_list, pull_list_size, 0);
1222 pull_list_size = 0; // I guess that's not necessary but doesn't hurt 760 pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1223 761
1224 762
1225 /* Schedule next round */ 763 /* Schedule next round */
1226 do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL ); 764 do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round, NULL);
1227 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 765 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1228} 766}
1229 767
1230/** 768/**
1231 * Open a connection to given peer and store channel and mq. 769 * Open a connection to given peer and store channel and mq.
1232 */ 770 */
1233 void 771 void
1234insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash) 772insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
1235{ 773{
1236 // We open a channel to be notified when this peer goes down. 774 // We open a channel to be notified when this peer goes down.
1237 touch_channel(peer_map, id); 775 touch_channel (peer_map, id);
1238} 776}
1239 777
1240/** 778/**
1241 * Close the connection to given peer and delete channel and mq. 779 * Close the connection to given peer and delete channel and mq.
1242 */ 780 */
1243 void 781 void
1244removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash) 782removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
1245{ 783{
1246 size_t s; 784 size_t s;
1247 struct peer_context *ctx; 785 struct peer_context *ctx;
1248 786
1249 s = SAMPLER_count_id(sampler_list, id); 787 s = RPS_sampler_count_id (id);
1250 if ( 1 >= s ) { 788 if ( 1 >= s )
1251 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id)) 789 {
790 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
1252 { 791 {
1253 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id); 792 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, id);
1254 if (NULL != ctx->to_channel) 793 if (NULL != ctx->to_channel)
1255 { 794 {
1256 if (NULL != ctx->mq) 795 if (NULL != ctx->mq)
1257 { 796 {
1258 GNUNET_MQ_destroy(ctx->mq); 797 GNUNET_MQ_destroy (ctx->mq);
1259 } 798 }
1260 GNUNET_CADET_channel_destroy(ctx->to_channel); 799 GNUNET_CADET_channel_destroy (ctx->to_channel);
1261 } 800 }
1262 // TODO cleanup peer 801 // TODO cleanup peer
1263 GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id); 802 GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
@@ -1282,27 +821,29 @@ init_peer_cb (void *cls,
1282 unsigned int best_path) // "How long is the best path? 821 unsigned int best_path) // "How long is the best path?
1283 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 822 // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1284{ 823{
824 struct init_peer_cls *ipc;
825
826 ipc = (struct init_peer_cls *) cls;
1285 if ( NULL != peer ) 827 if ( NULL != peer )
1286 { 828 {
1287 LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer); 829 LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1288 SAMPLER_update_list(sampler_list, peer); 830 RPS_sampler_update_list(peer);
1289 touch_peer_ctx(peer_map, peer); // unneeded? -> insertCB 831 touch_peer_ctx(peer_map, peer); // unneeded? -> insertCB
1290 832
1291 gossip_list[g_i] = *peer; 833 gossip_list[ipc->i] = *peer;
1292 g_i++; 834 ipc->i++;
1293 // FIXME find a better way to have a global counter
1294 835
1295 // send push/pull to each of those peers? 836 // send push/pull to each of those peers?
1296 } 837 }
1297 else 838 else
1298 { 839 {
1299 if (g_i < sampler_list->size) 840 if (ipc->i < gossip_list_size)
1300 { 841 {
1301 memcpy(&gossip_list[g_i], 842 memcpy(&gossip_list[ipc->i],
1302 &(sampler_list->peer_ids[g_i]), 843 RPS_sampler_get_rand_peer(),
1303 (sampler_list->size - g_i) * sizeof(struct GNUNET_PeerIdentity)); 844 (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity));
1304 } 845 }
1305 rps_start( (struct GNUNET_SERVER_Handle *) cls); 846 rps_start (ipc->server);
1306 } 847 }
1307} 848}
1308 849
@@ -1328,7 +869,7 @@ shutdown_task (void *cls,
1328 GNUNET_NSE_disconnect(nse); 869 GNUNET_NSE_disconnect(nse);
1329 GNUNET_CADET_disconnect(cadet_handle); 870 GNUNET_CADET_disconnect(cadet_handle);
1330 GNUNET_free(own_identity); 871 GNUNET_free(own_identity);
1331 SAMPLER_samplers_destroy(sampler_list); 872 RPS_sampler_destroy();
1332 GNUNET_array_grow(gossip_list, gossip_list_size, 0); 873 GNUNET_array_grow(gossip_list, gossip_list_size, 0);
1333 GNUNET_array_grow(push_list, push_list_size, 0); 874 GNUNET_array_grow(push_list, push_list_size, 0);
1334 GNUNET_array_grow(pull_list, pull_list_size, 0); 875 GNUNET_array_grow(pull_list, pull_list_size, 0);
@@ -1405,7 +946,8 @@ cleanup_channel(void *cls,
1405 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info ( 946 peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1406 (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER); 947 (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1407 // Guess simply casting isn't the nicest way... 948 // Guess simply casting isn't the nicest way...
1408 SAMPLER_reinitialise_samplers_by_value(sampler_list, peer); 949 // FIXME wait for cadet to change this function
950 RPS_sampler_reinitialise_by_value(peer);
1409} 951}
1410 952
1411/** 953/**
@@ -1413,7 +955,7 @@ cleanup_channel(void *cls,
1413 */ 955 */
1414static void 956static void
1415rps_start (struct GNUNET_SERVER_Handle *server) 957rps_start (struct GNUNET_SERVER_Handle *server)
1416{ 958{ // TODO get msg sizes right
1417 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 959 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1418 {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0}, 960 {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
1419 {NULL, NULL, 0, 0} 961 {NULL, NULL, 0, 0}
@@ -1453,19 +995,18 @@ run (void *cls,
1453 995
1454 LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n"); 996 LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1455 997
1456 cfg = c; 998 struct init_peer_cls *ipc;
1457 999
1000 cfg = c;
1458 1001
1459 own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
1460 1002
1003 /* Get own ID */
1004 own_identity = GNUNET_new(struct GNUNET_PeerIdentity);
1461 GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value 1005 GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value
1462
1463 GNUNET_assert(NULL != own_identity); 1006 GNUNET_assert(NULL != own_identity);
1464
1465 LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity); 1007 LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1466 1008
1467 1009
1468
1469 /* Get time interval from the configuration */ 1010 /* Get time interval from the configuration */
1470 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS", 1011 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1471 "ROUNDINTERVAL", 1012 "ROUNDINTERVAL",
@@ -1519,15 +1060,16 @@ run (void *cls,
1519 "BETA", 1060 "BETA",
1520 &beta)) 1061 &beta))
1521 { 1062 {
1522 LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n"); 1063 LOG (GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1523 } 1064 }
1524 LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta); 1065 LOG (GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1525 1066
1526 // TODO check that alpha + beta < 1 1067 // TODO check that alpha + beta < 1
1527 1068
1528 peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO); 1069 peer_map = GNUNET_CONTAINER_multipeermap_create (est_size, GNUNET_NO);
1529 1070
1530 1071
1072 /* Initialise cadet */
1531 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { 1073 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1532 {&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0}, 1074 {&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0},
1533 {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0}, 1075 {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1536,28 +1078,30 @@ run (void *cls,
1536 }; 1078 };
1537 1079
1538 const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h 1080 const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1539 cadet_handle = GNUNET_CADET_connect(cfg, 1081 cadet_handle = GNUNET_CADET_connect (cfg,
1540 cls, 1082 cls,
1541 &handle_inbound_channel, 1083 &handle_inbound_channel,
1542 &cleanup_channel, 1084 &cleanup_channel,
1543 cadet_handlers, 1085 cadet_handlers,
1544 ports); 1086 ports);
1545 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n"); 1087 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1546
1547 1088
1548 /* Initialise sampler and gossip list */
1549 1089
1550 sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL); 1090 /* Initialise sampler */
1091 RPS_sampler_init (est_size, own_identity, insertCB, NULL, removeCB, NULL);
1551 1092
1093 /* Initialise push and pull maps */
1552 push_list = NULL; 1094 push_list = NULL;
1553 push_list_size = 0; 1095 push_list_size = 0;
1554 pull_list = NULL; 1096 pull_list = NULL;
1555 pull_list_size = 0; 1097 pull_list_size = 0;
1556 1098
1557 1099
1558 LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 1100 ipc = GNUNET_new (struct init_peer_cls);
1559 GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server); 1101 ipc->server = server;
1560 // FIXME use magic 0000 PeerID to _start_ the service 1102 ipc->i = 0;
1103 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1104 GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
1561 1105
1562 // TODO send push/pull to each of those peers? 1106 // TODO send push/pull to each of those peers?
1563} 1107}
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
new file mode 100644
index 000000000..f80a70108
--- /dev/null
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -0,0 +1,677 @@
1/*
2 This file is part of GNUnet.
3 (C)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file rps/gnunet-service-rps_sampler.c
23 * @brief sampler implementation
24 * @author Julius Bünger
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "rps.h"
29
30#include "gnunet-service-rps_sampler.h"
31
32#include <math.h>
33#include <inttypes.h>
34
35#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37// multiple 'clients'?
38
39// TODO check for overflows
40
41// TODO align message structs
42
43// hist_size_init, hist_size_max
44
45/***********************************************************************
46 * WARNING: This section needs to be reviewed regarding the use of
47 * functions providing (pseudo)randomness!
48***********************************************************************/
49
50// TODO care about invalid input of the caller (size 0 or less...)
51
52enum RPS_SamplerEmpty
53{
54 NOT_EMPTY = 0x0,
55 EMPTY = 0x1
56};
57
58/**
59 * A sampler element sampling one PeerID at a time.
60 */
61struct RPS_SamplerElement
62{
63 /**
64 * Min-wise linear permutation used by this sampler.
65 *
66 * This is an key later used by a hmac.
67 */
68 struct GNUNET_CRYPTO_AuthKey auth_key;
69
70 /**
71 * The PeerID this sampler currently samples.
72 */
73 struct GNUNET_PeerIdentity peer_id;
74
75 /**
76 * The according hash value of this PeerID.
77 */
78 struct GNUNET_HashCode peer_id_hash;
79
80 /**
81 * Time of last request.
82 */
83 struct GNUNET_TIME_Absolute last_request;
84
85 /**
86 * Flag that indicates that we are not holding a valid PeerID right now.
87 */
88 enum RPS_SamplerEmpty is_empty;
89};
90
91/**
92 * Sampler with its own array of SamplerElements
93 */
94struct RPS_Sampler
95{
96 /**
97 * Number of sampler elements we hold.
98 */
99 unsigned int sampler_size;
100 //size_t size;
101
102 /**
103 * All Samplers in one array.
104 */
105 struct RPS_SamplerElement **sampler_elements;
106
107 /**
108 * Index to a sampler element.
109 *
110 * Gets cycled on every hist_request.
111 */
112 uint64_t sampler_elem_index;
113
114 /**
115 * Callback to be called when a peer gets inserted into a sampler.
116 */
117 RPS_sampler_insert_cb insert_cb;
118
119 /**
120 * Closure to the insert_cb.
121 */
122 void *insert_cls;
123
124 /**
125 * Callback to be called when a peer gets inserted into a sampler.
126 */
127 RPS_sampler_remove_cb remove_cb;
128
129 /**
130 * Closure to the remove_cb.
131 */
132 void *remove_cls;
133};
134
135/**
136 * Global sampler variable.
137 */
138struct RPS_Sampler *sampler;
139
140
141/**
142 * The minimal size for the extended sampler elements.
143 */
144static size_t min_size;
145
146/**
147 * The maximal size the extended sampler elements should grow to.
148 */
149static size_t max_size;
150
151/**
152 * The size the extended sampler elements currently have.
153 */
154static size_t extra_size;
155
156/**
157 * Inedex to the sampler element that is the next to be returned
158 */
159static struct RPS_SamplerElement **extended_samplers_index;
160
161/**
162 * Request counter.
163 *
164 * Only needed in the beginning to check how many of the 64 deltas
165 * we already have
166 */
167static unsigned int req_counter;
168
169/**
170 * Time of the last request we received.
171 *
172 * Used to compute the expected request rate.
173 */
174static struct GNUNET_TIME_Absolute last_request;
175
176/**
177 * Last 64 deltas between requests
178 */
179static struct GNUNET_TIME_Relative request_deltas[64];
180
181/**
182 * The prediction of the rate of requests
183 */
184static struct GNUNET_TIME_Relative request_rate;
185
186
187/**
188 * Sum all time relatives of an array.
189 */
190 struct GNUNET_TIME_Relative
191T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
192{
193 struct GNUNET_TIME_Relative sum;
194 uint64_t i;
195
196 sum = GNUNET_TIME_UNIT_ZERO;
197 for ( i = 0 ; i < arr_size ; i++ )
198 {
199 sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
200 }
201 return sum;
202}
203
204/**
205 * Compute the average of given time relatives.
206 */
207 struct GNUNET_TIME_Relative
208T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t arr_size)
209{
210 return T_relative_sum (rel_array, arr_size); // FIXME find a way to devide that by arr_size
211}
212
213
214
215/**
216 * Reinitialise a previously initialised sampler element.
217 *
218 * @param sampler pointer to the memory that keeps the value.
219 */
220 static void
221RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
222{
223 sampler_el->is_empty = EMPTY;
224
225 // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
226 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
227 &(sampler_el->auth_key.key),
228 GNUNET_CRYPTO_HASH_LENGTH);
229
230 sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS;
231
232 /* We might want to keep the previous peer */
233
234 //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
235 // sizeof(struct GNUNET_PeerIdentity),
236 // &sampler_el->peer_id_hash);
237}
238
239
240/**
241 * (Re)Initialise given Sampler with random min-wise independent function.
242 *
243 * In this implementation this means choosing an auth_key for later use in
244 * a hmac at random.
245 *
246 * @return a newly created RPS_SamplerElement which currently holds no id.
247 */
248 struct RPS_SamplerElement *
249RPS_sampler_elem_create (void)
250{
251 struct RPS_SamplerElement *s;
252
253 s = GNUNET_new (struct RPS_SamplerElement);
254
255 RPS_sampler_elem_reinit (s);
256 LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
257
258 return s;
259}
260
261
262/**
263 * Input an PeerID into the given sampler.
264 */
265 static void
266RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other,
267 RPS_sampler_insert_cb insert_cb, void *insert_cls,
268 RPS_sampler_remove_cb remove_cb, void *remove_cls)
269{
270 struct GNUNET_HashCode other_hash;
271
272 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) )
273 {
274 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
275 GNUNET_i2s(other));
276 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s_elem\n",
277 GNUNET_i2s(&(s_elem->peer_id)));
278 }
279 else
280 {
281 GNUNET_CRYPTO_hmac(&s_elem->auth_key,
282 other,
283 sizeof(struct GNUNET_PeerIdentity),
284 &other_hash);
285
286 if ( EMPTY == s_elem->is_empty )
287 { // Or whatever is a valid way to say
288 // "we have no PeerID at the moment"
289 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem; Simply accepting (was empty previously).\n",
290 GNUNET_i2s(other));
291 s_elem->peer_id = *other;
292 //s_elem->peer_id = other;
293 s_elem->peer_id_hash = other_hash;
294 if (NULL != sampler->insert_cb)
295 {
296 sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id));
297 }
298 }
299 else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) )
300 {
301 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
302 GNUNET_i2s(other));
303 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s_elem\n",
304 GNUNET_i2s(&s_elem->peer_id));
305
306 if ( NULL != sampler->remove_cb )
307 {
308 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s_elem with the remove callback.\n",
309 GNUNET_i2s(&s_elem->peer_id));
310 sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id);
311 }
312
313 memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
314 //s_elem->peer_id = other;
315 s_elem->peer_id_hash = other_hash;
316
317 if ( NULL != sampler->insert_cb )
318 {
319 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s_elem with the insert callback.\n",
320 GNUNET_i2s(&s_elem->peer_id));
321 sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
322 }
323 }
324 else
325 {
326 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
327 GNUNET_i2s(other));
328 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s_elem\n",
329 GNUNET_i2s(&s_elem->peer_id));
330 }
331 }
332 s_elem->is_empty = NOT_EMPTY;
333}
334
335
336/**
337 * Grow or shrink the size of the sampler.
338 *
339 * @param new_size the new size of the sampler
340 */
341 void
342RPS_sampler_resize (unsigned int new_size)
343{
344 unsigned int old_size;
345 uint64_t i;
346 struct RPS_SamplerElement **rem_list;
347
348 // TODO check min and max size
349
350 old_size = sampler->sampler_size;
351
352 if (old_size > new_size*4 &&
353 extra_size > new_size*4)
354 { /* Shrinking */
355
356 new_size /= 2;
357
358 /* Temporary store those to properly call the removeCB on those later */
359 rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
360 memcpy (rem_list,
361 &sampler->sampler_elements[new_size],
362 (old_size - new_size) * sizeof(struct RPS_SamplerElement *));
363
364 LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size);
365 GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
366 LOG (GNUNET_ERROR_TYPE_DEBUG,
367 "SAMPLER: sampler->sampler_elements now points to %p\n",
368 sampler->sampler_elements);
369
370 // TODO move extended_samplers_index
371 for (i = new_size ; i < old_size ; i++)
372 {/* Remove unneeded rest */
373 LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
374 if (NULL != sampler->remove_cb)
375 sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
376 GNUNET_free (rem_list[i]);
377 }
378 }
379 else if (old_size < new_size)// ||
380 //extra_size < new_size) // needed?
381 { /* Growing */
382 new_size *= 2; // TODO check overflow
383
384 LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size);
385 GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
386 LOG (GNUNET_ERROR_TYPE_DEBUG,
387 "SAMPLER: sampler->sampler_elements now points to %p\n",
388 sampler->sampler_elements);
389
390 // TODO move extended_samplers_index
391
392 for ( i = old_size ; i < new_size ; i++ )
393 { /* Add new sampler elements */
394 sampler->sampler_elements[i] = RPS_sampler_elem_create ();
395 if (NULL != sampler->insert_cb)
396 sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
397 LOG (GNUNET_ERROR_TYPE_DEBUG,
398 "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
399 i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
400 }
401 }
402 else
403 {
404 LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
405 return;
406 }
407
408 GNUNET_assert(sampler->sampler_size == new_size);
409 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove
410}
411
412
413/**
414 * Initialise a tuple of sampler elements.
415 *
416 * @param init_size the size the sampler is initialised with
417 * @param id with which all newly created sampler elements are initialised
418 * @param ins_cb the callback that will be called on every PeerID that is
419 * newly inserted into a sampler element
420 * @param ins_cls the closure given to #ins_cb
421 * @param rem_cb the callback that will be called on every PeerID that is
422 * removed from a sampler element
423 * @param rem_cls the closure given to #rem_cb
424 */
425 void
426RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
427 RPS_sampler_insert_cb ins_cb, void *ins_cls,
428 RPS_sampler_remove_cb rem_cb, void *rem_cls)
429{
430 //struct RPS_Sampler *sampler;
431 //uint64_t i;
432
433 /* Initialise context around extended sampler */
434 min_size = 10; // TODO make input to _samplers_init()
435 max_size = 1000; // TODO make input to _samplers_init()
436 GNUNET_new_array (64, struct GNUNET_TIME_Relative);
437
438 sampler = GNUNET_new (struct RPS_Sampler);
439 sampler->sampler_size = 0;
440 sampler->sampler_elements = NULL;
441 sampler->insert_cb = ins_cb;
442 sampler->insert_cls = ins_cls;
443 sampler->remove_cb = rem_cb;
444 sampler->remove_cls = rem_cls;
445 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
446 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
447 RPS_sampler_resize (init_size);
448 RPS_sampler_update_list (id); // no super nice desing but ok for the moment
449
450 extended_samplers_index = sampler->sampler_elements;
451
452 //GNUNET_assert (init_size == sampler->sampler_size);
453}
454
455
456/**
457 * A fuction to update every sampler in the given list
458 *
459 * @param id the PeerID that is put in the sampler
460 */
461 void
462RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
463{
464 uint64_t i;
465
466 for ( i = 0 ; i < sampler->sampler_size ; i++ )
467 RPS_sampler_elem_next (sampler->sampler_elements[i], id,
468 sampler->insert_cb, sampler->insert_cls,
469 sampler->remove_cb, sampler->remove_cls);
470}
471
472
473/**
474 * Reinitialise all previously initialised sampler elements with the given value.
475 *
476 * Used to get rid of a PeerID.
477 *
478 * @param id the id of the sampler elements to update.
479 */
480 void
481RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
482{
483 uint64_t i;
484
485 for ( i = 0 ; i < sampler->sampler_size ; i++ )
486 {
487 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
488 {
489 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
490 RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
491 }
492 }
493}
494
495
496/**
497 * Get one random peer out of the sampled peers.
498 *
499 * We might want to reinitialise this sampler after giving the
500 * corrsponding peer to the client.
501 * Only used internally
502 */
503 const struct GNUNET_PeerIdentity *
504RPS_sampler_get_rand_peer_ ()
505{
506 uint64_t r_index;
507 const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
508
509 // TODO implement extra logic
510
511 /**;
512 * Choose the r_index of the peer we want to return
513 * at random from the interval of the gossip list
514 */
515 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
516 sampler->sampler_size);
517
518 //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
519 // // TODO schedule for later
520 // peer = NULL;
521 //else
522 peer = &(sampler->sampler_elements[r_index]->peer_id);
523 sampler->sampler_elements[r_index]->last_request = GNUNET_TIME_absolute_get();
524 LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
525
526
527 return peer;
528}
529
530/**
531 * Get n random peers out of the sampled peers.
532 *
533 * We might want to reinitialise this sampler after giving the
534 * corrsponding peer to the client.
535 * Random with or without consumption?
536 * Only used internally
537 */
538 const struct GNUNET_PeerIdentity *
539RPS_sampler_get_n_rand_peers_ (uint64_t n)
540{
541 if ( 0 == sampler->sampler_size )
542 {
543 LOG (GNUNET_ERROR_TYPE_DEBUG,
544 "Sgrp: List empty - Returning NULL\n");
545 return NULL;
546 }
547 else
548 {
549 // TODO check if we have too much (distinct) sampled peers
550 // If we are not ready yet maybe schedule for later
551 struct GNUNET_PeerIdentity *peers;
552 uint64_t i;
553
554 peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
555
556 for ( i = 0 ; i < n ; i++ ) {
557 //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
558 memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity));
559 }
560 return peers;
561 }
562}
563
564
565/**
566 * Get one random peer out of the sampled peers.
567 *
568 * We might want to reinitialise this sampler after giving the
569 * corrsponding peer to the client.
570 *
571 * @return a random PeerID of the PeerIDs previously put into the sampler.
572 */
573 const struct GNUNET_PeerIdentity *
574RPS_sampler_get_rand_peer ()
575{
576 struct GNUNET_PeerIdentity *peer;
577
578 if (64 > req_counter)
579 req_counter++;
580 if (1 < req_counter)
581 {
582 memcpy (&request_deltas[1],
583 request_deltas,
584 (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
585 request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
586 GNUNET_TIME_absolute_get ());
587 request_rate = T_relative_avg (request_deltas, req_counter);
588 }
589 last_request = GNUNET_TIME_absolute_get();
590 // TODO resize the size of the extended_samplers
591
592 // use _get_rand_peer_ ?
593 peer = GNUNET_new (struct GNUNET_PeerIdentity);
594 *peer = (*extended_samplers_index)->peer_id;
595 RPS_sampler_elem_reinit (*extended_samplers_index);
596 if ( extended_samplers_index == &sampler->sampler_elements[sampler->sampler_size -1] )
597 extended_samplers_index = &sampler->sampler_elements[0];
598 else
599 extended_samplers_index++;
600 // TODO
601 return peer;
602}
603
604
605/**
606 * Get n random peers out of the sampled peers.
607 *
608 * We might want to reinitialise this sampler after giving the
609 * corrsponding peer to the client.
610 * Random with or without consumption?
611 *
612 * @return n random PeerIDs of the PeerIDs previously put into the sampler.
613 */
614 const struct GNUNET_PeerIdentity *
615RPS_sampler_get_n_rand_peers (uint64_t n)
616{
617 // use _get_rand_peers_ ?
618 if ( 0 == sampler->sampler_size )
619 {
620 LOG (GNUNET_ERROR_TYPE_DEBUG,
621 "Sgrp: List empty - Returning NULL\n");
622 return NULL;
623 }
624 else
625 {
626 // TODO check if we have too much (distinct) sampled peers
627 // If we are not ready yet maybe schedule for later
628 struct GNUNET_PeerIdentity *peers;
629 uint64_t i;
630
631 peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
632
633 for ( i = 0 ; i < n ; i++ ) {
634 //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
635 memcpy (&peers[i], RPS_sampler_get_rand_peer (), sizeof (struct GNUNET_PeerIdentity));
636 }
637 return peers;
638 }
639}
640
641
642/**
643 * Counts how many Samplers currently hold a given PeerID.
644 *
645 * @param id the PeerID to count.
646 *
647 * @return the number of occurrences of id.
648 */
649 uint64_t
650RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
651{
652 uint64_t count;
653 uint64_t i;
654
655 count = 0;
656 for ( i = 0 ; i < sampler->sampler_size ; i++ )
657 {
658 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
659 && EMPTY != sampler->sampler_elements[i]->is_empty)
660 count++;
661 }
662 return count;
663}
664
665
666/**
667 * Cleans the sampler.
668 */
669 void
670RPS_sampler_destroy ()
671{
672 RPS_sampler_resize (0);
673 GNUNET_free (request_deltas); // _array_grow()?
674 GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
675}
676
677/* end of gnunet-service-rps.c */
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
new file mode 100644
index 000000000..3772f9f0b
--- /dev/null
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -0,0 +1,147 @@
1/*
2 This file is part of GNUnet.
3 (C)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file rps/gnunet-service-rps_sampler.h
23 * @brief sampler implementation
24 * @author Julius Bünger
25 */
26
27#ifndef RPS_SAMPLER_H
28#define RPS_SAMPLER_H
29#include <inttypes.h>
30
31/**
32 * Callback that is called when a new PeerID is inserted into a sampler.
33 *
34 * @param cls the closure given alongside this function.
35 * @param id the PeerID that is inserted
36 */
37typedef void
38(*RPS_sampler_insert_cb) (void *cls,
39 const struct GNUNET_PeerIdentity *id);
40
41/**
42 * Callback that is called when a new PeerID is removed from a sampler.
43 *
44 * @param cls the closure given alongside this function.
45 * @param id the PeerID that is removed
46 */
47typedef void
48(*RPS_sampler_remove_cb) (void *cls,
49 const struct GNUNET_PeerIdentity *id);
50
51/**
52 * A sampler sampling a stream of PeerIDs.
53 */
54//struct RPS_Sampler;
55
56
57/**
58 * Grow or shrink the size of the sampler.
59 *
60 * @param new_size the new size of the sampler
61 */
62 void
63RPS_sampler_resize (unsigned int new_size);
64
65
66/**
67 * Initialise a tuple of samplers.
68 *
69 * @param init_size the size the sampler is initialised with
70 * @param id with which all newly created sampler elements are initialised
71 * @param ins_cb the callback that will be called on every PeerID that is
72 * newly inserted into a sampler element
73 * @param ins_cls the closure given to #ins_cb
74 * @param rem_cb the callback that will be called on every PeerID that is
75 * removed from a sampler element
76 * @param rem_cls the closure given to #rem_cb
77 */
78 void
79RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
80 RPS_sampler_insert_cb ins_cb, void *ins_cls,
81 RPS_sampler_remove_cb rem_cb, void *rem_cls);
82
83
84/**
85 * A fuction to update every sampler in the given list
86 *
87 * @param id the PeerID that is put in the sampler
88 */
89 void
90RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id);
91
92
93/**
94 * Reinitialise all previously initialised sampler elements with the given value.
95 *
96 * Used to get rid of a PeerID.
97 *
98 * @param id the id of the samplers to update.
99 */
100 void
101RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
102
103
104/**
105 * Get one random peer out of the sampled peers.
106 *
107 * We might want to reinitialise this sampler after giving the
108 * corrsponding peer to the client.
109 *
110 * @return a random PeerID of the PeerIDs previously put into the sampler.
111 */
112 const struct GNUNET_PeerIdentity *
113RPS_sampler_get_rand_peer ();
114
115
116/**
117 * Get n random peers out of the sampled peers.
118 *
119 * We might want to reinitialise this sampler after giving the
120 * corrsponding peer to the client.
121 * Random with or without consumption?
122 *
123 * @return n random PeerIDs of the PeerIDs previously put into the sampler.
124 */
125 const struct GNUNET_PeerIdentity *
126RPS_sampler_get_n_rand_peers (uint64_t n);
127
128
129/**
130 * Counts how many Samplers currently hold a given PeerID.
131 *
132 * @param id the PeerID to count.
133 *
134 * @return the number of occurrences of id.
135 */
136 uint64_t
137RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id);
138
139
140/**
141 * Cleans the samplers.
142 */
143 void
144RPS_sampler_destroy ();
145
146#endif
147/* end of gnunet-service-rps.c */