diff options
author | Julius Bünger <buenger@mytum.de> | 2015-01-22 00:18:44 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-01-22 00:18:44 +0000 |
commit | 5883a1ad9483eba3871968d9b8c5dfd9c3db12c1 (patch) | |
tree | 8a4861b1f8a93e52f3cf60261e9f8e6aca7b2586 /src | |
parent | ebbd29b689e43fe5c4cc83074fb366b71fdcff85 (diff) | |
download | gnunet-5883a1ad9483eba3871968d9b8c5dfd9c3db12c1.tar.gz gnunet-5883a1ad9483eba3871968d9b8c5dfd9c3db12c1.zip |
restructured service and sampler
Diffstat (limited to 'src')
-rw-r--r-- | src/rps/gnunet-service-rps.c | 340 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 171 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.h | 17 |
3 files changed, 317 insertions, 211 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 6caa77c40..9a7519728 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -70,22 +70,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; | |||
70 | */ | 70 | */ |
71 | static struct GNUNET_PeerIdentity *own_identity; | 71 | static struct GNUNET_PeerIdentity *own_identity; |
72 | 72 | ||
73 | /** | ||
74 | * Closure to the callback cadet calls on each peer it passes to us | ||
75 | */ | ||
76 | struct init_peer_cls | ||
77 | { | ||
78 | /** | ||
79 | * The server handle to later listen to client requests | ||
80 | */ | ||
81 | struct GNUNET_SERVER_Handle *server; | ||
82 | |||
83 | /** | ||
84 | * Counts how many peers cadet already passed to us | ||
85 | */ | ||
86 | uint32_t i; | ||
87 | }; | ||
88 | |||
89 | 73 | ||
90 | struct GNUNET_PeerIdentity * | 74 | struct GNUNET_PeerIdentity * |
91 | get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size); | 75 | get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size); |
@@ -122,6 +106,29 @@ enum PeerFlags | |||
122 | LIVING = 0x10 | 106 | LIVING = 0x10 |
123 | }; | 107 | }; |
124 | 108 | ||
109 | |||
110 | /** | ||
111 | * Functions of this type can be used to be stored at a peer for later execution. | ||
112 | */ | ||
113 | typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer); | ||
114 | |||
115 | /** | ||
116 | * Outstanding operation on peer consisting of callback and closure | ||
117 | */ | ||
118 | struct PeerOutstandingOp | ||
119 | { | ||
120 | /** | ||
121 | * Callback | ||
122 | */ | ||
123 | PeerOp op; | ||
124 | |||
125 | /** | ||
126 | * Closure | ||
127 | */ | ||
128 | void *op_cls; | ||
129 | }; | ||
130 | |||
131 | |||
125 | /** | 132 | /** |
126 | * Struct used to keep track of other peer's status | 133 | * Struct used to keep track of other peer's status |
127 | * | 134 | * |
@@ -150,6 +157,17 @@ struct PeerContext | |||
150 | struct GNUNET_CADET_Channel *recv_channel; // unneeded? | 157 | struct GNUNET_CADET_Channel *recv_channel; // unneeded? |
151 | 158 | ||
152 | /** | 159 | /** |
160 | * Array of outstanding operations on this peer. | ||
161 | */ | ||
162 | struct PeerOutstandingOp *outstanding_ops; | ||
163 | |||
164 | /** | ||
165 | * Number of outstanding operations. | ||
166 | */ | ||
167 | unsigned int num_outstanding_ops; | ||
168 | //size_t num_outstanding_ops; | ||
169 | |||
170 | /** | ||
153 | * This is pobably followed by 'statistical' data (when we first saw | 171 | * This is pobably followed by 'statistical' data (when we first saw |
154 | * him, how did we get his ID, how many pushes (in a timeinterval), | 172 | * him, how did we get his ID, how many pushes (in a timeinterval), |
155 | * ...) | 173 | * ...) |
@@ -310,6 +328,12 @@ static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE]; | |||
310 | static struct GNUNET_TIME_Relative request_rate; | 328 | static struct GNUNET_TIME_Relative request_rate; |
311 | 329 | ||
312 | 330 | ||
331 | /** | ||
332 | * Number of history update tasks. | ||
333 | */ | ||
334 | uint32_t num_hist_update_tasks; | ||
335 | |||
336 | |||
313 | /*********************************************************************** | 337 | /*********************************************************************** |
314 | * /Globals | 338 | * /Globals |
315 | ***********************************************************************/ | 339 | ***********************************************************************/ |
@@ -398,6 +422,8 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, | |||
398 | ctx->mq = NULL; | 422 | ctx->mq = NULL; |
399 | ctx->send_channel = NULL; | 423 | ctx->send_channel = NULL; |
400 | ctx->recv_channel = NULL; | 424 | ctx->recv_channel = NULL; |
425 | ctx->outstanding_ops = NULL; | ||
426 | ctx->num_outstanding_ops = 0; | ||
401 | (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, | 427 | (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, |
402 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
403 | } | 429 | } |
@@ -406,6 +432,22 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, | |||
406 | 432 | ||
407 | 433 | ||
408 | /** | 434 | /** |
435 | * Put random peer from sampler into the gossip list as history update. | ||
436 | */ | ||
437 | void | ||
438 | hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers) | ||
439 | { | ||
440 | GNUNET_assert (1 == num_peers); | ||
441 | |||
442 | if (gossip_list_size < sampler_size_est_need) | ||
443 | GNUNET_array_append (gossip_list, gossip_list_size, *ids); | ||
444 | |||
445 | if (0 < num_hist_update_tasks) | ||
446 | num_hist_update_tasks--; | ||
447 | } | ||
448 | |||
449 | |||
450 | /** | ||
409 | * Callback that is called when a channel was effectively established. | 451 | * Callback that is called when a channel was effectively established. |
410 | * This is given to ntfy_tmt_rdy and called when the channel was | 452 | * This is given to ntfy_tmt_rdy and called when the channel was |
411 | * successfully established. | 453 | * successfully established. |
@@ -422,6 +464,15 @@ peer_is_live (void *cls, size_t size, void *buf) | |||
422 | 464 | ||
423 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer)); | 465 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer)); |
424 | 466 | ||
467 | if (0 != peer_ctx->num_outstanding_ops) | ||
468 | { /* Call outstanding operations */ | ||
469 | unsigned int i; | ||
470 | |||
471 | for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) | ||
472 | peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer); | ||
473 | GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0); | ||
474 | } | ||
475 | |||
425 | GNUNET_free (peer); | 476 | GNUNET_free (peer); |
426 | 477 | ||
427 | buf = NULL; | 478 | buf = NULL; |
@@ -437,7 +488,7 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, | |||
437 | const struct GNUNET_PeerIdentity *peer) | 488 | const struct GNUNET_PeerIdentity *peer) |
438 | { | 489 | { |
439 | struct PeerContext *ctx; | 490 | struct PeerContext *ctx; |
440 | //struct GNUNET_PeerIdentity *tmp_peer; | 491 | struct GNUNET_PeerIdentity *tmp_peer; |
441 | 492 | ||
442 | ctx = get_peer_ctx (peer_map, peer); | 493 | ctx = get_peer_ctx (peer_map, peer); |
443 | if (NULL == ctx->send_channel) | 494 | if (NULL == ctx->send_channel) |
@@ -446,11 +497,14 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, | |||
446 | GNUNET_RPS_CADET_PORT, | 497 | GNUNET_RPS_CADET_PORT, |
447 | GNUNET_CADET_OPTION_RELIABLE); | 498 | GNUNET_CADET_OPTION_RELIABLE); |
448 | 499 | ||
449 | //tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity); | 500 | if (NULL == ctx->recv_channel) |
450 | //*tmp_peer = *peer; | 501 | { |
451 | //(void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO, | 502 | tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity); |
452 | // GNUNET_TIME_UNIT_FOREVER_REL, | 503 | *tmp_peer = *peer; |
453 | // 0, peer_is_live, tmp_peer); | 504 | (void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO, |
505 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
506 | 0, peer_is_live, tmp_peer); | ||
507 | } | ||
454 | 508 | ||
455 | // do I have to explicitly put it in the peer_map? | 509 | // do I have to explicitly put it in the peer_map? |
456 | (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, | 510 | (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, |
@@ -509,16 +563,91 @@ T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) | |||
509 | struct GNUNET_TIME_Relative | 563 | struct GNUNET_TIME_Relative |
510 | T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) | 564 | T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) |
511 | { | 565 | { |
512 | return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); // FIXME find a way to devide that by arr_size | 566 | return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); |
567 | } | ||
568 | |||
569 | |||
570 | /** | ||
571 | * Insert PeerID in #pull_list | ||
572 | * | ||
573 | * Called once we know a peer is live. | ||
574 | */ | ||
575 | void | ||
576 | insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
577 | { | ||
578 | if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) | ||
579 | GNUNET_array_append (pull_list, pull_list_size, *peer); | ||
580 | } | ||
581 | |||
582 | /** | ||
583 | * Check whether #insert_in_pull_list was already scheduled | ||
584 | */ | ||
585 | int | ||
586 | insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx) | ||
587 | { | ||
588 | unsigned int i; | ||
589 | |||
590 | for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) | ||
591 | if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op) | ||
592 | return GNUNET_YES; | ||
593 | return GNUNET_NO; | ||
594 | } | ||
595 | |||
596 | |||
597 | /** | ||
598 | * Insert PeerID in #gossip_list | ||
599 | * | ||
600 | * Called once we know a peer is live. | ||
601 | */ | ||
602 | void | ||
603 | insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
604 | { | ||
605 | if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer)) | ||
606 | GNUNET_array_append (gossip_list, gossip_list_size, *peer); | ||
607 | } | ||
608 | |||
609 | /** | ||
610 | * Check whether #insert_in_pull_list was already scheduled | ||
611 | */ | ||
612 | int | ||
613 | insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx) | ||
614 | { | ||
615 | unsigned int i; | ||
616 | |||
617 | for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) | ||
618 | if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op) | ||
619 | return GNUNET_YES; | ||
620 | return GNUNET_NO; | ||
621 | } | ||
622 | |||
623 | |||
624 | /** | ||
625 | * Update sampler with given PeerID. | ||
626 | */ | ||
627 | void | ||
628 | insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
629 | { | ||
630 | RPS_sampler_update_list (peer); | ||
631 | } | ||
632 | |||
633 | /** | ||
634 | * Check whether #insert_in_sampler was already scheduled | ||
635 | */ | ||
636 | int | ||
637 | insert_in_sampler_scheduled (const struct PeerContext *peer_ctx) | ||
638 | { | ||
639 | unsigned int i; | ||
640 | |||
641 | for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) | ||
642 | if (insert_in_sampler== peer_ctx->outstanding_ops[i].op) | ||
643 | return GNUNET_YES; | ||
644 | return GNUNET_NO; | ||
513 | } | 645 | } |
514 | 646 | ||
515 | 647 | ||
516 | /*********************************************************************** | ||
517 | * /Util functions | ||
518 | ***********************************************************************/ | ||
519 | 648 | ||
520 | /** | 649 | /** |
521 | * Wrapper around _sampler_resize() | 650 | * Wrapper around #RPS_sampler_resize() |
522 | */ | 651 | */ |
523 | void | 652 | void |
524 | resize_wrapper () | 653 | resize_wrapper () |
@@ -544,6 +673,10 @@ resize_wrapper () | |||
544 | } | 673 | } |
545 | 674 | ||
546 | 675 | ||
676 | /*********************************************************************** | ||
677 | * /Util functions | ||
678 | ***********************************************************************/ | ||
679 | |||
547 | /** | 680 | /** |
548 | * Function called by NSE. | 681 | * Function called by NSE. |
549 | * | 682 | * |
@@ -660,7 +793,7 @@ handle_client_request (void *cls, | |||
660 | 793 | ||
661 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers); | 794 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers); |
662 | 795 | ||
663 | RPS_sampler_get_n_rand_peers (client_respond, client, num_peers); | 796 | RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES); |
664 | 797 | ||
665 | GNUNET_SERVER_receive_done (client, | 798 | GNUNET_SERVER_receive_done (client, |
666 | GNUNET_OK); | 799 | GNUNET_OK); |
@@ -806,6 +939,8 @@ handle_peer_pull_reply (void *cls, | |||
806 | 939 | ||
807 | struct GNUNET_RPS_P2P_PullReplyMessage *in_msg; | 940 | struct GNUNET_RPS_P2P_PullReplyMessage *in_msg; |
808 | struct GNUNET_PeerIdentity *peers; | 941 | struct GNUNET_PeerIdentity *peers; |
942 | struct PeerContext *peer_ctx; | ||
943 | struct PeerOutstandingOp out_op; | ||
809 | uint32_t i; | 944 | uint32_t i; |
810 | 945 | ||
811 | if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size)) | 946 | if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size)) |
@@ -828,8 +963,19 @@ handle_peer_pull_reply (void *cls, | |||
828 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 963 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
829 | for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ ) | 964 | for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ ) |
830 | { | 965 | { |
831 | if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])) | 966 | peer_ctx = get_peer_ctx (peer_map, &peers[i]); |
832 | GNUNET_array_append (pull_list, pull_list_size, peers[i]); | 967 | |
968 | if ((0 != (peer_ctx->peer_flags && LIVING)) || | ||
969 | NULL != peer_ctx->recv_channel) | ||
970 | { | ||
971 | if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])) | ||
972 | GNUNET_array_append (pull_list, pull_list_size, peers[i]); | ||
973 | } | ||
974 | else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx)) | ||
975 | { | ||
976 | out_op.op = insert_in_pull_list; | ||
977 | GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); | ||
978 | } | ||
833 | } | 979 | } |
834 | 980 | ||
835 | // TODO check that id is valid - whether it is reachable | 981 | // TODO check that id is valid - whether it is reachable |
@@ -865,44 +1011,50 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
865 | 1011 | ||
866 | /* Send PUSHes */ | 1012 | /* Send PUSHes */ |
867 | //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size); | 1013 | //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size); |
868 | n_peers = round (alpha * gossip_list_size); | 1014 | if (0 != gossip_list_size) |
869 | if (0 == n_peers) | ||
870 | n_peers = 1; | ||
871 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n", | ||
872 | n_peers, alpha, gossip_list_size); | ||
873 | for ( i = 0 ; i < n_peers ; i++ ) | ||
874 | { | 1015 | { |
875 | peer = get_rand_peer (gossip_list, gossip_list_size); | 1016 | n_peers = round (alpha * gossip_list_size); |
876 | if (own_identity != peer) | 1017 | if (0 == n_peers) |
877 | { // FIXME if this fails schedule/loop this for later | 1018 | n_peers = 1; |
878 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer)); | 1019 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n", |
879 | 1020 | n_peers, alpha, gossip_list_size); | |
880 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); | 1021 | for ( i = 0 ; i < n_peers ; i++ ) |
881 | // FIXME sometimes it returns a pointer to a freed mq | 1022 | { |
882 | mq = get_mq (peer_map, peer); | 1023 | peer = get_rand_peer (gossip_list, gossip_list_size); |
883 | GNUNET_MQ_send (mq, ev); | 1024 | if (own_identity != peer) |
1025 | { // FIXME if this fails schedule/loop this for later | ||
1026 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer)); | ||
1027 | |||
1028 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); | ||
1029 | // FIXME sometimes it returns a pointer to a freed mq | ||
1030 | mq = get_mq (peer_map, peer); | ||
1031 | GNUNET_MQ_send (mq, ev); | ||
1032 | } | ||
884 | } | 1033 | } |
885 | } | 1034 | } |
886 | 1035 | ||
887 | 1036 | ||
888 | /* Send PULL requests */ | 1037 | /* Send PULL requests */ |
889 | //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size); | 1038 | //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size); |
890 | n_peers = round (beta * gossip_list_size); | 1039 | if (0 != gossip_list_size) |
891 | if (0 == n_peers) | ||
892 | n_peers = 1; | ||
893 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n", | ||
894 | n_peers, beta, gossip_list_size); | ||
895 | for ( i = 0 ; i < n_peers ; i++ ) | ||
896 | { | 1040 | { |
897 | peer = get_rand_peer (gossip_list, gossip_list_size); | 1041 | n_peers = round (beta * gossip_list_size); |
898 | if (own_identity != peer) | 1042 | if (0 == n_peers) |
899 | { // FIXME if this fails schedule/loop this for later | 1043 | n_peers = 1; |
900 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer)); | 1044 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n", |
901 | 1045 | n_peers, beta, gossip_list_size); | |
902 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); | 1046 | for ( i = 0 ; i < n_peers ; i++ ) |
903 | //pull_msg = NULL; | 1047 | { |
904 | mq = get_mq (peer_map, peer); | 1048 | peer = get_rand_peer (gossip_list, gossip_list_size); |
905 | GNUNET_MQ_send (mq, ev); | 1049 | if (own_identity != peer) |
1050 | { // FIXME if this fails schedule/loop this for later | ||
1051 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer)); | ||
1052 | |||
1053 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); | ||
1054 | //pull_msg = NULL; | ||
1055 | mq = get_mq (peer_map, peer); | ||
1056 | GNUNET_MQ_send (mq, ev); | ||
1057 | } | ||
906 | } | 1058 | } |
907 | } | 1059 | } |
908 | 1060 | ||
@@ -914,14 +1066,16 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
914 | push_list_size != 0 && | 1066 | push_list_size != 0 && |
915 | pull_list_size != 0 ) | 1067 | pull_list_size != 0 ) |
916 | { | 1068 | { |
917 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n"); | 1069 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n"); |
918 | 1070 | ||
919 | uint32_t first_border; | 1071 | uint32_t first_border; |
920 | uint32_t second_border; | 1072 | uint32_t second_border; |
921 | 1073 | ||
922 | GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need); | 1074 | first_border = round (alpha * sampler_size_est_need); |
1075 | second_border = first_border + round (beta * sampler_size_est_need); | ||
1076 | |||
1077 | GNUNET_array_grow (gossip_list, gossip_list_size, second_border); | ||
923 | 1078 | ||
924 | first_border = round (alpha * gossip_list_size); | ||
925 | for ( i = 0 ; i < first_border ; i++ ) | 1079 | for ( i = 0 ; i < first_border ; i++ ) |
926 | { // TODO use RPS_sampler_get_n_rand_peers | 1080 | { // TODO use RPS_sampler_get_n_rand_peers |
927 | /* Update gossip list with peers received through PUSHes */ | 1081 | /* Update gossip list with peers received through PUSHes */ |
@@ -931,7 +1085,6 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
931 | // TODO change the peer_flags accordingly | 1085 | // TODO change the peer_flags accordingly |
932 | } | 1086 | } |
933 | 1087 | ||
934 | second_border = first_border + round (beta * gossip_list_size); | ||
935 | for ( i = first_border ; i < second_border ; i++ ) | 1088 | for ( i = first_border ; i < second_border ; i++ ) |
936 | { | 1089 | { |
937 | /* Update gossip list with peers received through PULLs */ | 1090 | /* Update gossip list with peers received through PULLs */ |
@@ -944,8 +1097,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
944 | for ( i = second_border ; i < gossip_list_size ; i++ ) | 1097 | for ( i = second_border ; i < gossip_list_size ; i++ ) |
945 | { | 1098 | { |
946 | /* Update gossip list with peers from history */ | 1099 | /* Update gossip list with peers from history */ |
947 | peer = RPS_sampler_get_n_rand_peers_ (1); | 1100 | RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO); |
948 | gossip_list[i] = *peer; | 1101 | num_hist_update_tasks++; |
949 | // TODO change the peer_flags accordingly | 1102 | // TODO change the peer_flags accordingly |
950 | } | 1103 | } |
951 | 1104 | ||
@@ -1058,37 +1211,38 @@ init_peer_cb (void *cls, | |||
1058 | unsigned int best_path) // "How long is the best path? | 1211 | unsigned int best_path) // "How long is the best path? |
1059 | // (0 = unknown, 1 = ourselves, 2 = neighbor)" | 1212 | // (0 = unknown, 1 = ourselves, 2 = neighbor)" |
1060 | { | 1213 | { |
1061 | struct init_peer_cls *ipc; | 1214 | struct GNUNET_SERVER_Handle *server; |
1215 | struct PeerOutstandingOp out_op; | ||
1216 | struct PeerContext *peer_ctx; | ||
1062 | 1217 | ||
1063 | ipc = (struct init_peer_cls *) cls; | 1218 | server = (struct GNUNET_SERVER_Handle *) cls; |
1064 | if ( NULL != peer ) | 1219 | if ( NULL != peer ) |
1065 | { | 1220 | { |
1066 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1221 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1067 | "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n", | 1222 | "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n", |
1068 | ipc->i, GNUNET_i2s (peer), peer, gossip_list_size); | 1223 | GNUNET_i2s (peer), peer, gossip_list_size); |
1069 | RPS_sampler_update_list (peer); | ||
1070 | (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB | ||
1071 | 1224 | ||
1072 | if (ipc->i < gossip_list_size) | 1225 | // maybe create a function for that |
1226 | peer_ctx = get_peer_ctx (peer_map, peer); | ||
1227 | if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx)) | ||
1073 | { | 1228 | { |
1074 | gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid space here | 1229 | out_op.op = insert_in_sampler; |
1075 | // not sure whether fixed | 1230 | GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); |
1076 | ipc->i++; | ||
1077 | } | 1231 | } |
1078 | 1232 | ||
1079 | // send push/pull to each of those peers? | 1233 | if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx)) |
1080 | } | ||
1081 | else | ||
1082 | { | ||
1083 | if (ipc->i < gossip_list_size) | ||
1084 | { | 1234 | { |
1085 | memcpy (&gossip_list[ipc->i], | 1235 | out_op.op = insert_in_gossip_list; |
1086 | RPS_sampler_get_n_rand_peers_ (1), | 1236 | GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); |
1087 | (gossip_list_size - ipc->i) * sizeof (struct GNUNET_PeerIdentity)); | ||
1088 | } | 1237 | } |
1089 | rps_start (ipc->server); | 1238 | |
1090 | GNUNET_free (ipc); | 1239 | /* Issue livelyness test on peer */ |
1240 | (void) get_channel (peer_map, peer); | ||
1241 | |||
1242 | // send push/pull to each of those peers? | ||
1091 | } | 1243 | } |
1244 | else | ||
1245 | rps_start (server); | ||
1092 | } | 1246 | } |
1093 | 1247 | ||
1094 | 1248 | ||
@@ -1236,6 +1390,7 @@ cleanup_channel (void *cls, | |||
1236 | (void) peer_remove_cb (peer, peer, peer_ctx); | 1390 | (void) peer_remove_cb (peer, peer, peer_ctx); |
1237 | } | 1391 | } |
1238 | 1392 | ||
1393 | |||
1239 | /** | 1394 | /** |
1240 | * Actually start the service. | 1395 | * Actually start the service. |
1241 | */ | 1396 | */ |
@@ -1256,6 +1411,8 @@ rps_start (struct GNUNET_SERVER_Handle *server) | |||
1256 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n"); | 1411 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n"); |
1257 | 1412 | ||
1258 | 1413 | ||
1414 | num_hist_update_tasks = 0; | ||
1415 | |||
1259 | do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); | 1416 | do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); |
1260 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); | 1417 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); |
1261 | 1418 | ||
@@ -1283,7 +1440,6 @@ run (void *cls, | |||
1283 | 1440 | ||
1284 | LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n"); | 1441 | LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n"); |
1285 | 1442 | ||
1286 | struct init_peer_cls *ipc; | ||
1287 | 1443 | ||
1288 | cfg = c; | 1444 | cfg = c; |
1289 | 1445 | ||
@@ -1316,16 +1472,13 @@ run (void *cls, | |||
1316 | } | 1472 | } |
1317 | LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need); | 1473 | LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need); |
1318 | 1474 | ||
1319 | //gossip_list_size = sampler_size; // TODO rename sampler_size | ||
1320 | 1475 | ||
1321 | gossip_list = NULL; | 1476 | gossip_list = NULL; |
1322 | GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need); | ||
1323 | 1477 | ||
1324 | 1478 | ||
1325 | /* connect to NSE */ | 1479 | /* connect to NSE */ |
1326 | nse = GNUNET_NSE_connect (cfg, nse_callback, NULL); | 1480 | nse = GNUNET_NSE_connect (cfg, nse_callback, NULL); |
1327 | // TODO check whether that was successful | 1481 | // TODO check whether that was successful |
1328 | // TODO disconnect on shutdown | ||
1329 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n"); | 1482 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n"); |
1330 | 1483 | ||
1331 | 1484 | ||
@@ -1383,7 +1536,7 @@ run (void *cls, | |||
1383 | half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5); | 1536 | half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5); |
1384 | max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); | 1537 | max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); |
1385 | 1538 | ||
1386 | RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval, | 1539 | RPS_sampler_init (sampler_size_est_need, max_round_interval, |
1387 | insertCB, NULL, removeCB, NULL); | 1540 | insertCB, NULL, removeCB, NULL); |
1388 | sampler_size = sampler_size_est_need; | 1541 | sampler_size = sampler_size_est_need; |
1389 | 1542 | ||
@@ -1394,11 +1547,8 @@ run (void *cls, | |||
1394 | pull_list_size = 0; | 1547 | pull_list_size = 0; |
1395 | 1548 | ||
1396 | 1549 | ||
1397 | ipc = GNUNET_new (struct init_peer_cls); | ||
1398 | ipc->server = server; | ||
1399 | ipc->i = 0; | ||
1400 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); | 1550 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); |
1401 | GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc); | 1551 | GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, server); |
1402 | 1552 | ||
1403 | // TODO send push/pull to each of those peers? | 1553 | // TODO send push/pull to each of those peers? |
1404 | } | 1554 | } |
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 85d8d532b..b2ee5fb21 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -151,7 +151,7 @@ struct RPS_Sampler | |||
151 | /** | 151 | /** |
152 | * Closure to _get_n_rand_peers_ready_cb() | 152 | * Closure to _get_n_rand_peers_ready_cb() |
153 | */ | 153 | */ |
154 | struct RPS_GetNRandPeersReadyCls | 154 | struct NRandPeersReadyCls |
155 | { | 155 | { |
156 | /** | 156 | /** |
157 | * Number of peers we are waiting for. | 157 | * Number of peers we are waiting for. |
@@ -255,15 +255,15 @@ static uint32_t client_get_index; | |||
255 | * give those back. | 255 | * give those back. |
256 | */ | 256 | */ |
257 | void | 257 | void |
258 | RPS_sampler_get_n_rand_peers_ready_cb (void *cls, | 258 | check_n_peers_ready (void *cls, |
259 | const struct GNUNET_PeerIdentity *id) | 259 | const struct GNUNET_PeerIdentity *id) |
260 | { | 260 | { |
261 | struct RPS_GetNRandPeersReadyCls *n_peers_cls; | 261 | struct NRandPeersReadyCls *n_peers_cls; |
262 | 262 | ||
263 | n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls; | 263 | n_peers_cls = (struct NRandPeersReadyCls *) cls; |
264 | 264 | ||
265 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 265 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
266 | "SAMPLER: Got %" PRIX32 "th of %" PRIX32 " peers\n", | 266 | "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n", |
267 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); | 267 | n_peers_cls->cur_num_peers, n_peers_cls->num_peers); |
268 | 268 | ||
269 | if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers) | 269 | if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers) |
@@ -297,12 +297,6 @@ RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el) | |||
297 | 297 | ||
298 | sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS; | 298 | sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS; |
299 | 299 | ||
300 | /* We might want to keep the previous peer */ | ||
301 | |||
302 | //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id, | ||
303 | // sizeof(struct GNUNET_PeerIdentity), | ||
304 | // &sampler_el->peer_id_hash); | ||
305 | |||
306 | sampler_el->birth = GNUNET_TIME_absolute_get (); | 300 | sampler_el->birth = GNUNET_TIME_absolute_get (); |
307 | sampler_el->num_peers = 0; | 301 | sampler_el->num_peers = 0; |
308 | sampler_el->num_change = 0; | 302 | sampler_el->num_change = 0; |
@@ -479,7 +473,6 @@ RPS_sampler_resize (unsigned int new_size) | |||
479 | * Initialise a tuple of sampler elements. | 473 | * Initialise a tuple of sampler elements. |
480 | * | 474 | * |
481 | * @param init_size the size the sampler is initialised with | 475 | * @param init_size the size the sampler is initialised with |
482 | * @param id with which all newly created sampler elements are initialised | ||
483 | * @param ins_cb the callback that will be called on every PeerID that is | 476 | * @param ins_cb the callback that will be called on every PeerID that is |
484 | * newly inserted into a sampler element | 477 | * newly inserted into a sampler element |
485 | * @param ins_cls the closure given to #ins_cb | 478 | * @param ins_cls the closure given to #ins_cb |
@@ -489,7 +482,6 @@ RPS_sampler_resize (unsigned int new_size) | |||
489 | */ | 482 | */ |
490 | void | 483 | void |
491 | RPS_sampler_init (size_t init_size, | 484 | RPS_sampler_init (size_t init_size, |
492 | const struct GNUNET_PeerIdentity *id, | ||
493 | struct GNUNET_TIME_Relative max_round_interval, | 485 | struct GNUNET_TIME_Relative max_round_interval, |
494 | RPS_sampler_insert_cb ins_cb, void *ins_cls, | 486 | RPS_sampler_insert_cb ins_cb, void *ins_cls, |
495 | RPS_sampler_remove_cb rem_cb, void *rem_cls) | 487 | RPS_sampler_remove_cb rem_cb, void *rem_cls) |
@@ -513,7 +505,6 @@ RPS_sampler_init (size_t init_size, | |||
513 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); | 505 | //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); |
514 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); | 506 | //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); |
515 | RPS_sampler_resize (init_size); | 507 | RPS_sampler_resize (init_size); |
516 | RPS_sampler_update_list (id); // no super nice desing but ok for the moment | ||
517 | 508 | ||
518 | client_get_index = 0; | 509 | client_get_index = 0; |
519 | 510 | ||
@@ -568,13 +559,14 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) | |||
568 | * corrsponding peer to the client. | 559 | * corrsponding peer to the client. |
569 | * Only used internally | 560 | * Only used internally |
570 | */ | 561 | */ |
571 | const struct GNUNET_PeerIdentity * | 562 | void |
572 | RPS_sampler_get_rand_peer_ () | 563 | RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
573 | { | 564 | { |
565 | struct GetPeerCls *gpc; | ||
574 | uint32_t r_index; | 566 | uint32_t r_index; |
575 | const struct GNUNET_PeerIdentity *peer; // do we have to malloc that? | 567 | struct GNUNET_HashCode *hash; |
576 | 568 | ||
577 | // TODO implement extra logic | 569 | gpc = (struct GetPeerCls *) cls; |
578 | 570 | ||
579 | /**; | 571 | /**; |
580 | * Choose the r_index of the peer we want to return | 572 | * Choose the r_index of the peer we want to return |
@@ -583,50 +575,25 @@ RPS_sampler_get_rand_peer_ () | |||
583 | r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, | 575 | r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, |
584 | sampler->sampler_size); | 576 | sampler->sampler_size); |
585 | 577 | ||
586 | //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) | 578 | if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) |
587 | // // TODO schedule for later | ||
588 | // peer = NULL; | ||
589 | //else | ||
590 | peer = &(sampler->sampler_elements[r_index]->peer_id); | ||
591 | //sampler->sampler_elements[r_index]->last_client_request = GNUNET_TIME_absolute_get(); | ||
592 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer)); | ||
593 | |||
594 | return peer; | ||
595 | } | ||
596 | |||
597 | |||
598 | /** | ||
599 | * Get n random peers out of the sampled peers. | ||
600 | * | ||
601 | * We might want to reinitialise this sampler after giving the | ||
602 | * corrsponding peer to the client. | ||
603 | * Random with or without consumption? | ||
604 | * Only used internally | ||
605 | */ | ||
606 | const struct GNUNET_PeerIdentity * | ||
607 | RPS_sampler_get_n_rand_peers_ (uint32_t n) | ||
608 | { | ||
609 | if ( 0 == sampler->sampler_size ) | ||
610 | { | 579 | { |
611 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 580 | gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( |
612 | "Sgrp: List empty - Returning NULL\n"); | 581 | GNUNET_TIME_UNIT_SECONDS, |
613 | return NULL; | 582 | .1), |
583 | &RPS_sampler_get_rand_peer_, | ||
584 | cls); | ||
585 | return; | ||
614 | } | 586 | } |
615 | else | ||
616 | { | ||
617 | // TODO check if we have too much (distinct) sampled peers | ||
618 | // If we are not ready yet maybe schedule for later | ||
619 | struct GNUNET_PeerIdentity *peers; | ||
620 | uint32_t i; | ||
621 | 587 | ||
622 | peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity)); | 588 | *gpc->id = sampler->sampler_elements[r_index]->peer_id; |
623 | 589 | ||
624 | for ( i = 0 ; i < n ; i++ ) { | 590 | hash = GNUNET_new (struct GNUNET_HashCode); |
625 | //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); | 591 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); |
626 | memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity)); | 592 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task)) |
627 | } | 593 | LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n"); |
628 | return peers; | 594 | GNUNET_free (gpc->get_peer_task); |
629 | } | 595 | |
596 | gpc->cb (gpc->cb_cls, gpc->id); | ||
630 | } | 597 | } |
631 | 598 | ||
632 | 599 | ||
@@ -639,28 +606,28 @@ RPS_sampler_get_n_rand_peers_ (uint32_t n) | |||
639 | * @return a random PeerID of the PeerIDs previously put into the sampler. | 606 | * @return a random PeerID of the PeerIDs previously put into the sampler. |
640 | */ | 607 | */ |
641 | void | 608 | void |
642 | //RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, | ||
643 | // void *cls, struct GNUNET_PeerIdentity *id) | ||
644 | RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 609 | RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
645 | { | 610 | { |
646 | struct GetPeerCls *gpc; | 611 | struct GetPeerCls *gpc; |
612 | struct GNUNET_PeerIdentity tmp_id; | ||
647 | struct RPS_SamplerElement *s_elem; | 613 | struct RPS_SamplerElement *s_elem; |
648 | struct GNUNET_TIME_Relative last_request_diff; | 614 | struct GNUNET_TIME_Relative last_request_diff; |
649 | struct GNUNET_HashCode *hash; | 615 | struct GNUNET_HashCode *hash; |
650 | uint32_t tmp_client_get_index; | 616 | uint32_t tmp_client_get_index; |
651 | //struct GNUNET_TIME_Relative inv_last_request_diff; | ||
652 | 617 | ||
653 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n"); | 618 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n"); |
654 | 619 | ||
655 | gpc = (struct GetPeerCls *) cls; | 620 | gpc = (struct GetPeerCls *) cls; |
656 | hash = GNUNET_new (struct GNUNET_HashCode); | 621 | hash = GNUNET_new (struct GNUNET_HashCode); |
622 | |||
623 | /* Store the next #client_get_index to check whether we cycled over the whole list */ | ||
657 | if (0 < client_get_index) | 624 | if (0 < client_get_index) |
658 | tmp_client_get_index = client_get_index - 1; | 625 | tmp_client_get_index = client_get_index - 1; |
659 | else | 626 | else |
660 | tmp_client_get_index = sampler->sampler_size - 1; | 627 | tmp_client_get_index = sampler->sampler_size - 1; |
661 | 628 | ||
662 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 629 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
663 | "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ".\n", | 630 | "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n", |
664 | tmp_client_get_index, sampler->sampler_size); | 631 | tmp_client_get_index, sampler->sampler_size); |
665 | 632 | ||
666 | do | 633 | do |
@@ -674,17 +641,22 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
674 | return; | 641 | return; |
675 | } | 642 | } |
676 | 643 | ||
677 | *gpc->id = sampler->sampler_elements[client_get_index]->peer_id; | 644 | tmp_id = sampler->sampler_elements[client_get_index]->peer_id; |
678 | |||
679 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); | 645 | RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); |
646 | RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id, | ||
647 | NULL, NULL, NULL, NULL); | ||
648 | |||
649 | /* Cycle the #client_get_index one step further */ | ||
680 | if ( client_get_index == sampler->sampler_size - 1 ) | 650 | if ( client_get_index == sampler->sampler_size - 1 ) |
681 | client_get_index = 0; | 651 | client_get_index = 0; |
682 | else | 652 | else |
683 | client_get_index++; | 653 | client_get_index++; |
654 | |||
684 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index); | 655 | LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index); |
685 | } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); | 656 | } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); |
686 | 657 | ||
687 | s_elem = sampler->sampler_elements[client_get_index]; | 658 | s_elem = sampler->sampler_elements[client_get_index]; |
659 | *gpc->id = s_elem->peer_id; | ||
688 | 660 | ||
689 | /* Check whether we may use this sampler to give it back to the client */ | 661 | /* Check whether we may use this sampler to give it back to the client */ |
690 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) | 662 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) |
@@ -729,54 +701,49 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext | |||
729 | * | 701 | * |
730 | * @param cb callback that will be called once the ids are ready. | 702 | * @param cb callback that will be called once the ids are ready. |
731 | * @param cls closure given to @a cb | 703 | * @param cls closure given to @a cb |
704 | * @param for_client #GNUNET_YES if result is used for client, | ||
705 | * #GNUNET_NO if used internally | ||
732 | * @param num_peers the number of peers requested | 706 | * @param num_peers the number of peers requested |
733 | */ | 707 | */ |
734 | void | 708 | void |
735 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | 709 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
736 | void *cls, uint32_t num_peers) | 710 | void *cls, uint32_t num_peers, int for_client) |
737 | { | 711 | { |
738 | if ( 0 == sampler->sampler_size ) | 712 | GNUNET_assert (0 != sampler->sampler_size); |
739 | { | ||
740 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
741 | "Sgrp: List empty - Returning NULL\n"); | ||
742 | cb (cls, NULL, 0); | ||
743 | } | ||
744 | else | ||
745 | { | ||
746 | // TODO check if we have too much (distinct) sampled peers | ||
747 | // If we are not ready yet maybe schedule for later | ||
748 | uint32_t i; | ||
749 | struct RPS_GetNRandPeersReadyCls *cb_cls; | ||
750 | struct GetPeerCls *gpc; | ||
751 | struct GNUNET_HashCode *hash; | ||
752 | |||
753 | hash = GNUNET_new (struct GNUNET_HashCode); | ||
754 | 713 | ||
755 | cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls); | 714 | // TODO check if we have too much (distinct) sampled peers |
756 | cb_cls->num_peers = num_peers; | 715 | uint32_t i; |
757 | cb_cls->cur_num_peers = 0; | 716 | struct NRandPeersReadyCls *cb_cls; |
758 | cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); | 717 | struct GetPeerCls *gpc; |
759 | cb_cls->callback = cb; | 718 | struct GNUNET_HashCode *hash; |
760 | cb_cls->cls = cls; | ||
761 | 719 | ||
762 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 720 | hash = GNUNET_new (struct GNUNET_HashCode); |
763 | "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers); | ||
764 | 721 | ||
765 | for ( i = 0 ; i < num_peers ; i++ ) | 722 | cb_cls = GNUNET_new (struct NRandPeersReadyCls); |
766 | { | 723 | cb_cls->num_peers = num_peers; |
767 | gpc = GNUNET_new (struct GetPeerCls); | 724 | cb_cls->cur_num_peers = 0; |
768 | gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb; | 725 | cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); |
769 | gpc->cb_cls = cb_cls; | 726 | cb_cls->callback = cb; |
770 | gpc->id = &cb_cls->ids[i]; | 727 | cb_cls->cls = cls; |
728 | |||
729 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
730 | "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers); | ||
771 | 731 | ||
772 | // maybe add a little delay | 732 | for ( i = 0 ; i < num_peers ; i++ ) |
733 | { | ||
734 | gpc = GNUNET_new (struct GetPeerCls); | ||
735 | gpc->cb = check_n_peers_ready; | ||
736 | gpc->cb_cls = cb_cls; | ||
737 | gpc->id = &cb_cls->ids[i]; | ||
738 | |||
739 | // maybe add a little delay | ||
740 | if (GNUNET_YES == for_client) | ||
773 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc); | 741 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc); |
774 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); | 742 | else if (GNUNET_NO == for_client) |
775 | (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task, | 743 | gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc); |
776 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 744 | GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); |
777 | //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb, | 745 | (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task, |
778 | // cb_cls, &peers[i]); | 746 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
779 | } | ||
780 | } | 747 | } |
781 | } | 748 | } |
782 | 749 | ||
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index 451d3cdb0..a7021b4fe 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h | |||
@@ -89,7 +89,6 @@ RPS_sampler_resize (unsigned int new_size); | |||
89 | */ | 89 | */ |
90 | void | 90 | void |
91 | RPS_sampler_init (size_t init_size, | 91 | RPS_sampler_init (size_t init_size, |
92 | const struct GNUNET_PeerIdentity *id, | ||
93 | struct GNUNET_TIME_Relative max_round_interval, | 92 | struct GNUNET_TIME_Relative max_round_interval, |
94 | RPS_sampler_insert_cb ins_cb, void *ins_cls, | 93 | RPS_sampler_insert_cb ins_cb, void *ins_cls, |
95 | RPS_sampler_remove_cb rem_cb, void *rem_cls); | 94 | RPS_sampler_remove_cb rem_cb, void *rem_cls); |
@@ -121,26 +120,16 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id); | |||
121 | * We might want to reinitialise this sampler after giving the | 120 | * We might want to reinitialise this sampler after giving the |
122 | * corrsponding peer to the client. | 121 | * corrsponding peer to the client. |
123 | * Random with or without consumption? | 122 | * Random with or without consumption? |
124 | * Only used internally | ||
125 | */ | ||
126 | const struct GNUNET_PeerIdentity * | ||
127 | RPS_sampler_get_n_rand_peers_ (uint32_t n); | ||
128 | |||
129 | |||
130 | /** | ||
131 | * Get n random peers out of the sampled peers. | ||
132 | * | ||
133 | * We might want to reinitialise this sampler after giving the | ||
134 | * corrsponding peer to the client. | ||
135 | * Random with or without consumption? | ||
136 | * | 123 | * |
137 | * @param cb callback that will be called once the ids are ready. | 124 | * @param cb callback that will be called once the ids are ready. |
138 | * @param cls closure given to @a cb | 125 | * @param cls closure given to @a cb |
126 | * @param for_client #GNUNET_YES if result is used for client, | ||
127 | * #GNUNET_NO if used internally | ||
139 | * @param num_peers the number of peers requested | 128 | * @param num_peers the number of peers requested |
140 | */ | 129 | */ |
141 | void | 130 | void |
142 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, | 131 | RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, |
143 | void *cls, uint32_t num_peers); | 132 | void *cls, uint32_t num_peers, int for_client); |
144 | 133 | ||
145 | 134 | ||
146 | /** | 135 | /** |