diff options
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r-- | src/rps/gnunet-service-rps.c | 278 |
1 files changed, 198 insertions, 80 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 3e30041e8..5b78bb4a8 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -1769,6 +1769,12 @@ struct ClientContext int64_t view_updates_left; /** + * @brief How many peers from the biased + * stream this client expects to receive. + */ + int64_t stream_peers_left; + + /** * The client handle to send the reply to */ struct GNUNET_SERVICE_Client *client; @@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer) return ret; } + +/** + * @brief Send view to client + * + * @param cli_ctx the context of the client + * @param view_array the peerids of the view as array (can be empty) + * @param view_size the size of the view array (can be 0) + */ +void +send_view (const struct ClientContext *cli_ctx, + const struct GNUNET_PeerIdentity *view_array, + uint64_t view_size) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; + + if (NULL == view_array) + { + view_size = View_size (); + view_array = View_get_as_array(); + } + + ev = GNUNET_MQ_msg_extra (out_msg, + view_size * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); + out_msg->num_peers = htonl (view_size); + + GNUNET_memcpy (&out_msg[1], + view_array, + view_size * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (cli_ctx->mq, ev); +} + + +/** + * @brief Send peer from biased stream to client. + * + * @param cli_ctx the context of the client + * @param view_array the peerids of the view as array (can be empty) + * @param view_size the size of the view array (can be 0) + */ +void +send_stream_peer (const struct ClientContext *cli_ctx, + const struct GNUNET_PeerIdentity *peer) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; + + GNUNET_assert (NULL != peer); + + ev = GNUNET_MQ_msg (out_msg, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); + + GNUNET_memcpy (&out_msg->peer, + peer, + sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (cli_ctx->mq, ev); +} + + /** * @brief sends updates to clients that are interested */ static void -clients_notify_view_update (void); +clients_notify_view_update (void) +{ + struct ClientContext *cli_ctx_iter; + uint64_t num_peers; + const struct GNUNET_PeerIdentity *view_array; + + num_peers = View_size (); + view_array = View_get_as_array(); + /* check size of view is small enough */ + if (GNUNET_MAX_MESSAGE_SIZE < num_peers) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "View is too big to send\n"); + return; + } + + for (cli_ctx_iter = cli_ctx_head; + NULL != cli_ctx_iter; + cli_ctx_iter = cli_ctx_head->next) + { + if (1 < cli_ctx_iter->view_updates_left) + { + /* Client wants to receive limited amount of updates */ + cli_ctx_iter->view_updates_left -= 1; + } else if (1 == cli_ctx_iter->view_updates_left) + { + /* Last update of view for client */ + cli_ctx_iter->view_updates_left = -1; + } else if (0 > cli_ctx_iter->view_updates_left) { + /* Client is not interested in updates */ + continue; + } + /* else _updates_left == 0 - infinite amount of updates */ + + /* send view */ + send_view (cli_ctx_iter, view_array, num_peers); + } +} + + +/** + * @brief sends updates to clients that are interested + */ +static void +clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) + //enum StreamPeerSource) +{ + struct ClientContext *cli_ctx_iter; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got peer (%s) from biased stream - update all clients\n", + GNUNET_i2s (peer)); + + /* check size of view is small enough */ + for (cli_ctx_iter = cli_ctx_head; + NULL != cli_ctx_iter; + cli_ctx_iter = cli_ctx_head->next) + { + if (1 < cli_ctx_iter->stream_peers_left) + { + /* Client wants to receive limited amount of updates */ + cli_ctx_iter->stream_peers_left -= 1; + } else if (1 == cli_ctx_iter->stream_peers_left) + { + /* Last update of view for client */ + cli_ctx_iter->stream_peers_left = -1; + } else if (0 > cli_ctx_iter->stream_peers_left) { + /* Client is not interested in updates */ + continue; + } + /* else _updates_left == 0 - infinite amount of updates */ + + /* send view */ + send_stream_peer (cli_ctx_iter, peer); + } +} /** * Put random peer from sampler into the view as history update. @@ -2193,7 +2334,12 @@ hist_update (void *cls, for (i = 0; i < num_peers; i++) { - (void) insert_in_view (&ids[i]); + int inserted; + inserted = insert_in_view (&ids[i]); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer (&ids[i]); + } to_file (file_name_view_log, "+%s\t(hist)", GNUNET_i2s_full (ids)); @@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls, const struct GNUNET_PeerIdentity *peer) { (void) cls; - (void) insert_in_view (peer); + int inserted; + + inserted = insert_in_view (peer); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer (peer); + } } @@ -2860,104 +3012,54 @@ handle_client_seed (void *cls, GNUNET_SERVICE_client_continue (cli_ctx->client); } -/** - * @brief Send view to client - * - * @param cli_ctx the context of the client - * @param view_array the peerids of the view as array (can be empty) - * @param view_size the size of the view array (can be 0) - */ -void -send_view (const struct ClientContext *cli_ctx, - const struct GNUNET_PeerIdentity *view_array, - uint64_t view_size) -{ - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; - - if (NULL == view_array) - { - view_size = View_size (); - view_array = View_get_as_array(); - } - - ev = GNUNET_MQ_msg_extra (out_msg, - view_size * sizeof (struct GNUNET_PeerIdentity), - GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); - out_msg->num_peers = htonl (view_size); - - GNUNET_memcpy (&out_msg[1], - view_array, - view_size * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_MQ_send (cli_ctx->mq, ev); -} /** - * @brief sends updates to clients that are interested + * Handle RPS request from the client. + * + * @param cls closure + * @param message the actual message */ static void -clients_notify_view_update (void) +handle_client_view_request (void *cls, + const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) { - struct ClientContext *cli_ctx_iter; - uint64_t num_peers; - const struct GNUNET_PeerIdentity *view_array; + struct ClientContext *cli_ctx = cls; + uint64_t num_updates; - num_peers = View_size (); - view_array = View_get_as_array(); - /* check size of view is small enough */ - if (GNUNET_MAX_MESSAGE_SIZE < num_peers) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "View is too big to send\n"); - return; - } + num_updates = ntohl (msg->num_updates); - for (cli_ctx_iter = cli_ctx_head; - NULL != cli_ctx_iter; - cli_ctx_iter = cli_ctx_head->next) - { - if (1 < cli_ctx_iter->view_updates_left) - { - /* Client wants to receive limited amount of updates */ - cli_ctx_iter->view_updates_left -= 1; - } else if (1 == cli_ctx_iter->view_updates_left) - { - /* Last update of view for client */ - cli_ctx_iter->view_updates_left = -1; - } else if (0 > cli_ctx_iter->view_updates_left) { - /* Client is not interested in updates */ - continue; - } - /* else _updates_left == 0 - infinite amount of updates */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client requested %" PRIu64 " updates of view.\n", + num_updates); - /* send view */ - send_view (cli_ctx_iter, view_array, num_peers); - } + GNUNET_assert (NULL != cli_ctx); + cli_ctx->view_updates_left = num_updates; + send_view (cli_ctx, NULL, 0); + GNUNET_SERVICE_client_continue (cli_ctx->client); } /** - * Handle RPS request from the client. + * Handle RPS request for biased stream from the client. * * @param cls closure * @param message the actual message */ static void -handle_client_view_request (void *cls, - const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) +handle_client_stream_request (void *cls, + const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) { struct ClientContext *cli_ctx = cls; - uint64_t num_updates; + uint64_t num_peers; - num_updates = ntohl (msg->num_updates); + num_peers = ntohl (msg->num_peers); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requested %" PRIu64 " updates of view.\n", - num_updates); + "Client requested %" PRIu64 " peers from biased stream.\n", + num_peers); GNUNET_assert (NULL != cli_ctx); - cli_ctx->view_updates_left = num_updates; - send_view (cli_ctx, NULL, 0); + cli_ctx->stream_peers_left = num_peers; GNUNET_SERVICE_client_continue (cli_ctx->client); } @@ -3727,8 +3829,14 @@ do_round (void *cls) CustomPeerMap_size (push_map)); for (i = 0; i < first_border; i++) { - (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, - permut[i])); + int inserted; + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map, + permut[i])); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer ( + CustomPeerMap_get_peer_by_index (push_map, permut[i])); + } to_file (file_name_view_log, "+%s\t(push list)", GNUNET_i2s_full (&view_array[i])); @@ -3742,8 +3850,14 @@ do_round (void *cls) CustomPeerMap_size (pull_map)); for (i = first_border; i < second_border; i++) { - (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, + int inserted; + inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, permut[i - first_border])); + if (GNUNET_OK == inserted) + { + clients_notify_stream_peer ( + CustomPeerMap_get_peer_by_index (push_map, permut[i])); + } to_file (file_name_view_log, "+%s\t(pull list)", GNUNET_i2s_full (&view_array[i])); @@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, struct GNUNET_RPS_CS_DEBUG_ViewRequest, NULL), + GNUNET_MQ_hd_fixed_size (client_stream_request, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST, + struct GNUNET_RPS_CS_DEBUG_StreamRequest, + NULL), GNUNET_MQ_handler_end()); /* end of gnunet-service-rps.c */ |