diff options
Diffstat (limited to 'src/rps')
-rw-r--r-- | src/rps/gnunet-rps.c | 31 | ||||
-rw-r--r-- | src/rps/rps_api.c | 79 | ||||
-rw-r--r-- | src/rps/test_rps.c | 1 |
3 files changed, 10 insertions, 101 deletions
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index d0f905f51..49189481f 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c | |||
@@ -58,11 +58,6 @@ static int stream_input; | |||
58 | */ | 58 | */ |
59 | static uint64_t num_view_updates; | 59 | static uint64_t num_view_updates; |
60 | 60 | ||
61 | /** | ||
62 | * @brief Number of peers we want to receive from stream | ||
63 | */ | ||
64 | static uint64_t num_stream_peers; | ||
65 | |||
66 | 61 | ||
67 | /** | 62 | /** |
68 | * Task run when user presses CTRL-C to abort. | 63 | * Task run when user presses CTRL-C to abort. |
@@ -162,24 +157,13 @@ stream_input_handle (void *cls, | |||
162 | 157 | ||
163 | if (0 == num_peers) | 158 | if (0 == num_peers) |
164 | { | 159 | { |
165 | FPRINTF (stdout, "Empty view\n"); | 160 | FPRINTF (stdout, "No peer was returned\n"); |
166 | } | 161 | } |
167 | req_handle = NULL; | 162 | req_handle = NULL; |
168 | for (i = 0; i < num_peers; i++) | 163 | for (i = 0; i < num_peers; i++) |
169 | { | 164 | { |
170 | FPRINTF (stdout, "%s\n", | 165 | FPRINTF (stdout, "%s\n", |
171 | GNUNET_i2s_full (&recv_peers[i])); | 166 | GNUNET_i2s_full (&recv_peers[i])); |
172 | |||
173 | if (1 == num_stream_peers) | ||
174 | { | ||
175 | ret = 0; | ||
176 | GNUNET_SCHEDULER_shutdown (); | ||
177 | break; | ||
178 | } | ||
179 | else if (1 < num_stream_peers) | ||
180 | { | ||
181 | num_stream_peers--; | ||
182 | } | ||
183 | } | 167 | } |
184 | } | 168 | } |
185 | 169 | ||
@@ -243,18 +227,7 @@ run (void *cls, | |||
243 | } else if (stream_input) | 227 | } else if (stream_input) |
244 | { | 228 | { |
245 | /* Get updates of view */ | 229 | /* Get updates of view */ |
246 | if (NULL == args[0] || | 230 | GNUNET_RPS_stream_request (rps_handle, stream_input_handle, NULL); |
247 | 0 == sscanf (args[0], "%lu", &num_stream_peers)) | ||
248 | { | ||
249 | num_stream_peers = 0; | ||
250 | } | ||
251 | GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL); | ||
252 | if (0 != num_stream_peers) | ||
253 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
254 | "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers); | ||
255 | else | ||
256 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
257 | "Requesting continuous peers from biased stream\n"); | ||
258 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); | 231 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); |
259 | } | 232 | } |
260 | else | 233 | else |
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 5c68e4337..02d833506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -42,11 +42,6 @@ struct GNUNET_RPS_StreamRequestHandle | |||
42 | struct GNUNET_RPS_Handle *rps_handle; | 42 | struct GNUNET_RPS_Handle *rps_handle; |
43 | 43 | ||
44 | /** | 44 | /** |
45 | * The number of requested peers. | ||
46 | */ | ||
47 | uint32_t num_peers_left; | ||
48 | |||
49 | /** | ||
50 | * The callback to be called when we receive an answer. | 45 | * The callback to be called when we receive an answer. |
51 | */ | 46 | */ |
52 | GNUNET_RPS_NotifyReadyCB ready_cb; | 47 | GNUNET_RPS_NotifyReadyCB ready_cb; |
@@ -188,7 +183,6 @@ struct cb_cls_pack | |||
188 | */ | 183 | */ |
189 | static struct GNUNET_RPS_StreamRequestHandle * | 184 | static struct GNUNET_RPS_StreamRequestHandle * |
190 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 185 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
191 | uint64_t num_peers, | ||
192 | GNUNET_RPS_NotifyReadyCB ready_cb, | 186 | GNUNET_RPS_NotifyReadyCB ready_cb, |
193 | void *cls) | 187 | void *cls) |
194 | { | 188 | { |
@@ -197,7 +191,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | |||
197 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); | 191 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); |
198 | 192 | ||
199 | srh->rps_handle = rps_handle; | 193 | srh->rps_handle = rps_handle; |
200 | srh->num_peers_left = num_peers; | ||
201 | srh->ready_cb = ready_cb; | 194 | srh->ready_cb = ready_cb; |
202 | srh->ready_cb_cls = cls; | 195 | srh->ready_cb_cls = cls; |
203 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, | 196 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, |
@@ -327,14 +320,11 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) | |||
327 | * Request biased stream of peers that are being put into the sampler | 320 | * Request biased stream of peers that are being put into the sampler |
328 | * | 321 | * |
329 | * @param rps_handle handle to the rps service | 322 | * @param rps_handle handle to the rps service |
330 | * @param num_req_peers number of peers we want to receive | ||
331 | * (0 for infinite updates) | ||
332 | * @param cls a closure that will be given to the callback | 323 | * @param cls a closure that will be given to the callback |
333 | * @param ready_cb the callback called when the peers are available | 324 | * @param ready_cb the callback called when the peers are available |
334 | */ | 325 | */ |
335 | struct GNUNET_RPS_StreamRequestHandle * | 326 | struct GNUNET_RPS_StreamRequestHandle * |
336 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 327 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
337 | uint32_t num_peers, | ||
338 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | 328 | GNUNET_RPS_NotifyReadyCB stream_input_cb, |
339 | void *cls) | 329 | void *cls) |
340 | { | 330 | { |
@@ -343,12 +333,9 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | |||
343 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | 333 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; |
344 | 334 | ||
345 | srh = new_stream_request (rps_handle, | 335 | srh = new_stream_request (rps_handle, |
346 | num_peers, /* num requests */ | ||
347 | stream_input_cb, | 336 | stream_input_cb, |
348 | cls); | 337 | cls); |
349 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 338 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); |
350 | "Client requests %" PRIu32 " biased stream updates\n", | ||
351 | num_peers); | ||
352 | 339 | ||
353 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | 340 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); |
354 | GNUNET_MQ_send (rps_handle->mq, ev); | 341 | GNUNET_MQ_send (rps_handle->mq, ev); |
@@ -492,68 +479,21 @@ handle_stream_input (void *cls, | |||
492 | { | 479 | { |
493 | struct GNUNET_RPS_Handle *h = cls; | 480 | struct GNUNET_RPS_Handle *h = cls; |
494 | const struct GNUNET_PeerIdentity *peers; | 481 | const struct GNUNET_PeerIdentity *peers; |
495 | /* The following two pointers are used to prevent that new handles are | ||
496 | * inserted into the DLL, that is currently iterated over, from within a call | ||
497 | * to that handler_cb, are executed and in turn again add themselves to the | ||
498 | * iterated DLL infinitely */ | ||
499 | struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; | ||
500 | struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; | ||
501 | uint64_t num_peers; | 482 | uint64_t num_peers; |
502 | uint64_t num_peers_return; | ||
503 | 483 | ||
504 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 484 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
505 | num_peers = ntohl (msg->num_peers); | 485 | num_peers = ntohl (msg->num_peers); |
506 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 486 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
507 | "Received %" PRIu64 " peer(s) from stream input.\n", | 487 | "Received %" PRIu64 " peer(s) from stream input.\n", |
508 | num_peers); | 488 | num_peers); |
509 | srh_head_tmp = h->stream_requests_head; | 489 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; |
510 | srh_tail_tmp = h->stream_requests_tail; | ||
511 | h->stream_requests_head = NULL; | ||
512 | h->stream_requests_tail = NULL; | ||
513 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
514 | NULL != srh_iter; | 490 | NULL != srh_iter; |
515 | srh_iter = srh_iter->next) | 491 | srh_iter = srh_iter->next) |
516 | { | 492 | { |
517 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 493 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); |
518 | "Calling srh - left: %" PRIu64 "\n", | ||
519 | srh_iter->num_peers_left); | ||
520 | if (0 == srh_iter->num_peers_left) /* infinite updates */ | ||
521 | { | ||
522 | num_peers_return = num_peers; | ||
523 | } | ||
524 | else if (num_peers > srh_iter->num_peers_left) | ||
525 | { | ||
526 | num_peers_return = num_peers - srh_iter->num_peers_left; | ||
527 | } | ||
528 | else /* num_peers <= srh_iter->num_peers_left */ | ||
529 | { | ||
530 | num_peers_return = srh_iter->num_peers_left - num_peers; | ||
531 | } | ||
532 | srh_iter->ready_cb (srh_iter->ready_cb_cls, | 494 | srh_iter->ready_cb (srh_iter->ready_cb_cls, |
533 | num_peers_return, | 495 | num_peers, |
534 | peers); | 496 | peers); |
535 | if (0 == srh_iter->num_peers_left) ; | ||
536 | else if (num_peers_return >= srh_iter->num_peers_left) | ||
537 | { | ||
538 | remove_stream_request (srh_iter, | ||
539 | srh_head_tmp, | ||
540 | srh_tail_tmp); | ||
541 | } | ||
542 | else | ||
543 | { | ||
544 | srh_iter->num_peers_left -= num_peers_return; | ||
545 | } | ||
546 | } | ||
547 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
548 | NULL != srh_iter; | ||
549 | srh_iter = srh_iter->next) | ||
550 | { | ||
551 | GNUNET_CONTAINER_DLL_remove (srh_head_tmp, | ||
552 | srh_tail_tmp, | ||
553 | srh_iter); | ||
554 | GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, | ||
555 | h->stream_requests_tail, | ||
556 | srh_iter); | ||
557 | } | 497 | } |
558 | 498 | ||
559 | if (NULL == h->stream_requests_head) | 499 | if (NULL == h->stream_requests_head) |
@@ -738,7 +678,6 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
738 | peers_ready_cb, | 678 | peers_ready_cb, |
739 | rh); | 679 | rh); |
740 | rh->srh = GNUNET_RPS_stream_request (rps_handle, | 680 | rh->srh = GNUNET_RPS_stream_request (rps_handle, |
741 | 0, /* infinite updates */ | ||
742 | collect_peers_cb, | 681 | collect_peers_cb, |
743 | rh); /* cls */ | 682 | rh); /* cls */ |
744 | rh->ready_cb = ready_cb; | 683 | rh->ready_cb = ready_cb; |
@@ -913,12 +852,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | |||
913 | struct GNUNET_RPS_Handle *h; | 852 | struct GNUNET_RPS_Handle *h; |
914 | 853 | ||
915 | h = rh->rps_handle; | 854 | h = rh->rps_handle; |
916 | if (NULL != rh->srh) | 855 | GNUNET_assert (NULL != rh->srh); |
917 | { | 856 | remove_stream_request (rh->srh, |
918 | remove_stream_request (rh->srh, | 857 | h->stream_requests_head, |
919 | h->stream_requests_head, | 858 | h->stream_requests_tail); |
920 | h->stream_requests_tail); | ||
921 | } | ||
922 | if (NULL == h->stream_requests_head) cancel_stream(h); | 859 | if (NULL == h->stream_requests_head) cancel_stream(h); |
923 | if (NULL != rh->sampler_rh) | 860 | if (NULL != rh->sampler_rh) |
924 | { | 861 | { |
diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 1c98a1e5e..0740d01df 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c | |||
@@ -1606,7 +1606,6 @@ sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h) | |||
1606 | if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test"); | 1606 | if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test"); |
1607 | else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */ | 1607 | else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */ |
1608 | rps_peer->rps_srh = GNUNET_RPS_stream_request (h, | 1608 | rps_peer->rps_srh = GNUNET_RPS_stream_request (h, |
1609 | 0, | ||
1610 | &got_stream_peer_cb, | 1609 | &got_stream_peer_cb, |
1611 | rps_peer); | 1610 | rps_peer); |
1612 | } | 1611 | } |