diff options
author | Julius Bünger <buenger@mytum.de> | 2019-04-04 13:41:25 +0200 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2019-04-04 13:42:57 +0200 |
commit | 8c3d9fc59cd5617c4f5b7ea621971bdff25f5353 (patch) | |
tree | 910d8bdd653c13d47d943c69b405e2818aac2d14 /src/rps/rps_api.c | |
parent | a6f561efa7359dae5af8bfd3763e4f16168030ab (diff) | |
download | gnunet-8c3d9fc59cd5617c4f5b7ea621971bdff25f5353.tar.gz gnunet-8c3d9fc59cd5617c4f5b7ea621971bdff25f5353.zip |
RPS: Return peers to client after many observed ids
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 149 |
1 files changed, 142 insertions, 7 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index d0b241a2b..7a3adfa94 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -29,6 +29,8 @@ | |||
29 | #include "gnunet_rps_service.h" | 29 | #include "gnunet_rps_service.h" |
30 | #include "rps-sampler_client.h" | 30 | #include "rps-sampler_client.h" |
31 | 31 | ||
32 | #include "gnunet_nse_service.h" | ||
33 | |||
32 | #include <inttypes.h> | 34 | #include <inttypes.h> |
33 | 35 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) | 36 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) |
@@ -109,6 +111,35 @@ struct GNUNET_RPS_Handle | |||
109 | * @brief Tail of the DLL of stream requests | 111 | * @brief Tail of the DLL of stream requests |
110 | */ | 112 | */ |
111 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; | 113 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; |
114 | |||
115 | /** | ||
116 | * @brief Handle to nse service | ||
117 | */ | ||
118 | struct GNUNET_NSE_Handle *nse; | ||
119 | |||
120 | /** | ||
121 | * @brief Pointer to the head element in DLL of request handles | ||
122 | */ | ||
123 | struct GNUNET_RPS_Request_Handle *rh_head; | ||
124 | |||
125 | /** | ||
126 | * @brief Pointer to the tail element in DLL of request handles | ||
127 | */ | ||
128 | struct GNUNET_RPS_Request_Handle *rh_tail; | ||
129 | |||
130 | /** | ||
131 | * @brief The desired probability with which we want to have observed all | ||
132 | * peers. | ||
133 | */ | ||
134 | float desired_probability; | ||
135 | |||
136 | /** | ||
137 | * @brief A factor that catches the 'bias' of a random stream of peer ids. | ||
138 | * | ||
139 | * As introduced by Brahms: Factor between the number of unique ids in a | ||
140 | * truly random stream and number of unique ids in the gossip stream. | ||
141 | */ | ||
142 | float deficiency_factor; | ||
112 | }; | 143 | }; |
113 | 144 | ||
114 | 145 | ||
@@ -152,6 +183,16 @@ struct GNUNET_RPS_Request_Handle | |||
152 | * The closure for the callback. | 183 | * The closure for the callback. |
153 | */ | 184 | */ |
154 | void *ready_cb_cls; | 185 | void *ready_cb_cls; |
186 | |||
187 | /** | ||
188 | * @brief Pointer to next element in DLL | ||
189 | */ | ||
190 | struct GNUNET_RPS_Request_Handle *next; | ||
191 | |||
192 | /** | ||
193 | * @brief Pointer to previous element in DLL | ||
194 | */ | ||
195 | struct GNUNET_RPS_Request_Handle *prev; | ||
155 | }; | 196 | }; |
156 | 197 | ||
157 | 198 | ||
@@ -263,10 +304,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | |||
263 | rh->ready_cb (rh->ready_cb_cls, | 304 | rh->ready_cb (rh->ready_cb_cls, |
264 | num_peers, | 305 | num_peers, |
265 | peers); | 306 | peers); |
266 | GNUNET_RPS_stream_cancel (rh->srh); | 307 | GNUNET_RPS_request_cancel (rh); |
267 | rh->srh = NULL; | ||
268 | RPS_sampler_destroy (rh->sampler); | ||
269 | rh->sampler = NULL; | ||
270 | } | 308 | } |
271 | 309 | ||
272 | 310 | ||
@@ -607,6 +645,37 @@ hash_from_share_val (const char *share_val, | |||
607 | 645 | ||
608 | 646 | ||
609 | /** | 647 | /** |
648 | * @brief Callback for network size estimate - called with new estimates about | ||
649 | * the network size, updates all samplers with the new estimate | ||
650 | * | ||
651 | * Implements #GNUNET_NSE_Callback | ||
652 | * | ||
653 | * @param cls the rps handle | ||
654 | * @param timestamp unused | ||
655 | * @param logestimate the estimate | ||
656 | * @param std_dev the standard distribution | ||
657 | */ | ||
658 | static void | ||
659 | nse_cb (void *cls, | ||
660 | struct GNUNET_TIME_Absolute timestamp, | ||
661 | double logestimate, | ||
662 | double std_dev) | ||
663 | { | ||
664 | struct GNUNET_RPS_Handle *h = cls; | ||
665 | (void) timestamp; | ||
666 | (void) std_dev; | ||
667 | |||
668 | for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; | ||
669 | NULL != rh_iter && NULL != rh_iter->next; | ||
670 | rh_iter = rh_iter->next) | ||
671 | { | ||
672 | RPS_sampler_update_with_nw_size (rh_iter->sampler, | ||
673 | GNUNET_NSE_log_estimate_to_n (logestimate)); | ||
674 | } | ||
675 | } | ||
676 | |||
677 | |||
678 | /** | ||
610 | * Reconnect to the service | 679 | * Reconnect to the service |
611 | */ | 680 | */ |
612 | static void | 681 | static void |
@@ -631,6 +700,9 @@ reconnect (struct GNUNET_RPS_Handle *h) | |||
631 | mq_handlers, | 700 | mq_handlers, |
632 | &mq_error_handler, | 701 | &mq_error_handler, |
633 | h); | 702 | h); |
703 | if (NULL != h->nse) | ||
704 | GNUNET_NSE_disconnect (h->nse); | ||
705 | h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h); | ||
634 | } | 706 | } |
635 | 707 | ||
636 | 708 | ||
@@ -638,7 +710,7 @@ reconnect (struct GNUNET_RPS_Handle *h) | |||
638 | * Connect to the rps service | 710 | * Connect to the rps service |
639 | * | 711 | * |
640 | * @param cfg configuration to use | 712 | * @param cfg configuration to use |
641 | * @return a handle to the service | 713 | * @return a handle to the service, NULL on error |
642 | */ | 714 | */ |
643 | struct GNUNET_RPS_Handle * | 715 | struct GNUNET_RPS_Handle * |
644 | GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | 716 | GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) |
@@ -647,6 +719,44 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
647 | 719 | ||
648 | h = GNUNET_new (struct GNUNET_RPS_Handle); | 720 | h = GNUNET_new (struct GNUNET_RPS_Handle); |
649 | h->cfg = cfg; | 721 | h->cfg = cfg; |
722 | if (GNUNET_OK != | ||
723 | GNUNET_CONFIGURATION_get_value_float (cfg, | ||
724 | "RPS", | ||
725 | "DESIRED_PROBABILITY", | ||
726 | &h->desired_probability)) | ||
727 | { | ||
728 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, | ||
729 | "RPS", "DESIRED_PROBABILITY"); | ||
730 | GNUNET_free (h); | ||
731 | return NULL; | ||
732 | } | ||
733 | if (0 > h->desired_probability || | ||
734 | 1 < h->desired_probability) | ||
735 | { | ||
736 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
737 | "The desired probability must be in the interval [0;1]\n"); | ||
738 | GNUNET_free (h); | ||
739 | return NULL; | ||
740 | } | ||
741 | if (GNUNET_OK != | ||
742 | GNUNET_CONFIGURATION_get_value_float (cfg, | ||
743 | "RPS", | ||
744 | "DEFICIENCY_FACTOR", | ||
745 | &h->deficiency_factor)) | ||
746 | { | ||
747 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, | ||
748 | "RPS", "DEFICIENCY_FACTOR"); | ||
749 | GNUNET_free (h); | ||
750 | return NULL; | ||
751 | } | ||
752 | if (0 > h->desired_probability || | ||
753 | 1 < h->desired_probability) | ||
754 | { | ||
755 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
756 | "The deficiency factor must be in the interval [0;1]\n"); | ||
757 | GNUNET_free (h); | ||
758 | return NULL; | ||
759 | } | ||
650 | reconnect (h); | 760 | reconnect (h); |
651 | if (NULL == h->mq) | 761 | if (NULL == h->mq) |
652 | { | 762 | { |
@@ -725,6 +835,10 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
725 | rh->num_requests = num_req_peers; | 835 | rh->num_requests = num_req_peers; |
726 | rh->sampler = RPS_sampler_mod_init (num_req_peers, | 836 | rh->sampler = RPS_sampler_mod_init (num_req_peers, |
727 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff | 837 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff |
838 | RPS_sampler_set_desired_probability (rh->sampler, | ||
839 | rps_handle->desired_probability); | ||
840 | RPS_sampler_set_deficiency_factor (rh->sampler, | ||
841 | rps_handle->deficiency_factor); | ||
728 | rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, | 842 | rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, |
729 | num_req_peers, | 843 | num_req_peers, |
730 | peers_ready_cb, | 844 | peers_ready_cb, |
@@ -734,6 +848,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
734 | rh); /* cls */ | 848 | rh); /* cls */ |
735 | rh->ready_cb = ready_cb; | 849 | rh->ready_cb = ready_cb; |
736 | rh->ready_cb_cls = cls; | 850 | rh->ready_cb_cls = cls; |
851 | GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head, | ||
852 | rps_handle->rh_tail, | ||
853 | rh); | ||
737 | 854 | ||
738 | return rh; | 855 | return rh; |
739 | } | 856 | } |
@@ -911,6 +1028,7 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | |||
911 | 1028 | ||
912 | h = rh->rps_handle; | 1029 | h = rh->rps_handle; |
913 | GNUNET_assert (NULL != rh); | 1030 | GNUNET_assert (NULL != rh); |
1031 | GNUNET_assert (NULL != rh->srh); | ||
914 | GNUNET_assert (h == rh->srh->rps_handle); | 1032 | GNUNET_assert (h == rh->srh->rps_handle); |
915 | GNUNET_RPS_stream_cancel (rh->srh); | 1033 | GNUNET_RPS_stream_cancel (rh->srh); |
916 | rh->srh = NULL; | 1034 | rh->srh = NULL; |
@@ -920,6 +1038,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | |||
920 | RPS_sampler_request_cancel (rh->sampler_rh); | 1038 | RPS_sampler_request_cancel (rh->sampler_rh); |
921 | } | 1039 | } |
922 | RPS_sampler_destroy (rh->sampler); | 1040 | RPS_sampler_destroy (rh->sampler); |
1041 | rh->sampler = NULL; | ||
1042 | GNUNET_CONTAINER_DLL_remove (h->rh_head, | ||
1043 | h->rh_tail, | ||
1044 | rh); | ||
923 | GNUNET_free (rh); | 1045 | GNUNET_free (rh); |
924 | } | 1046 | } |
925 | 1047 | ||
@@ -939,13 +1061,24 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | |||
939 | LOG (GNUNET_ERROR_TYPE_WARNING, | 1061 | LOG (GNUNET_ERROR_TYPE_WARNING, |
940 | "Still waiting for replies\n"); | 1062 | "Still waiting for replies\n"); |
941 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; | 1063 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; |
942 | NULL != srh_iter; | 1064 | NULL != srh_iter; |
943 | srh_iter = srh_next) | 1065 | srh_iter = srh_next) |
944 | { | 1066 | { |
945 | srh_next = srh_iter->next; | 1067 | srh_next = srh_iter->next; |
946 | GNUNET_RPS_stream_cancel (srh_iter); | 1068 | GNUNET_RPS_stream_cancel (srh_iter); |
947 | } | 1069 | } |
948 | } | 1070 | } |
1071 | if (NULL != h->rh_head) | ||
1072 | { | ||
1073 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1074 | "Not all requests were cancelled!\n"); | ||
1075 | for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; | ||
1076 | h->rh_head != NULL; | ||
1077 | rh_iter = h->rh_head) | ||
1078 | { | ||
1079 | GNUNET_RPS_request_cancel (rh_iter); | ||
1080 | } | ||
1081 | } | ||
949 | if (NULL != srh_callback_peers) | 1082 | if (NULL != srh_callback_peers) |
950 | { | 1083 | { |
951 | GNUNET_free (srh_callback_peers); | 1084 | GNUNET_free (srh_callback_peers); |
@@ -957,6 +1090,8 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | |||
957 | "Still waiting for view updates\n"); | 1090 | "Still waiting for view updates\n"); |
958 | GNUNET_RPS_view_request_cancel (h); | 1091 | GNUNET_RPS_view_request_cancel (h); |
959 | } | 1092 | } |
1093 | if (NULL != h->nse) | ||
1094 | GNUNET_NSE_disconnect (h->nse); | ||
960 | GNUNET_MQ_destroy (h->mq); | 1095 | GNUNET_MQ_destroy (h->mq); |
961 | GNUNET_free (h); | 1096 | GNUNET_free (h); |
962 | } | 1097 | } |