summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_protocols.h16
-rw-r--r--src/include/gnunet_rps_service.h25
-rw-r--r--src/rps/gnunet-rps.c51
-rw-r--r--src/rps/gnunet-service-rps.c278
-rw-r--r--src/rps/rps.h41
-rw-r--r--src/rps/rps_api.c71
6 files changed, 399 insertions, 83 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 4400db7e1..56e049608 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2993,6 +2993,22 @@ extern "C"
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL 1132
+/**
+ * @brief Request biased input stream
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST 1133
+
+/**
+ * @brief Send peer of biased stream
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY 1134
+
+/**
+ * @brief Cancel getting biased strem
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135
+
+
/**
* Next available: 1200
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index b0c8c9867..252188c62 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -74,6 +74,14 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
const struct GNUNET_PeerIdentity *peers);
/**
+ * Callback called when a peer from the biased stream was received
+ *
+ * @param peer The received peer
+ */
+typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
+ const struct GNUNET_PeerIdentity *peer);
+
+/**
* Connect to the rps service
*
* @param cfg configuration to use
@@ -161,6 +169,23 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
GNUNET_RPS_ViewUpdateCB view_update_cb,
void *cls);
+
+/**
+ * Request biased stream of peers that are being put into the sampler
+ *
+ * @param rps_handle handle to the rps service
+ * @param num_req_peers number of peers we want to receive
+ * (0 for infinite updates)
+ * @param cls a closure that will be given to the callback
+ * @param ready_cb the callback called when the peers are available
+ */
+void
+GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+ uint32_t num_updates,
+ GNUNET_RPS_StreamInputCB stream_input_cb,
+ void *cls);
+
+
/**
* Disconnect from the rps service
*
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index 03b2c8ab7..d2c497fd4 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -49,10 +49,20 @@ static struct GNUNET_PeerIdentity peer_id;
static int view_update;
/**
+ * @brief Do we want to receive updates of the view? (Option --view)
+ */
+static int stream_input;
+
+/**
* @brief Number of updates we want to receive
*/
static uint64_t num_view_updates;
+/**
+ * @brief Number of updates we want to receive
+ */
+static uint64_t num_stream_peers;
+
/**
* Task run when user presses CTRL-C to abort.
@@ -137,6 +147,22 @@ view_update_handle (void *cls,
/**
+ * Callback called on receipt of peer from biased stream
+ *
+ * @param n number of peers
+ * @param recv_peers the received peers
+ */
+static void
+stream_input_handle (void *cls,
+ const struct GNUNET_PeerIdentity *recv_peer)
+{
+ // TODO when source of peer is sent, also print source
+ FPRINTF (stdout, "%s\n",
+ GNUNET_i2s_full (recv_peer));
+}
+
+
+/**
* Main function that will be run by the scheduler.
*
* @param cls closure
@@ -163,7 +189,8 @@ run (void *cls,
}
if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) &&
- (!view_update))
+ (!view_update) &&
+ (!stream_input))
{ /* Request n PeerIDs */
/* If number was specified use it, else request single peer. */
if (NULL == args[0] ||
@@ -189,7 +216,23 @@ run (void *cls,
"Requesting %" PRIu64 " view updates\n", num_view_updates);
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Requesting contiuous view updates\n");
+ "Requesting continuous view updates\n");
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+ } else if (stream_input)
+ {
+ /* Get updates of view */
+ if (NULL == args[0] ||
+ 0 == sscanf (args[0], "%lu", &num_stream_peers))
+ {
+ num_stream_peers = 0;
+ }
+ GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
+ if (0 != num_stream_peers)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting continuous peers from biased stream\n");
GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
}
else
@@ -223,6 +266,10 @@ main (int argc, char *const *argv)
"view",
gettext_noop ("Get updates of view (0 for infinite updates)"),
&view_update),
+ GNUNET_GETOPT_option_flag ('S',
+ "stream",
+ gettext_noop ("Get peers from biased stream"),
+ &stream_input),
GNUNET_GETOPT_OPTION_END
};
return (GNUNET_OK ==
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3e30041e8..5b78bb4a8 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1769,6 +1769,12 @@ struct ClientContext
int64_t view_updates_left;
/**
+ * @brief How many peers from the biased
+ * stream this client expects to receive.
+ */
+ int64_t stream_peers_left;
+
+ /**
* The client handle to send the reply to
*/
struct GNUNET_SERVICE_Client *client;
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
return ret;
}
+
+/**
+ * @brief Send view to client
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_view (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *view_array,
+ uint64_t view_size)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
+
+ if (NULL == view_array)
+ {
+ view_size = View_size ();
+ view_array = View_get_as_array();
+ }
+
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ view_size * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
+ out_msg->num_peers = htonl (view_size);
+
+ GNUNET_memcpy (&out_msg[1],
+ view_array,
+ view_size * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
+/**
+ * @brief Send peer from biased stream to client.
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_stream_peer (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *peer)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
+
+ GNUNET_assert (NULL != peer);
+
+ ev = GNUNET_MQ_msg (out_msg,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+
+ GNUNET_memcpy (&out_msg->peer,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
/**
* @brief sends updates to clients that are interested
*/
static void
-clients_notify_view_update (void);
+clients_notify_view_update (void)
+{
+ struct ClientContext *cli_ctx_iter;
+ uint64_t num_peers;
+ const struct GNUNET_PeerIdentity *view_array;
+
+ num_peers = View_size ();
+ view_array = View_get_as_array();
+ /* check size of view is small enough */
+ if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "View is too big to send\n");
+ return;
+ }
+
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->view_updates_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->view_updates_left -= 1;
+ } else if (1 == cli_ctx_iter->view_updates_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->view_updates_left = -1;
+ } else if (0 > cli_ctx_iter->view_updates_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_view (cli_ctx_iter, view_array, num_peers);
+ }
+}
+
+
+/**
+ * @brief sends updates to clients that are interested
+ */
+static void
+clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
+ //enum StreamPeerSource)
+{
+ struct ClientContext *cli_ctx_iter;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got peer (%s) from biased stream - update all clients\n",
+ GNUNET_i2s (peer));
+
+ /* check size of view is small enough */
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->stream_peers_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->stream_peers_left -= 1;
+ } else if (1 == cli_ctx_iter->stream_peers_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->stream_peers_left = -1;
+ } else if (0 > cli_ctx_iter->stream_peers_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_stream_peer (cli_ctx_iter, peer);
+ }
+}
/**
* Put random peer from sampler into the view as history update.
@@ -2193,7 +2334,12 @@ hist_update (void *cls,
for (i = 0; i < num_peers; i++)
{
- (void) insert_in_view (&ids[i]);
+ int inserted;
+ inserted = insert_in_view (&ids[i]);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (&ids[i]);
+ }
to_file (file_name_view_log,
"+%s\t(hist)",
GNUNET_i2s_full (ids));
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
(void) cls;
- (void) insert_in_view (peer);
+ int inserted;
+
+ inserted = insert_in_view (peer);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (peer);
+ }
}
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls,
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
-/**
- * @brief Send view to client
- *
- * @param cli_ctx the context of the client
- * @param view_array the peerids of the view as array (can be empty)
- * @param view_size the size of the view array (can be 0)
- */
-void
-send_view (const struct ClientContext *cli_ctx,
- const struct GNUNET_PeerIdentity *view_array,
- uint64_t view_size)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
-
- if (NULL == view_array)
- {
- view_size = View_size ();
- view_array = View_get_as_array();
- }
-
- ev = GNUNET_MQ_msg_extra (out_msg,
- view_size * sizeof (struct GNUNET_PeerIdentity),
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
- out_msg->num_peers = htonl (view_size);
-
- GNUNET_memcpy (&out_msg[1],
- view_array,
- view_size * sizeof (struct GNUNET_PeerIdentity));
- GNUNET_MQ_send (cli_ctx->mq, ev);
-}
/**
- * @brief sends updates to clients that are interested
+ * Handle RPS request from the client.
+ *
+ * @param cls closure
+ * @param message the actual message
*/
static void
-clients_notify_view_update (void)
+handle_client_view_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
{
- struct ClientContext *cli_ctx_iter;
- uint64_t num_peers;
- const struct GNUNET_PeerIdentity *view_array;
+ struct ClientContext *cli_ctx = cls;
+ uint64_t num_updates;
- num_peers = View_size ();
- view_array = View_get_as_array();
- /* check size of view is small enough */
- if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "View is too big to send\n");
- return;
- }
+ num_updates = ntohl (msg->num_updates);
- for (cli_ctx_iter = cli_ctx_head;
- NULL != cli_ctx_iter;
- cli_ctx_iter = cli_ctx_head->next)
- {
- if (1 < cli_ctx_iter->view_updates_left)
- {
- /* Client wants to receive limited amount of updates */
- cli_ctx_iter->view_updates_left -= 1;
- } else if (1 == cli_ctx_iter->view_updates_left)
- {
- /* Last update of view for client */
- cli_ctx_iter->view_updates_left = -1;
- } else if (0 > cli_ctx_iter->view_updates_left) {
- /* Client is not interested in updates */
- continue;
- }
- /* else _updates_left == 0 - infinite amount of updates */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requested %" PRIu64 " updates of view.\n",
+ num_updates);
- /* send view */
- send_view (cli_ctx_iter, view_array, num_peers);
- }
+ GNUNET_assert (NULL != cli_ctx);
+ cli_ctx->view_updates_left = num_updates;
+ send_view (cli_ctx, NULL, 0);
+ GNUNET_SERVICE_client_continue (cli_ctx->client);
}
/**
- * Handle RPS request from the client.
+ * Handle RPS request for biased stream from the client.
*
* @param cls closure
* @param message the actual message
*/
static void
-handle_client_view_request (void *cls,
- const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
+handle_client_stream_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
{
struct ClientContext *cli_ctx = cls;
- uint64_t num_updates;
+ uint64_t num_peers;
- num_updates = ntohl (msg->num_updates);
+ num_peers = ntohl (msg->num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested %" PRIu64 " updates of view.\n",
- num_updates);
+ "Client requested %" PRIu64 " peers from biased stream.\n",
+ num_peers);
GNUNET_assert (NULL != cli_ctx);
- cli_ctx->view_updates_left = num_updates;
- send_view (cli_ctx, NULL, 0);
+ cli_ctx->stream_peers_left = num_peers;
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
@@ -3727,8 +3829,14 @@ do_round (void *cls)
CustomPeerMap_size (push_map));
for (i = 0; i < first_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
- permut[i]));
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
+ permut[i]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(push list)",
GNUNET_i2s_full (&view_array[i]));
@@ -3742,8 +3850,14 @@ do_round (void *cls)
CustomPeerMap_size (pull_map));
for (i = first_border; i < second_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
permut[i - first_border]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(pull list)",
GNUNET_i2s_full (&view_array[i]));
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
struct GNUNET_RPS_CS_DEBUG_ViewRequest,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_stream_request,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
+ struct GNUNET_RPS_CS_DEBUG_StreamRequest,
+ NULL),
GNUNET_MQ_handler_end());
/* end of gnunet-service-rps.c */
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 58ba79082..66b2dd962 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -216,6 +216,47 @@ struct GNUNET_RPS_CS_DEBUG_ViewReply
};
/* Followed by num_peers * GNUNET_PeerIdentity */
+/**
+ * Message from client to service indicating that
+ * clients wants to get stream of biased peers
+ */
+struct GNUNET_RPS_CS_DEBUG_StreamRequest
+{
+ /**
+ * Header including size and type in NBO
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of peers
+ * 0 for sending updates until cancellation
+ */
+ uint32_t num_peers GNUNET_PACKED;
+};
+
+/**
+ * Message from service to client containing peer from biased stream
+ */
+struct GNUNET_RPS_CS_DEBUG_StreamReply
+{
+ /**
+ * Header including size and type in NBO
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Identifyer of the message.
+ */
+ uint32_t id GNUNET_PACKED;
+
+ /**
+ * @brief The peer of the biased stream
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ // TODO maybe source of peer (pull/push list, peerinfo, ...)
+};
+
GNUNET_NETWORK_STRUCT_END
/***********************************************************************
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index ac462f3a0..b7644540d 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle
GNUNET_RPS_ViewUpdateCB view_update_cb;
/**
- * @brief Callback called on each update of the view
+ * @brief Closure to each requested update of the view
*/
void *view_update_cls;
+
+ /**
+ * @brief Callback called on each peer of the biased input stream
+ */
+ GNUNET_RPS_StreamInputCB stream_input_cb;
+
+ /**
+ * @brief Closure to each requested peer from the biased stream
+ */
+ void *stream_input_cls;
};
@@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
GNUNET_MQ_send (rps_handle->mq, ev);
}
+
+/**
+ * Request biased stream of peers that are being put into the sampler
+ *
+ * @param rps_handle handle to the rps service
+ * @param num_req_peers number of peers we want to receive
+ * (0 for infinite updates)
+ * @param cls a closure that will be given to the callback
+ * @param ready_cb the callback called when the peers are available
+ */
+void
+GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+ uint32_t num_peers,
+ GNUNET_RPS_StreamInputCB stream_input_cb,
+ void *cls)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requests %" PRIu32 " biased stream updates\n",
+ num_peers);
+ rps_handle->stream_input_cb = stream_input_cb;
+ rps_handle->stream_input_cls = cls;
+
+ ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
+ msg->num_peers = htonl (num_peers);
+ GNUNET_MQ_send (rps_handle->mq, ev);
+}
+
+
/**
* This function is called, when the service updates the view.
* It verifies that @a msg is well-formed.
@@ -303,6 +344,7 @@ check_view_update (void *cls,
return GNUNET_OK;
}
+
/**
* This function is called, when the service updated its view.
* It calls the callback the caller provided
@@ -329,6 +371,29 @@ handle_view_update (void *cls,
}
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
+ * @param msg the message
+ */
+static void
+handle_stream_input (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+ struct GNUNET_RPS_Handle *h = cls;
+
+ /* Give the peers back */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "New peer of biased input stream\n");
+
+ GNUNET_assert (NULL != h);
+ GNUNET_assert (NULL != h->stream_input_cb);
+ h->stream_input_cb (h->stream_input_cb, &msg->peer);
+}
+
/**
* Reconnect to the service
@@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
struct GNUNET_RPS_CS_DEBUG_ViewReply,
h),
+ GNUNET_MQ_hd_fixed_size (stream_input,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+ struct GNUNET_RPS_CS_DEBUG_StreamReply,
+ h),
GNUNET_MQ_handler_end ()
};