diff options
author | Bart Polot <bart@net.in.tum.de> | 2015-01-28 18:05:20 +0000 |
---|---|---|
committer | Bart Polot <bart@net.in.tum.de> | 2015-01-28 18:05:20 +0000 |
commit | 4df042b9f44904c5d8608658125f361c7cffe0ef (patch) | |
tree | b13d45238b7dd2d856f44bce665de30d59fc98e4 /src/rps/gnunet-service-rps_sampler.c | |
parent | efd795756b2106dd595a7124cb505e2c531c1380 (diff) | |
download | gnunet-4df042b9f44904c5d8608658125f361c7cffe0ef.tar.gz gnunet-4df042b9f44904c5d8608658125f361c7cffe0ef.zip |
- fix memory management of sampler tasks
Diffstat (limited to 'src/rps/gnunet-service-rps_sampler.c')
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 191 |
1 files changed, 87 insertions, 104 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 419110440..92826d875 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -32,7 +32,7 @@ | |||
32 | #include <math.h> | 32 | #include <math.h> |
33 | #include <inttypes.h> | 33 | #include <inttypes.h> |
34 | 34 | ||
35 | #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) | 35 | #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__) |
36 | 36 | ||
37 | // multiple 'clients'? | 37 | // multiple 'clients'? |
38 | 38 | ||
@@ -186,7 +186,7 @@ struct NRandPeersReadyCls | |||
186 | * @param id the PeerID that was returned | 186 | * @param id the PeerID that was returned |
187 | */ | 187 | */ |
188 | typedef void | 188 | typedef void |
189 | (*RPS_sampler_rand_peer_ready_cb) (void *cls, | 189 | (*RPS_sampler_rand_peer_ready_cont) (void *cls, |
190 | const struct GNUNET_PeerIdentity *id); | 190 | const struct GNUNET_PeerIdentity *id); |
191 | 191 | ||
192 | /** | 192 | /** |
@@ -194,6 +194,10 @@ typedef void | |||
194 | */ | 194 | */ |
195 | struct GetPeerCls | 195 | struct GetPeerCls |
196 | { | 196 | { |
197 | /** DLL */ | ||
198 | struct GetPeerCls *next; | ||
199 | struct GetPeerCls *prev; | ||
200 | |||
197 | /** | 201 | /** |
198 | * The task for this function. | 202 | * The task for this function. |
199 | */ | 203 | */ |
@@ -202,12 +206,12 @@ struct GetPeerCls | |||
202 | /** | 206 | /** |
203 | * The callback | 207 | * The callback |
204 | */ | 208 | */ |
205 | RPS_sampler_rand_peer_ready_cb cb; | 209 | RPS_sampler_rand_peer_ready_cont cont; |
206 | 210 | ||
207 | /** | 211 | /** |
208 | * The closure to the callback | 212 | * The closure to the callback |
209 | */ | 213 | */ |
210 | void *cb_cls; | 214 | void *cont_cls; |
211 | 215 | ||
212 | /** | 216 | /** |
213 | * The address of the id to be stored at | 217 | * The address of the id to be stored at |
@@ -215,11 +219,6 @@ struct GetPeerCls | |||
215 | struct GNUNET_PeerIdentity *id; | 219 | struct GNUNET_PeerIdentity *id; |
216 | }; | 220 | }; |
217 | 221 | ||
218 | /** | ||
219 | * Multihashmap that keeps track of all get_peer_tasks that are still scheduled. | ||
220 | */ | ||
221 | struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks; | ||
222 | |||
223 | 222 | ||
224 | /** | 223 | /** |
225 | * Global sampler variable. | 224 | * Global sampler variable. |
@@ -248,6 +247,11 @@ static size_t max_size; | |||
248 | static uint32_t client_get_index; | 247 | static uint32_t client_get_index; |
249 | 248 | ||
250 | 249 | ||
250 | /** FIXME document */ | ||
251 | struct GetPeerCls *gpc_head; | ||
252 | struct GetPeerCls *gpc_tail; | ||
253 | |||
254 | |||
251 | /** | 255 | /** |
252 | * Callback to _get_rand_peer() used by _get_n_rand_peers(). | 256 | * Callback to _get_rand_peer() used by _get_n_rand_peers(). |
253 | * | 257 | * |
@@ -262,16 +266,17 @@ check_n_peers_ready (void *cls, | |||
262 | 266 | ||
263 | n_peers_cls = (struct NRandPeersReadyCls *) cls; | 267 | n_peers_cls = (struct NRandPeersReadyCls *) cls; |
264 | 268 | ||
269 | n_peers_cls->cur_num_peers++; | ||
265 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 270 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
266 | "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n", | 271 | "Got %" PRIX32 ". of %" PRIX32 " peers\n", |
267 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); | 272 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); |
268 | 273 | ||
269 | if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers) | 274 | if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) |
270 | { /* All peers are ready -- return those to the client */ | 275 | { /* All peers are ready -- return those to the client */ |
271 | GNUNET_assert (NULL != n_peers_cls->callback); | 276 | GNUNET_assert (NULL != n_peers_cls->callback); |
272 | 277 | ||
273 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 278 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
274 | "SAMPLER: returning %" PRIX32 " peers to the client\n", | 279 | "returning %" PRIX32 " peers to the client\n", |
275 | n_peers_cls->num_peers); | 280 | n_peers_cls->num_peers); |
276 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); | 281 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); |
277 | 282 | ||
@@ -319,7 +324,7 @@ RPS_sampler_elem_create (void) | |||
319 | s = GNUNET_new (struct RPS_SamplerElement); | 324 | s = GNUNET_new (struct RPS_SamplerElement); |
320 | 325 | ||
321 | RPS_sampler_elem_reinit (s); | 326 | RPS_sampler_elem_reinit (s); |
322 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n"); | 327 | LOG (GNUNET_ERROR_TYPE_DEBUG, "initialised with empty PeerID\n"); |
323 | 328 | ||
324 | return s; | 329 | return s; |
325 | } | 330 | } |
@@ -339,9 +344,9 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
339 | 344 | ||
340 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) ) | 345 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) ) |
341 | { | 346 | { |
342 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 347 | LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", |
343 | GNUNET_i2s (other)); | 348 | GNUNET_i2s (other)); |
344 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n", | 349 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Have already PeerID %s\n", |
345 | GNUNET_i2s (&(s_elem->peer_id))); | 350 | GNUNET_i2s (&(s_elem->peer_id))); |
346 | } | 351 | } |
347 | else | 352 | else |
@@ -353,7 +358,7 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
353 | 358 | ||
354 | if ( EMPTY == s_elem->is_empty ) | 359 | if ( EMPTY == s_elem->is_empty ) |
355 | { | 360 | { |
356 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n", | 361 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerID %s; Simply accepting (was empty previously).\n", |
357 | GNUNET_i2s(other)); | 362 | GNUNET_i2s(other)); |
358 | s_elem->peer_id = *other; | 363 | s_elem->peer_id = *other; |
359 | s_elem->peer_id_hash = other_hash; | 364 | s_elem->peer_id_hash = other_hash; |
@@ -365,14 +370,14 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
365 | } | 370 | } |
366 | else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) ) | 371 | else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) ) |
367 | { | 372 | { |
368 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 373 | LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", |
369 | GNUNET_i2s (other)); | 374 | GNUNET_i2s (other)); |
370 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", | 375 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n", |
371 | GNUNET_i2s (&s_elem->peer_id)); | 376 | GNUNET_i2s (&s_elem->peer_id)); |
372 | 377 | ||
373 | if ( NULL != sampler->remove_cb ) | 378 | if ( NULL != sampler->remove_cb ) |
374 | { | 379 | { |
375 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n", | 380 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n", |
376 | GNUNET_i2s (&s_elem->peer_id)); | 381 | GNUNET_i2s (&s_elem->peer_id)); |
377 | sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id); | 382 | sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id); |
378 | } | 383 | } |
@@ -382,7 +387,7 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
382 | 387 | ||
383 | if ( NULL != sampler->insert_cb ) | 388 | if ( NULL != sampler->insert_cb ) |
384 | { | 389 | { |
385 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n", | 390 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n", |
386 | GNUNET_i2s (&s_elem->peer_id)); | 391 | GNUNET_i2s (&s_elem->peer_id)); |
387 | sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); | 392 | sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); |
388 | } | 393 | } |
@@ -391,9 +396,9 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
391 | } | 396 | } |
392 | else | 397 | else |
393 | { | 398 | { |
394 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 399 | LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", |
395 | GNUNET_i2s(other)); | 400 | GNUNET_i2s(other)); |
396 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n", | 401 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n", |
397 | GNUNET_i2s(&s_elem->peer_id)); | 402 | GNUNET_i2s(&s_elem->peer_id)); |
398 | } | 403 | } |
399 | } | 404 | } |
@@ -436,15 +441,15 @@ sampler_resize (unsigned int new_size) | |||
436 | &sampler->sampler_elements[new_size], | 441 | &sampler->sampler_elements[new_size], |
437 | (old_size - new_size) * sizeof (struct RPS_SamplerElement *)); | 442 | (old_size - new_size) * sizeof (struct RPS_SamplerElement *)); |
438 | 443 | ||
439 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size); | 444 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size); |
440 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); | 445 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); |
441 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 446 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
442 | "SAMPLER: sampler->sampler_elements now points to %p\n", | 447 | "sampler->sampler_elements now points to %p\n", |
443 | sampler->sampler_elements); | 448 | sampler->sampler_elements); |
444 | 449 | ||
445 | for (i = 0 ; i < old_size - new_size ; i++) | 450 | for (i = 0 ; i < old_size - new_size ; i++) |
446 | {/* Remove unneeded rest */ | 451 | {/* Remove unneeded rest */ |
447 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX32 ". sampler\n", i); | 452 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i); |
448 | if (NULL != sampler->remove_cb) | 453 | if (NULL != sampler->remove_cb) |
449 | sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id); | 454 | sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id); |
450 | GNUNET_free (rem_list[i]); | 455 | GNUNET_free (rem_list[i]); |
@@ -453,10 +458,10 @@ sampler_resize (unsigned int new_size) | |||
453 | } | 458 | } |
454 | else if (old_size < new_size) | 459 | else if (old_size < new_size) |
455 | { /* Growing */ | 460 | { /* Growing */ |
456 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size); | 461 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Growing sampler %d -> %d\n", old_size, new_size); |
457 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); | 462 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); |
458 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 463 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
459 | "SAMPLER: sampler->sampler_elements now points to %p\n", | 464 | "sampler->sampler_elements now points to %p\n", |
460 | sampler->sampler_elements); | 465 | sampler->sampler_elements); |
461 | 466 | ||
462 | for ( i = old_size ; i < new_size ; i++ ) | 467 | for ( i = old_size ; i < new_size ; i++ ) |
@@ -465,18 +470,18 @@ sampler_resize (unsigned int new_size) | |||
465 | if (NULL != sampler->insert_cb) | 470 | if (NULL != sampler->insert_cb) |
466 | sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id); | 471 | sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id); |
467 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 472 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
468 | "SAMPLER: Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n", | 473 | "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n", |
469 | i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id)); | 474 | i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id)); |
470 | } | 475 | } |
471 | } | 476 | } |
472 | else | 477 | else |
473 | { | 478 | { |
474 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n"); | 479 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); |
475 | return; | 480 | return; |
476 | } | 481 | } |
477 | 482 | ||
478 | GNUNET_assert (sampler->sampler_size == new_size); | 483 | GNUNET_assert (sampler->sampler_size == new_size); |
479 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove | 484 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); // remove |
480 | } | 485 | } |
481 | 486 | ||
482 | 487 | ||
@@ -498,8 +503,8 @@ RPS_sampler_resize (unsigned int new_size) | |||
498 | * | 503 | * |
499 | * @param new_size the new size of the sampler | 504 | * @param new_size the new size of the sampler |
500 | */ | 505 | */ |
501 | void | 506 | static void |
502 | RPS_sampler_empty () | 507 | sampler_empty () |
503 | { | 508 | { |
504 | sampler_resize (0); | 509 | sampler_resize (0); |
505 | } | 510 | } |
@@ -537,7 +542,6 @@ RPS_sampler_init (size_t init_size, | |||
537 | sampler->insert_cls = ins_cls; | 542 | sampler->insert_cls = ins_cls; |
538 | sampler->remove_cb = rem_cb; | 543 | sampler->remove_cb = rem_cb; |
539 | sampler->remove_cls = rem_cls; | 544 | sampler->remove_cls = rem_cls; |
540 | get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | ||
541 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); | 545 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); |
542 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); | 546 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); |
543 | RPS_sampler_resize (init_size); | 547 | RPS_sampler_resize (init_size); |
@@ -581,7 +585,7 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) | |||
581 | { | 585 | { |
582 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) ) | 586 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) ) |
583 | { | 587 | { |
584 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n"); | 588 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); |
585 | RPS_sampler_elem_reinit (sampler->sampler_elements[i]); | 589 | RPS_sampler_elem_reinit (sampler->sampler_elements[i]); |
586 | } | 590 | } |
587 | } | 591 | } |
@@ -595,14 +599,16 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) | |||
595 | * corrsponding peer to the client. | 599 | * corrsponding peer to the client. |
596 | * Only used internally | 600 | * Only used internally |
597 | */ | 601 | */ |
598 | void | 602 | static void |
599 | RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 603 | sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
600 | { | 604 | { |
601 | struct GetPeerCls *gpc; | 605 | struct GetPeerCls *gpc = (struct GetPeerCls *) cls; |
602 | uint32_t r_index; | 606 | uint32_t r_index; |
603 | struct GNUNET_HashCode *hash; | ||
604 | 607 | ||
605 | gpc = (struct GetPeerCls *) cls; | 608 | gpc->get_peer_task = NULL; |
609 | GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc); | ||
610 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
611 | return; | ||
606 | 612 | ||
607 | /**; | 613 | /**; |
608 | * Choose the r_index of the peer we want to return | 614 | * Choose the r_index of the peer we want to return |
@@ -616,20 +622,15 @@ RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
616 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( | 622 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( |
617 | GNUNET_TIME_UNIT_SECONDS, | 623 | GNUNET_TIME_UNIT_SECONDS, |
618 | .1), | 624 | .1), |
619 | &RPS_sampler_get_rand_peer_, | 625 | &sampler_get_rand_peer2, |
620 | cls); | 626 | cls); |
621 | return; | 627 | return; |
622 | } | 628 | } |
623 | 629 | ||
624 | *gpc->id = sampler->sampler_elements[r_index]->peer_id; | 630 | *gpc->id = sampler->sampler_elements[r_index]->peer_id; |
625 | 631 | ||
626 | hash = GNUNET_new (struct GNUNET_HashCode); | 632 | gpc->cont (gpc->cont_cls, gpc->id); |
627 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | 633 | GNUNET_free (gpc); |
628 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task)) | ||
629 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n"); | ||
630 | GNUNET_free (gpc->get_peer_task); | ||
631 | |||
632 | gpc->cb (gpc->cb_cls, gpc->id); | ||
633 | } | 634 | } |
634 | 635 | ||
635 | 636 | ||
@@ -641,20 +642,22 @@ RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
641 | * | 642 | * |
642 | * @return a random PeerID of the PeerIDs previously put into the sampler. | 643 | * @return a random PeerID of the PeerIDs previously put into the sampler. |
643 | */ | 644 | */ |
644 | void | 645 | static void |
645 | RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 646 | sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
646 | { | 647 | { |
647 | struct GetPeerCls *gpc; | 648 | struct GetPeerCls *gpc = (struct GetPeerCls *) cls; |
648 | struct GNUNET_PeerIdentity tmp_id; | 649 | struct GNUNET_PeerIdentity tmp_id; |
649 | struct RPS_SamplerElement *s_elem; | 650 | struct RPS_SamplerElement *s_elem; |
650 | struct GNUNET_TIME_Relative last_request_diff; | 651 | struct GNUNET_TIME_Relative last_request_diff; |
651 | struct GNUNET_HashCode *hash; | ||
652 | uint32_t tmp_client_get_index; | 652 | uint32_t tmp_client_get_index; |
653 | 653 | ||
654 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n"); | 654 | gpc->get_peer_task = NULL; |
655 | GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc); | ||
656 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
657 | return; | ||
658 | |||
659 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); | ||
655 | 660 | ||
656 | gpc = (struct GetPeerCls *) cls; | ||
657 | hash = GNUNET_new (struct GNUNET_HashCode); | ||
658 | 661 | ||
659 | /* Store the next #client_get_index to check whether we cycled over the whole list */ | 662 | /* Store the next #client_get_index to check whether we cycled over the whole list */ |
660 | if (0 < client_get_index) | 663 | if (0 < client_get_index) |
@@ -663,16 +666,17 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
663 | tmp_client_get_index = sampler->sampler_size - 1; | 666 | tmp_client_get_index = sampler->sampler_size - 1; |
664 | 667 | ||
665 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 668 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
666 | "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n", | 669 | "scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n", |
667 | tmp_client_get_index, sampler->sampler_size); | 670 | tmp_client_get_index, sampler->sampler_size); |
668 | 671 | ||
669 | do | 672 | do |
670 | { /* Get first non empty sampler */ | 673 | { /* Get first non empty sampler */ |
671 | if (tmp_client_get_index == client_get_index) | 674 | if (tmp_client_get_index == client_get_index) |
672 | { | 675 | { |
673 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: reached tmp_index %" PRIX32 ".\n", client_get_index); | 676 | LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n", client_get_index); |
677 | GNUNET_assert (NULL == gpc->get_peer_task); | ||
674 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, | 678 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, |
675 | &RPS_sampler_get_rand_peer, | 679 | &sampler_get_rand_peer, |
676 | cls); | 680 | cls); |
677 | return; | 681 | return; |
678 | } | 682 | } |
@@ -688,7 +692,7 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
688 | else | 692 | else |
689 | client_get_index++; | 693 | client_get_index++; |
690 | 694 | ||
691 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index); | 695 | LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n", client_get_index); |
692 | } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); | 696 | } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); |
693 | 697 | ||
694 | s_elem = sampler->sampler_elements[client_get_index]; | 698 | s_elem = sampler->sampler_elements[client_get_index]; |
@@ -703,28 +707,25 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
703 | if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) | 707 | if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) |
704 | { | 708 | { |
705 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 709 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
706 | "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); | 710 | "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); |
707 | ///* How many time remains untile the next round has started? */ | 711 | ///* How many time remains untile the next round has started? */ |
708 | //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff, | 712 | //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff, |
709 | // sampler->max_round_interval); | 713 | // sampler->max_round_interval); |
710 | // add a little delay | 714 | // add a little delay |
711 | /* Schedule it one round later */ | 715 | /* Schedule it one round later */ |
716 | GNUNET_assert (NULL == gpc->get_peer_task); | ||
712 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, | 717 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, |
713 | &RPS_sampler_get_rand_peer, | 718 | &sampler_get_rand_peer, |
714 | cls); | 719 | cls); |
715 | return; | 720 | return; |
716 | } | 721 | } |
717 | // TODO add other reasons to wait here | 722 | // TODO add other reasons to wait here |
718 | } | 723 | } |
719 | 724 | ||
720 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | ||
721 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task)) | ||
722 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n"); | ||
723 | GNUNET_free (gpc->get_peer_task); | ||
724 | |||
725 | s_elem->last_client_request = GNUNET_TIME_absolute_get (); | 725 | s_elem->last_client_request = GNUNET_TIME_absolute_get (); |
726 | 726 | ||
727 | gpc->cb (gpc->cb_cls, gpc->id); | 727 | gpc->cont (gpc->cont_cls, gpc->id); |
728 | GNUNET_free (gpc); | ||
728 | } | 729 | } |
729 | 730 | ||
730 | 731 | ||
@@ -743,19 +744,14 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
743 | */ | 744 | */ |
744 | void | 745 | void |
745 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | 746 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
746 | void *cls, uint32_t num_peers, int for_client) | 747 | void *cls, uint32_t num_peers, int for_client) |
747 | { | 748 | { |
748 | GNUNET_assert (GNUNET_YES == for_client || | ||
749 | GNUNET_NO == for_client); | ||
750 | GNUNET_assert (0 != sampler->sampler_size); | 749 | GNUNET_assert (0 != sampler->sampler_size); |
751 | 750 | ||
752 | // TODO check if we have too much (distinct) sampled peers | 751 | // TODO check if we have too much (distinct) sampled peers |
753 | uint32_t i; | 752 | uint32_t i; |
754 | struct NRandPeersReadyCls *cb_cls; | 753 | struct NRandPeersReadyCls *cb_cls; |
755 | struct GetPeerCls *gpc; | 754 | struct GetPeerCls *gpc; |
756 | struct GNUNET_HashCode *hash; | ||
757 | |||
758 | hash = GNUNET_new (struct GNUNET_HashCode); | ||
759 | 755 | ||
760 | cb_cls = GNUNET_new (struct NRandPeersReadyCls); | 756 | cb_cls = GNUNET_new (struct NRandPeersReadyCls); |
761 | cb_cls->num_peers = num_peers; | 757 | cb_cls->num_peers = num_peers; |
@@ -765,23 +761,24 @@ RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | |||
765 | cb_cls->cls = cls; | 761 | cb_cls->cls = cls; |
766 | 762 | ||
767 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 763 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
768 | "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers); | 764 | "Scheduling requests for %" PRIX32 " peers\n", num_peers); |
769 | 765 | ||
770 | for ( i = 0 ; i < num_peers ; i++ ) | 766 | for ( i = 0 ; i < num_peers ; i++ ) |
771 | { | 767 | { |
772 | gpc = GNUNET_new (struct GetPeerCls); | 768 | gpc = GNUNET_new (struct GetPeerCls); |
773 | gpc->cb = check_n_peers_ready; | 769 | gpc->cont = check_n_peers_ready; |
774 | gpc->cb_cls = cb_cls; | 770 | gpc->cont_cls = cb_cls; |
775 | gpc->id = &cb_cls->ids[i]; | 771 | gpc->id = &cb_cls->ids[i]; |
776 | 772 | ||
777 | // maybe add a little delay | 773 | // maybe add a little delay |
778 | if (GNUNET_YES == for_client) | 774 | if (GNUNET_YES == for_client) |
779 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc); | 775 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer, gpc); |
780 | else if (GNUNET_NO == for_client) | 776 | else if (GNUNET_NO == for_client) |
781 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc); | 777 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer2, gpc); |
782 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | 778 | else |
783 | (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task, | 779 | GNUNET_abort (); |
784 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 780 | |
781 | GNUNET_CONTAINER_DLL_insert (gpc_head, gpc_tail, gpc); | ||
785 | } | 782 | } |
786 | } | 783 | } |
787 | 784 | ||
@@ -811,35 +808,21 @@ RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id) | |||
811 | 808 | ||
812 | 809 | ||
813 | /** | 810 | /** |
814 | * Callback to iterate over the hashmap to cancle the get_peer_tasks. | ||
815 | */ | ||
816 | int | ||
817 | clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
818 | { | ||
819 | struct GNUNET_SCHEDULER_Task *task; | ||
820 | |||
821 | task = (struct GNUNET_SCHEDULER_Task *) value; | ||
822 | GNUNET_SCHEDULER_cancel (task); | ||
823 | |||
824 | GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value); | ||
825 | |||
826 | return GNUNET_YES; | ||
827 | } | ||
828 | |||
829 | |||
830 | /** | ||
831 | * Cleans the sampler. | 811 | * Cleans the sampler. |
832 | */ | 812 | */ |
833 | void | 813 | void |
834 | RPS_sampler_destroy () | 814 | RPS_sampler_destroy () |
835 | { | 815 | { |
836 | if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks, | 816 | struct GetPeerCls *i; |
837 | clear_get_peer_tasks, | 817 | |
838 | NULL)) | 818 | for (i = gpc_head; NULL != i; i = gpc_head) |
839 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n"); | 819 | { |
840 | GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks); | 820 | GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, i); |
841 | RPS_sampler_resize (0); | 821 | GNUNET_SCHEDULER_cancel (i->get_peer_task); |
842 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0); | 822 | GNUNET_free (i); |
823 | } | ||
824 | |||
825 | sampler_empty (); | ||
843 | } | 826 | } |
844 | 827 | ||
845 | /* end of gnunet-service-rps.c */ | 828 | /* end of gnunet-service-rps.c */ |