aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c71
1 files changed, 70 insertions, 1 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index ac462f3a0..b7644540d 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle
61 GNUNET_RPS_ViewUpdateCB view_update_cb; 61 GNUNET_RPS_ViewUpdateCB view_update_cb;
62 62
63 /** 63 /**
64 * @brief Callback called on each update of the view 64 * @brief Closure to each requested update of the view
65 */ 65 */
66 void *view_update_cls; 66 void *view_update_cls;
67
68 /**
69 * @brief Callback called on each peer of the biased input stream
70 */
71 GNUNET_RPS_StreamInputCB stream_input_cb;
72
73 /**
74 * @brief Closure to each requested peer from the biased stream
75 */
76 void *stream_input_cls;
67}; 77};
68 78
69 79
@@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
277 GNUNET_MQ_send (rps_handle->mq, ev); 287 GNUNET_MQ_send (rps_handle->mq, ev);
278} 288}
279 289
290
291/**
292 * Request biased stream of peers that are being put into the sampler
293 *
294 * @param rps_handle handle to the rps service
295 * @param num_req_peers number of peers we want to receive
296 * (0 for infinite updates)
297 * @param cls a closure that will be given to the callback
298 * @param ready_cb the callback called when the peers are available
299 */
300void
301GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
302 uint32_t num_peers,
303 GNUNET_RPS_StreamInputCB stream_input_cb,
304 void *cls)
305{
306 struct GNUNET_MQ_Envelope *ev;
307 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
308
309 LOG (GNUNET_ERROR_TYPE_DEBUG,
310 "Client requests %" PRIu32 " biased stream updates\n",
311 num_peers);
312 rps_handle->stream_input_cb = stream_input_cb;
313 rps_handle->stream_input_cls = cls;
314
315 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
316 msg->num_peers = htonl (num_peers);
317 GNUNET_MQ_send (rps_handle->mq, ev);
318}
319
320
280/** 321/**
281 * This function is called, when the service updates the view. 322 * This function is called, when the service updates the view.
282 * It verifies that @a msg is well-formed. 323 * It verifies that @a msg is well-formed.
@@ -303,6 +344,7 @@ check_view_update (void *cls,
303 return GNUNET_OK; 344 return GNUNET_OK;
304} 345}
305 346
347
306/** 348/**
307 * This function is called, when the service updated its view. 349 * This function is called, when the service updated its view.
308 * It calls the callback the caller provided 350 * It calls the callback the caller provided
@@ -329,6 +371,29 @@ handle_view_update (void *cls,
329} 371}
330 372
331 373
374/**
375 * This function is called, when the service sends another peer from the biased
376 * stream.
377 * It calls the callback the caller provided
378 * and disconnects afterwards.
379 *
380 * @param msg the message
381 */
382static void
383handle_stream_input (void *cls,
384 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
385{
386 struct GNUNET_RPS_Handle *h = cls;
387
388 /* Give the peers back */
389 LOG (GNUNET_ERROR_TYPE_DEBUG,
390 "New peer of biased input stream\n");
391
392 GNUNET_assert (NULL != h);
393 GNUNET_assert (NULL != h->stream_input_cb);
394 h->stream_input_cb (h->stream_input_cb, &msg->peer);
395}
396
332 397
333/** 398/**
334 * Reconnect to the service 399 * Reconnect to the service
@@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
379 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 444 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
380 struct GNUNET_RPS_CS_DEBUG_ViewReply, 445 struct GNUNET_RPS_CS_DEBUG_ViewReply,
381 h), 446 h),
447 GNUNET_MQ_hd_fixed_size (stream_input,
448 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
449 struct GNUNET_RPS_CS_DEBUG_StreamReply,
450 h),
382 GNUNET_MQ_handler_end () 451 GNUNET_MQ_handler_end ()
383 }; 452 };
384 453