diff options
author | Julius Bünger <buenger@mytum.de> | 2018-11-22 13:07:47 +0100 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2018-11-23 00:22:29 +0100 |
commit | 101694a7120bc90c8c02024294b6f098b1bca9ff (patch) | |
tree | cec008a9a6ae9696871c166e4654e838221743a3 /src/rps/rps_api.c | |
parent | 7b3d14aa405f2060132aee4987d085ade74bad37 (diff) | |
download | gnunet-101694a7120bc90c8c02024294b6f098b1bca9ff.tar.gz gnunet-101694a7120bc90c8c02024294b6f098b1bca9ff.zip |
RPS api: Schedule callback
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 84 |
1 files changed, 69 insertions, 15 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index cfab06f17..420323c4b 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -11,7 +11,7 @@ | |||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | 11 | WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | 12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
13 | Affero General Public License for more details. | 13 | Affero General Public License for more details. |
14 | 14 | ||
15 | You should have received a copy of the GNU Affero General Public License | 15 | You should have received a copy of the GNU Affero General Public License |
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | 16 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | */ | 17 | */ |
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle | |||
52 | void *ready_cb_cls; | 52 | void *ready_cb_cls; |
53 | 53 | ||
54 | /** | 54 | /** |
55 | * @brief Scheduler task for scheduled callback | ||
56 | */ | ||
57 | struct GNUNET_SCHEDULER_Task *callback_task; | ||
58 | |||
59 | /** | ||
55 | * @brief Next element of the DLL | 60 | * @brief Next element of the DLL |
56 | */ | 61 | */ |
57 | struct GNUNET_RPS_StreamRequestHandle *next; | 62 | struct GNUNET_RPS_StreamRequestHandle *next; |
@@ -172,6 +177,19 @@ struct cb_cls_pack | |||
172 | 177 | ||
173 | 178 | ||
174 | /** | 179 | /** |
180 | * @brief Peers received from the biased stream to be passed to all | ||
181 | * srh_handlers | ||
182 | */ | ||
183 | static struct GNUNET_PeerIdentity *srh_callback_peers; | ||
184 | |||
185 | /** | ||
186 | * @brief Number of peers in the biased stream that are to be passed to all | ||
187 | * srh_handlers | ||
188 | */ | ||
189 | static uint64_t srh_callback_num_peers; | ||
190 | |||
191 | |||
192 | /** | ||
175 | * @brief Create a new handle for a stream request | 193 | * @brief Create a new handle for a stream request |
176 | * | 194 | * |
177 | * @param rps_handle The rps handle | 195 | * @param rps_handle The rps handle |
@@ -213,6 +231,12 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, | |||
213 | struct GNUNET_RPS_StreamRequestHandle *srh_head, | 231 | struct GNUNET_RPS_StreamRequestHandle *srh_head, |
214 | struct GNUNET_RPS_StreamRequestHandle *srh_tail) | 232 | struct GNUNET_RPS_StreamRequestHandle *srh_tail) |
215 | { | 233 | { |
234 | GNUNET_assert (NULL != srh); | ||
235 | if (NULL != srh->callback_task) | ||
236 | { | ||
237 | GNUNET_SCHEDULER_cancel (srh->callback_task); | ||
238 | srh->callback_task = NULL; | ||
239 | } | ||
216 | GNUNET_CONTAINER_DLL_remove (srh_head, | 240 | GNUNET_CONTAINER_DLL_remove (srh_head, |
217 | srh_tail, | 241 | srh_tail, |
218 | srh); | 242 | srh); |
@@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) | |||
425 | { | 449 | { |
426 | struct GNUNET_RPS_Handle *rps_handle; | 450 | struct GNUNET_RPS_Handle *rps_handle; |
427 | 451 | ||
428 | GNUNET_assert (NULL != srh); | ||
429 | rps_handle = srh->rps_handle; | 452 | rps_handle = srh->rps_handle; |
430 | GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, | 453 | remove_stream_request (srh, |
431 | rps_handle->stream_requests_tail, | 454 | rps_handle->stream_requests_head, |
432 | srh); | 455 | rps_handle->stream_requests_tail); |
433 | GNUNET_free (srh); | ||
434 | if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); | 456 | if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); |
435 | } | 457 | } |
436 | 458 | ||
@@ -463,6 +485,24 @@ check_stream_input (void *cls, | |||
463 | return GNUNET_OK; | 485 | return GNUNET_OK; |
464 | } | 486 | } |
465 | 487 | ||
488 | |||
489 | /** | ||
490 | * @brief Called by the scheduler to call the callbacks of the srh handlers | ||
491 | * | ||
492 | * @param cls Stream request handle | ||
493 | */ | ||
494 | static void | ||
495 | srh_callback_scheduled (void *cls) | ||
496 | { | ||
497 | struct GNUNET_RPS_StreamRequestHandle *srh = cls; | ||
498 | |||
499 | srh->callback_task = NULL; | ||
500 | srh->ready_cb (srh->ready_cb_cls, | ||
501 | srh_callback_num_peers, | ||
502 | srh_callback_peers); | ||
503 | } | ||
504 | |||
505 | |||
466 | /** | 506 | /** |
467 | * This function is called, when the service sends another peer from the biased | 507 | * This function is called, when the service sends another peer from the biased |
468 | * stream. | 508 | * stream. |
@@ -476,13 +516,20 @@ handle_stream_input (void *cls, | |||
476 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | 516 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) |
477 | { | 517 | { |
478 | struct GNUNET_RPS_Handle *h = cls; | 518 | struct GNUNET_RPS_Handle *h = cls; |
479 | const struct GNUNET_PeerIdentity *peers; | 519 | //const struct GNUNET_PeerIdentity *peers; |
480 | uint64_t num_peers; | 520 | uint64_t num_peers; |
481 | struct GNUNET_RPS_StreamRequestHandle *srh_iter; | 521 | struct GNUNET_RPS_StreamRequestHandle *srh_iter; |
482 | struct GNUNET_RPS_StreamRequestHandle *srh_next; | 522 | struct GNUNET_RPS_StreamRequestHandle *srh_next; |
483 | 523 | ||
484 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 524 | //peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
485 | num_peers = ntohl (msg->num_peers); | 525 | num_peers = ntohl (msg->num_peers); |
526 | srh_callback_num_peers = num_peers; | ||
527 | if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers); | ||
528 | srh_callback_peers = | ||
529 | GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
530 | GNUNET_memcpy (srh_callback_peers, | ||
531 | &msg[1], | ||
532 | num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
486 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 533 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
487 | "Received %" PRIu64 " peer(s) from stream input.\n", | 534 | "Received %" PRIu64 " peer(s) from stream input.\n", |
488 | num_peers); | 535 | num_peers); |
@@ -492,9 +539,12 @@ handle_stream_input (void *cls, | |||
492 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); | 539 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); |
493 | /* Store next pointer - srh might be removed/freed in callback */ | 540 | /* Store next pointer - srh might be removed/freed in callback */ |
494 | srh_next = srh_iter->next; | 541 | srh_next = srh_iter->next; |
495 | srh_iter->ready_cb (srh_iter->ready_cb_cls, | 542 | if (NULL != srh_iter->callback_task) |
496 | num_peers, | 543 | { |
497 | peers); | 544 | GNUNET_SCHEDULER_cancel (srh_iter->callback_task); |
545 | } | ||
546 | srh_iter->callback_task = | ||
547 | GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter); | ||
498 | srh_iter = srh_next; | 548 | srh_iter = srh_next; |
499 | } | 549 | } |
500 | 550 | ||
@@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | |||
855 | 905 | ||
856 | h = rh->rps_handle; | 906 | h = rh->rps_handle; |
857 | GNUNET_assert (NULL != rh); | 907 | GNUNET_assert (NULL != rh); |
858 | GNUNET_assert (NULL != rh->srh); | 908 | GNUNET_assert (h == rh->srh->rps_handle); |
859 | remove_stream_request (rh->srh, | 909 | GNUNET_RPS_stream_cancel (rh->srh); |
860 | h->stream_requests_head, | 910 | rh->srh = NULL; |
861 | h->stream_requests_tail); | ||
862 | if (NULL == h->stream_requests_head) cancel_stream(h); | 911 | if (NULL == h->stream_requests_head) cancel_stream(h); |
863 | if (NULL != rh->sampler_rh) | 912 | if (NULL != rh->sampler_rh) |
864 | { | 913 | { |
@@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | |||
891 | GNUNET_RPS_stream_cancel (srh_tmp); | 940 | GNUNET_RPS_stream_cancel (srh_tmp); |
892 | } | 941 | } |
893 | } | 942 | } |
943 | if (NULL != srh_callback_peers) | ||
944 | { | ||
945 | GNUNET_free (srh_callback_peers); | ||
946 | srh_callback_peers = NULL; | ||
947 | } | ||
894 | if (NULL != h->view_update_cb) | 948 | if (NULL != h->view_update_cb) |
895 | { | 949 | { |
896 | LOG (GNUNET_ERROR_TYPE_WARNING, | 950 | LOG (GNUNET_ERROR_TYPE_WARNING, |