From 67f690896f5bbbcd40efaec79a7c36d2e339990b Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Thu, 20 Sep 2018 01:15:53 +0200 Subject: Start changing architecture of rps service/client --- src/rps/rps_api.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 122 insertions(+), 10 deletions(-) (limited to 'src/rps/rps_api.c') diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 7afe21d3a..a558c8a35 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -25,6 +25,7 @@ #include "gnunet_util_lib.h" #include "rps.h" #include "gnunet_rps_service.h" +#include "gnunet-service-rps_sampler.h" #include @@ -523,6 +524,39 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) } +/** + * @brief Create new request handle + * + * @param rps_handle Handle to the service + * @param num_requests Number of requests + * @param ready_cb Callback + * @param cls Closure + * + * @return The newly created request handle + */ +static struct GNUNET_RPS_Request_Handle * +new_request_handle (struct GNUNET_RPS_Handle *rps_handle, + uint64_t num_requests, + struct RPS_Sampler *sampler, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_Request_Handle *rh; + + rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); + rh->rps_handle = rps_handle; + rh->id = rps_handle->current_request_id++; + rh->num_requests = num_requests; + rh->sampler = sampler; + rh->ready_cb = ready_cb; + rh->ready_cb_cls = cls; + GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + return rh; +} + + /** * Request n random peers. * @@ -533,33 +567,111 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) * @return a handle to cancel this request */ struct GNUNET_RPS_Request_Handle * -GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, +GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle, uint32_t num_req_peers, GNUNET_RPS_NotifyReadyCB ready_cb, void *cls) { struct GNUNET_RPS_Request_Handle *rh; - rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); - rh->rps_handle = rps_handle; - rh->id = rps_handle->current_request_id++; - rh->num_peers = num_req_peers; - rh->ready_cb = ready_cb; - rh->ready_cb_cls = cls; + rh = new_request_handle (rps_handle, + num_req_peers, + NULL, /* no sampler needed */ + ready_cb, + cls); LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting %" PRIu32 " peers with id %" PRIu32 "\n", num_req_peers, rh->id); - GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - send_request (rps_handle, rh->id, num_req_peers); return rh; } +/** + * @brief Callback to collect the peers from the biased stream and put those + * into the sampler. + * + * @param cls The #GNUNET_RPS_Request_Handle + * @param num_peers The number of peer that have been returned + * @param peers The array of @a num_peers that have been returned + */ +void +collect_peers_cb (void *cls, + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) +{ + struct GNUNET_RPS_Request_Handle *rh = cls; + + for (uint64_t i = 0; i < num_peers; i++) + { + RPS_sampler_update (rh->sampler, &peers[i]); + } +} + + +/** + * @brief Called once the sampler has collected all requested peers. + * + * Calls the callback provided by the client with the corresponding cls. + * + * @param peers The array of @a num_peers that has been returned. + * @param num_peers The number of peers that have been returned + * @param cls The #GNUNET_RPS_Request_Handle + */ +void +peers_ready_cb (const struct GNUNET_PeerIdentity *peers, + uint32_t num_peers, + void *cls) +{ + struct GNUNET_RPS_Request_Handle *rh = cls; + + rh->ready_cb (rh->ready_cb_cls, + num_peers, + peers); + // TODO cleanup, sampler, rh, cancel stuff + // TODO screw this function. We can give the cb,cls directly to the sampler. +} + +/** + * Request n random peers. + * + * @param rps_handle handle to the rps service + * @param num_req_peers number of peers we want to receive + * @param ready_cb the callback called when the peers are available + * @param cls closure given to the callback + * @return a handle to cancel this request + */ +struct GNUNET_RPS_Request_Handle * +GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, + uint32_t num_req_peers, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_Request_Handle *rh; + + rh = new_request_handle (rps_handle, + num_req_peers, + RPS_sampler_mod_init (num_req_peers, + GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff + ready_cb, + cls); + RPS_sampler_get_n_rand_peers (rh->sampler, + num_req_peers, + peers_ready_cb, + rh); + + GNUNET_RPS_stream_request (rps_handle, + 0, /* infinite updates */ + collect_peers_cb, + rh); /* cls */ + + return rh; +} + + /** * Seed rps service with peerIDs. * -- cgit v1.2.3