diff options
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 289 |
1 files changed, 238 insertions, 51 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index a558c8a35..e4f4db506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -32,6 +32,43 @@ | |||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) | 32 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) |
33 | 33 | ||
34 | /** | 34 | /** |
35 | * Handle for a request to get peers from biased stream of ids | ||
36 | */ | ||
37 | struct GNUNET_RPS_StreamRequestHandle | ||
38 | { | ||
39 | /** | ||
40 | * The client issuing the request. | ||
41 | */ | ||
42 | struct GNUNET_RPS_Handle *rps_handle; | ||
43 | |||
44 | /** | ||
45 | * The number of requested peers. | ||
46 | */ | ||
47 | uint32_t num_peers_left; | ||
48 | |||
49 | /** | ||
50 | * The callback to be called when we receive an answer. | ||
51 | */ | ||
52 | GNUNET_RPS_NotifyReadyCB ready_cb; | ||
53 | |||
54 | /** | ||
55 | * The closure for the callback. | ||
56 | */ | ||
57 | void *ready_cb_cls; | ||
58 | |||
59 | /** | ||
60 | * @brief Next element of the DLL | ||
61 | */ | ||
62 | struct GNUNET_RPS_StreamRequestHandle *next; | ||
63 | |||
64 | /** | ||
65 | * @brief Previous element of the DLL | ||
66 | */ | ||
67 | struct GNUNET_RPS_StreamRequestHandle *prev; | ||
68 | }; | ||
69 | |||
70 | |||
71 | /** | ||
35 | * Handler to handle requests from a client. | 72 | * Handler to handle requests from a client. |
36 | */ | 73 | */ |
37 | struct GNUNET_RPS_Handle | 74 | struct GNUNET_RPS_Handle |
@@ -67,14 +104,19 @@ struct GNUNET_RPS_Handle | |||
67 | void *view_update_cls; | 104 | void *view_update_cls; |
68 | 105 | ||
69 | /** | 106 | /** |
70 | * @brief Callback called on each peer of the biased input stream | 107 | * @brief Closure to each requested peer from the biased stream |
108 | */ | ||
109 | void *stream_input_cls; | ||
110 | |||
111 | /** | ||
112 | * @brief Head of the DLL of stream requests | ||
71 | */ | 113 | */ |
72 | GNUNET_RPS_NotifyReadyCB stream_input_cb; | 114 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_head; |
73 | 115 | ||
74 | /** | 116 | /** |
75 | * @brief Closure to each requested peer from the biased stream | 117 | * @brief Tail of the DLL of stream requests |
76 | */ | 118 | */ |
77 | void *stream_input_cls; | 119 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; |
78 | }; | 120 | }; |
79 | 121 | ||
80 | 122 | ||
@@ -139,6 +181,91 @@ struct cb_cls_pack | |||
139 | 181 | ||
140 | 182 | ||
141 | /** | 183 | /** |
184 | * @brief Create a new handle for a stream request | ||
185 | * | ||
186 | * @param rps_handle The rps handle | ||
187 | * @param num_peers The number of desired peers | ||
188 | * @param ready_cb The callback to be called, once all peers are ready | ||
189 | * @param cls The colsure to provide to the callback | ||
190 | * | ||
191 | * @return The handle to the stream request | ||
192 | */ | ||
193 | static struct GNUNET_RPS_StreamRequestHandle * | ||
194 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
195 | uint64_t num_peers, | ||
196 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
197 | void *cls) | ||
198 | { | ||
199 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
200 | |||
201 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); | ||
202 | |||
203 | srh->rps_handle = rps_handle; | ||
204 | srh->num_peers_left = num_peers; | ||
205 | srh->ready_cb = ready_cb; | ||
206 | srh->ready_cb_cls = cls; | ||
207 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, | ||
208 | rps_handle->stream_requests_tail, | ||
209 | srh); | ||
210 | |||
211 | return srh; | ||
212 | } | ||
213 | |||
214 | |||
215 | /** | ||
216 | * @brief Remove the given stream request from the list of requests and memory | ||
217 | * | ||
218 | * @param srh The request to be removed | ||
219 | * @param srh_head Head of the DLL to remove request from | ||
220 | * @param srh_tail Tail of the DLL to remove request from | ||
221 | */ | ||
222 | static void | ||
223 | remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, | ||
224 | struct GNUNET_RPS_StreamRequestHandle *srh_head, | ||
225 | struct GNUNET_RPS_StreamRequestHandle *srh_tail) | ||
226 | { | ||
227 | GNUNET_CONTAINER_DLL_remove (srh_head, | ||
228 | srh_tail, | ||
229 | srh); | ||
230 | |||
231 | GNUNET_free (srh); | ||
232 | } | ||
233 | |||
234 | |||
235 | /** | ||
236 | * @brief Create new request handle | ||
237 | * | ||
238 | * @param rps_handle Handle to the service | ||
239 | * @param num_requests Number of requests | ||
240 | * @param ready_cb Callback | ||
241 | * @param cls Closure | ||
242 | * | ||
243 | * @return The newly created request handle | ||
244 | */ | ||
245 | static struct GNUNET_RPS_Request_Handle * | ||
246 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
247 | uint64_t num_requests, | ||
248 | struct RPS_Sampler *sampler, | ||
249 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
250 | void *cls) | ||
251 | { | ||
252 | struct GNUNET_RPS_Request_Handle *rh; | ||
253 | |||
254 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
255 | rh->rps_handle = rps_handle; | ||
256 | rh->id = rps_handle->current_request_id++; | ||
257 | rh->num_requests = num_requests; | ||
258 | rh->sampler = sampler; | ||
259 | rh->ready_cb = ready_cb; | ||
260 | rh->ready_cb_cls = cls; | ||
261 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
262 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
263 | |||
264 | return rh; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
142 | * @brief Send a request to the service. | 269 | * @brief Send a request to the service. |
143 | * | 270 | * |
144 | * @param h rps handle | 271 | * @param h rps handle |
@@ -304,24 +431,27 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | |||
304 | * @param cls a closure that will be given to the callback | 431 | * @param cls a closure that will be given to the callback |
305 | * @param ready_cb the callback called when the peers are available | 432 | * @param ready_cb the callback called when the peers are available |
306 | */ | 433 | */ |
307 | void | 434 | struct GNUNET_RPS_StreamRequestHandle * |
308 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 435 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
309 | uint32_t num_peers, | 436 | uint32_t num_peers, |
310 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | 437 | GNUNET_RPS_NotifyReadyCB stream_input_cb, |
311 | void *cls) | 438 | void *cls) |
312 | { | 439 | { |
440 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
313 | struct GNUNET_MQ_Envelope *ev; | 441 | struct GNUNET_MQ_Envelope *ev; |
314 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | 442 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; |
315 | 443 | ||
444 | srh = new_stream_request (rps_handle, | ||
445 | num_peers, /* num requests */ | ||
446 | stream_input_cb, | ||
447 | cls); | ||
316 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 448 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
317 | "Client requests %" PRIu32 " biased stream updates\n", | 449 | "Client requests %" PRIu32 " biased stream updates\n", |
318 | num_peers); | 450 | num_peers); |
319 | rps_handle->stream_input_cb = stream_input_cb; | ||
320 | rps_handle->stream_input_cls = cls; | ||
321 | 451 | ||
322 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | 452 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); |
323 | msg->num_peers = htonl (num_peers); | ||
324 | GNUNET_MQ_send (rps_handle->mq, ev); | 453 | GNUNET_MQ_send (rps_handle->mq, ev); |
454 | return srh; | ||
325 | } | 455 | } |
326 | 456 | ||
327 | 457 | ||
@@ -379,6 +509,41 @@ handle_view_update (void *cls, | |||
379 | 509 | ||
380 | 510 | ||
381 | /** | 511 | /** |
512 | * @brief Send message to service that this client does not want to receive | ||
513 | * further updates from the biased peer stream | ||
514 | * | ||
515 | * @param rps_handle The handle representing the service to the client | ||
516 | */ | ||
517 | static void | ||
518 | cancel_stream (struct GNUNET_RPS_Handle *rps_handle) | ||
519 | { | ||
520 | struct GNUNET_MQ_Envelope *ev; | ||
521 | |||
522 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); | ||
523 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
524 | } | ||
525 | |||
526 | |||
527 | /** | ||
528 | * @brief Cancel a specific request for updates from the biased peer stream | ||
529 | * | ||
530 | * @param srh The request handle to cancel | ||
531 | */ | ||
532 | void | ||
533 | GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) | ||
534 | { | ||
535 | struct GNUNET_RPS_Handle *rps_handle; | ||
536 | |||
537 | rps_handle = srh->rps_handle; | ||
538 | GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, | ||
539 | rps_handle->stream_requests_tail, | ||
540 | srh); | ||
541 | GNUNET_free (srh); | ||
542 | if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); | ||
543 | } | ||
544 | |||
545 | |||
546 | /** | ||
382 | * This function is called, when the service sends another peer from the biased | 547 | * This function is called, when the service sends another peer from the biased |
383 | * stream. | 548 | * stream. |
384 | * It calls the callback the caller provided | 549 | * It calls the callback the caller provided |
@@ -420,16 +585,71 @@ handle_stream_input (void *cls, | |||
420 | { | 585 | { |
421 | struct GNUNET_RPS_Handle *h = cls; | 586 | struct GNUNET_RPS_Handle *h = cls; |
422 | const struct GNUNET_PeerIdentity *peers; | 587 | const struct GNUNET_PeerIdentity *peers; |
588 | /* The following two pointers are used to prevent that new handles are | ||
589 | * inserted into the DLL, that is currently iterated over, from within a call | ||
590 | * to that handler_cb, are executed and in turn again add themselves to the | ||
591 | * iterated DLL infinitely */ | ||
592 | struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; | ||
593 | struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; | ||
594 | uint64_t num_peers; | ||
595 | uint64_t num_peers_return; | ||
423 | 596 | ||
424 | /* Give the peers back */ | 597 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
598 | num_peers = ntohl (msg->num_peers); | ||
425 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 599 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
426 | "New peer of %" PRIu64 " biased input stream\n", | 600 | "Received %" PRIu64 " peer(s) from stream input.\n", |
427 | ntohl (msg->num_peers)); | 601 | num_peers); |
602 | srh_head_tmp = h->stream_requests_head; | ||
603 | srh_tail_tmp = h->stream_requests_tail; | ||
604 | h->stream_requests_head = NULL; | ||
605 | h->stream_requests_tail = NULL; | ||
606 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
607 | NULL != srh_iter; | ||
608 | srh_iter = srh_iter->next) | ||
609 | { | ||
610 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
611 | "Calling srh - left: %" PRIu64 "\n", | ||
612 | srh_iter->num_peers_left); | ||
613 | if (0 == srh_iter->num_peers_left) /* infinite updates */ | ||
614 | { | ||
615 | num_peers_return = num_peers; | ||
616 | } | ||
617 | else if (num_peers > srh_iter->num_peers_left) | ||
618 | { | ||
619 | num_peers_return = num_peers - srh_iter->num_peers_left; | ||
620 | } | ||
621 | else /* num_peers <= srh_iter->num_peers_left */ | ||
622 | { | ||
623 | num_peers_return = srh_iter->num_peers_left - num_peers; | ||
624 | } | ||
625 | srh_iter->ready_cb (srh_iter->ready_cb_cls, | ||
626 | num_peers_return, | ||
627 | peers); | ||
628 | if (0 == srh_iter->num_peers_left) ; | ||
629 | else if (num_peers_return >= srh_iter->num_peers_left) | ||
630 | { | ||
631 | remove_stream_request (srh_iter, | ||
632 | srh_head_tmp, | ||
633 | srh_tail_tmp); | ||
634 | } | ||
635 | else | ||
636 | { | ||
637 | srh_iter->num_peers_left -= num_peers_return; | ||
638 | } | ||
639 | } | ||
640 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
641 | NULL != srh_iter; | ||
642 | srh_iter = srh_iter->next) | ||
643 | { | ||
644 | GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, | ||
645 | h->stream_requests_tail, | ||
646 | srh_iter); | ||
647 | } | ||
428 | 648 | ||
429 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 649 | if (NULL == h->stream_requests_head) |
430 | GNUNET_assert (NULL != h); | 650 | { |
431 | GNUNET_assert (NULL != h->stream_input_cb); | 651 | cancel_stream (h); |
432 | h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers); | 652 | } |
433 | } | 653 | } |
434 | 654 | ||
435 | 655 | ||
@@ -525,39 +745,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
525 | 745 | ||
526 | 746 | ||
527 | /** | 747 | /** |
528 | * @brief Create new request handle | ||
529 | * | ||
530 | * @param rps_handle Handle to the service | ||
531 | * @param num_requests Number of requests | ||
532 | * @param ready_cb Callback | ||
533 | * @param cls Closure | ||
534 | * | ||
535 | * @return The newly created request handle | ||
536 | */ | ||
537 | static struct GNUNET_RPS_Request_Handle * | ||
538 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
539 | uint64_t num_requests, | ||
540 | struct RPS_Sampler *sampler, | ||
541 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
542 | void *cls) | ||
543 | { | ||
544 | struct GNUNET_RPS_Request_Handle *rh; | ||
545 | |||
546 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
547 | rh->rps_handle = rps_handle; | ||
548 | rh->id = rps_handle->current_request_id++; | ||
549 | rh->num_requests = num_requests; | ||
550 | rh->sampler = sampler; | ||
551 | rh->ready_cb = ready_cb; | ||
552 | rh->ready_cb_cls = cls; | ||
553 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
554 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
555 | |||
556 | return rh; | ||
557 | } | ||
558 | |||
559 | |||
560 | /** | ||
561 | * Request n random peers. | 748 | * Request n random peers. |
562 | * | 749 | * |
563 | * @param rps_handle handle to the rps service | 750 | * @param rps_handle handle to the rps service |
@@ -646,9 +833,9 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | |||
646 | */ | 833 | */ |
647 | struct GNUNET_RPS_Request_Handle * | 834 | struct GNUNET_RPS_Request_Handle * |
648 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | 835 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, |
649 | uint32_t num_req_peers, | 836 | uint32_t num_req_peers, |
650 | GNUNET_RPS_NotifyReadyCB ready_cb, | 837 | GNUNET_RPS_NotifyReadyCB ready_cb, |
651 | void *cls) | 838 | void *cls) |
652 | { | 839 | { |
653 | struct GNUNET_RPS_Request_Handle *rh; | 840 | struct GNUNET_RPS_Request_Handle *rh; |
654 | 841 | ||