summaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2018-09-18 13:59:37 +0200
committerJulius Bünger <buenger@mytum.de>2018-09-18 17:20:26 +0200
commitbd6822783a5daa6d03f1af13e0b4f05ba56df42a (patch)
treeefb66dd0dbae17a66783c751d3e16ef226a84363 /src/rps
parent2db3016405377f2d600f6029616eb1b5b391b685 (diff)
Add possibility to send multiple peers to client
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-rps.c32
-rw-r--r--src/rps/gnunet-service-rps.c64
-rw-r--r--src/rps/rps.h6
-rw-r--r--src/rps/rps_api.c43
4 files changed, 107 insertions, 38 deletions
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index d2c497fd4..d0f905f51 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -59,7 +59,7 @@ static int stream_input;
static uint64_t num_view_updates;
/**
- * @brief Number of updates we want to receive
+ * @brief Number of peers we want to receive from stream
*/
static uint64_t num_stream_peers;
@@ -154,11 +154,33 @@ view_update_handle (void *cls,
*/
static void
stream_input_handle (void *cls,
- const struct GNUNET_PeerIdentity *recv_peer)
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *recv_peers)
{
- // TODO when source of peer is sent, also print source
- FPRINTF (stdout, "%s\n",
- GNUNET_i2s_full (recv_peer));
+ uint64_t i;
+ (void) cls;
+
+ if (0 == num_peers)
+ {
+ FPRINTF (stdout, "Empty view\n");
+ }
+ req_handle = NULL;
+ for (i = 0; i < num_peers; i++)
+ {
+ FPRINTF (stdout, "%s\n",
+ GNUNET_i2s_full (&recv_peers[i]));
+
+ if (1 == num_stream_peers)
+ {
+ ret = 0;
+ GNUNET_SCHEDULER_shutdown ();
+ break;
+ }
+ else if (1 < num_stream_peers)
+ {
+ num_stream_peers--;
+ }
+ }
}
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 5b78bb4a8..4da73b09c 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx,
out_msg->num_peers = htonl (view_size);
GNUNET_memcpy (&out_msg[1],
- view_array,
- view_size * sizeof (struct GNUNET_PeerIdentity));
+ view_array,
+ view_size * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send (cli_ctx->mq, ev);
}
@@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx,
/**
* @brief Send peer from biased stream to client.
*
+ * TODO merge with send_view, parameterise
+ *
* @param cli_ctx the context of the client
* @param view_array the peerids of the view as array (can be empty)
* @param view_size the size of the view array (can be 0)
*/
void
send_stream_peer (const struct ClientContext *cli_ctx,
- const struct GNUNET_PeerIdentity *peer)
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
- GNUNET_assert (NULL != peer);
+ GNUNET_assert (NULL != peers);
- ev = GNUNET_MQ_msg (out_msg,
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ num_peers * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+ out_msg->num_peers = htonl (num_peers);
- GNUNET_memcpy (&out_msg->peer,
- peer,
- sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (&out_msg[1],
+ peers,
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send (cli_ctx->mq, ev);
}
@@ -2288,36 +2293,45 @@ clients_notify_view_update (void)
* @brief sends updates to clients that are interested
*/
static void
-clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
- //enum StreamPeerSource)
+clients_notify_stream_peer (uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
+ // TODO enum StreamPeerSource)
{
struct ClientContext *cli_ctx_iter;
+ uint64_t num_peers_send;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got peer (%s) from biased stream - update all clients\n",
- GNUNET_i2s (peer));
+ GNUNET_i2s (peers));
- /* check size of view is small enough */
for (cli_ctx_iter = cli_ctx_head;
NULL != cli_ctx_iter;
cli_ctx_iter = cli_ctx_head->next)
{
- if (1 < cli_ctx_iter->stream_peers_left)
+ if (0 < cli_ctx_iter->stream_peers_left)
{
/* Client wants to receive limited amount of updates */
- cli_ctx_iter->stream_peers_left -= 1;
- } else if (1 == cli_ctx_iter->stream_peers_left)
+ if (num_peers > cli_ctx_iter->stream_peers_left)
+ {
+ num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
+ cli_ctx_iter->stream_peers_left = 0;
+ }
+ else
+ {
+ num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
+ cli_ctx_iter->stream_peers_left -= num_peers_send;
+ }
+ } else if (0 > cli_ctx_iter->stream_peers_left)
{
- /* Last update of view for client */
- cli_ctx_iter->stream_peers_left = -1;
- } else if (0 > cli_ctx_iter->stream_peers_left) {
/* Client is not interested in updates */
continue;
+ } else /* _updates_left == 0 - infinite amount of updates */
+ {
+ num_peers_send = num_peers;
}
- /* else _updates_left == 0 - infinite amount of updates */
/* send view */
- send_stream_peer (cli_ctx_iter, peer);
+ send_stream_peer (cli_ctx_iter, num_peers_send, peers);
}
}
@@ -2338,7 +2352,7 @@ hist_update (void *cls,
inserted = insert_in_view (&ids[i]);
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (&ids[i]);
+ clients_notify_stream_peer (1, &ids[i]);
}
to_file (file_name_view_log,
"+%s\t(hist)",
@@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls,
inserted = insert_in_view (peer);
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (peer);
+ clients_notify_stream_peer (1, peer);
}
}
@@ -3834,7 +3848,7 @@ do_round (void *cls)
permut[i]));
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (
+ clients_notify_stream_peer (1,
CustomPeerMap_get_peer_by_index (push_map, permut[i]));
}
to_file (file_name_view_log,
@@ -3855,7 +3869,7 @@ do_round (void *cls)
permut[i - first_border]));
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (
+ clients_notify_stream_peer (1,
CustomPeerMap_get_peer_by_index (push_map, permut[i]));
}
to_file (file_name_view_log,
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 66b2dd962..26615bfc5 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply
uint32_t id GNUNET_PACKED;
/**
- * @brief The peer of the biased stream
+ * Number of peers
*/
- struct GNUNET_PeerIdentity peer;
+ uint64_t num_peers GNUNET_PACKED;
// TODO maybe source of peer (pull/push list, peerinfo, ...)
+
+ /* Followed by num_peers * GNUNET_PeerIdentity */
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index b7644540d..96660ded6 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -377,6 +377,34 @@ handle_view_update (void *cls,
* It calls the callback the caller provided
* and disconnects afterwards.
*
+ * TODO merge with check_view_update
+ *
+ * @param msg the message
+ */
+static int
+check_stream_input (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+ uint16_t msize = ntohs (msg->header.size);
+ uint32_t num_peers = ntohl (msg->num_peers);
+ (void) cls;
+
+ msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
+ if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
+ (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
* @param msg the message
*/
static void
@@ -384,14 +412,17 @@ handle_stream_input (void *cls,
const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
{
struct GNUNET_RPS_Handle *h = cls;
+ const struct GNUNET_PeerIdentity *peers;
/* Give the peers back */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "New peer of biased input stream\n");
+ "New peer of %" PRIu64 " biased input stream\n",
+ ntohl (msg->num_peers));
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
GNUNET_assert (NULL != h);
GNUNET_assert (NULL != h->stream_input_cb);
- h->stream_input_cb (h->stream_input_cb, &msg->peer);
+ h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers);
}
@@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
struct GNUNET_RPS_CS_DEBUG_ViewReply,
h),
- GNUNET_MQ_hd_fixed_size (stream_input,
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
- struct GNUNET_RPS_CS_DEBUG_StreamReply,
- h),
+ GNUNET_MQ_hd_var_size (stream_input,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+ struct GNUNET_RPS_CS_DEBUG_StreamReply,
+ h),
GNUNET_MQ_handler_end ()
};