aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h16
-rw-r--r--src/include/gnunet_rps_service.h25
-rw-r--r--src/rps/gnunet-rps.c51
-rw-r--r--src/rps/gnunet-service-rps.c278
-rw-r--r--src/rps/rps.h41
-rw-r--r--src/rps/rps_api.c71
6 files changed, 399 insertions, 83 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 4400db7e1..56e049608 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2993,6 +2993,22 @@ extern "C"
2993#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132 2993#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132
2994 2994
2995 2995
2996/**
2997 * @brief Request biased input stream
2998 */
2999#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST 1133
3000
3001/**
3002 * @brief Send peer of biased stream
3003 */
3004#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY 1134
3005
3006/**
3007 * @brief Cancel getting biased strem
3008 */
3009#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135
3010
3011
2996 3012
2997/** 3013/**
2998 * Next available: 1200 3014 * Next available: 1200
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index b0c8c9867..252188c62 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -74,6 +74,14 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
74 const struct GNUNET_PeerIdentity *peers); 74 const struct GNUNET_PeerIdentity *peers);
75 75
76/** 76/**
77 * Callback called when a peer from the biased stream was received
78 *
79 * @param peer The received peer
80 */
81typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
82 const struct GNUNET_PeerIdentity *peer);
83
84/**
77 * Connect to the rps service 85 * Connect to the rps service
78 * 86 *
79 * @param cfg configuration to use 87 * @param cfg configuration to use
@@ -161,6 +169,23 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
161 GNUNET_RPS_ViewUpdateCB view_update_cb, 169 GNUNET_RPS_ViewUpdateCB view_update_cb,
162 void *cls); 170 void *cls);
163 171
172
173/**
174 * Request biased stream of peers that are being put into the sampler
175 *
176 * @param rps_handle handle to the rps service
177 * @param num_req_peers number of peers we want to receive
178 * (0 for infinite updates)
179 * @param cls a closure that will be given to the callback
180 * @param ready_cb the callback called when the peers are available
181 */
182void
183GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
184 uint32_t num_updates,
185 GNUNET_RPS_StreamInputCB stream_input_cb,
186 void *cls);
187
188
164/** 189/**
165 * Disconnect from the rps service 190 * Disconnect from the rps service
166 * 191 *
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index 03b2c8ab7..d2c497fd4 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -49,10 +49,20 @@ static struct GNUNET_PeerIdentity peer_id;
49static int view_update; 49static int view_update;
50 50
51/** 51/**
52 * @brief Do we want to receive updates of the view? (Option --view)
53 */
54static int stream_input;
55
56/**
52 * @brief Number of updates we want to receive 57 * @brief Number of updates we want to receive
53 */ 58 */
54static uint64_t num_view_updates; 59static uint64_t num_view_updates;
55 60
61/**
62 * @brief Number of updates we want to receive
63 */
64static uint64_t num_stream_peers;
65
56 66
57/** 67/**
58 * Task run when user presses CTRL-C to abort. 68 * Task run when user presses CTRL-C to abort.
@@ -137,6 +147,22 @@ view_update_handle (void *cls,
137 147
138 148
139/** 149/**
150 * Callback called on receipt of peer from biased stream
151 *
152 * @param n number of peers
153 * @param recv_peers the received peers
154 */
155static void
156stream_input_handle (void *cls,
157 const struct GNUNET_PeerIdentity *recv_peer)
158{
159 // TODO when source of peer is sent, also print source
160 FPRINTF (stdout, "%s\n",
161 GNUNET_i2s_full (recv_peer));
162}
163
164
165/**
140 * Main function that will be run by the scheduler. 166 * Main function that will be run by the scheduler.
141 * 167 *
142 * @param cls closure 168 * @param cls closure
@@ -163,7 +189,8 @@ run (void *cls,
163 } 189 }
164 190
165 if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) && 191 if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) &&
166 (!view_update)) 192 (!view_update) &&
193 (!stream_input))
167 { /* Request n PeerIDs */ 194 { /* Request n PeerIDs */
168 /* If number was specified use it, else request single peer. */ 195 /* If number was specified use it, else request single peer. */
169 if (NULL == args[0] || 196 if (NULL == args[0] ||
@@ -189,7 +216,23 @@ run (void *cls,
189 "Requesting %" PRIu64 " view updates\n", num_view_updates); 216 "Requesting %" PRIu64 " view updates\n", num_view_updates);
190 else 217 else
191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
192 "Requesting contiuous view updates\n"); 219 "Requesting continuous view updates\n");
220 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
221 } else if (stream_input)
222 {
223 /* Get updates of view */
224 if (NULL == args[0] ||
225 0 == sscanf (args[0], "%lu", &num_stream_peers))
226 {
227 num_stream_peers = 0;
228 }
229 GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
230 if (0 != num_stream_peers)
231 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
232 "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
233 else
234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235 "Requesting continuous peers from biased stream\n");
193 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); 236 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
194 } 237 }
195 else 238 else
@@ -223,6 +266,10 @@ main (int argc, char *const *argv)
223 "view", 266 "view",
224 gettext_noop ("Get updates of view (0 for infinite updates)"), 267 gettext_noop ("Get updates of view (0 for infinite updates)"),
225 &view_update), 268 &view_update),
269 GNUNET_GETOPT_option_flag ('S',
270 "stream",
271 gettext_noop ("Get peers from biased stream"),
272 &stream_input),
226 GNUNET_GETOPT_OPTION_END 273 GNUNET_GETOPT_OPTION_END
227 }; 274 };
228 return (GNUNET_OK == 275 return (GNUNET_OK ==
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3e30041e8..5b78bb4a8 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1769,6 +1769,12 @@ struct ClientContext
1769 int64_t view_updates_left; 1769 int64_t view_updates_left;
1770 1770
1771 /** 1771 /**
1772 * @brief How many peers from the biased
1773 * stream this client expects to receive.
1774 */
1775 int64_t stream_peers_left;
1776
1777 /**
1772 * The client handle to send the reply to 1778 * The client handle to send the reply to
1773 */ 1779 */
1774 struct GNUNET_SERVICE_Client *client; 1780 struct GNUNET_SERVICE_Client *client;
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
2174 return ret; 2180 return ret;
2175} 2181}
2176 2182
2183
2184/**
2185 * @brief Send view to client
2186 *
2187 * @param cli_ctx the context of the client
2188 * @param view_array the peerids of the view as array (can be empty)
2189 * @param view_size the size of the view array (can be 0)
2190 */
2191void
2192send_view (const struct ClientContext *cli_ctx,
2193 const struct GNUNET_PeerIdentity *view_array,
2194 uint64_t view_size)
2195{
2196 struct GNUNET_MQ_Envelope *ev;
2197 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2198
2199 if (NULL == view_array)
2200 {
2201 view_size = View_size ();
2202 view_array = View_get_as_array();
2203 }
2204
2205 ev = GNUNET_MQ_msg_extra (out_msg,
2206 view_size * sizeof (struct GNUNET_PeerIdentity),
2207 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2208 out_msg->num_peers = htonl (view_size);
2209
2210 GNUNET_memcpy (&out_msg[1],
2211 view_array,
2212 view_size * sizeof (struct GNUNET_PeerIdentity));
2213 GNUNET_MQ_send (cli_ctx->mq, ev);
2214}
2215
2216
2217/**
2218 * @brief Send peer from biased stream to client.
2219 *
2220 * @param cli_ctx the context of the client
2221 * @param view_array the peerids of the view as array (can be empty)
2222 * @param view_size the size of the view array (can be 0)
2223 */
2224void
2225send_stream_peer (const struct ClientContext *cli_ctx,
2226 const struct GNUNET_PeerIdentity *peer)
2227{
2228 struct GNUNET_MQ_Envelope *ev;
2229 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2230
2231 GNUNET_assert (NULL != peer);
2232
2233 ev = GNUNET_MQ_msg (out_msg,
2234 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2235
2236 GNUNET_memcpy (&out_msg->peer,
2237 peer,
2238 sizeof (struct GNUNET_PeerIdentity));
2239 GNUNET_MQ_send (cli_ctx->mq, ev);
2240}
2241
2242
2177/** 2243/**
2178 * @brief sends updates to clients that are interested 2244 * @brief sends updates to clients that are interested
2179 */ 2245 */
2180static void 2246static void
2181clients_notify_view_update (void); 2247clients_notify_view_update (void)
2248{
2249 struct ClientContext *cli_ctx_iter;
2250 uint64_t num_peers;
2251 const struct GNUNET_PeerIdentity *view_array;
2252
2253 num_peers = View_size ();
2254 view_array = View_get_as_array();
2255 /* check size of view is small enough */
2256 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2257 {
2258 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2259 "View is too big to send\n");
2260 return;
2261 }
2262
2263 for (cli_ctx_iter = cli_ctx_head;
2264 NULL != cli_ctx_iter;
2265 cli_ctx_iter = cli_ctx_head->next)
2266 {
2267 if (1 < cli_ctx_iter->view_updates_left)
2268 {
2269 /* Client wants to receive limited amount of updates */
2270 cli_ctx_iter->view_updates_left -= 1;
2271 } else if (1 == cli_ctx_iter->view_updates_left)
2272 {
2273 /* Last update of view for client */
2274 cli_ctx_iter->view_updates_left = -1;
2275 } else if (0 > cli_ctx_iter->view_updates_left) {
2276 /* Client is not interested in updates */
2277 continue;
2278 }
2279 /* else _updates_left == 0 - infinite amount of updates */
2280
2281 /* send view */
2282 send_view (cli_ctx_iter, view_array, num_peers);
2283 }
2284}
2285
2286
2287/**
2288 * @brief sends updates to clients that are interested
2289 */
2290static void
2291clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
2292 //enum StreamPeerSource)
2293{
2294 struct ClientContext *cli_ctx_iter;
2295
2296 LOG (GNUNET_ERROR_TYPE_DEBUG,
2297 "Got peer (%s) from biased stream - update all clients\n",
2298 GNUNET_i2s (peer));
2299
2300 /* check size of view is small enough */
2301 for (cli_ctx_iter = cli_ctx_head;
2302 NULL != cli_ctx_iter;
2303 cli_ctx_iter = cli_ctx_head->next)
2304 {
2305 if (1 < cli_ctx_iter->stream_peers_left)
2306 {
2307 /* Client wants to receive limited amount of updates */
2308 cli_ctx_iter->stream_peers_left -= 1;
2309 } else if (1 == cli_ctx_iter->stream_peers_left)
2310 {
2311 /* Last update of view for client */
2312 cli_ctx_iter->stream_peers_left = -1;
2313 } else if (0 > cli_ctx_iter->stream_peers_left) {
2314 /* Client is not interested in updates */
2315 continue;
2316 }
2317 /* else _updates_left == 0 - infinite amount of updates */
2318
2319 /* send view */
2320 send_stream_peer (cli_ctx_iter, peer);
2321 }
2322}
2182 2323
2183/** 2324/**
2184 * Put random peer from sampler into the view as history update. 2325 * Put random peer from sampler into the view as history update.
@@ -2193,7 +2334,12 @@ hist_update (void *cls,
2193 2334
2194 for (i = 0; i < num_peers; i++) 2335 for (i = 0; i < num_peers; i++)
2195 { 2336 {
2196 (void) insert_in_view (&ids[i]); 2337 int inserted;
2338 inserted = insert_in_view (&ids[i]);
2339 if (GNUNET_OK == inserted)
2340 {
2341 clients_notify_stream_peer (&ids[i]);
2342 }
2197 to_file (file_name_view_log, 2343 to_file (file_name_view_log,
2198 "+%s\t(hist)", 2344 "+%s\t(hist)",
2199 GNUNET_i2s_full (ids)); 2345 GNUNET_i2s_full (ids));
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls,
2398 const struct GNUNET_PeerIdentity *peer) 2544 const struct GNUNET_PeerIdentity *peer)
2399{ 2545{
2400 (void) cls; 2546 (void) cls;
2401 (void) insert_in_view (peer); 2547 int inserted;
2548
2549 inserted = insert_in_view (peer);
2550 if (GNUNET_OK == inserted)
2551 {
2552 clients_notify_stream_peer (peer);
2553 }
2402} 2554}
2403 2555
2404 2556
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls,
2860 GNUNET_SERVICE_client_continue (cli_ctx->client); 3012 GNUNET_SERVICE_client_continue (cli_ctx->client);
2861} 3013}
2862 3014
2863/**
2864 * @brief Send view to client
2865 *
2866 * @param cli_ctx the context of the client
2867 * @param view_array the peerids of the view as array (can be empty)
2868 * @param view_size the size of the view array (can be 0)
2869 */
2870void
2871send_view (const struct ClientContext *cli_ctx,
2872 const struct GNUNET_PeerIdentity *view_array,
2873 uint64_t view_size)
2874{
2875 struct GNUNET_MQ_Envelope *ev;
2876 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2877
2878 if (NULL == view_array)
2879 {
2880 view_size = View_size ();
2881 view_array = View_get_as_array();
2882 }
2883
2884 ev = GNUNET_MQ_msg_extra (out_msg,
2885 view_size * sizeof (struct GNUNET_PeerIdentity),
2886 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2887 out_msg->num_peers = htonl (view_size);
2888
2889 GNUNET_memcpy (&out_msg[1],
2890 view_array,
2891 view_size * sizeof (struct GNUNET_PeerIdentity));
2892 GNUNET_MQ_send (cli_ctx->mq, ev);
2893}
2894 3015
2895/** 3016/**
2896 * @brief sends updates to clients that are interested 3017 * Handle RPS request from the client.
3018 *
3019 * @param cls closure
3020 * @param message the actual message
2897 */ 3021 */
2898static void 3022static void
2899clients_notify_view_update (void) 3023handle_client_view_request (void *cls,
3024 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2900{ 3025{
2901 struct ClientContext *cli_ctx_iter; 3026 struct ClientContext *cli_ctx = cls;
2902 uint64_t num_peers; 3027 uint64_t num_updates;
2903 const struct GNUNET_PeerIdentity *view_array;
2904 3028
2905 num_peers = View_size (); 3029 num_updates = ntohl (msg->num_updates);
2906 view_array = View_get_as_array();
2907 /* check size of view is small enough */
2908 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2909 {
2910 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2911 "View is too big to send\n");
2912 return;
2913 }
2914 3030
2915 for (cli_ctx_iter = cli_ctx_head; 3031 LOG (GNUNET_ERROR_TYPE_DEBUG,
2916 NULL != cli_ctx_iter; 3032 "Client requested %" PRIu64 " updates of view.\n",
2917 cli_ctx_iter = cli_ctx_head->next) 3033 num_updates);
2918 {
2919 if (1 < cli_ctx_iter->view_updates_left)
2920 {
2921 /* Client wants to receive limited amount of updates */
2922 cli_ctx_iter->view_updates_left -= 1;
2923 } else if (1 == cli_ctx_iter->view_updates_left)
2924 {
2925 /* Last update of view for client */
2926 cli_ctx_iter->view_updates_left = -1;
2927 } else if (0 > cli_ctx_iter->view_updates_left) {
2928 /* Client is not interested in updates */
2929 continue;
2930 }
2931 /* else _updates_left == 0 - infinite amount of updates */
2932 3034
2933 /* send view */ 3035 GNUNET_assert (NULL != cli_ctx);
2934 send_view (cli_ctx_iter, view_array, num_peers); 3036 cli_ctx->view_updates_left = num_updates;
2935 } 3037 send_view (cli_ctx, NULL, 0);
3038 GNUNET_SERVICE_client_continue (cli_ctx->client);
2936} 3039}
2937 3040
2938 3041
2939/** 3042/**
2940 * Handle RPS request from the client. 3043 * Handle RPS request for biased stream from the client.
2941 * 3044 *
2942 * @param cls closure 3045 * @param cls closure
2943 * @param message the actual message 3046 * @param message the actual message
2944 */ 3047 */
2945static void 3048static void
2946handle_client_view_request (void *cls, 3049handle_client_stream_request (void *cls,
2947 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) 3050 const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
2948{ 3051{
2949 struct ClientContext *cli_ctx = cls; 3052 struct ClientContext *cli_ctx = cls;
2950 uint64_t num_updates; 3053 uint64_t num_peers;
2951 3054
2952 num_updates = ntohl (msg->num_updates); 3055 num_peers = ntohl (msg->num_peers);
2953 3056
2954 LOG (GNUNET_ERROR_TYPE_DEBUG, 3057 LOG (GNUNET_ERROR_TYPE_DEBUG,
2955 "Client requested %" PRIu64 " updates of view.\n", 3058 "Client requested %" PRIu64 " peers from biased stream.\n",
2956 num_updates); 3059 num_peers);
2957 3060
2958 GNUNET_assert (NULL != cli_ctx); 3061 GNUNET_assert (NULL != cli_ctx);
2959 cli_ctx->view_updates_left = num_updates; 3062 cli_ctx->stream_peers_left = num_peers;
2960 send_view (cli_ctx, NULL, 0);
2961 GNUNET_SERVICE_client_continue (cli_ctx->client); 3063 GNUNET_SERVICE_client_continue (cli_ctx->client);
2962} 3064}
2963 3065
@@ -3727,8 +3829,14 @@ do_round (void *cls)
3727 CustomPeerMap_size (push_map)); 3829 CustomPeerMap_size (push_map));
3728 for (i = 0; i < first_border; i++) 3830 for (i = 0; i < first_border; i++)
3729 { 3831 {
3730 (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, 3832 int inserted;
3731 permut[i])); 3833 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3834 permut[i]));
3835 if (GNUNET_OK == inserted)
3836 {
3837 clients_notify_stream_peer (
3838 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3839 }
3732 to_file (file_name_view_log, 3840 to_file (file_name_view_log,
3733 "+%s\t(push list)", 3841 "+%s\t(push list)",
3734 GNUNET_i2s_full (&view_array[i])); 3842 GNUNET_i2s_full (&view_array[i]));
@@ -3742,8 +3850,14 @@ do_round (void *cls)
3742 CustomPeerMap_size (pull_map)); 3850 CustomPeerMap_size (pull_map));
3743 for (i = first_border; i < second_border; i++) 3851 for (i = first_border; i < second_border; i++)
3744 { 3852 {
3745 (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, 3853 int inserted;
3854 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3746 permut[i - first_border])); 3855 permut[i - first_border]));
3856 if (GNUNET_OK == inserted)
3857 {
3858 clients_notify_stream_peer (
3859 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3860 }
3747 to_file (file_name_view_log, 3861 to_file (file_name_view_log,
3748 "+%s\t(pull list)", 3862 "+%s\t(pull list)",
3749 GNUNET_i2s_full (&view_array[i])); 3863 GNUNET_i2s_full (&view_array[i]));
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN
4296 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, 4410 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4297 struct GNUNET_RPS_CS_DEBUG_ViewRequest, 4411 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4298 NULL), 4412 NULL),
4413 GNUNET_MQ_hd_fixed_size (client_stream_request,
4414 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4415 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4416 NULL),
4299 GNUNET_MQ_handler_end()); 4417 GNUNET_MQ_handler_end());
4300 4418
4301/* end of gnunet-service-rps.c */ 4419/* end of gnunet-service-rps.c */
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 58ba79082..66b2dd962 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -216,6 +216,47 @@ struct GNUNET_RPS_CS_DEBUG_ViewReply
216}; 216};
217 /* Followed by num_peers * GNUNET_PeerIdentity */ 217 /* Followed by num_peers * GNUNET_PeerIdentity */
218 218
219/**
220 * Message from client to service indicating that
221 * clients wants to get stream of biased peers
222 */
223struct GNUNET_RPS_CS_DEBUG_StreamRequest
224{
225 /**
226 * Header including size and type in NBO
227 */
228 struct GNUNET_MessageHeader header;
229
230 /**
231 * Number of peers
232 * 0 for sending updates until cancellation
233 */
234 uint32_t num_peers GNUNET_PACKED;
235};
236
237/**
238 * Message from service to client containing peer from biased stream
239 */
240struct GNUNET_RPS_CS_DEBUG_StreamReply
241{
242 /**
243 * Header including size and type in NBO
244 */
245 struct GNUNET_MessageHeader header;
246
247 /**
248 * Identifyer of the message.
249 */
250 uint32_t id GNUNET_PACKED;
251
252 /**
253 * @brief The peer of the biased stream
254 */
255 struct GNUNET_PeerIdentity peer;
256
257 // TODO maybe source of peer (pull/push list, peerinfo, ...)
258};
259
219GNUNET_NETWORK_STRUCT_END 260GNUNET_NETWORK_STRUCT_END
220 261
221/*********************************************************************** 262/***********************************************************************
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index ac462f3a0..b7644540d 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle
61 GNUNET_RPS_ViewUpdateCB view_update_cb; 61 GNUNET_RPS_ViewUpdateCB view_update_cb;
62 62
63 /** 63 /**
64 * @brief Callback called on each update of the view 64 * @brief Closure to each requested update of the view
65 */ 65 */
66 void *view_update_cls; 66 void *view_update_cls;
67
68 /**
69 * @brief Callback called on each peer of the biased input stream
70 */
71 GNUNET_RPS_StreamInputCB stream_input_cb;
72
73 /**
74 * @brief Closure to each requested peer from the biased stream
75 */
76 void *stream_input_cls;
67}; 77};
68 78
69 79
@@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
277 GNUNET_MQ_send (rps_handle->mq, ev); 287 GNUNET_MQ_send (rps_handle->mq, ev);
278} 288}
279 289
290
291/**
292 * Request biased stream of peers that are being put into the sampler
293 *
294 * @param rps_handle handle to the rps service
295 * @param num_req_peers number of peers we want to receive
296 * (0 for infinite updates)
297 * @param cls a closure that will be given to the callback
298 * @param ready_cb the callback called when the peers are available
299 */
300void
301GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
302 uint32_t num_peers,
303 GNUNET_RPS_StreamInputCB stream_input_cb,
304 void *cls)
305{
306 struct GNUNET_MQ_Envelope *ev;
307 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
308
309 LOG (GNUNET_ERROR_TYPE_DEBUG,
310 "Client requests %" PRIu32 " biased stream updates\n",
311 num_peers);
312 rps_handle->stream_input_cb = stream_input_cb;
313 rps_handle->stream_input_cls = cls;
314
315 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
316 msg->num_peers = htonl (num_peers);
317 GNUNET_MQ_send (rps_handle->mq, ev);
318}
319
320
280/** 321/**
281 * This function is called, when the service updates the view. 322 * This function is called, when the service updates the view.
282 * It verifies that @a msg is well-formed. 323 * It verifies that @a msg is well-formed.
@@ -303,6 +344,7 @@ check_view_update (void *cls,
303 return GNUNET_OK; 344 return GNUNET_OK;
304} 345}
305 346
347
306/** 348/**
307 * This function is called, when the service updated its view. 349 * This function is called, when the service updated its view.
308 * It calls the callback the caller provided 350 * It calls the callback the caller provided
@@ -329,6 +371,29 @@ handle_view_update (void *cls,
329} 371}
330 372
331 373
374/**
375 * This function is called, when the service sends another peer from the biased
376 * stream.
377 * It calls the callback the caller provided
378 * and disconnects afterwards.
379 *
380 * @param msg the message
381 */
382static void
383handle_stream_input (void *cls,
384 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
385{
386 struct GNUNET_RPS_Handle *h = cls;
387
388 /* Give the peers back */
389 LOG (GNUNET_ERROR_TYPE_DEBUG,
390 "New peer of biased input stream\n");
391
392 GNUNET_assert (NULL != h);
393 GNUNET_assert (NULL != h->stream_input_cb);
394 h->stream_input_cb (h->stream_input_cb, &msg->peer);
395}
396
332 397
333/** 398/**
334 * Reconnect to the service 399 * Reconnect to the service
@@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
379 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 444 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
380 struct GNUNET_RPS_CS_DEBUG_ViewReply, 445 struct GNUNET_RPS_CS_DEBUG_ViewReply,
381 h), 446 h),
447 GNUNET_MQ_hd_fixed_size (stream_input,
448 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
449 struct GNUNET_RPS_CS_DEBUG_StreamReply,
450 h),
382 GNUNET_MQ_handler_end () 451 GNUNET_MQ_handler_end ()
383 }; 452 };
384 453