summaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c84
1 files changed, 69 insertions, 15 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index cfab06f17..420323c4b 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -11,7 +11,7 @@
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle
void *ready_cb_cls;
/**
+ * @brief Scheduler task for scheduled callback
+ */
+ struct GNUNET_SCHEDULER_Task *callback_task;
+
+ /**
* @brief Next element of the DLL
*/
struct GNUNET_RPS_StreamRequestHandle *next;
@@ -172,6 +177,19 @@ struct cb_cls_pack
/**
+ * @brief Peers received from the biased stream to be passed to all
+ * srh_handlers
+ */
+static struct GNUNET_PeerIdentity *srh_callback_peers;
+
+/**
+ * @brief Number of peers in the biased stream that are to be passed to all
+ * srh_handlers
+ */
+static uint64_t srh_callback_num_peers;
+
+
+/**
* @brief Create a new handle for a stream request
*
* @param rps_handle The rps handle
@@ -213,6 +231,12 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
struct GNUNET_RPS_StreamRequestHandle *srh_head,
struct GNUNET_RPS_StreamRequestHandle *srh_tail)
{
+ GNUNET_assert (NULL != srh);
+ if (NULL != srh->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh->callback_task);
+ srh->callback_task = NULL;
+ }
GNUNET_CONTAINER_DLL_remove (srh_head,
srh_tail,
srh);
@@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
{
struct GNUNET_RPS_Handle *rps_handle;
- GNUNET_assert (NULL != srh);
rps_handle = srh->rps_handle;
- GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
- rps_handle->stream_requests_tail,
- srh);
- GNUNET_free (srh);
+ remove_stream_request (srh,
+ rps_handle->stream_requests_head,
+ rps_handle->stream_requests_tail);
if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
}
@@ -463,6 +485,24 @@ check_stream_input (void *cls,
return GNUNET_OK;
}
+
+/**
+ * @brief Called by the scheduler to call the callbacks of the srh handlers
+ *
+ * @param cls Stream request handle
+ */
+static void
+srh_callback_scheduled (void *cls)
+{
+ struct GNUNET_RPS_StreamRequestHandle *srh = cls;
+
+ srh->callback_task = NULL;
+ srh->ready_cb (srh->ready_cb_cls,
+ srh_callback_num_peers,
+ srh_callback_peers);
+}
+
+
/**
* This function is called, when the service sends another peer from the biased
* stream.
@@ -476,13 +516,20 @@ handle_stream_input (void *cls,
const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
{
struct GNUNET_RPS_Handle *h = cls;
- const struct GNUNET_PeerIdentity *peers;
+ //const struct GNUNET_PeerIdentity *peers;
uint64_t num_peers;
struct GNUNET_RPS_StreamRequestHandle *srh_iter;
struct GNUNET_RPS_StreamRequestHandle *srh_next;
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ //peers = (struct GNUNET_PeerIdentity *) &msg[1];
num_peers = ntohl (msg->num_peers);
+ srh_callback_num_peers = num_peers;
+ if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers);
+ srh_callback_peers =
+ GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (srh_callback_peers,
+ &msg[1],
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received %" PRIu64 " peer(s) from stream input.\n",
num_peers);
@@ -492,9 +539,12 @@ handle_stream_input (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
/* Store next pointer - srh might be removed/freed in callback */
srh_next = srh_iter->next;
- srh_iter->ready_cb (srh_iter->ready_cb_cls,
- num_peers,
- peers);
+ if (NULL != srh_iter->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
+ }
+ srh_iter->callback_task =
+ GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter);
srh_iter = srh_next;
}
@@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
h = rh->rps_handle;
GNUNET_assert (NULL != rh);
- GNUNET_assert (NULL != rh->srh);
- remove_stream_request (rh->srh,
- h->stream_requests_head,
- h->stream_requests_tail);
+ GNUNET_assert (h == rh->srh->rps_handle);
+ GNUNET_RPS_stream_cancel (rh->srh);
+ rh->srh = NULL;
if (NULL == h->stream_requests_head) cancel_stream(h);
if (NULL != rh->sampler_rh)
{
@@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
GNUNET_RPS_stream_cancel (srh_tmp);
}
}
+ if (NULL != srh_callback_peers)
+ {
+ GNUNET_free (srh_callback_peers);
+ srh_callback_peers = NULL;
+ }
if (NULL != h->view_update_cb)
{
LOG (GNUNET_ERROR_TYPE_WARNING,