From 17796b61c71778f5faf14038671b2ba7d1aff758 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Wed, 26 Sep 2018 10:40:37 +0200 Subject: Remove leftovers of old architecture --- src/include/gnunet_protocols.h | 18 +-- src/rps/gnunet-service-rps.c | 283 ----------------------------------- src/rps/rps.h | 60 -------- src/rps/rps_api.c | 331 ++++++++++------------------------------- 4 files changed, 82 insertions(+), 610 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 56e049608..7a84bda0c 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2619,31 +2619,17 @@ extern "C" /* Client-Service Messages */ -/** - * RPS CS REQUEST Message for the Client to request (a) random peer(s) - */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 954 - -/** - * RPS CS REPLY Message for the Server to send (a) random peer(s) - */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 955 - -/** - * RPS CS REQUEST CANCEL Message for the Client to cancel a request - */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 956 /** * RPS CS SEED Message for the Client to seed peers into rps */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 957 +#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 954 #ifdef ENABLE_MALICIOUS /** * Turn RPS service malicious */ -#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 958 +#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 955 #endif /* ENABLE_MALICIOUS */ diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 862514264..e3714bdba 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -1804,11 +1804,6 @@ struct ClientContext *cli_ctx_tail; */ static struct RPS_Sampler *prot_sampler; -/** - * Sampler used for the clients. - */ -static struct RPS_Sampler *client_sampler; - /** * Name to log view to */ @@ -1831,12 +1826,6 @@ static uint32_t num_observed_peers; static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; #endif /* TO_FILE */ -/** - * The size of sampler we need to be able to satisfy the client's need - * of random peers. - */ -static unsigned int sampler_size_client_need; - /** * The size of sampler we need to be able to satisfy the Brahms protocol's * need of random peers. @@ -1923,38 +1912,6 @@ static struct GNUNET_PEERINFO_Handle *peerinfo_handle; */ static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; -/** - * Request counter. - * - * Counts how many requets clients already issued. - * Only needed in the beginning to check how many of the 64 deltas - * we already have - */ -static unsigned int req_counter; - -/** - * Time of the last request we received. - * - * Used to compute the expected request rate. - */ -static struct GNUNET_TIME_Absolute last_request; - -/** - * Size of #request_deltas. - */ -#define REQUEST_DELTAS_SIZE 64 -static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE; - -/** - * Last 64 deltas between requests - */ -static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE]; - -/** - * The prediction of the rate of requests - */ -static struct GNUNET_TIME_Relative request_rate; - #ifdef ENABLE_MALICIOUS /** @@ -2107,38 +2064,6 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list, } -/** - * Sum all time relatives of an array. - */ -static struct GNUNET_TIME_Relative -T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, - uint32_t arr_size) -{ - struct GNUNET_TIME_Relative sum; - uint32_t i; - - sum = GNUNET_TIME_UNIT_ZERO; - for ( i = 0 ; i < arr_size ; i++ ) - { - sum = GNUNET_TIME_relative_add (sum, rel_array[i]); - } - return sum; -} - - -/** - * Compute the average of given time relatives. - */ -static struct GNUNET_TIME_Relative -T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, - uint32_t arr_size) -{ - return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, - arr_size), - arr_size); -} - - /** * Insert PeerID in #view * @@ -2368,69 +2293,6 @@ resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size) } -/** - * Wrapper around #RPS_sampler_resize() resizing the client sampler - */ -static void -client_resize_wrapper () -{ - uint32_t bigger_size; - - // TODO statistics - - bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need); - - // TODO respect the min, max - resize_wrapper (client_sampler, bigger_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n", - bigger_size); -} - - -/** - * Estimate request rate - * - * Called every time we receive a request from the client. - */ -static void -est_request_rate() -{ - struct GNUNET_TIME_Relative max_round_duration; - - if (request_deltas_size > req_counter) - req_counter++; - if ( 1 < req_counter) - { - /* Shift last request deltas to the right */ - memmove (&request_deltas[1], - request_deltas, - (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative)); - - /* Add current delta to beginning */ - request_deltas[0] = - GNUNET_TIME_absolute_get_difference (last_request, - GNUNET_TIME_absolute_get ()); - request_rate = T_relative_avg (request_deltas, req_counter); - request_rate = (request_rate.rel_value_us < 1) ? - GNUNET_TIME_relative_get_unit_ () : request_rate; - - /* Compute the duration a round will maximally take */ - max_round_duration = - GNUNET_TIME_relative_add (round_interval, - GNUNET_TIME_relative_divide (round_interval, 2)); - - /* Set the estimated size the sampler has to have to - * satisfy the current client request rate */ - sampler_size_client_need = - max_round_duration.rel_value_us / request_rate.rel_value_us; - - /* Resize the sampler */ - client_resize_wrapper (); - } - last_request = GNUNET_TIME_absolute_get (); -} - - /** * Add all peers in @a peer_array to @a peer_map used as set. * @@ -2565,7 +2427,6 @@ insert_in_sampler (void *cls, "Updating samplers with peer %s from insert_in_sampler()\n", GNUNET_i2s (peer)); RPS_sampler_update (prot_sampler, peer); - RPS_sampler_update (client_sampler, peer); if (0 < RPS_sampler_count_id (prot_sampler, peer)) { /* Make sure we 'know' about this peer */ @@ -2658,7 +2519,6 @@ remove_peer (const struct GNUNET_PeerIdentity *peer) CustomPeerMap_remove_peer (pull_map, peer); CustomPeerMap_remove_peer (push_map, peer); RPS_sampler_reinitialise_by_value (prot_sampler, peer); - RPS_sampler_reinitialise_by_value (client_sampler, peer); destroy_peer (get_peer_ctx (peer)); } @@ -2691,7 +2551,6 @@ clean_peer (const struct GNUNET_PeerIdentity *peer) (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && (0 == RPS_sampler_count_id (prot_sampler, peer)) && - (0 == RPS_sampler_count_id (client_sampler, peer)) && (GNUNET_NO != check_removable (peer)) ) { /* We can safely remove this peer */ LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2813,142 +2672,10 @@ nse_callback (void *cls, /* If the NSE has changed adapt the lists accordingly */ resize_wrapper (prot_sampler, sampler_size_est_need); - client_resize_wrapper (); View_change_len (view_size_est_need); } -/** - * Callback called once the requested PeerIDs are ready. - * - * Sends those to the requesting client. - */ -static void -client_respond (const struct GNUNET_PeerIdentity *peer_ids, - uint32_t num_peers, - void *cls) -{ - struct ReplyCls *reply_cls = cls; - uint32_t i; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_ReplyMessage *out_msg; - uint32_t size_needed; - struct ClientContext *cli_ctx; - - GNUNET_assert (NULL != reply_cls); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "sampler returned %" PRIu32 " peers:\n", - num_peers); - for (i = 0; i < num_peers; i++) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - " %" PRIu32 ": %s\n", - i, - GNUNET_i2s (&peer_ids[i])); - } - - size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) + - num_peers * sizeof (struct GNUNET_PeerIdentity); - - GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed); - - ev = GNUNET_MQ_msg_extra (out_msg, - num_peers * sizeof (struct GNUNET_PeerIdentity), - GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); - out_msg->num_peers = htonl (num_peers); - out_msg->id = htonl (reply_cls->id); - - GNUNET_memcpy (&out_msg[1], - peer_ids, - num_peers * sizeof (struct GNUNET_PeerIdentity)); - - cli_ctx = reply_cls->cli_ctx; - GNUNET_assert (NULL != cli_ctx); - reply_cls->req_handle = NULL; - destroy_reply_cls (reply_cls); - GNUNET_MQ_send (cli_ctx->mq, ev); -} - - -/** - * Handle RPS request from the client. - * - * @param cls closure - * @param message the actual message - */ -static void -handle_client_request (void *cls, - const struct GNUNET_RPS_CS_RequestMessage *msg) -{ - struct ClientContext *cli_ctx = cls; - uint32_t num_peers; - uint32_t size_needed; - struct ReplyCls *reply_cls; - uint32_t i; - - num_peers = ntohl (msg->num_peers); - size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) + - num_peers * sizeof (struct GNUNET_PeerIdentity); - - if (GNUNET_MAX_MESSAGE_SIZE < size_needed) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Message received from client has size larger than expected\n"); - GNUNET_SERVICE_client_drop (cli_ctx->client); - return; - } - - for (i = 0 ; i < num_peers ; i++) - est_request_rate(); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requested %" PRIu32 " random peer(s).\n", - num_peers); - - reply_cls = GNUNET_new (struct ReplyCls); - reply_cls->id = ntohl (msg->id); - reply_cls->cli_ctx = cli_ctx; - reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler, - num_peers, - client_respond, - reply_cls); - - GNUNET_assert (NULL != cli_ctx); - GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head, - cli_ctx->rep_cls_tail, - reply_cls); - GNUNET_SERVICE_client_continue (cli_ctx->client); -} - - -/** - * @brief Handle a message that requests the cancellation of a request - * - * @param cls unused - * @param message the message containing the id of the request - */ -static void -handle_client_request_cancel (void *cls, - const struct GNUNET_RPS_CS_RequestCancelMessage *msg) -{ - struct ClientContext *cli_ctx = cls; - struct ReplyCls *rep_cls; - - GNUNET_assert (NULL != cli_ctx); - GNUNET_assert (NULL != cli_ctx->rep_cls_head); - rep_cls = cli_ctx->rep_cls_head; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client cancels request with id %" PRIu32 "\n", - ntohl (msg->id)); - while ( (NULL != rep_cls->next) && - (rep_cls->id != ntohl (msg->id)) ) - rep_cls = rep_cls->next; - GNUNET_assert (rep_cls->id == ntohl (msg->id)); - destroy_reply_cls (rep_cls); - GNUNET_SERVICE_client_continue (cli_ctx->client); -} - - /** * @brief This function is called, when the client seeds peers. * It verifies that @a msg is well-formed. @@ -4102,7 +3829,6 @@ shutdown_task (void *cls) GNUNET_NSE_disconnect (nse); RPS_sampler_destroy (prot_sampler); - RPS_sampler_destroy (client_sampler); GNUNET_CADET_close_port (cadet_port); GNUNET_CADET_disconnect (cadet_handle); cadet_handle = NULL; @@ -4345,7 +4071,6 @@ run (void *cls, max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval); - client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval); /* Initialise push and pull maps */ push_map = CustomPeerMap_create (4); @@ -4385,14 +4110,6 @@ GNUNET_SERVICE_MAIN &client_connect_cb, &client_disconnect_cb, NULL, - GNUNET_MQ_hd_fixed_size (client_request, - GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, - struct GNUNET_RPS_CS_RequestMessage, - NULL), - GNUNET_MQ_hd_fixed_size (client_request_cancel, - GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL, - struct GNUNET_RPS_CS_RequestCancelMessage, - NULL), GNUNET_MQ_hd_var_size (client_seed, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, struct GNUNET_RPS_CS_SeedMessage, diff --git a/src/rps/rps.h b/src/rps/rps.h index 9e4487f88..915524f88 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h @@ -59,66 +59,6 @@ struct GNUNET_RPS_P2P_PullReplyMessage * Client-Service Messages ***********************************************************************/ -/** - * Message from client to RPS service to request random peer(s). - */ -struct GNUNET_RPS_CS_RequestMessage -{ - /** - * Header including size and type in NBO - */ - struct GNUNET_MessageHeader header; - - /** - * Identifyer of the message. - */ - uint32_t id GNUNET_PACKED; - - /** - * Number of random peer requested - */ - uint32_t num_peers GNUNET_PACKED; -}; - -/** - * Message from RPS service to client to reply with random peer(s). - */ -struct GNUNET_RPS_CS_ReplyMessage -{ - /** - * Type is #GNUNET_MESSAGE_TYPE_RPS_CS_REPLY. - */ - struct GNUNET_MessageHeader header; - - /** - * Identifyer of the message. - */ - uint32_t id GNUNET_PACKED; - - /** - * Number of random peer replied - */ - uint32_t num_peers GNUNET_PACKED; - - /* Followed by num_peers * GNUNET_PeerIdentity */ -}; - -/** - * Message from client to RPS service to cancel request. - */ -struct GNUNET_RPS_CS_RequestCancelMessage -{ - /** - * Header including size and type in NBO - */ - struct GNUNET_MessageHeader header; - - /** - * Identifyer of the message. - */ - uint32_t id GNUNET_PACKED; -}; - /** * Message from client to service with seed of peers. */ diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index e4f4db506..ee65c2a82 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -83,16 +83,6 @@ struct GNUNET_RPS_Handle */ struct GNUNET_MQ_Handle *mq; - /** - * Array of Request_Handles. - */ - struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers; - - /** - * The id of the last request. - */ - uint32_t current_request_id; - /** * @brief Callback called on each update of the view */ @@ -130,11 +120,6 @@ struct GNUNET_RPS_Request_Handle */ struct GNUNET_RPS_Handle *rps_handle; - /** - * The id of the request. - */ - uint32_t id; - /** * The number of requested peers. */ @@ -145,6 +130,17 @@ struct GNUNET_RPS_Request_Handle */ struct RPS_Sampler *sampler; + /** + * @brief Request handle of the request to the sampler - needed to cancel the request + */ + struct RPS_SamplerRequestHandle *sampler_rh; + + /** + * @brief Request handle of the request of the biased stream of peers - + * needed to cancel the request + */ + struct GNUNET_RPS_StreamRequestHandle *srh; + /** * The callback to be called when we receive an answer. */ @@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, /** - * @brief Create new request handle - * - * @param rps_handle Handle to the service - * @param num_requests Number of requests - * @param ready_cb Callback - * @param cls Closure + * @brief Called once the sampler has collected all requested peers. * - * @return The newly created request handle - */ -static struct GNUNET_RPS_Request_Handle * -new_request_handle (struct GNUNET_RPS_Handle *rps_handle, - uint64_t num_requests, - struct RPS_Sampler *sampler, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh; - - rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); - rh->rps_handle = rps_handle; - rh->id = rps_handle->current_request_id++; - rh->num_requests = num_requests; - rh->sampler = sampler; - rh->ready_cb = ready_cb; - rh->ready_cb_cls = cls; - GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - - return rh; -} - - -/** - * @brief Send a request to the service. + * Calls the callback provided by the client with the corresponding cls. * - * @param h rps handle - * @param id id of the request - * @param num_req_peers number of peers + * @param peers The array of @a num_peers that has been returned. + * @param num_peers The number of peers that have been returned + * @param cls The #GNUNET_RPS_Request_Handle */ void -send_request (const struct GNUNET_RPS_Handle *h, - uint32_t id, - uint32_t num_req_peers) +peers_ready_cb (const struct GNUNET_PeerIdentity *peers, + uint32_t num_peers, + void *cls) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_RequestMessage *msg; + struct GNUNET_RPS_Request_Handle *rh = cls; - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); - msg->num_peers = htonl (num_req_peers); - msg->id = htonl (id); - GNUNET_MQ_send (h->mq, ev); + rh->ready_cb (rh->ready_cb_cls, + num_peers, + peers); + // TODO cleanup, sampler, rh, cancel stuff + // TODO screw this function. We can give the cb,cls directly to the sampler. } -/** - * @brief Iterator function over pending requests - * - * Implements #GNUNET_CONTAINER_HashMapIterator32 - * - * @param cls rps handle - * @param key id of the request - * @param value request handle - * - * @return GNUNET_YES to continue iteration - */ -int -resend_requests_iterator (void *cls, uint32_t key, void *value) -{ - const struct GNUNET_RPS_Handle *h = cls; - const struct GNUNET_RPS_Request_Handle *req_handle = value; - (void) key; - - send_request (h, req_handle->id, req_handle->num_requests); - return GNUNET_YES; /* continue iterating */ -} /** - * @brief Resend all pending requests - * - * This is used to resend all pending requests after the client - * reconnected to the service, because the service cancels all - * pending requests after reconnection. + * @brief Callback to collect the peers from the biased stream and put those + * into the sampler. * - * @param h rps handle + * @param cls The #GNUNET_RPS_Request_Handle + * @param num_peers The number of peer that have been returned + * @param peers The array of @a num_peers that have been returned */ void -resend_requests (struct GNUNET_RPS_Handle *h) -{ - GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, - resend_requests_iterator, - h); -} - - -/** - * This function is called, when the service replies to our request. - * It verifies that @a msg is well-formed. - * - * @param cls the closure - * @param msg the message - * @return #GNUNET_OK if @a msg is well-formed - */ -static int -check_reply (void *cls, - const struct GNUNET_RPS_CS_ReplyMessage *msg) +collect_peers_cb (void *cls, + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) { - uint16_t msize = ntohs (msg->header.size); - uint32_t num_peers = ntohl (msg->num_peers); - (void) cls; + struct GNUNET_RPS_Request_Handle *rh = cls; - msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); - if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || - (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) + for (uint64_t i = 0; i < num_peers; i++) { - GNUNET_break (0); - return GNUNET_SYSERR; + RPS_sampler_update (rh->sampler, &peers[i]); } - return GNUNET_OK; } /** - * This function is called, when the service replies to our request. - * It calls the callback the caller gave us with the provided closure - * and disconnects afterwards. + * @brief Create new request handle * - * @param cls the closure - * @param msg the message + * @param rps_handle Handle to the service + * @param num_requests Number of requests + * @param ready_cb Callback + * @param cls Closure + * + * @return The newly created request handle */ -static void -handle_reply (void *cls, - const struct GNUNET_RPS_CS_ReplyMessage *msg) +static struct GNUNET_RPS_Request_Handle * +new_request_handle (struct GNUNET_RPS_Handle *rps_handle, + uint64_t num_requests, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) { - struct GNUNET_RPS_Handle *h = cls; - struct GNUNET_PeerIdentity *peers; struct GNUNET_RPS_Request_Handle *rh; - uint32_t id; - /* Give the peers back */ - id = ntohl (msg->id); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", - ntohl (msg->num_peers), - id); + rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); + rh->rps_handle = rps_handle; + rh->num_requests = num_requests; + rh->sampler = RPS_sampler_mod_init (num_requests, + GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff + rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, + num_requests, + peers_ready_cb, + rh); + rh->srh = GNUNET_RPS_stream_request (rps_handle, + 0, /* infinite updates */ + collect_peers_cb, + rh); /* cls */ + rh->ready_cb = ready_cb; + rh->ready_cb_cls = cls; - peers = (struct GNUNET_PeerIdentity *) &msg[1]; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id)); - rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id); - GNUNET_assert (NULL != rh); - GNUNET_assert (rh->num_requests == ntohl (msg->num_peers)); - GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id); - rh->ready_cb (rh->ready_cb_cls, - ntohl (msg->num_peers), - peers); + return rh; } @@ -683,7 +605,6 @@ mq_error_handler (void *cls, reconnect (h); /* Resend all pending request as the service destroyed its knowledge * about them */ - resend_requests (h); } @@ -694,10 +615,6 @@ static void reconnect (struct GNUNET_RPS_Handle *h) { struct GNUNET_MQ_MessageHandler mq_handlers[] = { - GNUNET_MQ_hd_var_size (reply, - GNUNET_MESSAGE_TYPE_RPS_CS_REPLY, - struct GNUNET_RPS_CS_ReplyMessage, - h), GNUNET_MQ_hd_var_size (view_update, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, struct GNUNET_RPS_CS_DEBUG_ViewReply, @@ -731,7 +648,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) struct GNUNET_RPS_Handle *h; h = GNUNET_new (struct GNUNET_RPS_Handle); - h->current_request_id = 0; h->cfg = cfg; reconnect (h); if (NULL == h->mq) @@ -739,89 +655,10 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) GNUNET_free (h); return NULL; } - h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4); return h; } -/** - * Request n random peers. - * - * @param rps_handle handle to the rps service - * @param num_req_peers number of peers we want to receive - * @param ready_cb the callback called when the peers are available - * @param cls closure given to the callback - * @return a handle to cancel this request - */ -struct GNUNET_RPS_Request_Handle * -GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle, - uint32_t num_req_peers, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh; - - rh = new_request_handle (rps_handle, - num_req_peers, - NULL, /* no sampler needed */ - ready_cb, - cls); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Requesting %" PRIu32 " peers with id %" PRIu32 "\n", - num_req_peers, - rh->id); - - send_request (rps_handle, rh->id, num_req_peers); - return rh; -} - - -/** - * @brief Callback to collect the peers from the biased stream and put those - * into the sampler. - * - * @param cls The #GNUNET_RPS_Request_Handle - * @param num_peers The number of peer that have been returned - * @param peers The array of @a num_peers that have been returned - */ -void -collect_peers_cb (void *cls, - uint64_t num_peers, - const struct GNUNET_PeerIdentity *peers) -{ - struct GNUNET_RPS_Request_Handle *rh = cls; - - for (uint64_t i = 0; i < num_peers; i++) - { - RPS_sampler_update (rh->sampler, &peers[i]); - } -} - - -/** - * @brief Called once the sampler has collected all requested peers. - * - * Calls the callback provided by the client with the corresponding cls. - * - * @param peers The array of @a num_peers that has been returned. - * @param num_peers The number of peers that have been returned - * @param cls The #GNUNET_RPS_Request_Handle - */ -void -peers_ready_cb (const struct GNUNET_PeerIdentity *peers, - uint32_t num_peers, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh = cls; - - rh->ready_cb (rh->ready_cb_cls, - num_peers, - peers); - // TODO cleanup, sampler, rh, cancel stuff - // TODO screw this function. We can give the cb,cls directly to the sampler. -} - /** * Request n random peers. * @@ -841,19 +678,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, rh = new_request_handle (rps_handle, num_req_peers, - RPS_sampler_mod_init (num_req_peers, - GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff ready_cb, cls); - RPS_sampler_get_n_rand_peers (rh->sampler, - num_req_peers, - peers_ready_cb, - rh); - GNUNET_RPS_stream_request (rps_handle, - 0, /* infinite updates */ - collect_peers_cb, - rh); /* cls */ return rh; } @@ -1022,20 +849,21 @@ void GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) { struct GNUNET_RPS_Handle *h; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_RequestCancelMessage*msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Cancelling request with id %" PRIu32 "\n", - rh->id); h = rh->rps_handle; - GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, - rh->id)); - GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); - msg->id = htonl (rh->id); - GNUNET_MQ_send (rh->rps_handle->mq, ev); + if (NULL != rh->srh) + { + remove_stream_request (rh->srh, + h->stream_requests_head, + h->stream_requests_tail); + } + if (NULL == h->stream_requests_head) cancel_stream(h); + if (NULL != rh->sampler_rh) + { + RPS_sampler_request_cancel (rh->sampler_rh); + } + RPS_sampler_destroy (rh->sampler); + GNUNET_free (rh); } @@ -1048,10 +876,11 @@ void GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) { GNUNET_MQ_destroy (h->mq); - if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) + if (NULL != h->stream_requests_head) + { LOG (GNUNET_ERROR_TYPE_WARNING, "Still waiting for requests\n"); - GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); + } GNUNET_free (h); } -- cgit v1.2.3