diff options
Diffstat (limited to 'src/rps/rps-sampler_common.c')
-rw-r--r-- | src/rps/rps-sampler_common.c | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c new file mode 100644 index 000000000..d004c06a5 --- /dev/null +++ b/src/rps/rps-sampler_common.c | |||
@@ -0,0 +1,527 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | */ | ||
18 | |||
19 | /** | ||
20 | * @file rps/rps-sampler_common.c | ||
21 | * @brief Code common to client and service sampler | ||
22 | * @author Julius Bünger | ||
23 | */ | ||
24 | #include "platform.h" | ||
25 | #include "gnunet_util_lib.h" | ||
26 | #include "gnunet_statistics_service.h" | ||
27 | |||
28 | #include "rps-sampler_common.h" | ||
29 | #include "gnunet-service-rps_sampler_elem.h" | ||
30 | |||
31 | #include <math.h> | ||
32 | #include <inttypes.h> | ||
33 | |||
34 | #include "rps-test_util.h" | ||
35 | |||
36 | #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler_common",__VA_ARGS__) | ||
37 | |||
38 | /** | ||
39 | * @brief Context for a callback. Contains callback and closure. | ||
40 | * | ||
41 | * Meant to be an entry in an DLL. | ||
42 | */ | ||
43 | struct SamplerNotifyUpdateCTX | ||
44 | { | ||
45 | /** | ||
46 | * @brief The Callback to call on updates | ||
47 | */ | ||
48 | SamplerNotifyUpdateCB notify_cb; | ||
49 | |||
50 | /** | ||
51 | * @brief The according closure. | ||
52 | */ | ||
53 | void *cls; | ||
54 | |||
55 | /** | ||
56 | * @brief Next element in DLL. | ||
57 | */ | ||
58 | struct SamplerNotifyUpdateCTX *next; | ||
59 | |||
60 | /** | ||
61 | * @brief Previous element in DLL. | ||
62 | */ | ||
63 | struct SamplerNotifyUpdateCTX *prev; | ||
64 | }; | ||
65 | |||
66 | |||
67 | /** | ||
68 | * Closure to _get_n_rand_peers_ready_cb() | ||
69 | */ | ||
70 | struct RPS_SamplerRequestHandle | ||
71 | { | ||
72 | /** | ||
73 | * DLL | ||
74 | */ | ||
75 | struct RPS_SamplerRequestHandle *next; | ||
76 | struct RPS_SamplerRequestHandle *prev; | ||
77 | |||
78 | /** | ||
79 | * Number of peers we are waiting for. | ||
80 | */ | ||
81 | uint32_t num_peers; | ||
82 | |||
83 | /** | ||
84 | * Number of peers we currently have. | ||
85 | */ | ||
86 | uint32_t cur_num_peers; | ||
87 | |||
88 | /** | ||
89 | * Pointer to the array holding the ids. | ||
90 | */ | ||
91 | struct GNUNET_PeerIdentity *ids; | ||
92 | |||
93 | /** | ||
94 | * Head and tail for the DLL to store the tasks for single requests | ||
95 | */ | ||
96 | struct GetPeerCls *gpc_head; | ||
97 | struct GetPeerCls *gpc_tail; | ||
98 | |||
99 | /** | ||
100 | * Sampler. | ||
101 | */ | ||
102 | struct RPS_Sampler *sampler; | ||
103 | |||
104 | /** | ||
105 | * Callback to be called when all ids are available. | ||
106 | */ | ||
107 | RPS_sampler_n_rand_peers_ready_cb callback; | ||
108 | |||
109 | /** | ||
110 | * Closure given to the callback | ||
111 | */ | ||
112 | void *cls; | ||
113 | }; | ||
114 | |||
115 | |||
116 | /** | ||
117 | * @brief Add a callback that will be called when the next peer is inserted | ||
118 | * into the sampler | ||
119 | * | ||
120 | * @param sampler The sampler on which update it will be called | ||
121 | * @param notify_cb The callback | ||
122 | * @param cls Closure given to the callback | ||
123 | * | ||
124 | * @return The context containing callback and closure | ||
125 | */ | ||
126 | struct SamplerNotifyUpdateCTX * | ||
127 | sampler_notify_on_update (struct RPS_Sampler *sampler, | ||
128 | SamplerNotifyUpdateCB notify_cb, | ||
129 | void *cls) | ||
130 | { | ||
131 | struct SamplerNotifyUpdateCTX *notify_ctx; | ||
132 | |||
133 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
134 | "Inserting new context for notification\n"); | ||
135 | notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); | ||
136 | notify_ctx->notify_cb = notify_cb; | ||
137 | notify_ctx->cls = cls; | ||
138 | GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, | ||
139 | sampler->notify_ctx_tail, | ||
140 | notify_ctx); | ||
141 | return notify_ctx; | ||
142 | } | ||
143 | |||
144 | |||
145 | /** | ||
146 | * Get the size of the sampler. | ||
147 | * | ||
148 | * @param sampler the sampler to return the size of. | ||
149 | * @return the size of the sampler | ||
150 | */ | ||
151 | unsigned int | ||
152 | RPS_sampler_get_size (struct RPS_Sampler *sampler) | ||
153 | { | ||
154 | return sampler->sampler_size; | ||
155 | } | ||
156 | |||
157 | |||
158 | /** | ||
159 | * @brief Notify about update of the sampler. | ||
160 | * | ||
161 | * Call the callbacks that are waiting for notification on updates to the | ||
162 | * sampler. | ||
163 | * | ||
164 | * @param sampler The sampler the updates are waiting for | ||
165 | */ | ||
166 | static void | ||
167 | notify_update (struct RPS_Sampler *sampler) | ||
168 | { | ||
169 | struct SamplerNotifyUpdateCTX *tmp_notify_head; | ||
170 | struct SamplerNotifyUpdateCTX *tmp_notify_tail; | ||
171 | |||
172 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
173 | "Calling callbacks waiting for update notification.\n"); | ||
174 | tmp_notify_head = sampler->notify_ctx_head; | ||
175 | tmp_notify_tail = sampler->notify_ctx_tail; | ||
176 | sampler->notify_ctx_head = NULL; | ||
177 | sampler->notify_ctx_tail = NULL; | ||
178 | for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head; | ||
179 | NULL != tmp_notify_head; | ||
180 | notify_iter = tmp_notify_head) | ||
181 | { | ||
182 | GNUNET_assert (NULL != notify_iter->notify_cb); | ||
183 | GNUNET_CONTAINER_DLL_remove (tmp_notify_head, | ||
184 | tmp_notify_tail, | ||
185 | notify_iter); | ||
186 | notify_iter->notify_cb (notify_iter->cls); | ||
187 | GNUNET_free (notify_iter); | ||
188 | } | ||
189 | } | ||
190 | |||
191 | |||
192 | /** | ||
193 | * Update every sampler element of this sampler with given peer | ||
194 | * | ||
195 | * @param sampler the sampler to update. | ||
196 | * @param id the PeerID that is put in the sampler | ||
197 | */ | ||
198 | void | ||
199 | RPS_sampler_update (struct RPS_Sampler *sampler, | ||
200 | const struct GNUNET_PeerIdentity *id) | ||
201 | { | ||
202 | to_file (sampler->file_name, | ||
203 | "Got %s", | ||
204 | GNUNET_i2s_full (id)); | ||
205 | |||
206 | for (uint32_t i = 0; i < sampler->sampler_size; i++) | ||
207 | { | ||
208 | RPS_sampler_elem_next (sampler->sampler_elements[i], | ||
209 | id); | ||
210 | } | ||
211 | notify_update (sampler); | ||
212 | } | ||
213 | |||
214 | |||
215 | /** | ||
216 | * Reinitialise all previously initialised sampler elements with the given value. | ||
217 | * | ||
218 | * Used to get rid of a PeerID. | ||
219 | * | ||
220 | * @param sampler the sampler to reinitialise a sampler element in. | ||
221 | * @param id the id of the sampler elements to update. | ||
222 | */ | ||
223 | void | ||
224 | RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, | ||
225 | const struct GNUNET_PeerIdentity *id) | ||
226 | { | ||
227 | uint32_t i; | ||
228 | |||
229 | for (i = 0; i < sampler->sampler_size; i++) | ||
230 | { | ||
231 | if (0 == GNUNET_CRYPTO_cmp_peer_identity(id, | ||
232 | &(sampler->sampler_elements[i]->peer_id)) ) | ||
233 | { | ||
234 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); | ||
235 | to_file (sampler->sampler_elements[i]->file_name, | ||
236 | "--- non-active"); | ||
237 | RPS_sampler_elem_reinit (sampler->sampler_elements[i]); | ||
238 | } | ||
239 | } | ||
240 | } | ||
241 | |||
242 | |||
243 | /** | ||
244 | * Counts how many Samplers currently hold a given PeerID. | ||
245 | * | ||
246 | * @param sampler the sampler to count ids in. | ||
247 | * @param id the PeerID to count. | ||
248 | * | ||
249 | * @return the number of occurrences of id. | ||
250 | */ | ||
251 | uint32_t | ||
252 | RPS_sampler_count_id (struct RPS_Sampler *sampler, | ||
253 | const struct GNUNET_PeerIdentity *id) | ||
254 | { | ||
255 | uint32_t count; | ||
256 | uint32_t i; | ||
257 | |||
258 | count = 0; | ||
259 | for ( i = 0 ; i < sampler->sampler_size ; i++ ) | ||
260 | { | ||
261 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) | ||
262 | && EMPTY != sampler->sampler_elements[i]->is_empty) | ||
263 | count++; | ||
264 | } | ||
265 | return count; | ||
266 | } | ||
267 | |||
268 | |||
269 | /** | ||
270 | * Grow or shrink the size of the sampler. | ||
271 | * | ||
272 | * @param sampler the sampler to resize. | ||
273 | * @param new_size the new size of the sampler | ||
274 | */ | ||
275 | static void | ||
276 | sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) | ||
277 | { | ||
278 | unsigned int old_size; | ||
279 | uint32_t i; | ||
280 | |||
281 | // TODO check min and max size | ||
282 | |||
283 | old_size = sampler->sampler_size; | ||
284 | |||
285 | if (old_size > new_size) | ||
286 | { /* Shrinking */ | ||
287 | |||
288 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
289 | "Shrinking sampler %d -> %d\n", | ||
290 | old_size, | ||
291 | new_size); | ||
292 | |||
293 | to_file (sampler->file_name, | ||
294 | "Shrinking sampler %d -> %d", | ||
295 | old_size, | ||
296 | new_size); | ||
297 | |||
298 | for (i = new_size ; i < old_size ; i++) | ||
299 | { | ||
300 | to_file (sampler->file_name, | ||
301 | "-%" PRIu32 ": %s", | ||
302 | i, | ||
303 | sampler->sampler_elements[i]->file_name); | ||
304 | RPS_sampler_elem_destroy (sampler->sampler_elements[i]); | ||
305 | } | ||
306 | |||
307 | GNUNET_array_grow (sampler->sampler_elements, | ||
308 | sampler->sampler_size, | ||
309 | new_size); | ||
310 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
311 | "sampler->sampler_elements now points to %p\n", | ||
312 | sampler->sampler_elements); | ||
313 | |||
314 | } | ||
315 | else if (old_size < new_size) | ||
316 | { /* Growing */ | ||
317 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
318 | "Growing sampler %d -> %d\n", | ||
319 | old_size, | ||
320 | new_size); | ||
321 | |||
322 | to_file (sampler->file_name, | ||
323 | "Growing sampler %d -> %d", | ||
324 | old_size, | ||
325 | new_size); | ||
326 | |||
327 | GNUNET_array_grow (sampler->sampler_elements, | ||
328 | sampler->sampler_size, | ||
329 | new_size); | ||
330 | |||
331 | for (i = old_size ; i < new_size ; i++) | ||
332 | { /* Add new sampler elements */ | ||
333 | sampler->sampler_elements[i] = RPS_sampler_elem_create (); | ||
334 | |||
335 | to_file (sampler->file_name, | ||
336 | "+%" PRIu32 ": %s", | ||
337 | i, | ||
338 | sampler->sampler_elements[i]->file_name); | ||
339 | } | ||
340 | } | ||
341 | else | ||
342 | { | ||
343 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); | ||
344 | return; | ||
345 | } | ||
346 | |||
347 | GNUNET_assert (sampler->sampler_size == new_size); | ||
348 | } | ||
349 | |||
350 | |||
351 | /** | ||
352 | * Grow or shrink the size of the sampler. | ||
353 | * | ||
354 | * @param sampler the sampler to resize. | ||
355 | * @param new_size the new size of the sampler | ||
356 | */ | ||
357 | void | ||
358 | RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) | ||
359 | { | ||
360 | GNUNET_assert (0 < new_size); | ||
361 | sampler_resize (sampler, new_size); | ||
362 | } | ||
363 | |||
364 | |||
365 | /** | ||
366 | * Empty the sampler. | ||
367 | * | ||
368 | * @param sampler the sampler to empty. | ||
369 | * @param new_size the new size of the sampler | ||
370 | */ | ||
371 | static void | ||
372 | sampler_empty (struct RPS_Sampler *sampler) | ||
373 | { | ||
374 | sampler_resize (sampler, 0); | ||
375 | } | ||
376 | |||
377 | |||
378 | /** | ||
379 | * Callback to _get_rand_peer() used by _get_n_rand_peers(). | ||
380 | * | ||
381 | * Checks whether all n peers are available. If they are, | ||
382 | * give those back. | ||
383 | */ | ||
384 | static void | ||
385 | check_n_peers_ready (void *cls, | ||
386 | const struct GNUNET_PeerIdentity *id) | ||
387 | { | ||
388 | struct RPS_SamplerRequestHandle *req_handle = cls; | ||
389 | (void) id; | ||
390 | |||
391 | req_handle->cur_num_peers++; | ||
392 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
393 | "Got %" PRIX32 ". of %" PRIX32 " peers\n", | ||
394 | req_handle->cur_num_peers, req_handle->num_peers); | ||
395 | |||
396 | if (req_handle->num_peers == req_handle->cur_num_peers) | ||
397 | { /* All peers are ready -- return those to the client */ | ||
398 | GNUNET_assert (NULL != req_handle->callback); | ||
399 | |||
400 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
401 | "returning %" PRIX32 " peers to the client\n", | ||
402 | req_handle->num_peers); | ||
403 | req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls); | ||
404 | |||
405 | RPS_sampler_request_cancel (req_handle); | ||
406 | } | ||
407 | } | ||
408 | |||
409 | |||
410 | /** | ||
411 | * Get n random peers out of the sampled peers. | ||
412 | * | ||
413 | * We might want to reinitialise this sampler after giving the | ||
414 | * corrsponding peer to the client. | ||
415 | * Random with or without consumption? | ||
416 | * | ||
417 | * @param sampler the sampler to get peers from. | ||
418 | * @param cb callback that will be called once the ids are ready. | ||
419 | * @param cls closure given to @a cb | ||
420 | * @param for_client #GNUNET_YES if result is used for client, | ||
421 | * #GNUNET_NO if used internally | ||
422 | * @param num_peers the number of peers requested | ||
423 | */ | ||
424 | struct RPS_SamplerRequestHandle * | ||
425 | RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, | ||
426 | uint32_t num_peers, | ||
427 | RPS_sampler_n_rand_peers_ready_cb cb, | ||
428 | void *cls) | ||
429 | { | ||
430 | uint32_t i; | ||
431 | struct RPS_SamplerRequestHandle *req_handle; | ||
432 | struct GetPeerCls *gpc; | ||
433 | |||
434 | GNUNET_assert (0 != sampler->sampler_size); | ||
435 | if (0 == num_peers) | ||
436 | return NULL; | ||
437 | |||
438 | // TODO check if we have too much (distinct) sampled peers | ||
439 | req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); | ||
440 | req_handle->num_peers = num_peers; | ||
441 | req_handle->cur_num_peers = 0; | ||
442 | req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); | ||
443 | req_handle->sampler = sampler; | ||
444 | req_handle->callback = cb; | ||
445 | req_handle->cls = cls; | ||
446 | GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, | ||
447 | sampler->req_handle_tail, | ||
448 | req_handle); | ||
449 | |||
450 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
451 | "Scheduling requests for %" PRIu32 " peers\n", num_peers); | ||
452 | |||
453 | for (i = 0; i < num_peers; i++) | ||
454 | { | ||
455 | gpc = GNUNET_new (struct GetPeerCls); | ||
456 | gpc->req_handle = req_handle; | ||
457 | gpc->cont = check_n_peers_ready; | ||
458 | gpc->cont_cls = req_handle; | ||
459 | gpc->id = &req_handle->ids[i]; | ||
460 | |||
461 | GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, | ||
462 | req_handle->gpc_tail, | ||
463 | gpc); | ||
464 | // maybe add a little delay | ||
465 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, | ||
466 | gpc); | ||
467 | } | ||
468 | return req_handle; | ||
469 | } | ||
470 | |||
471 | /** | ||
472 | * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. | ||
473 | * | ||
474 | * @param req_handle the handle to the request | ||
475 | */ | ||
476 | void | ||
477 | RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) | ||
478 | { | ||
479 | struct GetPeerCls *i; | ||
480 | |||
481 | while (NULL != (i = req_handle->gpc_head) ) | ||
482 | { | ||
483 | GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, | ||
484 | req_handle->gpc_tail, | ||
485 | i); | ||
486 | if (NULL != i->get_peer_task) | ||
487 | { | ||
488 | GNUNET_SCHEDULER_cancel (i->get_peer_task); | ||
489 | } | ||
490 | if (NULL != i->notify_ctx) | ||
491 | { | ||
492 | GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head, | ||
493 | req_handle->sampler->notify_ctx_tail, | ||
494 | i->notify_ctx); | ||
495 | GNUNET_free (i->notify_ctx); | ||
496 | } | ||
497 | GNUNET_free (i); | ||
498 | } | ||
499 | GNUNET_free (req_handle->ids); | ||
500 | GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, | ||
501 | req_handle->sampler->req_handle_tail, | ||
502 | req_handle); | ||
503 | GNUNET_free (req_handle); | ||
504 | } | ||
505 | |||
506 | |||
507 | /** | ||
508 | * Cleans the sampler. | ||
509 | */ | ||
510 | void | ||
511 | RPS_sampler_destroy (struct RPS_Sampler *sampler) | ||
512 | { | ||
513 | if (NULL != sampler->req_handle_head) | ||
514 | { | ||
515 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
516 | "There are still pending requests. Going to remove them.\n"); | ||
517 | while (NULL != sampler->req_handle_head) | ||
518 | { | ||
519 | RPS_sampler_request_cancel (sampler->req_handle_head); | ||
520 | } | ||
521 | } | ||
522 | sampler_empty (sampler); | ||
523 | GNUNET_free (sampler); | ||
524 | } | ||
525 | |||
526 | |||
527 | /* end of rps-sampler_common.c */ | ||