From e91d46cdbbc97414968fa751fd1f596757c56875 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Fri, 14 Sep 2018 00:38:45 +0200 Subject: Add API call to receive unbiased peer stream for debugging and profiling --- src/rps/rps_api.c | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) (limited to 'src/rps/rps_api.c') diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index ac462f3a0..b7644540d 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle GNUNET_RPS_ViewUpdateCB view_update_cb; /** - * @brief Callback called on each update of the view + * @brief Closure to each requested update of the view */ void *view_update_cls; + + /** + * @brief Callback called on each peer of the biased input stream + */ + GNUNET_RPS_StreamInputCB stream_input_cb; + + /** + * @brief Closure to each requested peer from the biased stream + */ + void *stream_input_cls; }; @@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, GNUNET_MQ_send (rps_handle->mq, ev); } + +/** + * Request biased stream of peers that are being put into the sampler + * + * @param rps_handle handle to the rps service + * @param num_req_peers number of peers we want to receive + * (0 for infinite updates) + * @param cls a closure that will be given to the callback + * @param ready_cb the callback called when the peers are available + */ +void +GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, + uint32_t num_peers, + GNUNET_RPS_StreamInputCB stream_input_cb, + void *cls) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client requests %" PRIu32 " biased stream updates\n", + num_peers); + rps_handle->stream_input_cb = stream_input_cb; + rps_handle->stream_input_cls = cls; + + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); + msg->num_peers = htonl (num_peers); + GNUNET_MQ_send (rps_handle->mq, ev); +} + + /** * This function is called, when the service updates the view. * It verifies that @a msg is well-formed. @@ -303,6 +344,7 @@ check_view_update (void *cls, return GNUNET_OK; } + /** * This function is called, when the service updated its view. * It calls the callback the caller provided @@ -329,6 +371,29 @@ handle_view_update (void *cls, } +/** + * This function is called, when the service sends another peer from the biased + * stream. + * It calls the callback the caller provided + * and disconnects afterwards. + * + * @param msg the message + */ +static void +handle_stream_input (void *cls, + const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) +{ + struct GNUNET_RPS_Handle *h = cls; + + /* Give the peers back */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "New peer of biased input stream\n"); + + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != h->stream_input_cb); + h->stream_input_cb (h->stream_input_cb, &msg->peer); +} + /** * Reconnect to the service @@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h) GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, struct GNUNET_RPS_CS_DEBUG_ViewReply, h), + GNUNET_MQ_hd_fixed_size (stream_input, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, + struct GNUNET_RPS_CS_DEBUG_StreamReply, + h), GNUNET_MQ_handler_end () }; -- cgit v1.2.3