diff options
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 331 |
1 files changed, 80 insertions, 251 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index e4f4db506..ee65c2a82 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -84,16 +84,6 @@ struct GNUNET_RPS_Handle | |||
84 | struct GNUNET_MQ_Handle *mq; | 84 | struct GNUNET_MQ_Handle *mq; |
85 | 85 | ||
86 | /** | 86 | /** |
87 | * Array of Request_Handles. | ||
88 | */ | ||
89 | struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers; | ||
90 | |||
91 | /** | ||
92 | * The id of the last request. | ||
93 | */ | ||
94 | uint32_t current_request_id; | ||
95 | |||
96 | /** | ||
97 | * @brief Callback called on each update of the view | 87 | * @brief Callback called on each update of the view |
98 | */ | 88 | */ |
99 | GNUNET_RPS_NotifyReadyCB view_update_cb; | 89 | GNUNET_RPS_NotifyReadyCB view_update_cb; |
@@ -131,11 +121,6 @@ struct GNUNET_RPS_Request_Handle | |||
131 | struct GNUNET_RPS_Handle *rps_handle; | 121 | struct GNUNET_RPS_Handle *rps_handle; |
132 | 122 | ||
133 | /** | 123 | /** |
134 | * The id of the request. | ||
135 | */ | ||
136 | uint32_t id; | ||
137 | |||
138 | /** | ||
139 | * The number of requested peers. | 124 | * The number of requested peers. |
140 | */ | 125 | */ |
141 | uint32_t num_requests; | 126 | uint32_t num_requests; |
@@ -146,6 +131,17 @@ struct GNUNET_RPS_Request_Handle | |||
146 | struct RPS_Sampler *sampler; | 131 | struct RPS_Sampler *sampler; |
147 | 132 | ||
148 | /** | 133 | /** |
134 | * @brief Request handle of the request to the sampler - needed to cancel the request | ||
135 | */ | ||
136 | struct RPS_SamplerRequestHandle *sampler_rh; | ||
137 | |||
138 | /** | ||
139 | * @brief Request handle of the request of the biased stream of peers - | ||
140 | * needed to cancel the request | ||
141 | */ | ||
142 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
143 | |||
144 | /** | ||
149 | * The callback to be called when we receive an answer. | 145 | * The callback to be called when we receive an answer. |
150 | */ | 146 | */ |
151 | GNUNET_RPS_NotifyReadyCB ready_cb; | 147 | GNUNET_RPS_NotifyReadyCB ready_cb; |
@@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, | |||
233 | 229 | ||
234 | 230 | ||
235 | /** | 231 | /** |
236 | * @brief Create new request handle | 232 | * @brief Called once the sampler has collected all requested peers. |
237 | * | ||
238 | * @param rps_handle Handle to the service | ||
239 | * @param num_requests Number of requests | ||
240 | * @param ready_cb Callback | ||
241 | * @param cls Closure | ||
242 | * | 233 | * |
243 | * @return The newly created request handle | 234 | * Calls the callback provided by the client with the corresponding cls. |
244 | */ | ||
245 | static struct GNUNET_RPS_Request_Handle * | ||
246 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
247 | uint64_t num_requests, | ||
248 | struct RPS_Sampler *sampler, | ||
249 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
250 | void *cls) | ||
251 | { | ||
252 | struct GNUNET_RPS_Request_Handle *rh; | ||
253 | |||
254 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
255 | rh->rps_handle = rps_handle; | ||
256 | rh->id = rps_handle->current_request_id++; | ||
257 | rh->num_requests = num_requests; | ||
258 | rh->sampler = sampler; | ||
259 | rh->ready_cb = ready_cb; | ||
260 | rh->ready_cb_cls = cls; | ||
261 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
262 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
263 | |||
264 | return rh; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
269 | * @brief Send a request to the service. | ||
270 | * | 235 | * |
271 | * @param h rps handle | 236 | * @param peers The array of @a num_peers that has been returned. |
272 | * @param id id of the request | 237 | * @param num_peers The number of peers that have been returned |
273 | * @param num_req_peers number of peers | 238 | * @param cls The #GNUNET_RPS_Request_Handle |
274 | */ | 239 | */ |
275 | void | 240 | void |
276 | send_request (const struct GNUNET_RPS_Handle *h, | 241 | peers_ready_cb (const struct GNUNET_PeerIdentity *peers, |
277 | uint32_t id, | 242 | uint32_t num_peers, |
278 | uint32_t num_req_peers) | 243 | void *cls) |
279 | { | 244 | { |
280 | struct GNUNET_MQ_Envelope *ev; | 245 | struct GNUNET_RPS_Request_Handle *rh = cls; |
281 | struct GNUNET_RPS_CS_RequestMessage *msg; | ||
282 | 246 | ||
283 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); | 247 | rh->ready_cb (rh->ready_cb_cls, |
284 | msg->num_peers = htonl (num_req_peers); | 248 | num_peers, |
285 | msg->id = htonl (id); | 249 | peers); |
286 | GNUNET_MQ_send (h->mq, ev); | 250 | // TODO cleanup, sampler, rh, cancel stuff |
251 | // TODO screw this function. We can give the cb,cls directly to the sampler. | ||
287 | } | 252 | } |
288 | 253 | ||
289 | /** | ||
290 | * @brief Iterator function over pending requests | ||
291 | * | ||
292 | * Implements #GNUNET_CONTAINER_HashMapIterator32 | ||
293 | * | ||
294 | * @param cls rps handle | ||
295 | * @param key id of the request | ||
296 | * @param value request handle | ||
297 | * | ||
298 | * @return GNUNET_YES to continue iteration | ||
299 | */ | ||
300 | int | ||
301 | resend_requests_iterator (void *cls, uint32_t key, void *value) | ||
302 | { | ||
303 | const struct GNUNET_RPS_Handle *h = cls; | ||
304 | const struct GNUNET_RPS_Request_Handle *req_handle = value; | ||
305 | (void) key; | ||
306 | |||
307 | send_request (h, req_handle->id, req_handle->num_requests); | ||
308 | return GNUNET_YES; /* continue iterating */ | ||
309 | } | ||
310 | 254 | ||
311 | /** | 255 | /** |
312 | * @brief Resend all pending requests | 256 | * @brief Callback to collect the peers from the biased stream and put those |
313 | * | 257 | * into the sampler. |
314 | * This is used to resend all pending requests after the client | ||
315 | * reconnected to the service, because the service cancels all | ||
316 | * pending requests after reconnection. | ||
317 | * | 258 | * |
318 | * @param h rps handle | 259 | * @param cls The #GNUNET_RPS_Request_Handle |
260 | * @param num_peers The number of peer that have been returned | ||
261 | * @param peers The array of @a num_peers that have been returned | ||
319 | */ | 262 | */ |
320 | void | 263 | void |
321 | resend_requests (struct GNUNET_RPS_Handle *h) | 264 | collect_peers_cb (void *cls, |
322 | { | 265 | uint64_t num_peers, |
323 | GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, | 266 | const struct GNUNET_PeerIdentity *peers) |
324 | resend_requests_iterator, | ||
325 | h); | ||
326 | } | ||
327 | |||
328 | |||
329 | /** | ||
330 | * This function is called, when the service replies to our request. | ||
331 | * It verifies that @a msg is well-formed. | ||
332 | * | ||
333 | * @param cls the closure | ||
334 | * @param msg the message | ||
335 | * @return #GNUNET_OK if @a msg is well-formed | ||
336 | */ | ||
337 | static int | ||
338 | check_reply (void *cls, | ||
339 | const struct GNUNET_RPS_CS_ReplyMessage *msg) | ||
340 | { | 267 | { |
341 | uint16_t msize = ntohs (msg->header.size); | 268 | struct GNUNET_RPS_Request_Handle *rh = cls; |
342 | uint32_t num_peers = ntohl (msg->num_peers); | ||
343 | (void) cls; | ||
344 | 269 | ||
345 | msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); | 270 | for (uint64_t i = 0; i < num_peers; i++) |
346 | if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || | ||
347 | (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) | ||
348 | { | 271 | { |
349 | GNUNET_break (0); | 272 | RPS_sampler_update (rh->sampler, &peers[i]); |
350 | return GNUNET_SYSERR; | ||
351 | } | 273 | } |
352 | return GNUNET_OK; | ||
353 | } | 274 | } |
354 | 275 | ||
355 | 276 | ||
356 | /** | 277 | /** |
357 | * This function is called, when the service replies to our request. | 278 | * @brief Create new request handle |
358 | * It calls the callback the caller gave us with the provided closure | ||
359 | * and disconnects afterwards. | ||
360 | * | 279 | * |
361 | * @param cls the closure | 280 | * @param rps_handle Handle to the service |
362 | * @param msg the message | 281 | * @param num_requests Number of requests |
282 | * @param ready_cb Callback | ||
283 | * @param cls Closure | ||
284 | * | ||
285 | * @return The newly created request handle | ||
363 | */ | 286 | */ |
364 | static void | 287 | static struct GNUNET_RPS_Request_Handle * |
365 | handle_reply (void *cls, | 288 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, |
366 | const struct GNUNET_RPS_CS_ReplyMessage *msg) | 289 | uint64_t num_requests, |
290 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
291 | void *cls) | ||
367 | { | 292 | { |
368 | struct GNUNET_RPS_Handle *h = cls; | ||
369 | struct GNUNET_PeerIdentity *peers; | ||
370 | struct GNUNET_RPS_Request_Handle *rh; | 293 | struct GNUNET_RPS_Request_Handle *rh; |
371 | uint32_t id; | ||
372 | 294 | ||
373 | /* Give the peers back */ | 295 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); |
374 | id = ntohl (msg->id); | 296 | rh->rps_handle = rps_handle; |
375 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 297 | rh->num_requests = num_requests; |
376 | "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", | 298 | rh->sampler = RPS_sampler_mod_init (num_requests, |
377 | ntohl (msg->num_peers), | 299 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff |
378 | id); | 300 | rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, |
301 | num_requests, | ||
302 | peers_ready_cb, | ||
303 | rh); | ||
304 | rh->srh = GNUNET_RPS_stream_request (rps_handle, | ||
305 | 0, /* infinite updates */ | ||
306 | collect_peers_cb, | ||
307 | rh); /* cls */ | ||
308 | rh->ready_cb = ready_cb; | ||
309 | rh->ready_cb_cls = cls; | ||
379 | 310 | ||
380 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 311 | return rh; |
381 | GNUNET_assert (GNUNET_YES == | ||
382 | GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id)); | ||
383 | rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id); | ||
384 | GNUNET_assert (NULL != rh); | ||
385 | GNUNET_assert (rh->num_requests == ntohl (msg->num_peers)); | ||
386 | GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id); | ||
387 | rh->ready_cb (rh->ready_cb_cls, | ||
388 | ntohl (msg->num_peers), | ||
389 | peers); | ||
390 | } | 312 | } |
391 | 313 | ||
392 | 314 | ||
@@ -683,7 +605,6 @@ mq_error_handler (void *cls, | |||
683 | reconnect (h); | 605 | reconnect (h); |
684 | /* Resend all pending request as the service destroyed its knowledge | 606 | /* Resend all pending request as the service destroyed its knowledge |
685 | * about them */ | 607 | * about them */ |
686 | resend_requests (h); | ||
687 | } | 608 | } |
688 | 609 | ||
689 | 610 | ||
@@ -694,10 +615,6 @@ static void | |||
694 | reconnect (struct GNUNET_RPS_Handle *h) | 615 | reconnect (struct GNUNET_RPS_Handle *h) |
695 | { | 616 | { |
696 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 617 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
697 | GNUNET_MQ_hd_var_size (reply, | ||
698 | GNUNET_MESSAGE_TYPE_RPS_CS_REPLY, | ||
699 | struct GNUNET_RPS_CS_ReplyMessage, | ||
700 | h), | ||
701 | GNUNET_MQ_hd_var_size (view_update, | 618 | GNUNET_MQ_hd_var_size (view_update, |
702 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, | 619 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, |
703 | struct GNUNET_RPS_CS_DEBUG_ViewReply, | 620 | struct GNUNET_RPS_CS_DEBUG_ViewReply, |
@@ -731,7 +648,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
731 | struct GNUNET_RPS_Handle *h; | 648 | struct GNUNET_RPS_Handle *h; |
732 | 649 | ||
733 | h = GNUNET_new (struct GNUNET_RPS_Handle); | 650 | h = GNUNET_new (struct GNUNET_RPS_Handle); |
734 | h->current_request_id = 0; | ||
735 | h->cfg = cfg; | 651 | h->cfg = cfg; |
736 | reconnect (h); | 652 | reconnect (h); |
737 | if (NULL == h->mq) | 653 | if (NULL == h->mq) |
@@ -739,7 +655,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
739 | GNUNET_free (h); | 655 | GNUNET_free (h); |
740 | return NULL; | 656 | return NULL; |
741 | } | 657 | } |
742 | h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4); | ||
743 | return h; | 658 | return h; |
744 | } | 659 | } |
745 | 660 | ||
@@ -754,84 +669,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
754 | * @return a handle to cancel this request | 669 | * @return a handle to cancel this request |
755 | */ | 670 | */ |
756 | struct GNUNET_RPS_Request_Handle * | 671 | struct GNUNET_RPS_Request_Handle * |
757 | GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle, | ||
758 | uint32_t num_req_peers, | ||
759 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
760 | void *cls) | ||
761 | { | ||
762 | struct GNUNET_RPS_Request_Handle *rh; | ||
763 | |||
764 | rh = new_request_handle (rps_handle, | ||
765 | num_req_peers, | ||
766 | NULL, /* no sampler needed */ | ||
767 | ready_cb, | ||
768 | cls); | ||
769 | |||
770 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
771 | "Requesting %" PRIu32 " peers with id %" PRIu32 "\n", | ||
772 | num_req_peers, | ||
773 | rh->id); | ||
774 | |||
775 | send_request (rps_handle, rh->id, num_req_peers); | ||
776 | return rh; | ||
777 | } | ||
778 | |||
779 | |||
780 | /** | ||
781 | * @brief Callback to collect the peers from the biased stream and put those | ||
782 | * into the sampler. | ||
783 | * | ||
784 | * @param cls The #GNUNET_RPS_Request_Handle | ||
785 | * @param num_peers The number of peer that have been returned | ||
786 | * @param peers The array of @a num_peers that have been returned | ||
787 | */ | ||
788 | void | ||
789 | collect_peers_cb (void *cls, | ||
790 | uint64_t num_peers, | ||
791 | const struct GNUNET_PeerIdentity *peers) | ||
792 | { | ||
793 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
794 | |||
795 | for (uint64_t i = 0; i < num_peers; i++) | ||
796 | { | ||
797 | RPS_sampler_update (rh->sampler, &peers[i]); | ||
798 | } | ||
799 | } | ||
800 | |||
801 | |||
802 | /** | ||
803 | * @brief Called once the sampler has collected all requested peers. | ||
804 | * | ||
805 | * Calls the callback provided by the client with the corresponding cls. | ||
806 | * | ||
807 | * @param peers The array of @a num_peers that has been returned. | ||
808 | * @param num_peers The number of peers that have been returned | ||
809 | * @param cls The #GNUNET_RPS_Request_Handle | ||
810 | */ | ||
811 | void | ||
812 | peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | ||
813 | uint32_t num_peers, | ||
814 | void *cls) | ||
815 | { | ||
816 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
817 | |||
818 | rh->ready_cb (rh->ready_cb_cls, | ||
819 | num_peers, | ||
820 | peers); | ||
821 | // TODO cleanup, sampler, rh, cancel stuff | ||
822 | // TODO screw this function. We can give the cb,cls directly to the sampler. | ||
823 | } | ||
824 | |||
825 | /** | ||
826 | * Request n random peers. | ||
827 | * | ||
828 | * @param rps_handle handle to the rps service | ||
829 | * @param num_req_peers number of peers we want to receive | ||
830 | * @param ready_cb the callback called when the peers are available | ||
831 | * @param cls closure given to the callback | ||
832 | * @return a handle to cancel this request | ||
833 | */ | ||
834 | struct GNUNET_RPS_Request_Handle * | ||
835 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | 672 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, |
836 | uint32_t num_req_peers, | 673 | uint32_t num_req_peers, |
837 | GNUNET_RPS_NotifyReadyCB ready_cb, | 674 | GNUNET_RPS_NotifyReadyCB ready_cb, |
@@ -841,19 +678,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
841 | 678 | ||
842 | rh = new_request_handle (rps_handle, | 679 | rh = new_request_handle (rps_handle, |
843 | num_req_peers, | 680 | num_req_peers, |
844 | RPS_sampler_mod_init (num_req_peers, | ||
845 | GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff | ||
846 | ready_cb, | 681 | ready_cb, |
847 | cls); | 682 | cls); |
848 | RPS_sampler_get_n_rand_peers (rh->sampler, | ||
849 | num_req_peers, | ||
850 | peers_ready_cb, | ||
851 | rh); | ||
852 | 683 | ||
853 | GNUNET_RPS_stream_request (rps_handle, | ||
854 | 0, /* infinite updates */ | ||
855 | collect_peers_cb, | ||
856 | rh); /* cls */ | ||
857 | 684 | ||
858 | return rh; | 685 | return rh; |
859 | } | 686 | } |
@@ -1022,20 +849,21 @@ void | |||
1022 | GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | 849 | GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) |
1023 | { | 850 | { |
1024 | struct GNUNET_RPS_Handle *h; | 851 | struct GNUNET_RPS_Handle *h; |
1025 | struct GNUNET_MQ_Envelope *ev; | ||
1026 | struct GNUNET_RPS_CS_RequestCancelMessage*msg; | ||
1027 | |||
1028 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1029 | "Cancelling request with id %" PRIu32 "\n", | ||
1030 | rh->id); | ||
1031 | 852 | ||
1032 | h = rh->rps_handle; | 853 | h = rh->rps_handle; |
1033 | GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, | 854 | if (NULL != rh->srh) |
1034 | rh->id)); | 855 | { |
1035 | GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); | 856 | remove_stream_request (rh->srh, |
1036 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); | 857 | h->stream_requests_head, |
1037 | msg->id = htonl (rh->id); | 858 | h->stream_requests_tail); |
1038 | GNUNET_MQ_send (rh->rps_handle->mq, ev); | 859 | } |
860 | if (NULL == h->stream_requests_head) cancel_stream(h); | ||
861 | if (NULL != rh->sampler_rh) | ||
862 | { | ||
863 | RPS_sampler_request_cancel (rh->sampler_rh); | ||
864 | } | ||
865 | RPS_sampler_destroy (rh->sampler); | ||
866 | GNUNET_free (rh); | ||
1039 | } | 867 | } |
1040 | 868 | ||
1041 | 869 | ||
@@ -1048,10 +876,11 @@ void | |||
1048 | GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | 876 | GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) |
1049 | { | 877 | { |
1050 | GNUNET_MQ_destroy (h->mq); | 878 | GNUNET_MQ_destroy (h->mq); |
1051 | if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) | 879 | if (NULL != h->stream_requests_head) |
880 | { | ||
1052 | LOG (GNUNET_ERROR_TYPE_WARNING, | 881 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1053 | "Still waiting for requests\n"); | 882 | "Still waiting for requests\n"); |
1054 | GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); | 883 | } |
1055 | GNUNET_free (h); | 884 | GNUNET_free (h); |
1056 | } | 885 | } |
1057 | 886 | ||