From e91d46cdbbc97414968fa751fd1f596757c56875 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Fri, 14 Sep 2018 00:38:45 +0200 Subject: Add API call to receive unbiased peer stream for debugging and profiling --- src/include/gnunet_protocols.h | 16 +++ src/include/gnunet_rps_service.h | 25 ++++ src/rps/gnunet-rps.c | 51 ++++++- src/rps/gnunet-service-rps.c | 278 ++++++++++++++++++++++++++++----------- src/rps/rps.h | 41 ++++++ src/rps/rps_api.c | 71 +++++++++- 6 files changed, 399 insertions(+), 83 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 4400db7e1..56e049608 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2993,6 +2993,22 @@ extern "C" #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132 +/** + * @brief Request biased input stream + */ +#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST 1133 + +/** + * @brief Send peer of biased stream + */ +#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY 1134 + +/** + * @brief Cancel getting biased strem + */ +#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 + + /** * Next available: 1200 diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index b0c8c9867..252188c62 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h @@ -73,6 +73,14 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls, uint64_t num_peers, const struct GNUNET_PeerIdentity *peers); +/** + * Callback called when a peer from the biased stream was received + * + * @param peer The received peer + */ +typedef void (* GNUNET_RPS_StreamInputCB) (void *cls, + const struct GNUNET_PeerIdentity *peer); + /** * Connect to the rps service * @@ -161,6 +169,23 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, GNUNET_RPS_ViewUpdateCB view_update_cb, void *cls); + +/** + * Request biased stream of peers that are being put into the sampler + * + * @param rps_handle handle to the rps service + * @param num_req_peers number of peers we want to receive + * (0 for infinite updates) + * @param cls a closure that will be given to the callback + * @param ready_cb the callback called when the peers are available + */ +void +GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, + uint32_t num_updates, + GNUNET_RPS_StreamInputCB stream_input_cb, + void *cls); + + /** * Disconnect from the rps service * diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index 03b2c8ab7..d2c497fd4 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c @@ -48,11 +48,21 @@ static struct GNUNET_PeerIdentity peer_id; */ static int view_update; +/** + * @brief Do we want to receive updates of the view? (Option --view) + */ +static int stream_input; + /** * @brief Number of updates we want to receive */ static uint64_t num_view_updates; +/** + * @brief Number of updates we want to receive + */ +static uint64_t num_stream_peers; + /** * Task run when user presses CTRL-C to abort. @@ -136,6 +146,22 @@ view_update_handle (void *cls, } +/** + * Callback called on receipt of peer from biased stream + * + * @param n number of peers + * @param recv_peers the received peers + */ +static void +stream_input_handle (void *cls, + const struct GNUNET_PeerIdentity *recv_peer) +{ + // TODO when source of peer is sent, also print source + FPRINTF (stdout, "%s\n", + GNUNET_i2s_full (recv_peer)); +} + + /** * Main function that will be run by the scheduler. * @@ -163,7 +189,8 @@ run (void *cls, } if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) && - (!view_update)) + (!view_update) && + (!stream_input)) { /* Request n PeerIDs */ /* If number was specified use it, else request single peer. */ if (NULL == args[0] || @@ -189,7 +216,23 @@ run (void *cls, "Requesting %" PRIu64 " view updates\n", num_view_updates); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Requesting contiuous view updates\n"); + "Requesting continuous view updates\n"); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); + } else if (stream_input) + { + /* Get updates of view */ + if (NULL == args[0] || + 0 == sscanf (args[0], "%lu", &num_stream_peers)) + { + num_stream_peers = 0; + } + GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL); + if (0 != num_stream_peers) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Requesting continuous peers from biased stream\n"); GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); } else @@ -223,6 +266,10 @@ main (int argc, char *const *argv) "view", gettext_noop ("Get updates of view (0 for infinite updates)"), &view_update), + GNUNET_GETOPT_option_flag ('S', + "stream", + gettext_noop ("Get peers from biased stream"), + &stream_input), GNUNET_GETOPT_OPTION_END }; return (GNUNET_OK == diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 3e30041e8..5b78bb4a8 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -1768,6 +1768,12 @@ struct ClientContext */ int64_t view_updates_left; + /** + * @brief How many peers from the biased + * stream this client expects to receive. + */ + int64_t stream_peers_left; + /** * The client handle to send the reply to */ @@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer) return ret; } + +/** + * @brief Send view to client + * + * @param cli_ctx the context of the client + * @param view_array the peerids of the view as array (can be empty) + * @param view_size the size of the view array (can be 0) + */ +void +send_view (const struct ClientContext *cli_ctx, + const struct GNUNET_PeerIdentity *view_array, + uint64_t view_size) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; + + if (NULL == view_array) + { + view_size = View_size (); + view_array = View_get_as_array(); + } + + ev = GNUNET_MQ_msg_extra (out_msg, + view_size * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); + out_msg->num_peers = htonl (view_size); + + GNUNET_memcpy (&out_msg[1], + view_array, + view_size * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (cli_ctx->mq, ev); +} + + +/** + * @brief Send peer from biased stream to client. + * + * @param cli_ctx the context of the client + * @param view_array the peerids of the view as array (can be empty) + * @param view_size the size of the view array (can be 0) + */ +void +send_stream_peer (const struct ClientContext *cli_ctx, + const struct GNUNET_PeerIdentity *peer) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; + + GNUNET_assert (NULL != peer); + + ev = GNUNET_MQ_msg (out_msg, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); + + GNUNET_memcpy (&out_msg->peer, + peer, + sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (cli_ctx->mq, ev); +} + + /** * @brief sends updates to clients that are interested */ static void -clients_notify_view_update (void); +clients_notify_view_update (void) +{ + struct ClientContext *cli_ctx_iter; + uint64_t num_peers; + const struct GNUNET_PeerIdentity *view_array; + + num_peers = View_size (); + view_array = View_get_as_array(); + /* check size of view is small enough */ + if (GNUNET_MAX_MESSAGE_SIZE < num_peers) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "View is too big to send\n"); + return; + } + + for (cli_ctx_iter = cli_ctx_head; + NULL != cli_ctx_iter; + cli_ctx_iter = cli_ctx_head->next) + { + if (1 < cli_ctx_iter->view_updates_left) + { + /* Client wants to receive limited amount of updates */ + cli_ctx_iter->view_updates_left -= 1; + } else if (1 == cli_ctx_iter->view_updates_left) + { + /* Last update of view for client */ + cli_ctx_iter->view_updates_left = -1; + } else if (0 > cli_ctx_iter->view_updates_left) { + /* Client is not interested in updates */ + continue; + } + /* else _updates_left == 0 - infinite amount of updates */ + + /* send view */ + send_view (cli_ctx_iter, view_array, num_peers); + } +} + + +/** + * @brief sends updates to clients that are interested + */ +static void +clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) + //enum StreamPeerSource) +{ + struct ClientContext *cli_ctx_iter; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got peer (%s) from biased stream - update all clients\n", + GNUNET_i2s (peer)); + + /* check size of view is small enough */ + for (cli_ctx_iter = cli_ctx_head; + NULL != cli_ctx_iter; + cli_ctx_iter = cli_ctx_head->next) + { + if (1 < cli_ctx_iter->stream_peers_left) + { + /* Client wants to receive limited amount of updates */ + cli_ctx_iter->stream_peers_left -= 1; + } else if (1 == cli_ctx_iter->stream_peers_left) + { + /* Last update of view for client */ + cli_ctx_iter->stream_peers_left = -1; + } else if (0 > cli_ctx_iter->stream_peers_left) { + /* Client is not interested in updates */ + continue; + } + /* else _updates_left == 0 - infinite amount of updates */ + + /* send view */ + send_stream_peer (cli_ctx_iter, peer); + } +} /** * Put random peer from sampler into the view as history update. @@ -2193,7 +2334,12 @@ hist_update (void *cls, for (i = 0; i < num_peers; i++) { - (void) insert_in_view (&ids[i]); + int inserted; + inserted = insert_in_view (&ids[i]); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer (&ids[i]); + } to_file (file_name_view_log, "+%s\t(hist)", GNUNET_i2s_full (ids)); @@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls, const struct GNUNET_PeerIdentity *peer) { (void) cls; - (void) insert_in_view (peer); + int inserted; + + inserted = insert_in_view (peer); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer (peer); + } } @@ -2860,104 +3012,54 @@ handle_client_seed (void *cls, GNUNET_SERVICE_client_continue (cli_ctx->client); } -/** - * @brief Send view to client - * - * @param cli_ctx the context of the client - * @param view_array the peerids of the view as array (can be empty) - * @param view_size the size of the view array (can be 0) - */ -void -send_view (const struct ClientContext *cli_ctx, - const struct GNUNET_PeerIdentity *view_array, - uint64_t view_size) -{ - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; - - if (NULL == view_array) - { - view_size = View_size (); - view_array = View_get_as_array(); - } - - ev = GNUNET_MQ_msg_extra (out_msg, - view_size * sizeof (struct GNUNET_PeerIdentity), - GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); - out_msg->num_peers = htonl (view_size); - - GNUNET_memcpy (&out_msg[1], - view_array, - view_size * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_MQ_send (cli_ctx->mq, ev); -} /** - * @brief sends updates to clients that are interested + * Handle RPS request from the client. + * + * @param cls closure + * @param message the actual message */ static void -clients_notify_view_update (void) +handle_client_view_request (void *cls, + const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) { - struct ClientContext *cli_ctx_iter; - uint64_t num_peers; - const struct GNUNET_PeerIdentity *view_array; + struct ClientContext *cli_ctx = cls; + uint64_t num_updates; - num_peers = View_size (); - view_array = View_get_as_array(); - /* check size of view is small enough */ - if (GNUNET_MAX_MESSAGE_SIZE < num_peers) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "View is too big to send\n"); - return; - } + num_updates = ntohl (msg->num_updates); - for (cli_ctx_iter = cli_ctx_head; - NULL != cli_ctx_iter; - cli_ctx_iter = cli_ctx_head->next) - { - if (1 < cli_ctx_iter->view_updates_left) - { - /* Client wants to receive limited amount of updates */ - cli_ctx_iter->view_updates_left -= 1; - } else if (1 == cli_ctx_iter->view_updates_left) - { - /* Last update of view for client */ - cli_ctx_iter->view_updates_left = -1; - } else if (0 > cli_ctx_iter->view_updates_left) { - /* Client is not interested in updates */ - continue; - } - /* else _updates_left == 0 - infinite amount of updates */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client requested %" PRIu64 " updates of view.\n", + num_updates); - /* send view */ - send_view (cli_ctx_iter, view_array, num_peers); - } + GNUNET_assert (NULL != cli_ctx); + cli_ctx->view_updates_left = num_updates; + send_view (cli_ctx, NULL, 0); + GNUNET_SERVICE_client_continue (cli_ctx->client); } /** - * Handle RPS request from the client. + * Handle RPS request for biased stream from the client. * * @param cls closure * @param message the actual message */ static void -handle_client_view_request (void *cls, - const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) +handle_client_stream_request (void *cls, + const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) { struct ClientContext *cli_ctx = cls; - uint64_t num_updates; + uint64_t num_peers; - num_updates = ntohl (msg->num_updates); + num_peers = ntohl (msg->num_peers); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requested %" PRIu64 " updates of view.\n", - num_updates); + "Client requested %" PRIu64 " peers from biased stream.\n", + num_peers); GNUNET_assert (NULL != cli_ctx); - cli_ctx->view_updates_left = num_updates; - send_view (cli_ctx, NULL, 0); + cli_ctx->stream_peers_left = num_peers; GNUNET_SERVICE_client_continue (cli_ctx->client); } @@ -3727,8 +3829,14 @@ do_round (void *cls) CustomPeerMap_size (push_map)); for (i = 0; i < first_border; i++) { - (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, - permut[i])); + int inserted; + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map, + permut[i])); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer ( + CustomPeerMap_get_peer_by_index (push_map, permut[i])); + } to_file (file_name_view_log, "+%s\t(push list)", GNUNET_i2s_full (&view_array[i])); @@ -3742,8 +3850,14 @@ do_round (void *cls) CustomPeerMap_size (pull_map)); for (i = first_border; i < second_border; i++) { - (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, + int inserted; + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, permut[i - first_border])); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer ( + CustomPeerMap_get_peer_by_index (push_map, permut[i])); + } to_file (file_name_view_log, "+%s\t(pull list)", GNUNET_i2s_full (&view_array[i])); @@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, struct GNUNET_RPS_CS_DEBUG_ViewRequest, NULL), + GNUNET_MQ_hd_fixed_size (client_stream_request, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST, + struct GNUNET_RPS_CS_DEBUG_StreamRequest, + NULL), GNUNET_MQ_handler_end()); /* end of gnunet-service-rps.c */ diff --git a/src/rps/rps.h b/src/rps/rps.h index 58ba79082..66b2dd962 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h @@ -216,6 +216,47 @@ struct GNUNET_RPS_CS_DEBUG_ViewReply }; /* Followed by num_peers * GNUNET_PeerIdentity */ +/** + * Message from client to service indicating that + * clients wants to get stream of biased peers + */ +struct GNUNET_RPS_CS_DEBUG_StreamRequest +{ + /** + * Header including size and type in NBO + */ + struct GNUNET_MessageHeader header; + + /** + * Number of peers + * 0 for sending updates until cancellation + */ + uint32_t num_peers GNUNET_PACKED; +}; + +/** + * Message from service to client containing peer from biased stream + */ +struct GNUNET_RPS_CS_DEBUG_StreamReply +{ + /** + * Header including size and type in NBO + */ + struct GNUNET_MessageHeader header; + + /** + * Identifyer of the message. + */ + uint32_t id GNUNET_PACKED; + + /** + * @brief The peer of the biased stream + */ + struct GNUNET_PeerIdentity peer; + + // TODO maybe source of peer (pull/push list, peerinfo, ...) +}; + GNUNET_NETWORK_STRUCT_END /*********************************************************************** diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index ac462f3a0..b7644540d 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle GNUNET_RPS_ViewUpdateCB view_update_cb; /** - * @brief Callback called on each update of the view + * @brief Closure to each requested update of the view */ void *view_update_cls; + + /** + * @brief Callback called on each peer of the biased input stream + */ + GNUNET_RPS_StreamInputCB stream_input_cb; + + /** + * @brief Closure to each requested peer from the biased stream + */ + void *stream_input_cls; }; @@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, GNUNET_MQ_send (rps_handle->mq, ev); } + +/** + * Request biased stream of peers that are being put into the sampler + * + * @param rps_handle handle to the rps service + * @param num_req_peers number of peers we want to receive + * (0 for infinite updates) + * @param cls a closure that will be given to the callback + * @param ready_cb the callback called when the peers are available + */ +void +GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, + uint32_t num_peers, + GNUNET_RPS_StreamInputCB stream_input_cb, + void *cls) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client requests %" PRIu32 " biased stream updates\n", + num_peers); + rps_handle->stream_input_cb = stream_input_cb; + rps_handle->stream_input_cls = cls; + + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); + msg->num_peers = htonl (num_peers); + GNUNET_MQ_send (rps_handle->mq, ev); +} + + /** * This function is called, when the service updates the view. * It verifies that @a msg is well-formed. @@ -303,6 +344,7 @@ check_view_update (void *cls, return GNUNET_OK; } + /** * This function is called, when the service updated its view. * It calls the callback the caller provided @@ -329,6 +371,29 @@ handle_view_update (void *cls, } +/** + * This function is called, when the service sends another peer from the biased + * stream. + * It calls the callback the caller provided + * and disconnects afterwards. + * + * @param msg the message + */ +static void +handle_stream_input (void *cls, + const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) +{ + struct GNUNET_RPS_Handle *h = cls; + + /* Give the peers back */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "New peer of biased input stream\n"); + + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != h->stream_input_cb); + h->stream_input_cb (h->stream_input_cb, &msg->peer); +} + /** * Reconnect to the service @@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h) GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, struct GNUNET_RPS_CS_DEBUG_ViewReply, h), + GNUNET_MQ_hd_fixed_size (stream_input, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, + struct GNUNET_RPS_CS_DEBUG_StreamReply, + h), GNUNET_MQ_handler_end () }; -- cgit v1.2.3