aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rps/rps_api.c84
1 files changed, 69 insertions, 15 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index cfab06f17..420323c4b 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/ 17*/
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle
52 void *ready_cb_cls; 52 void *ready_cb_cls;
53 53
54 /** 54 /**
55 * @brief Scheduler task for scheduled callback
56 */
57 struct GNUNET_SCHEDULER_Task *callback_task;
58
59 /**
55 * @brief Next element of the DLL 60 * @brief Next element of the DLL
56 */ 61 */
57 struct GNUNET_RPS_StreamRequestHandle *next; 62 struct GNUNET_RPS_StreamRequestHandle *next;
@@ -172,6 +177,19 @@ struct cb_cls_pack
172 177
173 178
174/** 179/**
180 * @brief Peers received from the biased stream to be passed to all
181 * srh_handlers
182 */
183static struct GNUNET_PeerIdentity *srh_callback_peers;
184
185/**
186 * @brief Number of peers in the biased stream that are to be passed to all
187 * srh_handlers
188 */
189static uint64_t srh_callback_num_peers;
190
191
192/**
175 * @brief Create a new handle for a stream request 193 * @brief Create a new handle for a stream request
176 * 194 *
177 * @param rps_handle The rps handle 195 * @param rps_handle The rps handle
@@ -213,6 +231,12 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
213 struct GNUNET_RPS_StreamRequestHandle *srh_head, 231 struct GNUNET_RPS_StreamRequestHandle *srh_head,
214 struct GNUNET_RPS_StreamRequestHandle *srh_tail) 232 struct GNUNET_RPS_StreamRequestHandle *srh_tail)
215{ 233{
234 GNUNET_assert (NULL != srh);
235 if (NULL != srh->callback_task)
236 {
237 GNUNET_SCHEDULER_cancel (srh->callback_task);
238 srh->callback_task = NULL;
239 }
216 GNUNET_CONTAINER_DLL_remove (srh_head, 240 GNUNET_CONTAINER_DLL_remove (srh_head,
217 srh_tail, 241 srh_tail,
218 srh); 242 srh);
@@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
425{ 449{
426 struct GNUNET_RPS_Handle *rps_handle; 450 struct GNUNET_RPS_Handle *rps_handle;
427 451
428 GNUNET_assert (NULL != srh);
429 rps_handle = srh->rps_handle; 452 rps_handle = srh->rps_handle;
430 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, 453 remove_stream_request (srh,
431 rps_handle->stream_requests_tail, 454 rps_handle->stream_requests_head,
432 srh); 455 rps_handle->stream_requests_tail);
433 GNUNET_free (srh);
434 if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); 456 if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
435} 457}
436 458
@@ -463,6 +485,24 @@ check_stream_input (void *cls,
463 return GNUNET_OK; 485 return GNUNET_OK;
464} 486}
465 487
488
489/**
490 * @brief Called by the scheduler to call the callbacks of the srh handlers
491 *
492 * @param cls Stream request handle
493 */
494static void
495srh_callback_scheduled (void *cls)
496{
497 struct GNUNET_RPS_StreamRequestHandle *srh = cls;
498
499 srh->callback_task = NULL;
500 srh->ready_cb (srh->ready_cb_cls,
501 srh_callback_num_peers,
502 srh_callback_peers);
503}
504
505
466/** 506/**
467 * This function is called, when the service sends another peer from the biased 507 * This function is called, when the service sends another peer from the biased
468 * stream. 508 * stream.
@@ -476,13 +516,20 @@ handle_stream_input (void *cls,
476 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 516 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
477{ 517{
478 struct GNUNET_RPS_Handle *h = cls; 518 struct GNUNET_RPS_Handle *h = cls;
479 const struct GNUNET_PeerIdentity *peers; 519 //const struct GNUNET_PeerIdentity *peers;
480 uint64_t num_peers; 520 uint64_t num_peers;
481 struct GNUNET_RPS_StreamRequestHandle *srh_iter; 521 struct GNUNET_RPS_StreamRequestHandle *srh_iter;
482 struct GNUNET_RPS_StreamRequestHandle *srh_next; 522 struct GNUNET_RPS_StreamRequestHandle *srh_next;
483 523
484 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 524 //peers = (struct GNUNET_PeerIdentity *) &msg[1];
485 num_peers = ntohl (msg->num_peers); 525 num_peers = ntohl (msg->num_peers);
526 srh_callback_num_peers = num_peers;
527 if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers);
528 srh_callback_peers =
529 GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
530 GNUNET_memcpy (srh_callback_peers,
531 &msg[1],
532 num_peers * sizeof (struct GNUNET_PeerIdentity));
486 LOG (GNUNET_ERROR_TYPE_DEBUG, 533 LOG (GNUNET_ERROR_TYPE_DEBUG,
487 "Received %" PRIu64 " peer(s) from stream input.\n", 534 "Received %" PRIu64 " peer(s) from stream input.\n",
488 num_peers); 535 num_peers);
@@ -492,9 +539,12 @@ handle_stream_input (void *cls,
492 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); 539 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
493 /* Store next pointer - srh might be removed/freed in callback */ 540 /* Store next pointer - srh might be removed/freed in callback */
494 srh_next = srh_iter->next; 541 srh_next = srh_iter->next;
495 srh_iter->ready_cb (srh_iter->ready_cb_cls, 542 if (NULL != srh_iter->callback_task)
496 num_peers, 543 {
497 peers); 544 GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
545 }
546 srh_iter->callback_task =
547 GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter);
498 srh_iter = srh_next; 548 srh_iter = srh_next;
499 } 549 }
500 550
@@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
855 905
856 h = rh->rps_handle; 906 h = rh->rps_handle;
857 GNUNET_assert (NULL != rh); 907 GNUNET_assert (NULL != rh);
858 GNUNET_assert (NULL != rh->srh); 908 GNUNET_assert (h == rh->srh->rps_handle);
859 remove_stream_request (rh->srh, 909 GNUNET_RPS_stream_cancel (rh->srh);
860 h->stream_requests_head, 910 rh->srh = NULL;
861 h->stream_requests_tail);
862 if (NULL == h->stream_requests_head) cancel_stream(h); 911 if (NULL == h->stream_requests_head) cancel_stream(h);
863 if (NULL != rh->sampler_rh) 912 if (NULL != rh->sampler_rh)
864 { 913 {
@@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
891 GNUNET_RPS_stream_cancel (srh_tmp); 940 GNUNET_RPS_stream_cancel (srh_tmp);
892 } 941 }
893 } 942 }
943 if (NULL != srh_callback_peers)
944 {
945 GNUNET_free (srh_callback_peers);
946 srh_callback_peers = NULL;
947 }
894 if (NULL != h->view_update_cb) 948 if (NULL != h->view_update_cb)
895 { 949 {
896 LOG (GNUNET_ERROR_TYPE_WARNING, 950 LOG (GNUNET_ERROR_TYPE_WARNING,