aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2018-10-14 13:35:23 +0200
committerJulius Bünger <buenger@mytum.de>2018-10-14 13:35:23 +0200
commit6048d2a10a95822d06b5d7be640adc89a895b01a (patch)
treee53b7a7efcebe282b49dff4b258f26dcbb71442a /src
parent10a1964c7c8fc9d2f9eab2942fa71fa00318a1b4 (diff)
downloadgnunet-6048d2a10a95822d06b5d7be640adc89a895b01a.tar.gz
gnunet-6048d2a10a95822d06b5d7be640adc89a895b01a.zip
RPS API: Remove numer of peers from stream request
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_rps_service.h3
-rw-r--r--src/rps/gnunet-rps.c31
-rw-r--r--src/rps/rps_api.c79
-rw-r--r--src/rps/test_rps.c1
4 files changed, 10 insertions, 104 deletions
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index 7fdfe491e..274ca94a2 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -180,14 +180,11 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
180 * Request biased stream of peers that are being put into the sampler 180 * Request biased stream of peers that are being put into the sampler
181 * 181 *
182 * @param rps_handle handle to the rps service 182 * @param rps_handle handle to the rps service
183 * @param num_req_peers number of peers we want to receive
184 * (0 for infinite updates)
185 * @param cls a closure that will be given to the callback 183 * @param cls a closure that will be given to the callback
186 * @param ready_cb the callback called when the peers are available 184 * @param ready_cb the callback called when the peers are available
187 */ 185 */
188struct GNUNET_RPS_StreamRequestHandle * 186struct GNUNET_RPS_StreamRequestHandle *
189GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, 187GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
190 uint32_t num_updates,
191 GNUNET_RPS_NotifyReadyCB stream_input_cb, 188 GNUNET_RPS_NotifyReadyCB stream_input_cb,
192 void *cls); 189 void *cls);
193 190
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index d0f905f51..49189481f 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -58,11 +58,6 @@ static int stream_input;
58 */ 58 */
59static uint64_t num_view_updates; 59static uint64_t num_view_updates;
60 60
61/**
62 * @brief Number of peers we want to receive from stream
63 */
64static uint64_t num_stream_peers;
65
66 61
67/** 62/**
68 * Task run when user presses CTRL-C to abort. 63 * Task run when user presses CTRL-C to abort.
@@ -162,24 +157,13 @@ stream_input_handle (void *cls,
162 157
163 if (0 == num_peers) 158 if (0 == num_peers)
164 { 159 {
165 FPRINTF (stdout, "Empty view\n"); 160 FPRINTF (stdout, "No peer was returned\n");
166 } 161 }
167 req_handle = NULL; 162 req_handle = NULL;
168 for (i = 0; i < num_peers; i++) 163 for (i = 0; i < num_peers; i++)
169 { 164 {
170 FPRINTF (stdout, "%s\n", 165 FPRINTF (stdout, "%s\n",
171 GNUNET_i2s_full (&recv_peers[i])); 166 GNUNET_i2s_full (&recv_peers[i]));
172
173 if (1 == num_stream_peers)
174 {
175 ret = 0;
176 GNUNET_SCHEDULER_shutdown ();
177 break;
178 }
179 else if (1 < num_stream_peers)
180 {
181 num_stream_peers--;
182 }
183 } 167 }
184} 168}
185 169
@@ -243,18 +227,7 @@ run (void *cls,
243 } else if (stream_input) 227 } else if (stream_input)
244 { 228 {
245 /* Get updates of view */ 229 /* Get updates of view */
246 if (NULL == args[0] || 230 GNUNET_RPS_stream_request (rps_handle, stream_input_handle, NULL);
247 0 == sscanf (args[0], "%lu", &num_stream_peers))
248 {
249 num_stream_peers = 0;
250 }
251 GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
252 if (0 != num_stream_peers)
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254 "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
255 else
256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257 "Requesting continuous peers from biased stream\n");
258 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); 231 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
259 } 232 }
260 else 233 else
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 5c68e4337..02d833506 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -42,11 +42,6 @@ struct GNUNET_RPS_StreamRequestHandle
42 struct GNUNET_RPS_Handle *rps_handle; 42 struct GNUNET_RPS_Handle *rps_handle;
43 43
44 /** 44 /**
45 * The number of requested peers.
46 */
47 uint32_t num_peers_left;
48
49 /**
50 * The callback to be called when we receive an answer. 45 * The callback to be called when we receive an answer.
51 */ 46 */
52 GNUNET_RPS_NotifyReadyCB ready_cb; 47 GNUNET_RPS_NotifyReadyCB ready_cb;
@@ -188,7 +183,6 @@ struct cb_cls_pack
188 */ 183 */
189static struct GNUNET_RPS_StreamRequestHandle * 184static struct GNUNET_RPS_StreamRequestHandle *
190new_stream_request (struct GNUNET_RPS_Handle *rps_handle, 185new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
191 uint64_t num_peers,
192 GNUNET_RPS_NotifyReadyCB ready_cb, 186 GNUNET_RPS_NotifyReadyCB ready_cb,
193 void *cls) 187 void *cls)
194{ 188{
@@ -197,7 +191,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
197 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); 191 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
198 192
199 srh->rps_handle = rps_handle; 193 srh->rps_handle = rps_handle;
200 srh->num_peers_left = num_peers;
201 srh->ready_cb = ready_cb; 194 srh->ready_cb = ready_cb;
202 srh->ready_cb_cls = cls; 195 srh->ready_cb_cls = cls;
203 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, 196 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
@@ -327,14 +320,11 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
327 * Request biased stream of peers that are being put into the sampler 320 * Request biased stream of peers that are being put into the sampler
328 * 321 *
329 * @param rps_handle handle to the rps service 322 * @param rps_handle handle to the rps service
330 * @param num_req_peers number of peers we want to receive
331 * (0 for infinite updates)
332 * @param cls a closure that will be given to the callback 323 * @param cls a closure that will be given to the callback
333 * @param ready_cb the callback called when the peers are available 324 * @param ready_cb the callback called when the peers are available
334 */ 325 */
335struct GNUNET_RPS_StreamRequestHandle * 326struct GNUNET_RPS_StreamRequestHandle *
336GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, 327GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
337 uint32_t num_peers,
338 GNUNET_RPS_NotifyReadyCB stream_input_cb, 328 GNUNET_RPS_NotifyReadyCB stream_input_cb,
339 void *cls) 329 void *cls)
340{ 330{
@@ -343,12 +333,9 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
343 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; 333 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
344 334
345 srh = new_stream_request (rps_handle, 335 srh = new_stream_request (rps_handle,
346 num_peers, /* num requests */
347 stream_input_cb, 336 stream_input_cb,
348 cls); 337 cls);
349 LOG (GNUNET_ERROR_TYPE_DEBUG, 338 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
350 "Client requests %" PRIu32 " biased stream updates\n",
351 num_peers);
352 339
353 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); 340 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
354 GNUNET_MQ_send (rps_handle->mq, ev); 341 GNUNET_MQ_send (rps_handle->mq, ev);
@@ -492,68 +479,21 @@ handle_stream_input (void *cls,
492{ 479{
493 struct GNUNET_RPS_Handle *h = cls; 480 struct GNUNET_RPS_Handle *h = cls;
494 const struct GNUNET_PeerIdentity *peers; 481 const struct GNUNET_PeerIdentity *peers;
495 /* The following two pointers are used to prevent that new handles are
496 * inserted into the DLL, that is currently iterated over, from within a call
497 * to that handler_cb, are executed and in turn again add themselves to the
498 * iterated DLL infinitely */
499 struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
500 struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
501 uint64_t num_peers; 482 uint64_t num_peers;
502 uint64_t num_peers_return;
503 483
504 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 484 peers = (struct GNUNET_PeerIdentity *) &msg[1];
505 num_peers = ntohl (msg->num_peers); 485 num_peers = ntohl (msg->num_peers);
506 LOG (GNUNET_ERROR_TYPE_DEBUG, 486 LOG (GNUNET_ERROR_TYPE_DEBUG,
507 "Received %" PRIu64 " peer(s) from stream input.\n", 487 "Received %" PRIu64 " peer(s) from stream input.\n",
508 num_peers); 488 num_peers);
509 srh_head_tmp = h->stream_requests_head; 489 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
510 srh_tail_tmp = h->stream_requests_tail;
511 h->stream_requests_head = NULL;
512 h->stream_requests_tail = NULL;
513 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
514 NULL != srh_iter; 490 NULL != srh_iter;
515 srh_iter = srh_iter->next) 491 srh_iter = srh_iter->next)
516 { 492 {
517 LOG (GNUNET_ERROR_TYPE_DEBUG, 493 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
518 "Calling srh - left: %" PRIu64 "\n",
519 srh_iter->num_peers_left);
520 if (0 == srh_iter->num_peers_left) /* infinite updates */
521 {
522 num_peers_return = num_peers;
523 }
524 else if (num_peers > srh_iter->num_peers_left)
525 {
526 num_peers_return = num_peers - srh_iter->num_peers_left;
527 }
528 else /* num_peers <= srh_iter->num_peers_left */
529 {
530 num_peers_return = srh_iter->num_peers_left - num_peers;
531 }
532 srh_iter->ready_cb (srh_iter->ready_cb_cls, 494 srh_iter->ready_cb (srh_iter->ready_cb_cls,
533 num_peers_return, 495 num_peers,
534 peers); 496 peers);
535 if (0 == srh_iter->num_peers_left) ;
536 else if (num_peers_return >= srh_iter->num_peers_left)
537 {
538 remove_stream_request (srh_iter,
539 srh_head_tmp,
540 srh_tail_tmp);
541 }
542 else
543 {
544 srh_iter->num_peers_left -= num_peers_return;
545 }
546 }
547 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
548 NULL != srh_iter;
549 srh_iter = srh_iter->next)
550 {
551 GNUNET_CONTAINER_DLL_remove (srh_head_tmp,
552 srh_tail_tmp,
553 srh_iter);
554 GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
555 h->stream_requests_tail,
556 srh_iter);
557 } 497 }
558 498
559 if (NULL == h->stream_requests_head) 499 if (NULL == h->stream_requests_head)
@@ -738,7 +678,6 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
738 peers_ready_cb, 678 peers_ready_cb,
739 rh); 679 rh);
740 rh->srh = GNUNET_RPS_stream_request (rps_handle, 680 rh->srh = GNUNET_RPS_stream_request (rps_handle,
741 0, /* infinite updates */
742 collect_peers_cb, 681 collect_peers_cb,
743 rh); /* cls */ 682 rh); /* cls */
744 rh->ready_cb = ready_cb; 683 rh->ready_cb = ready_cb;
@@ -913,12 +852,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
913 struct GNUNET_RPS_Handle *h; 852 struct GNUNET_RPS_Handle *h;
914 853
915 h = rh->rps_handle; 854 h = rh->rps_handle;
916 if (NULL != rh->srh) 855 GNUNET_assert (NULL != rh->srh);
917 { 856 remove_stream_request (rh->srh,
918 remove_stream_request (rh->srh, 857 h->stream_requests_head,
919 h->stream_requests_head, 858 h->stream_requests_tail);
920 h->stream_requests_tail);
921 }
922 if (NULL == h->stream_requests_head) cancel_stream(h); 859 if (NULL == h->stream_requests_head) cancel_stream(h);
923 if (NULL != rh->sampler_rh) 860 if (NULL != rh->sampler_rh)
924 { 861 {
diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c
index 1c98a1e5e..0740d01df 100644
--- a/src/rps/test_rps.c
+++ b/src/rps/test_rps.c
@@ -1606,7 +1606,6 @@ sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1606 if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test"); 1606 if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test");
1607 else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */ 1607 else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */
1608 rps_peer->rps_srh = GNUNET_RPS_stream_request (h, 1608 rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
1609 0,
1610 &got_stream_peer_cb, 1609 &got_stream_peer_cb,
1611 rps_peer); 1610 rps_peer);
1612} 1611}