From 101694a7120bc90c8c02024294b6f098b1bca9ff Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Thu, 22 Nov 2018 13:07:47 +0100 Subject: RPS api: Schedule callback --- src/rps/rps_api.c | 84 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 15 deletions(-) (limited to 'src/rps/rps_api.c') diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index cfab06f17..420323c4b 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ @@ -51,6 +51,11 @@ struct GNUNET_RPS_StreamRequestHandle */ void *ready_cb_cls; + /** + * @brief Scheduler task for scheduled callback + */ + struct GNUNET_SCHEDULER_Task *callback_task; + /** * @brief Next element of the DLL */ @@ -171,6 +176,19 @@ struct cb_cls_pack }; +/** + * @brief Peers received from the biased stream to be passed to all + * srh_handlers + */ +static struct GNUNET_PeerIdentity *srh_callback_peers; + +/** + * @brief Number of peers in the biased stream that are to be passed to all + * srh_handlers + */ +static uint64_t srh_callback_num_peers; + + /** * @brief Create a new handle for a stream request * @@ -213,6 +231,12 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, struct GNUNET_RPS_StreamRequestHandle *srh_head, struct GNUNET_RPS_StreamRequestHandle *srh_tail) { + GNUNET_assert (NULL != srh); + if (NULL != srh->callback_task) + { + GNUNET_SCHEDULER_cancel (srh->callback_task); + srh->callback_task = NULL; + } GNUNET_CONTAINER_DLL_remove (srh_head, srh_tail, srh); @@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) { struct GNUNET_RPS_Handle *rps_handle; - GNUNET_assert (NULL != srh); rps_handle = srh->rps_handle; - GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, - rps_handle->stream_requests_tail, - srh); - GNUNET_free (srh); + remove_stream_request (srh, + rps_handle->stream_requests_head, + rps_handle->stream_requests_tail); if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); } @@ -463,6 +485,24 @@ check_stream_input (void *cls, return GNUNET_OK; } + +/** + * @brief Called by the scheduler to call the callbacks of the srh handlers + * + * @param cls Stream request handle + */ +static void +srh_callback_scheduled (void *cls) +{ + struct GNUNET_RPS_StreamRequestHandle *srh = cls; + + srh->callback_task = NULL; + srh->ready_cb (srh->ready_cb_cls, + srh_callback_num_peers, + srh_callback_peers); +} + + /** * This function is called, when the service sends another peer from the biased * stream. @@ -476,13 +516,20 @@ handle_stream_input (void *cls, const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) { struct GNUNET_RPS_Handle *h = cls; - const struct GNUNET_PeerIdentity *peers; + //const struct GNUNET_PeerIdentity *peers; uint64_t num_peers; struct GNUNET_RPS_StreamRequestHandle *srh_iter; struct GNUNET_RPS_StreamRequestHandle *srh_next; - peers = (struct GNUNET_PeerIdentity *) &msg[1]; + //peers = (struct GNUNET_PeerIdentity *) &msg[1]; num_peers = ntohl (msg->num_peers); + srh_callback_num_peers = num_peers; + if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers); + srh_callback_peers = + GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (srh_callback_peers, + &msg[1], + num_peers * sizeof (struct GNUNET_PeerIdentity)); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received %" PRIu64 " peer(s) from stream input.\n", num_peers); @@ -492,9 +539,12 @@ handle_stream_input (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); /* Store next pointer - srh might be removed/freed in callback */ srh_next = srh_iter->next; - srh_iter->ready_cb (srh_iter->ready_cb_cls, - num_peers, - peers); + if (NULL != srh_iter->callback_task) + { + GNUNET_SCHEDULER_cancel (srh_iter->callback_task); + } + srh_iter->callback_task = + GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter); srh_iter = srh_next; } @@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) h = rh->rps_handle; GNUNET_assert (NULL != rh); - GNUNET_assert (NULL != rh->srh); - remove_stream_request (rh->srh, - h->stream_requests_head, - h->stream_requests_tail); + GNUNET_assert (h == rh->srh->rps_handle); + GNUNET_RPS_stream_cancel (rh->srh); + rh->srh = NULL; if (NULL == h->stream_requests_head) cancel_stream(h); if (NULL != rh->sampler_rh) { @@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) GNUNET_RPS_stream_cancel (srh_tmp); } } + if (NULL != srh_callback_peers) + { + GNUNET_free (srh_callback_peers); + srh_callback_peers = NULL; + } if (NULL != h->view_update_cb) { LOG (GNUNET_ERROR_TYPE_WARNING, -- cgit v1.2.3