diff options
author | Julius Bünger <buenger@mytum.de> | 2018-09-18 13:59:37 +0200 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2018-09-18 17:20:26 +0200 |
commit | bd6822783a5daa6d03f1af13e0b4f05ba56df42a (patch) | |
tree | efb66dd0dbae17a66783c751d3e16ef226a84363 /src/rps | |
parent | 2db3016405377f2d600f6029616eb1b5b391b685 (diff) | |
download | gnunet-bd6822783a5daa6d03f1af13e0b4f05ba56df42a.tar.gz gnunet-bd6822783a5daa6d03f1af13e0b4f05ba56df42a.zip |
Add possibility to send multiple peers to client
Diffstat (limited to 'src/rps')
-rw-r--r-- | src/rps/gnunet-rps.c | 32 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps.c | 64 | ||||
-rw-r--r-- | src/rps/rps.h | 6 | ||||
-rw-r--r-- | src/rps/rps_api.c | 43 |
4 files changed, 107 insertions, 38 deletions
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index d2c497fd4..d0f905f51 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c | |||
@@ -59,7 +59,7 @@ static int stream_input; | |||
59 | static uint64_t num_view_updates; | 59 | static uint64_t num_view_updates; |
60 | 60 | ||
61 | /** | 61 | /** |
62 | * @brief Number of updates we want to receive | 62 | * @brief Number of peers we want to receive from stream |
63 | */ | 63 | */ |
64 | static uint64_t num_stream_peers; | 64 | static uint64_t num_stream_peers; |
65 | 65 | ||
@@ -154,11 +154,33 @@ view_update_handle (void *cls, | |||
154 | */ | 154 | */ |
155 | static void | 155 | static void |
156 | stream_input_handle (void *cls, | 156 | stream_input_handle (void *cls, |
157 | const struct GNUNET_PeerIdentity *recv_peer) | 157 | uint64_t num_peers, |
158 | const struct GNUNET_PeerIdentity *recv_peers) | ||
158 | { | 159 | { |
159 | // TODO when source of peer is sent, also print source | 160 | uint64_t i; |
160 | FPRINTF (stdout, "%s\n", | 161 | (void) cls; |
161 | GNUNET_i2s_full (recv_peer)); | 162 | |
163 | if (0 == num_peers) | ||
164 | { | ||
165 | FPRINTF (stdout, "Empty view\n"); | ||
166 | } | ||
167 | req_handle = NULL; | ||
168 | for (i = 0; i < num_peers; i++) | ||
169 | { | ||
170 | FPRINTF (stdout, "%s\n", | ||
171 | GNUNET_i2s_full (&recv_peers[i])); | ||
172 | |||
173 | if (1 == num_stream_peers) | ||
174 | { | ||
175 | ret = 0; | ||
176 | GNUNET_SCHEDULER_shutdown (); | ||
177 | break; | ||
178 | } | ||
179 | else if (1 < num_stream_peers) | ||
180 | { | ||
181 | num_stream_peers--; | ||
182 | } | ||
183 | } | ||
162 | } | 184 | } |
163 | 185 | ||
164 | 186 | ||
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 5b78bb4a8..4da73b09c 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx, | |||
2208 | out_msg->num_peers = htonl (view_size); | 2208 | out_msg->num_peers = htonl (view_size); |
2209 | 2209 | ||
2210 | GNUNET_memcpy (&out_msg[1], | 2210 | GNUNET_memcpy (&out_msg[1], |
2211 | view_array, | 2211 | view_array, |
2212 | view_size * sizeof (struct GNUNET_PeerIdentity)); | 2212 | view_size * sizeof (struct GNUNET_PeerIdentity)); |
2213 | GNUNET_MQ_send (cli_ctx->mq, ev); | 2213 | GNUNET_MQ_send (cli_ctx->mq, ev); |
2214 | } | 2214 | } |
2215 | 2215 | ||
@@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx, | |||
2217 | /** | 2217 | /** |
2218 | * @brief Send peer from biased stream to client. | 2218 | * @brief Send peer from biased stream to client. |
2219 | * | 2219 | * |
2220 | * TODO merge with send_view, parameterise | ||
2221 | * | ||
2220 | * @param cli_ctx the context of the client | 2222 | * @param cli_ctx the context of the client |
2221 | * @param view_array the peerids of the view as array (can be empty) | 2223 | * @param view_array the peerids of the view as array (can be empty) |
2222 | * @param view_size the size of the view array (can be 0) | 2224 | * @param view_size the size of the view array (can be 0) |
2223 | */ | 2225 | */ |
2224 | void | 2226 | void |
2225 | send_stream_peer (const struct ClientContext *cli_ctx, | 2227 | send_stream_peer (const struct ClientContext *cli_ctx, |
2226 | const struct GNUNET_PeerIdentity *peer) | 2228 | uint64_t num_peers, |
2229 | const struct GNUNET_PeerIdentity *peers) | ||
2227 | { | 2230 | { |
2228 | struct GNUNET_MQ_Envelope *ev; | 2231 | struct GNUNET_MQ_Envelope *ev; |
2229 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; | 2232 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; |
2230 | 2233 | ||
2231 | GNUNET_assert (NULL != peer); | 2234 | GNUNET_assert (NULL != peers); |
2232 | 2235 | ||
2233 | ev = GNUNET_MQ_msg (out_msg, | 2236 | ev = GNUNET_MQ_msg_extra (out_msg, |
2234 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); | 2237 | num_peers * sizeof (struct GNUNET_PeerIdentity), |
2238 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); | ||
2239 | out_msg->num_peers = htonl (num_peers); | ||
2235 | 2240 | ||
2236 | GNUNET_memcpy (&out_msg->peer, | 2241 | GNUNET_memcpy (&out_msg[1], |
2237 | peer, | 2242 | peers, |
2238 | sizeof (struct GNUNET_PeerIdentity)); | 2243 | num_peers * sizeof (struct GNUNET_PeerIdentity)); |
2239 | GNUNET_MQ_send (cli_ctx->mq, ev); | 2244 | GNUNET_MQ_send (cli_ctx->mq, ev); |
2240 | } | 2245 | } |
2241 | 2246 | ||
@@ -2288,36 +2293,45 @@ clients_notify_view_update (void) | |||
2288 | * @brief sends updates to clients that are interested | 2293 | * @brief sends updates to clients that are interested |
2289 | */ | 2294 | */ |
2290 | static void | 2295 | static void |
2291 | clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) | 2296 | clients_notify_stream_peer (uint64_t num_peers, |
2292 | //enum StreamPeerSource) | 2297 | const struct GNUNET_PeerIdentity *peers) |
2298 | // TODO enum StreamPeerSource) | ||
2293 | { | 2299 | { |
2294 | struct ClientContext *cli_ctx_iter; | 2300 | struct ClientContext *cli_ctx_iter; |
2301 | uint64_t num_peers_send; | ||
2295 | 2302 | ||
2296 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2303 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2297 | "Got peer (%s) from biased stream - update all clients\n", | 2304 | "Got peer (%s) from biased stream - update all clients\n", |
2298 | GNUNET_i2s (peer)); | 2305 | GNUNET_i2s (peers)); |
2299 | 2306 | ||
2300 | /* check size of view is small enough */ | ||
2301 | for (cli_ctx_iter = cli_ctx_head; | 2307 | for (cli_ctx_iter = cli_ctx_head; |
2302 | NULL != cli_ctx_iter; | 2308 | NULL != cli_ctx_iter; |
2303 | cli_ctx_iter = cli_ctx_head->next) | 2309 | cli_ctx_iter = cli_ctx_head->next) |
2304 | { | 2310 | { |
2305 | if (1 < cli_ctx_iter->stream_peers_left) | 2311 | if (0 < cli_ctx_iter->stream_peers_left) |
2306 | { | 2312 | { |
2307 | /* Client wants to receive limited amount of updates */ | 2313 | /* Client wants to receive limited amount of updates */ |
2308 | cli_ctx_iter->stream_peers_left -= 1; | 2314 | if (num_peers > cli_ctx_iter->stream_peers_left) |
2309 | } else if (1 == cli_ctx_iter->stream_peers_left) | 2315 | { |
2316 | num_peers_send = num_peers - cli_ctx_iter->stream_peers_left; | ||
2317 | cli_ctx_iter->stream_peers_left = 0; | ||
2318 | } | ||
2319 | else | ||
2320 | { | ||
2321 | num_peers_send = cli_ctx_iter->stream_peers_left - num_peers; | ||
2322 | cli_ctx_iter->stream_peers_left -= num_peers_send; | ||
2323 | } | ||
2324 | } else if (0 > cli_ctx_iter->stream_peers_left) | ||
2310 | { | 2325 | { |
2311 | /* Last update of view for client */ | ||
2312 | cli_ctx_iter->stream_peers_left = -1; | ||
2313 | } else if (0 > cli_ctx_iter->stream_peers_left) { | ||
2314 | /* Client is not interested in updates */ | 2326 | /* Client is not interested in updates */ |
2315 | continue; | 2327 | continue; |
2328 | } else /* _updates_left == 0 - infinite amount of updates */ | ||
2329 | { | ||
2330 | num_peers_send = num_peers; | ||
2316 | } | 2331 | } |
2317 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2318 | 2332 | ||
2319 | /* send view */ | 2333 | /* send view */ |
2320 | send_stream_peer (cli_ctx_iter, peer); | 2334 | send_stream_peer (cli_ctx_iter, num_peers_send, peers); |
2321 | } | 2335 | } |
2322 | } | 2336 | } |
2323 | 2337 | ||
@@ -2338,7 +2352,7 @@ hist_update (void *cls, | |||
2338 | inserted = insert_in_view (&ids[i]); | 2352 | inserted = insert_in_view (&ids[i]); |
2339 | if (GNUNET_OK == inserted) | 2353 | if (GNUNET_OK == inserted) |
2340 | { | 2354 | { |
2341 | clients_notify_stream_peer (&ids[i]); | 2355 | clients_notify_stream_peer (1, &ids[i]); |
2342 | } | 2356 | } |
2343 | to_file (file_name_view_log, | 2357 | to_file (file_name_view_log, |
2344 | "+%s\t(hist)", | 2358 | "+%s\t(hist)", |
@@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls, | |||
2549 | inserted = insert_in_view (peer); | 2563 | inserted = insert_in_view (peer); |
2550 | if (GNUNET_OK == inserted) | 2564 | if (GNUNET_OK == inserted) |
2551 | { | 2565 | { |
2552 | clients_notify_stream_peer (peer); | 2566 | clients_notify_stream_peer (1, peer); |
2553 | } | 2567 | } |
2554 | } | 2568 | } |
2555 | 2569 | ||
@@ -3834,7 +3848,7 @@ do_round (void *cls) | |||
3834 | permut[i])); | 3848 | permut[i])); |
3835 | if (GNUNET_OK == inserted) | 3849 | if (GNUNET_OK == inserted) |
3836 | { | 3850 | { |
3837 | clients_notify_stream_peer ( | 3851 | clients_notify_stream_peer (1, |
3838 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | 3852 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); |
3839 | } | 3853 | } |
3840 | to_file (file_name_view_log, | 3854 | to_file (file_name_view_log, |
@@ -3855,7 +3869,7 @@ do_round (void *cls) | |||
3855 | permut[i - first_border])); | 3869 | permut[i - first_border])); |
3856 | if (GNUNET_OK == inserted) | 3870 | if (GNUNET_OK == inserted) |
3857 | { | 3871 | { |
3858 | clients_notify_stream_peer ( | 3872 | clients_notify_stream_peer (1, |
3859 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | 3873 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); |
3860 | } | 3874 | } |
3861 | to_file (file_name_view_log, | 3875 | to_file (file_name_view_log, |
diff --git a/src/rps/rps.h b/src/rps/rps.h index 66b2dd962..26615bfc5 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h | |||
@@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply | |||
250 | uint32_t id GNUNET_PACKED; | 250 | uint32_t id GNUNET_PACKED; |
251 | 251 | ||
252 | /** | 252 | /** |
253 | * @brief The peer of the biased stream | 253 | * Number of peers |
254 | */ | 254 | */ |
255 | struct GNUNET_PeerIdentity peer; | 255 | uint64_t num_peers GNUNET_PACKED; |
256 | 256 | ||
257 | // TODO maybe source of peer (pull/push list, peerinfo, ...) | 257 | // TODO maybe source of peer (pull/push list, peerinfo, ...) |
258 | |||
259 | /* Followed by num_peers * GNUNET_PeerIdentity */ | ||
258 | }; | 260 | }; |
259 | 261 | ||
260 | GNUNET_NETWORK_STRUCT_END | 262 | GNUNET_NETWORK_STRUCT_END |
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index b7644540d..96660ded6 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -377,6 +377,34 @@ handle_view_update (void *cls, | |||
377 | * It calls the callback the caller provided | 377 | * It calls the callback the caller provided |
378 | * and disconnects afterwards. | 378 | * and disconnects afterwards. |
379 | * | 379 | * |
380 | * TODO merge with check_view_update | ||
381 | * | ||
382 | * @param msg the message | ||
383 | */ | ||
384 | static int | ||
385 | check_stream_input (void *cls, | ||
386 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | ||
387 | { | ||
388 | uint16_t msize = ntohs (msg->header.size); | ||
389 | uint32_t num_peers = ntohl (msg->num_peers); | ||
390 | (void) cls; | ||
391 | |||
392 | msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply); | ||
393 | if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || | ||
394 | (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) | ||
395 | { | ||
396 | GNUNET_break (0); | ||
397 | return GNUNET_SYSERR; | ||
398 | } | ||
399 | return GNUNET_OK; | ||
400 | } | ||
401 | |||
402 | /** | ||
403 | * This function is called, when the service sends another peer from the biased | ||
404 | * stream. | ||
405 | * It calls the callback the caller provided | ||
406 | * and disconnects afterwards. | ||
407 | * | ||
380 | * @param msg the message | 408 | * @param msg the message |
381 | */ | 409 | */ |
382 | static void | 410 | static void |
@@ -384,14 +412,17 @@ handle_stream_input (void *cls, | |||
384 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | 412 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) |
385 | { | 413 | { |
386 | struct GNUNET_RPS_Handle *h = cls; | 414 | struct GNUNET_RPS_Handle *h = cls; |
415 | const struct GNUNET_PeerIdentity *peers; | ||
387 | 416 | ||
388 | /* Give the peers back */ | 417 | /* Give the peers back */ |
389 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 418 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
390 | "New peer of biased input stream\n"); | 419 | "New peer of %" PRIu64 " biased input stream\n", |
420 | ntohl (msg->num_peers)); | ||
391 | 421 | ||
422 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
392 | GNUNET_assert (NULL != h); | 423 | GNUNET_assert (NULL != h); |
393 | GNUNET_assert (NULL != h->stream_input_cb); | 424 | GNUNET_assert (NULL != h->stream_input_cb); |
394 | h->stream_input_cb (h->stream_input_cb, &msg->peer); | 425 | h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers); |
395 | } | 426 | } |
396 | 427 | ||
397 | 428 | ||
@@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h) | |||
444 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, | 475 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, |
445 | struct GNUNET_RPS_CS_DEBUG_ViewReply, | 476 | struct GNUNET_RPS_CS_DEBUG_ViewReply, |
446 | h), | 477 | h), |
447 | GNUNET_MQ_hd_fixed_size (stream_input, | 478 | GNUNET_MQ_hd_var_size (stream_input, |
448 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, | 479 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, |
449 | struct GNUNET_RPS_CS_DEBUG_StreamReply, | 480 | struct GNUNET_RPS_CS_DEBUG_StreamReply, |
450 | h), | 481 | h), |
451 | GNUNET_MQ_handler_end () | 482 | GNUNET_MQ_handler_end () |
452 | }; | 483 | }; |
453 | 484 | ||