diff options
author | Julius Bünger <buenger@mytum.de> | 2018-10-14 13:35:23 +0200 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2018-10-14 13:35:23 +0200 |
commit | 6048d2a10a95822d06b5d7be640adc89a895b01a (patch) | |
tree | e53b7a7efcebe282b49dff4b258f26dcbb71442a /src/rps/rps_api.c | |
parent | 10a1964c7c8fc9d2f9eab2942fa71fa00318a1b4 (diff) | |
download | gnunet-6048d2a10a95822d06b5d7be640adc89a895b01a.tar.gz gnunet-6048d2a10a95822d06b5d7be640adc89a895b01a.zip |
RPS API: Remove numer of peers from stream request
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 79 |
1 files changed, 8 insertions, 71 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 5c68e4337..02d833506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -42,11 +42,6 @@ struct GNUNET_RPS_StreamRequestHandle | |||
42 | struct GNUNET_RPS_Handle *rps_handle; | 42 | struct GNUNET_RPS_Handle *rps_handle; |
43 | 43 | ||
44 | /** | 44 | /** |
45 | * The number of requested peers. | ||
46 | */ | ||
47 | uint32_t num_peers_left; | ||
48 | |||
49 | /** | ||
50 | * The callback to be called when we receive an answer. | 45 | * The callback to be called when we receive an answer. |
51 | */ | 46 | */ |
52 | GNUNET_RPS_NotifyReadyCB ready_cb; | 47 | GNUNET_RPS_NotifyReadyCB ready_cb; |
@@ -188,7 +183,6 @@ struct cb_cls_pack | |||
188 | */ | 183 | */ |
189 | static struct GNUNET_RPS_StreamRequestHandle * | 184 | static struct GNUNET_RPS_StreamRequestHandle * |
190 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 185 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
191 | uint64_t num_peers, | ||
192 | GNUNET_RPS_NotifyReadyCB ready_cb, | 186 | GNUNET_RPS_NotifyReadyCB ready_cb, |
193 | void *cls) | 187 | void *cls) |
194 | { | 188 | { |
@@ -197,7 +191,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | |||
197 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); | 191 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); |
198 | 192 | ||
199 | srh->rps_handle = rps_handle; | 193 | srh->rps_handle = rps_handle; |
200 | srh->num_peers_left = num_peers; | ||
201 | srh->ready_cb = ready_cb; | 194 | srh->ready_cb = ready_cb; |
202 | srh->ready_cb_cls = cls; | 195 | srh->ready_cb_cls = cls; |
203 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, | 196 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, |
@@ -327,14 +320,11 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) | |||
327 | * Request biased stream of peers that are being put into the sampler | 320 | * Request biased stream of peers that are being put into the sampler |
328 | * | 321 | * |
329 | * @param rps_handle handle to the rps service | 322 | * @param rps_handle handle to the rps service |
330 | * @param num_req_peers number of peers we want to receive | ||
331 | * (0 for infinite updates) | ||
332 | * @param cls a closure that will be given to the callback | 323 | * @param cls a closure that will be given to the callback |
333 | * @param ready_cb the callback called when the peers are available | 324 | * @param ready_cb the callback called when the peers are available |
334 | */ | 325 | */ |
335 | struct GNUNET_RPS_StreamRequestHandle * | 326 | struct GNUNET_RPS_StreamRequestHandle * |
336 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 327 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
337 | uint32_t num_peers, | ||
338 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | 328 | GNUNET_RPS_NotifyReadyCB stream_input_cb, |
339 | void *cls) | 329 | void *cls) |
340 | { | 330 | { |
@@ -343,12 +333,9 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | |||
343 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | 333 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; |
344 | 334 | ||
345 | srh = new_stream_request (rps_handle, | 335 | srh = new_stream_request (rps_handle, |
346 | num_peers, /* num requests */ | ||
347 | stream_input_cb, | 336 | stream_input_cb, |
348 | cls); | 337 | cls); |
349 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 338 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); |
350 | "Client requests %" PRIu32 " biased stream updates\n", | ||
351 | num_peers); | ||
352 | 339 | ||
353 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | 340 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); |
354 | GNUNET_MQ_send (rps_handle->mq, ev); | 341 | GNUNET_MQ_send (rps_handle->mq, ev); |
@@ -492,68 +479,21 @@ handle_stream_input (void *cls, | |||
492 | { | 479 | { |
493 | struct GNUNET_RPS_Handle *h = cls; | 480 | struct GNUNET_RPS_Handle *h = cls; |
494 | const struct GNUNET_PeerIdentity *peers; | 481 | const struct GNUNET_PeerIdentity *peers; |
495 | /* The following two pointers are used to prevent that new handles are | ||
496 | * inserted into the DLL, that is currently iterated over, from within a call | ||
497 | * to that handler_cb, are executed and in turn again add themselves to the | ||
498 | * iterated DLL infinitely */ | ||
499 | struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; | ||
500 | struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; | ||
501 | uint64_t num_peers; | 482 | uint64_t num_peers; |
502 | uint64_t num_peers_return; | ||
503 | 483 | ||
504 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 484 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
505 | num_peers = ntohl (msg->num_peers); | 485 | num_peers = ntohl (msg->num_peers); |
506 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 486 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
507 | "Received %" PRIu64 " peer(s) from stream input.\n", | 487 | "Received %" PRIu64 " peer(s) from stream input.\n", |
508 | num_peers); | 488 | num_peers); |
509 | srh_head_tmp = h->stream_requests_head; | 489 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; |
510 | srh_tail_tmp = h->stream_requests_tail; | ||
511 | h->stream_requests_head = NULL; | ||
512 | h->stream_requests_tail = NULL; | ||
513 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
514 | NULL != srh_iter; | 490 | NULL != srh_iter; |
515 | srh_iter = srh_iter->next) | 491 | srh_iter = srh_iter->next) |
516 | { | 492 | { |
517 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 493 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); |
518 | "Calling srh - left: %" PRIu64 "\n", | ||
519 | srh_iter->num_peers_left); | ||
520 | if (0 == srh_iter->num_peers_left) /* infinite updates */ | ||
521 | { | ||
522 | num_peers_return = num_peers; | ||
523 | } | ||
524 | else if (num_peers > srh_iter->num_peers_left) | ||
525 | { | ||
526 | num_peers_return = num_peers - srh_iter->num_peers_left; | ||
527 | } | ||
528 | else /* num_peers <= srh_iter->num_peers_left */ | ||
529 | { | ||
530 | num_peers_return = srh_iter->num_peers_left - num_peers; | ||
531 | } | ||
532 | srh_iter->ready_cb (srh_iter->ready_cb_cls, | 494 | srh_iter->ready_cb (srh_iter->ready_cb_cls, |
533 | num_peers_return, | 495 | num_peers, |
534 | peers); | 496 | peers); |
535 | if (0 == srh_iter->num_peers_left) ; | ||
536 | else if (num_peers_return >= srh_iter->num_peers_left) | ||
537 | { | ||
538 | remove_stream_request (srh_iter, | ||
539 | srh_head_tmp, | ||
540 | srh_tail_tmp); | ||
541 | } | ||
542 | else | ||
543 | { | ||
544 | srh_iter->num_peers_left -= num_peers_return; | ||
545 | } | ||
546 | } | ||
547 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
548 | NULL != srh_iter; | ||
549 | srh_iter = srh_iter->next) | ||
550 | { | ||
551 | GNUNET_CONTAINER_DLL_remove (srh_head_tmp, | ||
552 | srh_tail_tmp, | ||
553 | srh_iter); | ||
554 | GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, | ||
555 | h->stream_requests_tail, | ||
556 | srh_iter); | ||
557 | } | 497 | } |
558 | 498 | ||
559 | if (NULL == h->stream_requests_head) | 499 | if (NULL == h->stream_requests_head) |
@@ -738,7 +678,6 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
738 | peers_ready_cb, | 678 | peers_ready_cb, |
739 | rh); | 679 | rh); |
740 | rh->srh = GNUNET_RPS_stream_request (rps_handle, | 680 | rh->srh = GNUNET_RPS_stream_request (rps_handle, |
741 | 0, /* infinite updates */ | ||
742 | collect_peers_cb, | 681 | collect_peers_cb, |
743 | rh); /* cls */ | 682 | rh); /* cls */ |
744 | rh->ready_cb = ready_cb; | 683 | rh->ready_cb = ready_cb; |
@@ -913,12 +852,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | |||
913 | struct GNUNET_RPS_Handle *h; | 852 | struct GNUNET_RPS_Handle *h; |
914 | 853 | ||
915 | h = rh->rps_handle; | 854 | h = rh->rps_handle; |
916 | if (NULL != rh->srh) | 855 | GNUNET_assert (NULL != rh->srh); |
917 | { | 856 | remove_stream_request (rh->srh, |
918 | remove_stream_request (rh->srh, | 857 | h->stream_requests_head, |
919 | h->stream_requests_head, | 858 | h->stream_requests_tail); |
920 | h->stream_requests_tail); | ||
921 | } | ||
922 | if (NULL == h->stream_requests_head) cancel_stream(h); | 859 | if (NULL == h->stream_requests_head) cancel_stream(h); |
923 | if (NULL != rh->sampler_rh) | 860 | if (NULL != rh->sampler_rh) |
924 | { | 861 | { |