diff options
author | Julius Bünger <buenger@mytum.de> | 2015-01-18 03:19:02 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-01-18 03:19:02 +0000 |
commit | 42efb9525e1ed4a389b9bbd2c2ef9900e6f7f7a6 (patch) | |
tree | 52ebdf2bf7eca36f93ca825bf566619522bb7332 /src/rps/gnunet-service-rps_sampler.c | |
parent | 71480adbd306495b43d8198ff8c7acbefa632ae4 (diff) | |
download | gnunet-42efb9525e1ed4a389b9bbd2c2ef9900e6f7f7a6.tar.gz gnunet-42efb9525e1ed4a389b9bbd2c2ef9900e6f7f7a6.zip |
schedule some requests for later
Diffstat (limited to 'src/rps/gnunet-service-rps_sampler.c')
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 249 |
1 files changed, 203 insertions, 46 deletions
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index f1f7f041b..0d131e198 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -77,15 +77,31 @@ struct RPS_SamplerElement | |||
77 | */ | 77 | */ |
78 | struct GNUNET_HashCode peer_id_hash; | 78 | struct GNUNET_HashCode peer_id_hash; |
79 | 79 | ||
80 | |||
80 | /** | 81 | /** |
81 | * Time of last request. | 82 | * Time of last request. |
82 | */ | 83 | */ |
83 | struct GNUNET_TIME_Absolute last_request; | 84 | struct GNUNET_TIME_Absolute last_client_request; |
84 | 85 | ||
85 | /** | 86 | /** |
86 | * Flag that indicates that we are not holding a valid PeerID right now. | 87 | * Flag that indicates that we are not holding a valid PeerID right now. |
87 | */ | 88 | */ |
88 | enum RPS_SamplerEmpty is_empty; | 89 | enum RPS_SamplerEmpty is_empty; |
90 | |||
91 | /** | ||
92 | * 'Birth' | ||
93 | */ | ||
94 | struct GNUNET_TIME_Absolute birth; | ||
95 | |||
96 | /** | ||
97 | * How many times a PeerID was put in this sampler. | ||
98 | */ | ||
99 | uint32_t num_peers; | ||
100 | |||
101 | /** | ||
102 | * How many times this sampler changed the peer_id. | ||
103 | */ | ||
104 | uint32_t num_change; | ||
89 | }; | 105 | }; |
90 | 106 | ||
91 | /** | 107 | /** |
@@ -112,6 +128,13 @@ struct RPS_Sampler | |||
112 | uint64_t sampler_elem_index; | 128 | uint64_t sampler_elem_index; |
113 | 129 | ||
114 | /** | 130 | /** |
131 | * Max time a round takes | ||
132 | * | ||
133 | * Used in the context of RPS | ||
134 | */ | ||
135 | struct GNUNET_TIME_Relative max_round_interval; | ||
136 | |||
137 | /** | ||
115 | * Callback to be called when a peer gets inserted into a sampler. | 138 | * Callback to be called when a peer gets inserted into a sampler. |
116 | */ | 139 | */ |
117 | RPS_sampler_insert_cb insert_cb; | 140 | RPS_sampler_insert_cb insert_cb; |
@@ -174,6 +197,38 @@ typedef void | |||
174 | const struct GNUNET_PeerIdentity *id); | 197 | const struct GNUNET_PeerIdentity *id); |
175 | 198 | ||
176 | /** | 199 | /** |
200 | * Closure to #RPS_sampler_get_rand_peer() | ||
201 | */ | ||
202 | struct GetPeerCls | ||
203 | { | ||
204 | /** | ||
205 | * The task for this function. | ||
206 | */ | ||
207 | struct GNUNET_SCHEDULER_Task *get_peer_task; | ||
208 | |||
209 | /** | ||
210 | * The callback | ||
211 | */ | ||
212 | RPS_sampler_rand_peer_ready_cb cb; | ||
213 | |||
214 | /** | ||
215 | * The closure to the callback | ||
216 | */ | ||
217 | void *cb_cls; | ||
218 | |||
219 | /** | ||
220 | * The address of the id to be stored at | ||
221 | */ | ||
222 | struct GNUNET_PeerIdentity *id; | ||
223 | }; | ||
224 | |||
225 | /** | ||
226 | * Multihashmap that keeps track of all get_peer_tasks that are still scheduled. | ||
227 | */ | ||
228 | struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks; | ||
229 | |||
230 | |||
231 | /** | ||
177 | * Global sampler variable. | 232 | * Global sampler variable. |
178 | */ | 233 | */ |
179 | struct RPS_Sampler *sampler; | 234 | struct RPS_Sampler *sampler; |
@@ -214,10 +269,17 @@ RPS_sampler_get_n_rand_peers_ready_cb (void *cls, | |||
214 | 269 | ||
215 | n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls; | 270 | n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls; |
216 | 271 | ||
272 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
273 | "SAMPLER: Got %" PRIX64 "th of %" PRIX64 " peers\n", | ||
274 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); | ||
275 | |||
217 | if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) | 276 | if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) |
218 | { | 277 | { /* All peers are ready -- return those to the client */ |
219 | GNUNET_assert (NULL != n_peers_cls->callback); | 278 | GNUNET_assert (NULL != n_peers_cls->callback); |
220 | 279 | ||
280 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
281 | "SAMPLER: returning %" PRIX64 " peers to the client\n", | ||
282 | n_peers_cls->num_peers); | ||
221 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); | 283 | n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); |
222 | 284 | ||
223 | GNUNET_free (n_peers_cls); | 285 | GNUNET_free (n_peers_cls); |
@@ -240,13 +302,17 @@ RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el) | |||
240 | &(sampler_el->auth_key.key), | 302 | &(sampler_el->auth_key.key), |
241 | GNUNET_CRYPTO_HASH_LENGTH); | 303 | GNUNET_CRYPTO_HASH_LENGTH); |
242 | 304 | ||
243 | sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS; | 305 | sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS; |
244 | 306 | ||
245 | /* We might want to keep the previous peer */ | 307 | /* We might want to keep the previous peer */ |
246 | 308 | ||
247 | //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id, | 309 | //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id, |
248 | // sizeof(struct GNUNET_PeerIdentity), | 310 | // sizeof(struct GNUNET_PeerIdentity), |
249 | // &sampler_el->peer_id_hash); | 311 | // &sampler_el->peer_id_hash); |
312 | |||
313 | sampler_el->birth = GNUNET_TIME_absolute_get (); | ||
314 | sampler_el->num_peers = 0; | ||
315 | sampler_el->num_change = 0; | ||
250 | } | 316 | } |
251 | 317 | ||
252 | 318 | ||
@@ -282,12 +348,14 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
282 | { | 348 | { |
283 | struct GNUNET_HashCode other_hash; | 349 | struct GNUNET_HashCode other_hash; |
284 | 350 | ||
285 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) ) | 351 | s_elem->num_peers++; |
352 | |||
353 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) ) | ||
286 | { | 354 | { |
287 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 355 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", |
288 | GNUNET_i2s(other)); | 356 | GNUNET_i2s (other)); |
289 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n", | 357 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n", |
290 | GNUNET_i2s(&(s_elem->peer_id))); | 358 | GNUNET_i2s (&(s_elem->peer_id))); |
291 | } | 359 | } |
292 | else | 360 | else |
293 | { | 361 | { |
@@ -297,48 +365,48 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_Pe | |||
297 | &other_hash); | 365 | &other_hash); |
298 | 366 | ||
299 | if ( EMPTY == s_elem->is_empty ) | 367 | if ( EMPTY == s_elem->is_empty ) |
300 | { // Or whatever is a valid way to say | 368 | { |
301 | // "we have no PeerID at the moment" | 369 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n", |
302 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n", | ||
303 | GNUNET_i2s(other)); | 370 | GNUNET_i2s(other)); |
304 | s_elem->peer_id = *other; | 371 | s_elem->peer_id = *other; |
305 | //s_elem->peer_id = other; | ||
306 | s_elem->peer_id_hash = other_hash; | 372 | s_elem->peer_id_hash = other_hash; |
373 | |||
307 | if (NULL != sampler->insert_cb) | 374 | if (NULL != sampler->insert_cb) |
308 | { | 375 | sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id)); |
309 | sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id)); | 376 | |
310 | } | 377 | s_elem->num_change++; |
311 | } | 378 | } |
312 | else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) ) | 379 | else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) ) |
313 | { | 380 | { |
314 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 381 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", |
315 | GNUNET_i2s(other)); | 382 | GNUNET_i2s (other)); |
316 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", | 383 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", |
317 | GNUNET_i2s(&s_elem->peer_id)); | 384 | GNUNET_i2s (&s_elem->peer_id)); |
318 | 385 | ||
319 | if ( NULL != sampler->remove_cb ) | 386 | if ( NULL != sampler->remove_cb ) |
320 | { | 387 | { |
321 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n", | 388 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n", |
322 | GNUNET_i2s(&s_elem->peer_id)); | 389 | GNUNET_i2s (&s_elem->peer_id)); |
323 | sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id); | 390 | sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id); |
324 | } | 391 | } |
325 | 392 | ||
326 | memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); | 393 | s_elem->peer_id = *other; |
327 | //s_elem->peer_id = other; | ||
328 | s_elem->peer_id_hash = other_hash; | 394 | s_elem->peer_id_hash = other_hash; |
329 | 395 | ||
330 | if ( NULL != sampler->insert_cb ) | 396 | if ( NULL != sampler->insert_cb ) |
331 | { | 397 | { |
332 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n", | 398 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n", |
333 | GNUNET_i2s(&s_elem->peer_id)); | 399 | GNUNET_i2s (&s_elem->peer_id)); |
334 | sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); | 400 | sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); |
335 | } | 401 | } |
402 | |||
403 | s_elem->num_change++; | ||
336 | } | 404 | } |
337 | else | 405 | else |
338 | { | 406 | { |
339 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", | 407 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", |
340 | GNUNET_i2s(other)); | 408 | GNUNET_i2s(other)); |
341 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n", | 409 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n", |
342 | GNUNET_i2s(&s_elem->peer_id)); | 410 | GNUNET_i2s(&s_elem->peer_id)); |
343 | } | 411 | } |
344 | } | 412 | } |
@@ -410,7 +478,7 @@ RPS_sampler_resize (unsigned int new_size) | |||
410 | } | 478 | } |
411 | 479 | ||
412 | GNUNET_assert(sampler->sampler_size == new_size); | 480 | GNUNET_assert(sampler->sampler_size == new_size); |
413 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove | 481 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove |
414 | } | 482 | } |
415 | 483 | ||
416 | 484 | ||
@@ -427,7 +495,9 @@ RPS_sampler_resize (unsigned int new_size) | |||
427 | * @param rem_cls the closure given to #rem_cb | 495 | * @param rem_cls the closure given to #rem_cb |
428 | */ | 496 | */ |
429 | void | 497 | void |
430 | RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id, | 498 | RPS_sampler_init (size_t init_size, |
499 | const struct GNUNET_PeerIdentity *id, | ||
500 | struct GNUNET_TIME_Relative max_round_interval, | ||
431 | RPS_sampler_insert_cb ins_cb, void *ins_cls, | 501 | RPS_sampler_insert_cb ins_cb, void *ins_cls, |
432 | RPS_sampler_remove_cb rem_cb, void *rem_cls) | 502 | RPS_sampler_remove_cb rem_cb, void *rem_cls) |
433 | { | 503 | { |
@@ -441,10 +511,12 @@ RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id, | |||
441 | sampler = GNUNET_new (struct RPS_Sampler); | 511 | sampler = GNUNET_new (struct RPS_Sampler); |
442 | sampler->sampler_size = 0; | 512 | sampler->sampler_size = 0; |
443 | sampler->sampler_elements = NULL; | 513 | sampler->sampler_elements = NULL; |
514 | sampler->max_round_interval = max_round_interval; | ||
444 | sampler->insert_cb = ins_cb; | 515 | sampler->insert_cb = ins_cb; |
445 | sampler->insert_cls = ins_cls; | 516 | sampler->insert_cls = ins_cls; |
446 | sampler->remove_cb = rem_cb; | 517 | sampler->remove_cb = rem_cb; |
447 | sampler->remove_cls = rem_cls; | 518 | sampler->remove_cls = rem_cls; |
519 | get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); | ||
448 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); | 520 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); |
449 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); | 521 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); |
450 | RPS_sampler_resize (init_size); | 522 | RPS_sampler_resize (init_size); |
@@ -489,7 +561,7 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) | |||
489 | { | 561 | { |
490 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) ) | 562 | if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) ) |
491 | { | 563 | { |
492 | LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n"); | 564 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n"); |
493 | RPS_sampler_elem_reinit (sampler->sampler_elements[i]); | 565 | RPS_sampler_elem_reinit (sampler->sampler_elements[i]); |
494 | } | 566 | } |
495 | } | 567 | } |
@@ -523,8 +595,8 @@ RPS_sampler_get_rand_peer_ () | |||
523 | // peer = NULL; | 595 | // peer = NULL; |
524 | //else | 596 | //else |
525 | peer = &(sampler->sampler_elements[r_index]->peer_id); | 597 | peer = &(sampler->sampler_elements[r_index]->peer_id); |
526 | sampler->sampler_elements[r_index]->last_request = GNUNET_TIME_absolute_get(); | 598 | //sampler->sampler_elements[r_index]->last_client_request = GNUNET_TIME_absolute_get(); |
527 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer)); | 599 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer)); |
528 | 600 | ||
529 | return peer; | 601 | return peer; |
530 | } | 602 | } |
@@ -574,22 +646,66 @@ RPS_sampler_get_n_rand_peers_ (uint64_t n) | |||
574 | * @return a random PeerID of the PeerIDs previously put into the sampler. | 646 | * @return a random PeerID of the PeerIDs previously put into the sampler. |
575 | */ | 647 | */ |
576 | void | 648 | void |
577 | RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, | 649 | //RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, |
578 | void *cls, struct GNUNET_PeerIdentity *id) | 650 | // void *cls, struct GNUNET_PeerIdentity *id) |
651 | RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
579 | { | 652 | { |
653 | struct GetPeerCls *gpc; | ||
654 | struct RPS_SamplerElement *s_elem; | ||
655 | struct GNUNET_TIME_Relative last_request_diff; | ||
656 | struct GNUNET_HashCode *hash; | ||
657 | //struct GNUNET_TIME_Relative inv_last_request_diff; | ||
658 | |||
659 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n"); | ||
660 | |||
661 | gpc = (struct GetPeerCls *) cls; | ||
662 | hash = GNUNET_new (struct GNUNET_HashCode); | ||
663 | |||
580 | do | 664 | do |
581 | { | 665 | { /* Get first non empty sampler */ |
582 | // TODO check if we can actually return that now - otherwise wait | 666 | // TODO schedule for later if all samplers are empty |
583 | *id = sampler->sampler_elements[client_get_index]->peer_id; | 667 | *gpc->id = sampler->sampler_elements[client_get_index]->peer_id; |
584 | 668 | ||
585 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); | 669 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); |
586 | if ( client_get_index == sampler->sampler_size ) | 670 | if ( client_get_index == sampler->sampler_size ) |
587 | client_get_index = 0; | 671 | client_get_index = 0; |
588 | else | 672 | else |
589 | client_get_index++; | 673 | client_get_index++; |
590 | } while (NOT_EMPTY == sampler->sampler_elements[client_get_index]->is_empty); | 674 | } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); |
675 | |||
676 | s_elem = sampler->sampler_elements[client_get_index]; | ||
677 | |||
678 | /* Check whether we may use this sampler to give it back to the client */ | ||
679 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) | ||
680 | { | ||
681 | last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, | ||
682 | GNUNET_TIME_absolute_get ()); | ||
683 | /* We're not going to give it back now if it was already requested by a client this round */ | ||
684 | if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) | ||
685 | { | ||
686 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
687 | "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); | ||
688 | ///* How many time remains untile the next round has started? */ | ||
689 | //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff, | ||
690 | // sampler->max_round_interval); | ||
691 | // add a little delay | ||
692 | /* Schedule it one round later */ | ||
693 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, | ||
694 | &RPS_sampler_get_rand_peer, | ||
695 | cls); | ||
696 | return; | ||
697 | } | ||
698 | // TODO add other reasons to wait here | ||
699 | } | ||
700 | |||
701 | GNUNET_CRYPTO_hash (gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | ||
702 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, gpc->get_peer_task)) | ||
703 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n"); | ||
704 | GNUNET_free (gpc->get_peer_task); | ||
705 | |||
706 | s_elem->last_client_request = GNUNET_TIME_absolute_get (); | ||
591 | 707 | ||
592 | cb (cls, id); | 708 | gpc->cb (gpc->cb_cls, gpc->id); |
593 | } | 709 | } |
594 | 710 | ||
595 | 711 | ||
@@ -608,11 +724,11 @@ RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, | |||
608 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | 724 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
609 | void *cls, uint64_t num_peers) | 725 | void *cls, uint64_t num_peers) |
610 | { | 726 | { |
611 | // use _get_rand_peers_ ? | ||
612 | if ( 0 == sampler->sampler_size ) | 727 | if ( 0 == sampler->sampler_size ) |
613 | { | 728 | { |
614 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 729 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
615 | "Sgrp: List empty - Returning NULL\n"); | 730 | "Sgrp: List empty - Returning NULL\n"); |
731 | cb (cls, NULL, 0); | ||
616 | } | 732 | } |
617 | else | 733 | else |
618 | { | 734 | { |
@@ -621,18 +737,37 @@ RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | |||
621 | struct GNUNET_PeerIdentity *peers; | 737 | struct GNUNET_PeerIdentity *peers; |
622 | uint64_t i; | 738 | uint64_t i; |
623 | struct RPS_GetNRandPeersReadyCls *cb_cls; | 739 | struct RPS_GetNRandPeersReadyCls *cb_cls; |
740 | struct GetPeerCls *gpc; | ||
741 | struct GNUNET_HashCode *hash; | ||
624 | 742 | ||
625 | peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); | 743 | peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); |
744 | hash = GNUNET_new (struct GNUNET_HashCode); | ||
626 | 745 | ||
627 | cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls); | 746 | cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls); |
628 | cb_cls->num_peers = num_peers; | 747 | cb_cls->num_peers = num_peers; |
629 | cb_cls->cur_num_peers = 0; | 748 | cb_cls->cur_num_peers = 0; |
630 | cb_cls->callback = NULL; | 749 | cb_cls->ids = peers; |
631 | cb_cls->cls = NULL; | 750 | cb_cls->callback = cb; |
751 | cb_cls->cls = cls; | ||
752 | |||
753 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
754 | "SAMPLER: Scheduling requests for %" PRIX64 " peers\n", num_peers); | ||
632 | 755 | ||
633 | for ( i = 0 ; i < num_peers ; i++ ) | 756 | for ( i = 0 ; i < num_peers ; i++ ) |
634 | RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb, | 757 | { |
635 | cb_cls, &peers[i]); | 758 | gpc = GNUNET_new (struct GetPeerCls); |
759 | gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb; | ||
760 | gpc->cb_cls = cb_cls; | ||
761 | gpc->id = &peers[i]; | ||
762 | |||
763 | // maybe add a little delay | ||
764 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc); | ||
765 | GNUNET_CRYPTO_hash (gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | ||
766 | (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, gpc->get_peer_task, | ||
767 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
768 | //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb, | ||
769 | // cb_cls, &peers[i]); | ||
770 | } | ||
636 | } | 771 | } |
637 | } | 772 | } |
638 | 773 | ||
@@ -662,11 +797,33 @@ RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id) | |||
662 | 797 | ||
663 | 798 | ||
664 | /** | 799 | /** |
800 | * Callback to iterate over the hashmap to cancle the get_peer_tasks. | ||
801 | */ | ||
802 | int | ||
803 | clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
804 | { | ||
805 | struct GNUNET_SCHEDULER_Task *task; | ||
806 | |||
807 | task = (struct GNUNET_SCHEDULER_Task *) value; | ||
808 | GNUNET_SCHEDULER_cancel (task); | ||
809 | |||
810 | GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value); | ||
811 | |||
812 | return GNUNET_YES; | ||
813 | } | ||
814 | |||
815 | |||
816 | /** | ||
665 | * Cleans the sampler. | 817 | * Cleans the sampler. |
666 | */ | 818 | */ |
667 | void | 819 | void |
668 | RPS_sampler_destroy () | 820 | RPS_sampler_destroy () |
669 | { | 821 | { |
822 | if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks, | ||
823 | clear_get_peer_tasks, | ||
824 | NULL)) | ||
825 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n"); | ||
826 | GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks); | ||
670 | RPS_sampler_resize (0); | 827 | RPS_sampler_resize (0); |
671 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0); | 828 | GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0); |
672 | } | 829 | } |