aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_rps_service.h1
-rw-r--r--src/rps/gnunet-rps.c32
-rw-r--r--src/rps/gnunet-service-rps.c64
-rw-r--r--src/rps/rps.h6
-rw-r--r--src/rps/rps_api.c43
5 files changed, 108 insertions, 38 deletions
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index 252188c62..eda012076 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -79,6 +79,7 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
79 * @param peer The received peer 79 * @param peer The received peer
80 */ 80 */
81typedef void (* GNUNET_RPS_StreamInputCB) (void *cls, 81typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
82 uint64_t num_peers,
82 const struct GNUNET_PeerIdentity *peer); 83 const struct GNUNET_PeerIdentity *peer);
83 84
84/** 85/**
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index d2c497fd4..d0f905f51 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -59,7 +59,7 @@ static int stream_input;
59static uint64_t num_view_updates; 59static uint64_t num_view_updates;
60 60
61/** 61/**
62 * @brief Number of updates we want to receive 62 * @brief Number of peers we want to receive from stream
63 */ 63 */
64static uint64_t num_stream_peers; 64static uint64_t num_stream_peers;
65 65
@@ -154,11 +154,33 @@ view_update_handle (void *cls,
154 */ 154 */
155static void 155static void
156stream_input_handle (void *cls, 156stream_input_handle (void *cls,
157 const struct GNUNET_PeerIdentity *recv_peer) 157 uint64_t num_peers,
158 const struct GNUNET_PeerIdentity *recv_peers)
158{ 159{
159 // TODO when source of peer is sent, also print source 160 uint64_t i;
160 FPRINTF (stdout, "%s\n", 161 (void) cls;
161 GNUNET_i2s_full (recv_peer)); 162
163 if (0 == num_peers)
164 {
165 FPRINTF (stdout, "Empty view\n");
166 }
167 req_handle = NULL;
168 for (i = 0; i < num_peers; i++)
169 {
170 FPRINTF (stdout, "%s\n",
171 GNUNET_i2s_full (&recv_peers[i]));
172
173 if (1 == num_stream_peers)
174 {
175 ret = 0;
176 GNUNET_SCHEDULER_shutdown ();
177 break;
178 }
179 else if (1 < num_stream_peers)
180 {
181 num_stream_peers--;
182 }
183 }
162} 184}
163 185
164 186
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 5b78bb4a8..4da73b09c 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx,
2208 out_msg->num_peers = htonl (view_size); 2208 out_msg->num_peers = htonl (view_size);
2209 2209
2210 GNUNET_memcpy (&out_msg[1], 2210 GNUNET_memcpy (&out_msg[1],
2211 view_array, 2211 view_array,
2212 view_size * sizeof (struct GNUNET_PeerIdentity)); 2212 view_size * sizeof (struct GNUNET_PeerIdentity));
2213 GNUNET_MQ_send (cli_ctx->mq, ev); 2213 GNUNET_MQ_send (cli_ctx->mq, ev);
2214} 2214}
2215 2215
@@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx,
2217/** 2217/**
2218 * @brief Send peer from biased stream to client. 2218 * @brief Send peer from biased stream to client.
2219 * 2219 *
2220 * TODO merge with send_view, parameterise
2221 *
2220 * @param cli_ctx the context of the client 2222 * @param cli_ctx the context of the client
2221 * @param view_array the peerids of the view as array (can be empty) 2223 * @param view_array the peerids of the view as array (can be empty)
2222 * @param view_size the size of the view array (can be 0) 2224 * @param view_size the size of the view array (can be 0)
2223 */ 2225 */
2224void 2226void
2225send_stream_peer (const struct ClientContext *cli_ctx, 2227send_stream_peer (const struct ClientContext *cli_ctx,
2226 const struct GNUNET_PeerIdentity *peer) 2228 uint64_t num_peers,
2229 const struct GNUNET_PeerIdentity *peers)
2227{ 2230{
2228 struct GNUNET_MQ_Envelope *ev; 2231 struct GNUNET_MQ_Envelope *ev;
2229 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; 2232 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2230 2233
2231 GNUNET_assert (NULL != peer); 2234 GNUNET_assert (NULL != peers);
2232 2235
2233 ev = GNUNET_MQ_msg (out_msg, 2236 ev = GNUNET_MQ_msg_extra (out_msg,
2234 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); 2237 num_peers * sizeof (struct GNUNET_PeerIdentity),
2238 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2239 out_msg->num_peers = htonl (num_peers);
2235 2240
2236 GNUNET_memcpy (&out_msg->peer, 2241 GNUNET_memcpy (&out_msg[1],
2237 peer, 2242 peers,
2238 sizeof (struct GNUNET_PeerIdentity)); 2243 num_peers * sizeof (struct GNUNET_PeerIdentity));
2239 GNUNET_MQ_send (cli_ctx->mq, ev); 2244 GNUNET_MQ_send (cli_ctx->mq, ev);
2240} 2245}
2241 2246
@@ -2288,36 +2293,45 @@ clients_notify_view_update (void)
2288 * @brief sends updates to clients that are interested 2293 * @brief sends updates to clients that are interested
2289 */ 2294 */
2290static void 2295static void
2291clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) 2296clients_notify_stream_peer (uint64_t num_peers,
2292 //enum StreamPeerSource) 2297 const struct GNUNET_PeerIdentity *peers)
2298 // TODO enum StreamPeerSource)
2293{ 2299{
2294 struct ClientContext *cli_ctx_iter; 2300 struct ClientContext *cli_ctx_iter;
2301 uint64_t num_peers_send;
2295 2302
2296 LOG (GNUNET_ERROR_TYPE_DEBUG, 2303 LOG (GNUNET_ERROR_TYPE_DEBUG,
2297 "Got peer (%s) from biased stream - update all clients\n", 2304 "Got peer (%s) from biased stream - update all clients\n",
2298 GNUNET_i2s (peer)); 2305 GNUNET_i2s (peers));
2299 2306
2300 /* check size of view is small enough */
2301 for (cli_ctx_iter = cli_ctx_head; 2307 for (cli_ctx_iter = cli_ctx_head;
2302 NULL != cli_ctx_iter; 2308 NULL != cli_ctx_iter;
2303 cli_ctx_iter = cli_ctx_head->next) 2309 cli_ctx_iter = cli_ctx_head->next)
2304 { 2310 {
2305 if (1 < cli_ctx_iter->stream_peers_left) 2311 if (0 < cli_ctx_iter->stream_peers_left)
2306 { 2312 {
2307 /* Client wants to receive limited amount of updates */ 2313 /* Client wants to receive limited amount of updates */
2308 cli_ctx_iter->stream_peers_left -= 1; 2314 if (num_peers > cli_ctx_iter->stream_peers_left)
2309 } else if (1 == cli_ctx_iter->stream_peers_left) 2315 {
2316 num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
2317 cli_ctx_iter->stream_peers_left = 0;
2318 }
2319 else
2320 {
2321 num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
2322 cli_ctx_iter->stream_peers_left -= num_peers_send;
2323 }
2324 } else if (0 > cli_ctx_iter->stream_peers_left)
2310 { 2325 {
2311 /* Last update of view for client */
2312 cli_ctx_iter->stream_peers_left = -1;
2313 } else if (0 > cli_ctx_iter->stream_peers_left) {
2314 /* Client is not interested in updates */ 2326 /* Client is not interested in updates */
2315 continue; 2327 continue;
2328 } else /* _updates_left == 0 - infinite amount of updates */
2329 {
2330 num_peers_send = num_peers;
2316 } 2331 }
2317 /* else _updates_left == 0 - infinite amount of updates */
2318 2332
2319 /* send view */ 2333 /* send view */
2320 send_stream_peer (cli_ctx_iter, peer); 2334 send_stream_peer (cli_ctx_iter, num_peers_send, peers);
2321 } 2335 }
2322} 2336}
2323 2337
@@ -2338,7 +2352,7 @@ hist_update (void *cls,
2338 inserted = insert_in_view (&ids[i]); 2352 inserted = insert_in_view (&ids[i]);
2339 if (GNUNET_OK == inserted) 2353 if (GNUNET_OK == inserted)
2340 { 2354 {
2341 clients_notify_stream_peer (&ids[i]); 2355 clients_notify_stream_peer (1, &ids[i]);
2342 } 2356 }
2343 to_file (file_name_view_log, 2357 to_file (file_name_view_log,
2344 "+%s\t(hist)", 2358 "+%s\t(hist)",
@@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls,
2549 inserted = insert_in_view (peer); 2563 inserted = insert_in_view (peer);
2550 if (GNUNET_OK == inserted) 2564 if (GNUNET_OK == inserted)
2551 { 2565 {
2552 clients_notify_stream_peer (peer); 2566 clients_notify_stream_peer (1, peer);
2553 } 2567 }
2554} 2568}
2555 2569
@@ -3834,7 +3848,7 @@ do_round (void *cls)
3834 permut[i])); 3848 permut[i]));
3835 if (GNUNET_OK == inserted) 3849 if (GNUNET_OK == inserted)
3836 { 3850 {
3837 clients_notify_stream_peer ( 3851 clients_notify_stream_peer (1,
3838 CustomPeerMap_get_peer_by_index (push_map, permut[i])); 3852 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3839 } 3853 }
3840 to_file (file_name_view_log, 3854 to_file (file_name_view_log,
@@ -3855,7 +3869,7 @@ do_round (void *cls)
3855 permut[i - first_border])); 3869 permut[i - first_border]));
3856 if (GNUNET_OK == inserted) 3870 if (GNUNET_OK == inserted)
3857 { 3871 {
3858 clients_notify_stream_peer ( 3872 clients_notify_stream_peer (1,
3859 CustomPeerMap_get_peer_by_index (push_map, permut[i])); 3873 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3860 } 3874 }
3861 to_file (file_name_view_log, 3875 to_file (file_name_view_log,
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 66b2dd962..26615bfc5 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply
250 uint32_t id GNUNET_PACKED; 250 uint32_t id GNUNET_PACKED;
251 251
252 /** 252 /**
253 * @brief The peer of the biased stream 253 * Number of peers
254 */ 254 */
255 struct GNUNET_PeerIdentity peer; 255 uint64_t num_peers GNUNET_PACKED;
256 256
257 // TODO maybe source of peer (pull/push list, peerinfo, ...) 257 // TODO maybe source of peer (pull/push list, peerinfo, ...)
258
259 /* Followed by num_peers * GNUNET_PeerIdentity */
258}; 260};
259 261
260GNUNET_NETWORK_STRUCT_END 262GNUNET_NETWORK_STRUCT_END
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index b7644540d..96660ded6 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -377,6 +377,34 @@ handle_view_update (void *cls,
377 * It calls the callback the caller provided 377 * It calls the callback the caller provided
378 * and disconnects afterwards. 378 * and disconnects afterwards.
379 * 379 *
380 * TODO merge with check_view_update
381 *
382 * @param msg the message
383 */
384static int
385check_stream_input (void *cls,
386 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
387{
388 uint16_t msize = ntohs (msg->header.size);
389 uint32_t num_peers = ntohl (msg->num_peers);
390 (void) cls;
391
392 msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
393 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
394 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
395 {
396 GNUNET_break (0);
397 return GNUNET_SYSERR;
398 }
399 return GNUNET_OK;
400}
401
402/**
403 * This function is called, when the service sends another peer from the biased
404 * stream.
405 * It calls the callback the caller provided
406 * and disconnects afterwards.
407 *
380 * @param msg the message 408 * @param msg the message
381 */ 409 */
382static void 410static void
@@ -384,14 +412,17 @@ handle_stream_input (void *cls,
384 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 412 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
385{ 413{
386 struct GNUNET_RPS_Handle *h = cls; 414 struct GNUNET_RPS_Handle *h = cls;
415 const struct GNUNET_PeerIdentity *peers;
387 416
388 /* Give the peers back */ 417 /* Give the peers back */
389 LOG (GNUNET_ERROR_TYPE_DEBUG, 418 LOG (GNUNET_ERROR_TYPE_DEBUG,
390 "New peer of biased input stream\n"); 419 "New peer of %" PRIu64 " biased input stream\n",
420 ntohl (msg->num_peers));
391 421
422 peers = (struct GNUNET_PeerIdentity *) &msg[1];
392 GNUNET_assert (NULL != h); 423 GNUNET_assert (NULL != h);
393 GNUNET_assert (NULL != h->stream_input_cb); 424 GNUNET_assert (NULL != h->stream_input_cb);
394 h->stream_input_cb (h->stream_input_cb, &msg->peer); 425 h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers);
395} 426}
396 427
397 428
@@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
444 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 475 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
445 struct GNUNET_RPS_CS_DEBUG_ViewReply, 476 struct GNUNET_RPS_CS_DEBUG_ViewReply,
446 h), 477 h),
447 GNUNET_MQ_hd_fixed_size (stream_input, 478 GNUNET_MQ_hd_var_size (stream_input,
448 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, 479 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
449 struct GNUNET_RPS_CS_DEBUG_StreamReply, 480 struct GNUNET_RPS_CS_DEBUG_StreamReply,
450 h), 481 h),
451 GNUNET_MQ_handler_end () 482 GNUNET_MQ_handler_end ()
452 }; 483 };
453 484