summaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r--src/rps/gnunet-service-rps.c278
1 files changed, 198 insertions, 80 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3e30041e8..5b78bb4a8 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1769,6 +1769,12 @@ struct ClientContext
int64_t view_updates_left;
/**
+ * @brief How many peers from the biased
+ * stream this client expects to receive.
+ */
+ int64_t stream_peers_left;
+
+ /**
* The client handle to send the reply to
*/
struct GNUNET_SERVICE_Client *client;
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
return ret;
}
+
+/**
+ * @brief Send view to client
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_view (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *view_array,
+ uint64_t view_size)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
+
+ if (NULL == view_array)
+ {
+ view_size = View_size ();
+ view_array = View_get_as_array();
+ }
+
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ view_size * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
+ out_msg->num_peers = htonl (view_size);
+
+ GNUNET_memcpy (&out_msg[1],
+ view_array,
+ view_size * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
+/**
+ * @brief Send peer from biased stream to client.
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_stream_peer (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *peer)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
+
+ GNUNET_assert (NULL != peer);
+
+ ev = GNUNET_MQ_msg (out_msg,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+
+ GNUNET_memcpy (&out_msg->peer,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
/**
* @brief sends updates to clients that are interested
*/
static void
-clients_notify_view_update (void);
+clients_notify_view_update (void)
+{
+ struct ClientContext *cli_ctx_iter;
+ uint64_t num_peers;
+ const struct GNUNET_PeerIdentity *view_array;
+
+ num_peers = View_size ();
+ view_array = View_get_as_array();
+ /* check size of view is small enough */
+ if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "View is too big to send\n");
+ return;
+ }
+
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->view_updates_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->view_updates_left -= 1;
+ } else if (1 == cli_ctx_iter->view_updates_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->view_updates_left = -1;
+ } else if (0 > cli_ctx_iter->view_updates_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_view (cli_ctx_iter, view_array, num_peers);
+ }
+}
+
+
+/**
+ * @brief sends updates to clients that are interested
+ */
+static void
+clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
+ //enum StreamPeerSource)
+{
+ struct ClientContext *cli_ctx_iter;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got peer (%s) from biased stream - update all clients\n",
+ GNUNET_i2s (peer));
+
+ /* check size of view is small enough */
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->stream_peers_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->stream_peers_left -= 1;
+ } else if (1 == cli_ctx_iter->stream_peers_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->stream_peers_left = -1;
+ } else if (0 > cli_ctx_iter->stream_peers_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_stream_peer (cli_ctx_iter, peer);
+ }
+}
/**
* Put random peer from sampler into the view as history update.
@@ -2193,7 +2334,12 @@ hist_update (void *cls,
for (i = 0; i < num_peers; i++)
{
- (void) insert_in_view (&ids[i]);
+ int inserted;
+ inserted = insert_in_view (&ids[i]);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (&ids[i]);
+ }
to_file (file_name_view_log,
"+%s\t(hist)",
GNUNET_i2s_full (ids));
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
(void) cls;
- (void) insert_in_view (peer);
+ int inserted;
+
+ inserted = insert_in_view (peer);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (peer);
+ }
}
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls,
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
-/**
- * @brief Send view to client
- *
- * @param cli_ctx the context of the client
- * @param view_array the peerids of the view as array (can be empty)
- * @param view_size the size of the view array (can be 0)
- */
-void
-send_view (const struct ClientContext *cli_ctx,
- const struct GNUNET_PeerIdentity *view_array,
- uint64_t view_size)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
-
- if (NULL == view_array)
- {
- view_size = View_size ();
- view_array = View_get_as_array();
- }
-
- ev = GNUNET_MQ_msg_extra (out_msg,
- view_size * sizeof (struct GNUNET_PeerIdentity),
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
- out_msg->num_peers = htonl (view_size);
-
- GNUNET_memcpy (&out_msg[1],
- view_array,
- view_size * sizeof (struct GNUNET_PeerIdentity));
- GNUNET_MQ_send (cli_ctx->mq, ev);
-}
/**
- * @brief sends updates to clients that are interested
+ * Handle RPS request from the client.
+ *
+ * @param cls closure
+ * @param message the actual message
*/
static void
-clients_notify_view_update (void)
+handle_client_view_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
{
- struct ClientContext *cli_ctx_iter;
- uint64_t num_peers;
- const struct GNUNET_PeerIdentity *view_array;
+ struct ClientContext *cli_ctx = cls;
+ uint64_t num_updates;
- num_peers = View_size ();
- view_array = View_get_as_array();
- /* check size of view is small enough */
- if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "View is too big to send\n");
- return;
- }
+ num_updates = ntohl (msg->num_updates);
- for (cli_ctx_iter = cli_ctx_head;
- NULL != cli_ctx_iter;
- cli_ctx_iter = cli_ctx_head->next)
- {
- if (1 < cli_ctx_iter->view_updates_left)
- {
- /* Client wants to receive limited amount of updates */
- cli_ctx_iter->view_updates_left -= 1;
- } else if (1 == cli_ctx_iter->view_updates_left)
- {
- /* Last update of view for client */
- cli_ctx_iter->view_updates_left = -1;
- } else if (0 > cli_ctx_iter->view_updates_left) {
- /* Client is not interested in updates */
- continue;
- }
- /* else _updates_left == 0 - infinite amount of updates */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requested %" PRIu64 " updates of view.\n",
+ num_updates);
- /* send view */
- send_view (cli_ctx_iter, view_array, num_peers);
- }
+ GNUNET_assert (NULL != cli_ctx);
+ cli_ctx->view_updates_left = num_updates;
+ send_view (cli_ctx, NULL, 0);
+ GNUNET_SERVICE_client_continue (cli_ctx->client);
}
/**
- * Handle RPS request from the client.
+ * Handle RPS request for biased stream from the client.
*
* @param cls closure
* @param message the actual message
*/
static void
-handle_client_view_request (void *cls,
- const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
+handle_client_stream_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
{
struct ClientContext *cli_ctx = cls;
- uint64_t num_updates;
+ uint64_t num_peers;
- num_updates = ntohl (msg->num_updates);
+ num_peers = ntohl (msg->num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested %" PRIu64 " updates of view.\n",
- num_updates);
+ "Client requested %" PRIu64 " peers from biased stream.\n",
+ num_peers);
GNUNET_assert (NULL != cli_ctx);
- cli_ctx->view_updates_left = num_updates;
- send_view (cli_ctx, NULL, 0);
+ cli_ctx->stream_peers_left = num_peers;
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
@@ -3727,8 +3829,14 @@ do_round (void *cls)
CustomPeerMap_size (push_map));
for (i = 0; i < first_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
- permut[i]));
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
+ permut[i]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(push list)",
GNUNET_i2s_full (&view_array[i]));
@@ -3742,8 +3850,14 @@ do_round (void *cls)
CustomPeerMap_size (pull_map));
for (i = first_border; i < second_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
permut[i - first_border]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(pull list)",
GNUNET_i2s_full (&view_array[i]));
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
struct GNUNET_RPS_CS_DEBUG_ViewRequest,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_stream_request,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
+ struct GNUNET_RPS_CS_DEBUG_StreamRequest,
+ NULL),
GNUNET_MQ_handler_end());
/* end of gnunet-service-rps.c */