From 3afe1a3435697b01fee557420a701fba1821dbe5 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Mon, 8 Apr 2019 00:55:35 +0200 Subject: RPS: Retrieve more info from sampler for profiling --- src/rps/rps_api.c | 193 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 186 insertions(+), 7 deletions(-) (limited to 'src/rps/rps_api.c') diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 7a3adfa94..83dff27e8 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -127,6 +127,16 @@ struct GNUNET_RPS_Handle */ struct GNUNET_RPS_Request_Handle *rh_tail; + /** + * @brief Pointer to the head element in DLL of single request handles + */ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head; + + /** + * @brief Pointer to the tail element in DLL of single request handles + */ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail; + /** * @brief The desired probability with which we want to have observed all * peers. @@ -196,6 +206,54 @@ struct GNUNET_RPS_Request_Handle }; +/** + * Handler for a single request from a client. + */ +struct GNUNET_RPS_Request_Handle_Single_Info +{ + /** + * The client issuing the request. + */ + struct GNUNET_RPS_Handle *rps_handle; + + /** + * @brief The Sampler for the client request + */ + struct RPS_Sampler *sampler; + + /** + * @brief Request handle of the request to the sampler - needed to cancel the request + */ + struct RPS_SamplerRequestHandleSingleInfo *sampler_rh; + + /** + * @brief Request handle of the request of the biased stream of peers - + * needed to cancel the request + */ + struct GNUNET_RPS_StreamRequestHandle *srh; + + /** + * The callback to be called when we receive an answer. + */ + GNUNET_RPS_NotifyReadySingleInfoCB ready_cb; + + /** + * The closure for the callback. + */ + void *ready_cb_cls; + + /** + * @brief Pointer to next element in DLL + */ + struct GNUNET_RPS_Request_Handle_Single_Info *next; + + /** + * @brief Pointer to previous element in DLL + */ + struct GNUNET_RPS_Request_Handle_Single_Info *prev; +}; + + /** * Struct used to pack the callback, its closure (provided by the caller) * and the connection handler to the service to pass it to a callback function. @@ -308,6 +366,36 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, } +/** + * @brief Called once the sampler has collected the requested peer. + * + * Calls the callback provided by the client with the corresponding cls. + * + * @param peers The array of @a num_peers that has been returned. + * @param num_peers The number of peers that have been returned + * @param cls The #GNUNET_RPS_Request_Handle + * @param probability Probability with which all IDs have been observed + * @param num_observed Number of observed IDs + */ +static void +peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers, + void *cls, + double probability, + uint32_t num_observed) +{ + struct GNUNET_RPS_Request_Handle *rh = cls; + (void) probability; + (void) num_observed; + uint32_t num_peers = 1; + + rh->sampler_rh = NULL; + rh->ready_cb (rh->ready_cb_cls, + num_peers, + peers); + GNUNET_RPS_request_cancel (rh); +} + + /** * @brief Callback to collect the peers from the biased stream and put those * into the sampler. @@ -632,15 +720,15 @@ mq_error_handler (void *cls, */ static void hash_from_share_val (const char *share_val, - struct GNUNET_HashCode *hash) + struct GNUNET_HashCode *hash) { GNUNET_CRYPTO_kdf (hash, - sizeof (struct GNUNET_HashCode), - "rps", - strlen ("rps"), - share_val, - strlen (share_val), - NULL, 0); + sizeof (struct GNUNET_HashCode), + "rps", + strlen ("rps"), + share_val, + strlen (share_val), + NULL, 0); } @@ -672,6 +760,13 @@ nse_cb (void *cls, RPS_sampler_update_with_nw_size (rh_iter->sampler, GNUNET_NSE_log_estimate_to_n (logestimate)); } + for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; + NULL != rhs_iter && NULL != rhs_iter->next; + rhs_iter = rhs_iter->next) + { + RPS_sampler_update_with_nw_size (rhs_iter->sampler, + GNUNET_NSE_log_estimate_to_n (logestimate)); + } } @@ -856,6 +951,48 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, } +/** + * Request one random peer, getting additional information. + * + * @param rps_handle handle to the rps service + * @param ready_cb the callback called when the peers are available + * @param cls closure given to the callback + * @return a handle to cancel this request + */ +struct GNUNET_RPS_Request_Handle_Single_Info * +GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle, + GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs; + uint32_t num_req_peers = 1; + + LOG (GNUNET_ERROR_TYPE_INFO, + "Client requested peer with additional info\n"); + rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info); + rhs->rps_handle = rps_handle; + rhs->sampler = RPS_sampler_mod_init (num_req_peers, + GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff + RPS_sampler_set_desired_probability (rhs->sampler, + rps_handle->desired_probability); + RPS_sampler_set_deficiency_factor (rhs->sampler, + rps_handle->deficiency_factor); + rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler, + peer_info_ready_cb, + rhs); + rhs->srh = GNUNET_RPS_stream_request (rps_handle, + collect_peers_cb, + rhs); /* cls */ + rhs->ready_cb = ready_cb; + rhs->ready_cb_cls = cls; + GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head, + rps_handle->rhs_tail, + rhs); + + return rhs; +} + + /** * Seed rps service with peerIDs. * @@ -1046,6 +1183,37 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) } +/** + * Cancle an issued single info request. + * + * @param rhs request handle of request to cancle + */ +void +GNUNET_RPS_request_single_info_cancel ( + struct GNUNET_RPS_Request_Handle_Single_Info *rhs) +{ + struct GNUNET_RPS_Handle *h; + + h = rhs->rps_handle; + GNUNET_assert (NULL != rhs); + GNUNET_assert (NULL != rhs->srh); + GNUNET_assert (h == rhs->srh->rps_handle); + GNUNET_RPS_stream_cancel (rhs->srh); + rhs->srh = NULL; + if (NULL == h->stream_requests_head) cancel_stream(h); + if (NULL != rhs->sampler_rh) + { + RPS_sampler_request_single_info_cancel (rhs->sampler_rh); + } + RPS_sampler_destroy (rhs->sampler); + rhs->sampler = NULL; + GNUNET_CONTAINER_DLL_remove (h->rhs_head, + h->rhs_tail, + rhs); + GNUNET_free (rhs); +} + + /** * Disconnect from the rps service * @@ -1079,6 +1247,17 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) GNUNET_RPS_request_cancel (rh_iter); } } + if (NULL != h->rhs_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Not all requests were cancelled!\n"); + for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; + h->rhs_head != NULL; + rhs_iter = h->rhs_head) + { + GNUNET_RPS_request_single_info_cancel (rhs_iter); + } + } if (NULL != srh_callback_peers) { GNUNET_free (srh_callback_peers); -- cgit v1.2.3