diff options
-rw-r--r-- | src/include/gnunet_protocols.h | 16 | ||||
-rw-r--r-- | src/include/gnunet_rps_service.h | 25 | ||||
-rw-r--r-- | src/rps/gnunet-rps.c | 51 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps.c | 278 | ||||
-rw-r--r-- | src/rps/rps.h | 41 | ||||
-rw-r--r-- | src/rps/rps_api.c | 71 |
6 files changed, 399 insertions, 83 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 4400db7e1..56e049608 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2993,6 +2993,22 @@ extern "C" | |||
2993 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132 | 2993 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132 |
2994 | 2994 | ||
2995 | 2995 | ||
2996 | /** | ||
2997 | * @brief Request biased input stream | ||
2998 | */ | ||
2999 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST 1133 | ||
3000 | |||
3001 | /** | ||
3002 | * @brief Send peer of biased stream | ||
3003 | */ | ||
3004 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY 1134 | ||
3005 | |||
3006 | /** | ||
3007 | * @brief Cancel getting biased strem | ||
3008 | */ | ||
3009 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 | ||
3010 | |||
3011 | |||
2996 | 3012 | ||
2997 | /** | 3013 | /** |
2998 | * Next available: 1200 | 3014 | * Next available: 1200 |
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index b0c8c9867..252188c62 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h | |||
@@ -74,6 +74,14 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls, | |||
74 | const struct GNUNET_PeerIdentity *peers); | 74 | const struct GNUNET_PeerIdentity *peers); |
75 | 75 | ||
76 | /** | 76 | /** |
77 | * Callback called when a peer from the biased stream was received | ||
78 | * | ||
79 | * @param peer The received peer | ||
80 | */ | ||
81 | typedef void (* GNUNET_RPS_StreamInputCB) (void *cls, | ||
82 | const struct GNUNET_PeerIdentity *peer); | ||
83 | |||
84 | /** | ||
77 | * Connect to the rps service | 85 | * Connect to the rps service |
78 | * | 86 | * |
79 | * @param cfg configuration to use | 87 | * @param cfg configuration to use |
@@ -161,6 +169,23 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | |||
161 | GNUNET_RPS_ViewUpdateCB view_update_cb, | 169 | GNUNET_RPS_ViewUpdateCB view_update_cb, |
162 | void *cls); | 170 | void *cls); |
163 | 171 | ||
172 | |||
173 | /** | ||
174 | * Request biased stream of peers that are being put into the sampler | ||
175 | * | ||
176 | * @param rps_handle handle to the rps service | ||
177 | * @param num_req_peers number of peers we want to receive | ||
178 | * (0 for infinite updates) | ||
179 | * @param cls a closure that will be given to the callback | ||
180 | * @param ready_cb the callback called when the peers are available | ||
181 | */ | ||
182 | void | ||
183 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
184 | uint32_t num_updates, | ||
185 | GNUNET_RPS_StreamInputCB stream_input_cb, | ||
186 | void *cls); | ||
187 | |||
188 | |||
164 | /** | 189 | /** |
165 | * Disconnect from the rps service | 190 | * Disconnect from the rps service |
166 | * | 191 | * |
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index 03b2c8ab7..d2c497fd4 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c | |||
@@ -49,10 +49,20 @@ static struct GNUNET_PeerIdentity peer_id; | |||
49 | static int view_update; | 49 | static int view_update; |
50 | 50 | ||
51 | /** | 51 | /** |
52 | * @brief Do we want to receive updates of the view? (Option --view) | ||
53 | */ | ||
54 | static int stream_input; | ||
55 | |||
56 | /** | ||
52 | * @brief Number of updates we want to receive | 57 | * @brief Number of updates we want to receive |
53 | */ | 58 | */ |
54 | static uint64_t num_view_updates; | 59 | static uint64_t num_view_updates; |
55 | 60 | ||
61 | /** | ||
62 | * @brief Number of updates we want to receive | ||
63 | */ | ||
64 | static uint64_t num_stream_peers; | ||
65 | |||
56 | 66 | ||
57 | /** | 67 | /** |
58 | * Task run when user presses CTRL-C to abort. | 68 | * Task run when user presses CTRL-C to abort. |
@@ -137,6 +147,22 @@ view_update_handle (void *cls, | |||
137 | 147 | ||
138 | 148 | ||
139 | /** | 149 | /** |
150 | * Callback called on receipt of peer from biased stream | ||
151 | * | ||
152 | * @param n number of peers | ||
153 | * @param recv_peers the received peers | ||
154 | */ | ||
155 | static void | ||
156 | stream_input_handle (void *cls, | ||
157 | const struct GNUNET_PeerIdentity *recv_peer) | ||
158 | { | ||
159 | // TODO when source of peer is sent, also print source | ||
160 | FPRINTF (stdout, "%s\n", | ||
161 | GNUNET_i2s_full (recv_peer)); | ||
162 | } | ||
163 | |||
164 | |||
165 | /** | ||
140 | * Main function that will be run by the scheduler. | 166 | * Main function that will be run by the scheduler. |
141 | * | 167 | * |
142 | * @param cls closure | 168 | * @param cls closure |
@@ -163,7 +189,8 @@ run (void *cls, | |||
163 | } | 189 | } |
164 | 190 | ||
165 | if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) && | 191 | if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) && |
166 | (!view_update)) | 192 | (!view_update) && |
193 | (!stream_input)) | ||
167 | { /* Request n PeerIDs */ | 194 | { /* Request n PeerIDs */ |
168 | /* If number was specified use it, else request single peer. */ | 195 | /* If number was specified use it, else request single peer. */ |
169 | if (NULL == args[0] || | 196 | if (NULL == args[0] || |
@@ -189,7 +216,23 @@ run (void *cls, | |||
189 | "Requesting %" PRIu64 " view updates\n", num_view_updates); | 216 | "Requesting %" PRIu64 " view updates\n", num_view_updates); |
190 | else | 217 | else |
191 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 218 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
192 | "Requesting contiuous view updates\n"); | 219 | "Requesting continuous view updates\n"); |
220 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); | ||
221 | } else if (stream_input) | ||
222 | { | ||
223 | /* Get updates of view */ | ||
224 | if (NULL == args[0] || | ||
225 | 0 == sscanf (args[0], "%lu", &num_stream_peers)) | ||
226 | { | ||
227 | num_stream_peers = 0; | ||
228 | } | ||
229 | GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL); | ||
230 | if (0 != num_stream_peers) | ||
231 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
232 | "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers); | ||
233 | else | ||
234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
235 | "Requesting continuous peers from biased stream\n"); | ||
193 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); | 236 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); |
194 | } | 237 | } |
195 | else | 238 | else |
@@ -223,6 +266,10 @@ main (int argc, char *const *argv) | |||
223 | "view", | 266 | "view", |
224 | gettext_noop ("Get updates of view (0 for infinite updates)"), | 267 | gettext_noop ("Get updates of view (0 for infinite updates)"), |
225 | &view_update), | 268 | &view_update), |
269 | GNUNET_GETOPT_option_flag ('S', | ||
270 | "stream", | ||
271 | gettext_noop ("Get peers from biased stream"), | ||
272 | &stream_input), | ||
226 | GNUNET_GETOPT_OPTION_END | 273 | GNUNET_GETOPT_OPTION_END |
227 | }; | 274 | }; |
228 | return (GNUNET_OK == | 275 | return (GNUNET_OK == |
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 3e30041e8..5b78bb4a8 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -1769,6 +1769,12 @@ struct ClientContext | |||
1769 | int64_t view_updates_left; | 1769 | int64_t view_updates_left; |
1770 | 1770 | ||
1771 | /** | 1771 | /** |
1772 | * @brief How many peers from the biased | ||
1773 | * stream this client expects to receive. | ||
1774 | */ | ||
1775 | int64_t stream_peers_left; | ||
1776 | |||
1777 | /** | ||
1772 | * The client handle to send the reply to | 1778 | * The client handle to send the reply to |
1773 | */ | 1779 | */ |
1774 | struct GNUNET_SERVICE_Client *client; | 1780 | struct GNUNET_SERVICE_Client *client; |
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer) | |||
2174 | return ret; | 2180 | return ret; |
2175 | } | 2181 | } |
2176 | 2182 | ||
2183 | |||
2184 | /** | ||
2185 | * @brief Send view to client | ||
2186 | * | ||
2187 | * @param cli_ctx the context of the client | ||
2188 | * @param view_array the peerids of the view as array (can be empty) | ||
2189 | * @param view_size the size of the view array (can be 0) | ||
2190 | */ | ||
2191 | void | ||
2192 | send_view (const struct ClientContext *cli_ctx, | ||
2193 | const struct GNUNET_PeerIdentity *view_array, | ||
2194 | uint64_t view_size) | ||
2195 | { | ||
2196 | struct GNUNET_MQ_Envelope *ev; | ||
2197 | struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; | ||
2198 | |||
2199 | if (NULL == view_array) | ||
2200 | { | ||
2201 | view_size = View_size (); | ||
2202 | view_array = View_get_as_array(); | ||
2203 | } | ||
2204 | |||
2205 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
2206 | view_size * sizeof (struct GNUNET_PeerIdentity), | ||
2207 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); | ||
2208 | out_msg->num_peers = htonl (view_size); | ||
2209 | |||
2210 | GNUNET_memcpy (&out_msg[1], | ||
2211 | view_array, | ||
2212 | view_size * sizeof (struct GNUNET_PeerIdentity)); | ||
2213 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2214 | } | ||
2215 | |||
2216 | |||
2217 | /** | ||
2218 | * @brief Send peer from biased stream to client. | ||
2219 | * | ||
2220 | * @param cli_ctx the context of the client | ||
2221 | * @param view_array the peerids of the view as array (can be empty) | ||
2222 | * @param view_size the size of the view array (can be 0) | ||
2223 | */ | ||
2224 | void | ||
2225 | send_stream_peer (const struct ClientContext *cli_ctx, | ||
2226 | const struct GNUNET_PeerIdentity *peer) | ||
2227 | { | ||
2228 | struct GNUNET_MQ_Envelope *ev; | ||
2229 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; | ||
2230 | |||
2231 | GNUNET_assert (NULL != peer); | ||
2232 | |||
2233 | ev = GNUNET_MQ_msg (out_msg, | ||
2234 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); | ||
2235 | |||
2236 | GNUNET_memcpy (&out_msg->peer, | ||
2237 | peer, | ||
2238 | sizeof (struct GNUNET_PeerIdentity)); | ||
2239 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2240 | } | ||
2241 | |||
2242 | |||
2177 | /** | 2243 | /** |
2178 | * @brief sends updates to clients that are interested | 2244 | * @brief sends updates to clients that are interested |
2179 | */ | 2245 | */ |
2180 | static void | 2246 | static void |
2181 | clients_notify_view_update (void); | 2247 | clients_notify_view_update (void) |
2248 | { | ||
2249 | struct ClientContext *cli_ctx_iter; | ||
2250 | uint64_t num_peers; | ||
2251 | const struct GNUNET_PeerIdentity *view_array; | ||
2252 | |||
2253 | num_peers = View_size (); | ||
2254 | view_array = View_get_as_array(); | ||
2255 | /* check size of view is small enough */ | ||
2256 | if (GNUNET_MAX_MESSAGE_SIZE < num_peers) | ||
2257 | { | ||
2258 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2259 | "View is too big to send\n"); | ||
2260 | return; | ||
2261 | } | ||
2262 | |||
2263 | for (cli_ctx_iter = cli_ctx_head; | ||
2264 | NULL != cli_ctx_iter; | ||
2265 | cli_ctx_iter = cli_ctx_head->next) | ||
2266 | { | ||
2267 | if (1 < cli_ctx_iter->view_updates_left) | ||
2268 | { | ||
2269 | /* Client wants to receive limited amount of updates */ | ||
2270 | cli_ctx_iter->view_updates_left -= 1; | ||
2271 | } else if (1 == cli_ctx_iter->view_updates_left) | ||
2272 | { | ||
2273 | /* Last update of view for client */ | ||
2274 | cli_ctx_iter->view_updates_left = -1; | ||
2275 | } else if (0 > cli_ctx_iter->view_updates_left) { | ||
2276 | /* Client is not interested in updates */ | ||
2277 | continue; | ||
2278 | } | ||
2279 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2280 | |||
2281 | /* send view */ | ||
2282 | send_view (cli_ctx_iter, view_array, num_peers); | ||
2283 | } | ||
2284 | } | ||
2285 | |||
2286 | |||
2287 | /** | ||
2288 | * @brief sends updates to clients that are interested | ||
2289 | */ | ||
2290 | static void | ||
2291 | clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) | ||
2292 | //enum StreamPeerSource) | ||
2293 | { | ||
2294 | struct ClientContext *cli_ctx_iter; | ||
2295 | |||
2296 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2297 | "Got peer (%s) from biased stream - update all clients\n", | ||
2298 | GNUNET_i2s (peer)); | ||
2299 | |||
2300 | /* check size of view is small enough */ | ||
2301 | for (cli_ctx_iter = cli_ctx_head; | ||
2302 | NULL != cli_ctx_iter; | ||
2303 | cli_ctx_iter = cli_ctx_head->next) | ||
2304 | { | ||
2305 | if (1 < cli_ctx_iter->stream_peers_left) | ||
2306 | { | ||
2307 | /* Client wants to receive limited amount of updates */ | ||
2308 | cli_ctx_iter->stream_peers_left -= 1; | ||
2309 | } else if (1 == cli_ctx_iter->stream_peers_left) | ||
2310 | { | ||
2311 | /* Last update of view for client */ | ||
2312 | cli_ctx_iter->stream_peers_left = -1; | ||
2313 | } else if (0 > cli_ctx_iter->stream_peers_left) { | ||
2314 | /* Client is not interested in updates */ | ||
2315 | continue; | ||
2316 | } | ||
2317 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2318 | |||
2319 | /* send view */ | ||
2320 | send_stream_peer (cli_ctx_iter, peer); | ||
2321 | } | ||
2322 | } | ||
2182 | 2323 | ||
2183 | /** | 2324 | /** |
2184 | * Put random peer from sampler into the view as history update. | 2325 | * Put random peer from sampler into the view as history update. |
@@ -2193,7 +2334,12 @@ hist_update (void *cls, | |||
2193 | 2334 | ||
2194 | for (i = 0; i < num_peers; i++) | 2335 | for (i = 0; i < num_peers; i++) |
2195 | { | 2336 | { |
2196 | (void) insert_in_view (&ids[i]); | 2337 | int inserted; |
2338 | inserted = insert_in_view (&ids[i]); | ||
2339 | if (GNUNET_OK == inserted) | ||
2340 | { | ||
2341 | clients_notify_stream_peer (&ids[i]); | ||
2342 | } | ||
2197 | to_file (file_name_view_log, | 2343 | to_file (file_name_view_log, |
2198 | "+%s\t(hist)", | 2344 | "+%s\t(hist)", |
2199 | GNUNET_i2s_full (ids)); | 2345 | GNUNET_i2s_full (ids)); |
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls, | |||
2398 | const struct GNUNET_PeerIdentity *peer) | 2544 | const struct GNUNET_PeerIdentity *peer) |
2399 | { | 2545 | { |
2400 | (void) cls; | 2546 | (void) cls; |
2401 | (void) insert_in_view (peer); | 2547 | int inserted; |
2548 | |||
2549 | inserted = insert_in_view (peer); | ||
2550 | if (GNUNET_OK == inserted) | ||
2551 | { | ||
2552 | clients_notify_stream_peer (peer); | ||
2553 | } | ||
2402 | } | 2554 | } |
2403 | 2555 | ||
2404 | 2556 | ||
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls, | |||
2860 | GNUNET_SERVICE_client_continue (cli_ctx->client); | 3012 | GNUNET_SERVICE_client_continue (cli_ctx->client); |
2861 | } | 3013 | } |
2862 | 3014 | ||
2863 | /** | ||
2864 | * @brief Send view to client | ||
2865 | * | ||
2866 | * @param cli_ctx the context of the client | ||
2867 | * @param view_array the peerids of the view as array (can be empty) | ||
2868 | * @param view_size the size of the view array (can be 0) | ||
2869 | */ | ||
2870 | void | ||
2871 | send_view (const struct ClientContext *cli_ctx, | ||
2872 | const struct GNUNET_PeerIdentity *view_array, | ||
2873 | uint64_t view_size) | ||
2874 | { | ||
2875 | struct GNUNET_MQ_Envelope *ev; | ||
2876 | struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; | ||
2877 | |||
2878 | if (NULL == view_array) | ||
2879 | { | ||
2880 | view_size = View_size (); | ||
2881 | view_array = View_get_as_array(); | ||
2882 | } | ||
2883 | |||
2884 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
2885 | view_size * sizeof (struct GNUNET_PeerIdentity), | ||
2886 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); | ||
2887 | out_msg->num_peers = htonl (view_size); | ||
2888 | |||
2889 | GNUNET_memcpy (&out_msg[1], | ||
2890 | view_array, | ||
2891 | view_size * sizeof (struct GNUNET_PeerIdentity)); | ||
2892 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2893 | } | ||
2894 | 3015 | ||
2895 | /** | 3016 | /** |
2896 | * @brief sends updates to clients that are interested | 3017 | * Handle RPS request from the client. |
3018 | * | ||
3019 | * @param cls closure | ||
3020 | * @param message the actual message | ||
2897 | */ | 3021 | */ |
2898 | static void | 3022 | static void |
2899 | clients_notify_view_update (void) | 3023 | handle_client_view_request (void *cls, |
3024 | const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) | ||
2900 | { | 3025 | { |
2901 | struct ClientContext *cli_ctx_iter; | 3026 | struct ClientContext *cli_ctx = cls; |
2902 | uint64_t num_peers; | 3027 | uint64_t num_updates; |
2903 | const struct GNUNET_PeerIdentity *view_array; | ||
2904 | 3028 | ||
2905 | num_peers = View_size (); | 3029 | num_updates = ntohl (msg->num_updates); |
2906 | view_array = View_get_as_array(); | ||
2907 | /* check size of view is small enough */ | ||
2908 | if (GNUNET_MAX_MESSAGE_SIZE < num_peers) | ||
2909 | { | ||
2910 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2911 | "View is too big to send\n"); | ||
2912 | return; | ||
2913 | } | ||
2914 | 3030 | ||
2915 | for (cli_ctx_iter = cli_ctx_head; | 3031 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2916 | NULL != cli_ctx_iter; | 3032 | "Client requested %" PRIu64 " updates of view.\n", |
2917 | cli_ctx_iter = cli_ctx_head->next) | 3033 | num_updates); |
2918 | { | ||
2919 | if (1 < cli_ctx_iter->view_updates_left) | ||
2920 | { | ||
2921 | /* Client wants to receive limited amount of updates */ | ||
2922 | cli_ctx_iter->view_updates_left -= 1; | ||
2923 | } else if (1 == cli_ctx_iter->view_updates_left) | ||
2924 | { | ||
2925 | /* Last update of view for client */ | ||
2926 | cli_ctx_iter->view_updates_left = -1; | ||
2927 | } else if (0 > cli_ctx_iter->view_updates_left) { | ||
2928 | /* Client is not interested in updates */ | ||
2929 | continue; | ||
2930 | } | ||
2931 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2932 | 3034 | ||
2933 | /* send view */ | 3035 | GNUNET_assert (NULL != cli_ctx); |
2934 | send_view (cli_ctx_iter, view_array, num_peers); | 3036 | cli_ctx->view_updates_left = num_updates; |
2935 | } | 3037 | send_view (cli_ctx, NULL, 0); |
3038 | GNUNET_SERVICE_client_continue (cli_ctx->client); | ||
2936 | } | 3039 | } |
2937 | 3040 | ||
2938 | 3041 | ||
2939 | /** | 3042 | /** |
2940 | * Handle RPS request from the client. | 3043 | * Handle RPS request for biased stream from the client. |
2941 | * | 3044 | * |
2942 | * @param cls closure | 3045 | * @param cls closure |
2943 | * @param message the actual message | 3046 | * @param message the actual message |
2944 | */ | 3047 | */ |
2945 | static void | 3048 | static void |
2946 | handle_client_view_request (void *cls, | 3049 | handle_client_stream_request (void *cls, |
2947 | const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) | 3050 | const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) |
2948 | { | 3051 | { |
2949 | struct ClientContext *cli_ctx = cls; | 3052 | struct ClientContext *cli_ctx = cls; |
2950 | uint64_t num_updates; | 3053 | uint64_t num_peers; |
2951 | 3054 | ||
2952 | num_updates = ntohl (msg->num_updates); | 3055 | num_peers = ntohl (msg->num_peers); |
2953 | 3056 | ||
2954 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3057 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2955 | "Client requested %" PRIu64 " updates of view.\n", | 3058 | "Client requested %" PRIu64 " peers from biased stream.\n", |
2956 | num_updates); | 3059 | num_peers); |
2957 | 3060 | ||
2958 | GNUNET_assert (NULL != cli_ctx); | 3061 | GNUNET_assert (NULL != cli_ctx); |
2959 | cli_ctx->view_updates_left = num_updates; | 3062 | cli_ctx->stream_peers_left = num_peers; |
2960 | send_view (cli_ctx, NULL, 0); | ||
2961 | GNUNET_SERVICE_client_continue (cli_ctx->client); | 3063 | GNUNET_SERVICE_client_continue (cli_ctx->client); |
2962 | } | 3064 | } |
2963 | 3065 | ||
@@ -3727,8 +3829,14 @@ do_round (void *cls) | |||
3727 | CustomPeerMap_size (push_map)); | 3829 | CustomPeerMap_size (push_map)); |
3728 | for (i = 0; i < first_border; i++) | 3830 | for (i = 0; i < first_border; i++) |
3729 | { | 3831 | { |
3730 | (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, | 3832 | int inserted; |
3731 | permut[i])); | 3833 | inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map, |
3834 | permut[i])); | ||
3835 | if (GNUNET_OK == inserted) | ||
3836 | { | ||
3837 | clients_notify_stream_peer ( | ||
3838 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | ||
3839 | } | ||
3732 | to_file (file_name_view_log, | 3840 | to_file (file_name_view_log, |
3733 | "+%s\t(push list)", | 3841 | "+%s\t(push list)", |
3734 | GNUNET_i2s_full (&view_array[i])); | 3842 | GNUNET_i2s_full (&view_array[i])); |
@@ -3742,8 +3850,14 @@ do_round (void *cls) | |||
3742 | CustomPeerMap_size (pull_map)); | 3850 | CustomPeerMap_size (pull_map)); |
3743 | for (i = first_border; i < second_border; i++) | 3851 | for (i = first_border; i < second_border; i++) |
3744 | { | 3852 | { |
3745 | (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, | 3853 | int inserted; |
3854 | inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, | ||
3746 | permut[i - first_border])); | 3855 | permut[i - first_border])); |
3856 | if (GNUNET_OK == inserted) | ||
3857 | { | ||
3858 | clients_notify_stream_peer ( | ||
3859 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | ||
3860 | } | ||
3747 | to_file (file_name_view_log, | 3861 | to_file (file_name_view_log, |
3748 | "+%s\t(pull list)", | 3862 | "+%s\t(pull list)", |
3749 | GNUNET_i2s_full (&view_array[i])); | 3863 | GNUNET_i2s_full (&view_array[i])); |
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN | |||
4296 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, | 4410 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, |
4297 | struct GNUNET_RPS_CS_DEBUG_ViewRequest, | 4411 | struct GNUNET_RPS_CS_DEBUG_ViewRequest, |
4298 | NULL), | 4412 | NULL), |
4413 | GNUNET_MQ_hd_fixed_size (client_stream_request, | ||
4414 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST, | ||
4415 | struct GNUNET_RPS_CS_DEBUG_StreamRequest, | ||
4416 | NULL), | ||
4299 | GNUNET_MQ_handler_end()); | 4417 | GNUNET_MQ_handler_end()); |
4300 | 4418 | ||
4301 | /* end of gnunet-service-rps.c */ | 4419 | /* end of gnunet-service-rps.c */ |
diff --git a/src/rps/rps.h b/src/rps/rps.h index 58ba79082..66b2dd962 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h | |||
@@ -216,6 +216,47 @@ struct GNUNET_RPS_CS_DEBUG_ViewReply | |||
216 | }; | 216 | }; |
217 | /* Followed by num_peers * GNUNET_PeerIdentity */ | 217 | /* Followed by num_peers * GNUNET_PeerIdentity */ |
218 | 218 | ||
219 | /** | ||
220 | * Message from client to service indicating that | ||
221 | * clients wants to get stream of biased peers | ||
222 | */ | ||
223 | struct GNUNET_RPS_CS_DEBUG_StreamRequest | ||
224 | { | ||
225 | /** | ||
226 | * Header including size and type in NBO | ||
227 | */ | ||
228 | struct GNUNET_MessageHeader header; | ||
229 | |||
230 | /** | ||
231 | * Number of peers | ||
232 | * 0 for sending updates until cancellation | ||
233 | */ | ||
234 | uint32_t num_peers GNUNET_PACKED; | ||
235 | }; | ||
236 | |||
237 | /** | ||
238 | * Message from service to client containing peer from biased stream | ||
239 | */ | ||
240 | struct GNUNET_RPS_CS_DEBUG_StreamReply | ||
241 | { | ||
242 | /** | ||
243 | * Header including size and type in NBO | ||
244 | */ | ||
245 | struct GNUNET_MessageHeader header; | ||
246 | |||
247 | /** | ||
248 | * Identifyer of the message. | ||
249 | */ | ||
250 | uint32_t id GNUNET_PACKED; | ||
251 | |||
252 | /** | ||
253 | * @brief The peer of the biased stream | ||
254 | */ | ||
255 | struct GNUNET_PeerIdentity peer; | ||
256 | |||
257 | // TODO maybe source of peer (pull/push list, peerinfo, ...) | ||
258 | }; | ||
259 | |||
219 | GNUNET_NETWORK_STRUCT_END | 260 | GNUNET_NETWORK_STRUCT_END |
220 | 261 | ||
221 | /*********************************************************************** | 262 | /*********************************************************************** |
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index ac462f3a0..b7644540d 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle | |||
61 | GNUNET_RPS_ViewUpdateCB view_update_cb; | 61 | GNUNET_RPS_ViewUpdateCB view_update_cb; |
62 | 62 | ||
63 | /** | 63 | /** |
64 | * @brief Callback called on each update of the view | 64 | * @brief Closure to each requested update of the view |
65 | */ | 65 | */ |
66 | void *view_update_cls; | 66 | void *view_update_cls; |
67 | |||
68 | /** | ||
69 | * @brief Callback called on each peer of the biased input stream | ||
70 | */ | ||
71 | GNUNET_RPS_StreamInputCB stream_input_cb; | ||
72 | |||
73 | /** | ||
74 | * @brief Closure to each requested peer from the biased stream | ||
75 | */ | ||
76 | void *stream_input_cls; | ||
67 | }; | 77 | }; |
68 | 78 | ||
69 | 79 | ||
@@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | |||
277 | GNUNET_MQ_send (rps_handle->mq, ev); | 287 | GNUNET_MQ_send (rps_handle->mq, ev); |
278 | } | 288 | } |
279 | 289 | ||
290 | |||
291 | /** | ||
292 | * Request biased stream of peers that are being put into the sampler | ||
293 | * | ||
294 | * @param rps_handle handle to the rps service | ||
295 | * @param num_req_peers number of peers we want to receive | ||
296 | * (0 for infinite updates) | ||
297 | * @param cls a closure that will be given to the callback | ||
298 | * @param ready_cb the callback called when the peers are available | ||
299 | */ | ||
300 | void | ||
301 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
302 | uint32_t num_peers, | ||
303 | GNUNET_RPS_StreamInputCB stream_input_cb, | ||
304 | void *cls) | ||
305 | { | ||
306 | struct GNUNET_MQ_Envelope *ev; | ||
307 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | ||
308 | |||
309 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
310 | "Client requests %" PRIu32 " biased stream updates\n", | ||
311 | num_peers); | ||
312 | rps_handle->stream_input_cb = stream_input_cb; | ||
313 | rps_handle->stream_input_cls = cls; | ||
314 | |||
315 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | ||
316 | msg->num_peers = htonl (num_peers); | ||
317 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
318 | } | ||
319 | |||
320 | |||
280 | /** | 321 | /** |
281 | * This function is called, when the service updates the view. | 322 | * This function is called, when the service updates the view. |
282 | * It verifies that @a msg is well-formed. | 323 | * It verifies that @a msg is well-formed. |
@@ -303,6 +344,7 @@ check_view_update (void *cls, | |||
303 | return GNUNET_OK; | 344 | return GNUNET_OK; |
304 | } | 345 | } |
305 | 346 | ||
347 | |||
306 | /** | 348 | /** |
307 | * This function is called, when the service updated its view. | 349 | * This function is called, when the service updated its view. |
308 | * It calls the callback the caller provided | 350 | * It calls the callback the caller provided |
@@ -329,6 +371,29 @@ handle_view_update (void *cls, | |||
329 | } | 371 | } |
330 | 372 | ||
331 | 373 | ||
374 | /** | ||
375 | * This function is called, when the service sends another peer from the biased | ||
376 | * stream. | ||
377 | * It calls the callback the caller provided | ||
378 | * and disconnects afterwards. | ||
379 | * | ||
380 | * @param msg the message | ||
381 | */ | ||
382 | static void | ||
383 | handle_stream_input (void *cls, | ||
384 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | ||
385 | { | ||
386 | struct GNUNET_RPS_Handle *h = cls; | ||
387 | |||
388 | /* Give the peers back */ | ||
389 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
390 | "New peer of biased input stream\n"); | ||
391 | |||
392 | GNUNET_assert (NULL != h); | ||
393 | GNUNET_assert (NULL != h->stream_input_cb); | ||
394 | h->stream_input_cb (h->stream_input_cb, &msg->peer); | ||
395 | } | ||
396 | |||
332 | 397 | ||
333 | /** | 398 | /** |
334 | * Reconnect to the service | 399 | * Reconnect to the service |
@@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h) | |||
379 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, | 444 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, |
380 | struct GNUNET_RPS_CS_DEBUG_ViewReply, | 445 | struct GNUNET_RPS_CS_DEBUG_ViewReply, |
381 | h), | 446 | h), |
447 | GNUNET_MQ_hd_fixed_size (stream_input, | ||
448 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, | ||
449 | struct GNUNET_RPS_CS_DEBUG_StreamReply, | ||
450 | h), | ||
382 | GNUNET_MQ_handler_end () | 451 | GNUNET_MQ_handler_end () |
383 | }; | 452 | }; |
384 | 453 | ||