From bd6822783a5daa6d03f1af13e0b4f05ba56df42a Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Tue, 18 Sep 2018 13:59:37 +0200 Subject: Add possibility to send multiple peers to client --- src/include/gnunet_rps_service.h | 1 + src/rps/gnunet-rps.c | 32 ++++++++++++++++---- src/rps/gnunet-service-rps.c | 64 ++++++++++++++++++++++++---------------- src/rps/rps.h | 6 ++-- src/rps/rps_api.c | 43 +++++++++++++++++++++++---- 5 files changed, 108 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index 252188c62..eda012076 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h @@ -79,6 +79,7 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls, * @param peer The received peer */ typedef void (* GNUNET_RPS_StreamInputCB) (void *cls, + uint64_t num_peers, const struct GNUNET_PeerIdentity *peer); /** diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index d2c497fd4..d0f905f51 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c @@ -59,7 +59,7 @@ static int stream_input; static uint64_t num_view_updates; /** - * @brief Number of updates we want to receive + * @brief Number of peers we want to receive from stream */ static uint64_t num_stream_peers; @@ -154,11 +154,33 @@ view_update_handle (void *cls, */ static void stream_input_handle (void *cls, - const struct GNUNET_PeerIdentity *recv_peer) + uint64_t num_peers, + const struct GNUNET_PeerIdentity *recv_peers) { - // TODO when source of peer is sent, also print source - FPRINTF (stdout, "%s\n", - GNUNET_i2s_full (recv_peer)); + uint64_t i; + (void) cls; + + if (0 == num_peers) + { + FPRINTF (stdout, "Empty view\n"); + } + req_handle = NULL; + for (i = 0; i < num_peers; i++) + { + FPRINTF (stdout, "%s\n", + GNUNET_i2s_full (&recv_peers[i])); + + if (1 == num_stream_peers) + { + ret = 0; + GNUNET_SCHEDULER_shutdown (); + break; + } + else if (1 < num_stream_peers) + { + num_stream_peers--; + } + } } diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 5b78bb4a8..4da73b09c 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx, out_msg->num_peers = htonl (view_size); GNUNET_memcpy (&out_msg[1], - view_array, - view_size * sizeof (struct GNUNET_PeerIdentity)); + view_array, + view_size * sizeof (struct GNUNET_PeerIdentity)); GNUNET_MQ_send (cli_ctx->mq, ev); } @@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx, /** * @brief Send peer from biased stream to client. * + * TODO merge with send_view, parameterise + * * @param cli_ctx the context of the client * @param view_array the peerids of the view as array (can be empty) * @param view_size the size of the view array (can be 0) */ void send_stream_peer (const struct ClientContext *cli_ctx, - const struct GNUNET_PeerIdentity *peer) + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; - GNUNET_assert (NULL != peer); + GNUNET_assert (NULL != peers); - ev = GNUNET_MQ_msg (out_msg, - GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); + ev = GNUNET_MQ_msg_extra (out_msg, + num_peers * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); + out_msg->num_peers = htonl (num_peers); - GNUNET_memcpy (&out_msg->peer, - peer, - sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (&out_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); GNUNET_MQ_send (cli_ctx->mq, ev); } @@ -2288,36 +2293,45 @@ clients_notify_view_update (void) * @brief sends updates to clients that are interested */ static void -clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) - //enum StreamPeerSource) +clients_notify_stream_peer (uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) + // TODO enum StreamPeerSource) { struct ClientContext *cli_ctx_iter; + uint64_t num_peers_send; LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer (%s) from biased stream - update all clients\n", - GNUNET_i2s (peer)); + GNUNET_i2s (peers)); - /* check size of view is small enough */ for (cli_ctx_iter = cli_ctx_head; NULL != cli_ctx_iter; cli_ctx_iter = cli_ctx_head->next) { - if (1 < cli_ctx_iter->stream_peers_left) + if (0 < cli_ctx_iter->stream_peers_left) { /* Client wants to receive limited amount of updates */ - cli_ctx_iter->stream_peers_left -= 1; - } else if (1 == cli_ctx_iter->stream_peers_left) + if (num_peers > cli_ctx_iter->stream_peers_left) + { + num_peers_send = num_peers - cli_ctx_iter->stream_peers_left; + cli_ctx_iter->stream_peers_left = 0; + } + else + { + num_peers_send = cli_ctx_iter->stream_peers_left - num_peers; + cli_ctx_iter->stream_peers_left -= num_peers_send; + } + } else if (0 > cli_ctx_iter->stream_peers_left) { - /* Last update of view for client */ - cli_ctx_iter->stream_peers_left = -1; - } else if (0 > cli_ctx_iter->stream_peers_left) { /* Client is not interested in updates */ continue; + } else /* _updates_left == 0 - infinite amount of updates */ + { + num_peers_send = num_peers; } - /* else _updates_left == 0 - infinite amount of updates */ /* send view */ - send_stream_peer (cli_ctx_iter, peer); + send_stream_peer (cli_ctx_iter, num_peers_send, peers); } } @@ -2338,7 +2352,7 @@ hist_update (void *cls, inserted = insert_in_view (&ids[i]); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (&ids[i]); + clients_notify_stream_peer (1, &ids[i]); } to_file (file_name_view_log, "+%s\t(hist)", @@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls, inserted = insert_in_view (peer); if (GNUNET_OK == inserted) { - clients_notify_stream_peer (peer); + clients_notify_stream_peer (1, peer); } } @@ -3834,7 +3848,7 @@ do_round (void *cls) permut[i])); if (GNUNET_OK == inserted) { - clients_notify_stream_peer ( + clients_notify_stream_peer (1, CustomPeerMap_get_peer_by_index (push_map, permut[i])); } to_file (file_name_view_log, @@ -3855,7 +3869,7 @@ do_round (void *cls) permut[i - first_border])); if (GNUNET_OK == inserted) { - clients_notify_stream_peer ( + clients_notify_stream_peer (1, CustomPeerMap_get_peer_by_index (push_map, permut[i])); } to_file (file_name_view_log, diff --git a/src/rps/rps.h b/src/rps/rps.h index 66b2dd962..26615bfc5 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h @@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply uint32_t id GNUNET_PACKED; /** - * @brief The peer of the biased stream + * Number of peers */ - struct GNUNET_PeerIdentity peer; + uint64_t num_peers GNUNET_PACKED; // TODO maybe source of peer (pull/push list, peerinfo, ...) + + /* Followed by num_peers * GNUNET_PeerIdentity */ }; GNUNET_NETWORK_STRUCT_END diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index b7644540d..96660ded6 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -371,6 +371,34 @@ handle_view_update (void *cls, } +/** + * This function is called, when the service sends another peer from the biased + * stream. + * It calls the callback the caller provided + * and disconnects afterwards. + * + * TODO merge with check_view_update + * + * @param msg the message + */ +static int +check_stream_input (void *cls, + const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) +{ + uint16_t msize = ntohs (msg->header.size); + uint32_t num_peers = ntohl (msg->num_peers); + (void) cls; + + msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply); + if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || + (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + /** * This function is called, when the service sends another peer from the biased * stream. @@ -384,14 +412,17 @@ handle_stream_input (void *cls, const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) { struct GNUNET_RPS_Handle *h = cls; + const struct GNUNET_PeerIdentity *peers; /* Give the peers back */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "New peer of biased input stream\n"); + "New peer of %" PRIu64 " biased input stream\n", + ntohl (msg->num_peers)); + peers = (struct GNUNET_PeerIdentity *) &msg[1]; GNUNET_assert (NULL != h); GNUNET_assert (NULL != h->stream_input_cb); - h->stream_input_cb (h->stream_input_cb, &msg->peer); + h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers); } @@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h) GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, struct GNUNET_RPS_CS_DEBUG_ViewReply, h), - GNUNET_MQ_hd_fixed_size (stream_input, - GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, - struct GNUNET_RPS_CS_DEBUG_StreamReply, - h), + GNUNET_MQ_hd_var_size (stream_input, + GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, + struct GNUNET_RPS_CS_DEBUG_StreamReply, + h), GNUNET_MQ_handler_end () }; -- cgit v1.2.3