aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c331
1 files changed, 80 insertions, 251 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index e4f4db506..ee65c2a82 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -84,16 +84,6 @@ struct GNUNET_RPS_Handle
84 struct GNUNET_MQ_Handle *mq; 84 struct GNUNET_MQ_Handle *mq;
85 85
86 /** 86 /**
87 * Array of Request_Handles.
88 */
89 struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers;
90
91 /**
92 * The id of the last request.
93 */
94 uint32_t current_request_id;
95
96 /**
97 * @brief Callback called on each update of the view 87 * @brief Callback called on each update of the view
98 */ 88 */
99 GNUNET_RPS_NotifyReadyCB view_update_cb; 89 GNUNET_RPS_NotifyReadyCB view_update_cb;
@@ -131,11 +121,6 @@ struct GNUNET_RPS_Request_Handle
131 struct GNUNET_RPS_Handle *rps_handle; 121 struct GNUNET_RPS_Handle *rps_handle;
132 122
133 /** 123 /**
134 * The id of the request.
135 */
136 uint32_t id;
137
138 /**
139 * The number of requested peers. 124 * The number of requested peers.
140 */ 125 */
141 uint32_t num_requests; 126 uint32_t num_requests;
@@ -146,6 +131,17 @@ struct GNUNET_RPS_Request_Handle
146 struct RPS_Sampler *sampler; 131 struct RPS_Sampler *sampler;
147 132
148 /** 133 /**
134 * @brief Request handle of the request to the sampler - needed to cancel the request
135 */
136 struct RPS_SamplerRequestHandle *sampler_rh;
137
138 /**
139 * @brief Request handle of the request of the biased stream of peers -
140 * needed to cancel the request
141 */
142 struct GNUNET_RPS_StreamRequestHandle *srh;
143
144 /**
149 * The callback to be called when we receive an answer. 145 * The callback to be called when we receive an answer.
150 */ 146 */
151 GNUNET_RPS_NotifyReadyCB ready_cb; 147 GNUNET_RPS_NotifyReadyCB ready_cb;
@@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
233 229
234 230
235/** 231/**
236 * @brief Create new request handle 232 * @brief Called once the sampler has collected all requested peers.
237 *
238 * @param rps_handle Handle to the service
239 * @param num_requests Number of requests
240 * @param ready_cb Callback
241 * @param cls Closure
242 * 233 *
243 * @return The newly created request handle 234 * Calls the callback provided by the client with the corresponding cls.
244 */
245static struct GNUNET_RPS_Request_Handle *
246new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
247 uint64_t num_requests,
248 struct RPS_Sampler *sampler,
249 GNUNET_RPS_NotifyReadyCB ready_cb,
250 void *cls)
251{
252 struct GNUNET_RPS_Request_Handle *rh;
253
254 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
255 rh->rps_handle = rps_handle;
256 rh->id = rps_handle->current_request_id++;
257 rh->num_requests = num_requests;
258 rh->sampler = sampler;
259 rh->ready_cb = ready_cb;
260 rh->ready_cb_cls = cls;
261 GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
262 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
263
264 return rh;
265}
266
267
268/**
269 * @brief Send a request to the service.
270 * 235 *
271 * @param h rps handle 236 * @param peers The array of @a num_peers that has been returned.
272 * @param id id of the request 237 * @param num_peers The number of peers that have been returned
273 * @param num_req_peers number of peers 238 * @param cls The #GNUNET_RPS_Request_Handle
274 */ 239 */
275void 240void
276send_request (const struct GNUNET_RPS_Handle *h, 241peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
277 uint32_t id, 242 uint32_t num_peers,
278 uint32_t num_req_peers) 243 void *cls)
279{ 244{
280 struct GNUNET_MQ_Envelope *ev; 245 struct GNUNET_RPS_Request_Handle *rh = cls;
281 struct GNUNET_RPS_CS_RequestMessage *msg;
282 246
283 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); 247 rh->ready_cb (rh->ready_cb_cls,
284 msg->num_peers = htonl (num_req_peers); 248 num_peers,
285 msg->id = htonl (id); 249 peers);
286 GNUNET_MQ_send (h->mq, ev); 250 // TODO cleanup, sampler, rh, cancel stuff
251 // TODO screw this function. We can give the cb,cls directly to the sampler.
287} 252}
288 253
289/**
290 * @brief Iterator function over pending requests
291 *
292 * Implements #GNUNET_CONTAINER_HashMapIterator32
293 *
294 * @param cls rps handle
295 * @param key id of the request
296 * @param value request handle
297 *
298 * @return GNUNET_YES to continue iteration
299 */
300int
301resend_requests_iterator (void *cls, uint32_t key, void *value)
302{
303 const struct GNUNET_RPS_Handle *h = cls;
304 const struct GNUNET_RPS_Request_Handle *req_handle = value;
305 (void) key;
306
307 send_request (h, req_handle->id, req_handle->num_requests);
308 return GNUNET_YES; /* continue iterating */
309}
310 254
311/** 255/**
312 * @brief Resend all pending requests 256 * @brief Callback to collect the peers from the biased stream and put those
313 * 257 * into the sampler.
314 * This is used to resend all pending requests after the client
315 * reconnected to the service, because the service cancels all
316 * pending requests after reconnection.
317 * 258 *
318 * @param h rps handle 259 * @param cls The #GNUNET_RPS_Request_Handle
260 * @param num_peers The number of peer that have been returned
261 * @param peers The array of @a num_peers that have been returned
319 */ 262 */
320void 263void
321resend_requests (struct GNUNET_RPS_Handle *h) 264collect_peers_cb (void *cls,
322{ 265 uint64_t num_peers,
323 GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, 266 const struct GNUNET_PeerIdentity *peers)
324 resend_requests_iterator,
325 h);
326}
327
328
329/**
330 * This function is called, when the service replies to our request.
331 * It verifies that @a msg is well-formed.
332 *
333 * @param cls the closure
334 * @param msg the message
335 * @return #GNUNET_OK if @a msg is well-formed
336 */
337static int
338check_reply (void *cls,
339 const struct GNUNET_RPS_CS_ReplyMessage *msg)
340{ 267{
341 uint16_t msize = ntohs (msg->header.size); 268 struct GNUNET_RPS_Request_Handle *rh = cls;
342 uint32_t num_peers = ntohl (msg->num_peers);
343 (void) cls;
344 269
345 msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); 270 for (uint64_t i = 0; i < num_peers; i++)
346 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
347 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
348 { 271 {
349 GNUNET_break (0); 272 RPS_sampler_update (rh->sampler, &peers[i]);
350 return GNUNET_SYSERR;
351 } 273 }
352 return GNUNET_OK;
353} 274}
354 275
355 276
356/** 277/**
357 * This function is called, when the service replies to our request. 278 * @brief Create new request handle
358 * It calls the callback the caller gave us with the provided closure
359 * and disconnects afterwards.
360 * 279 *
361 * @param cls the closure 280 * @param rps_handle Handle to the service
362 * @param msg the message 281 * @param num_requests Number of requests
282 * @param ready_cb Callback
283 * @param cls Closure
284 *
285 * @return The newly created request handle
363 */ 286 */
364static void 287static struct GNUNET_RPS_Request_Handle *
365handle_reply (void *cls, 288new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
366 const struct GNUNET_RPS_CS_ReplyMessage *msg) 289 uint64_t num_requests,
290 GNUNET_RPS_NotifyReadyCB ready_cb,
291 void *cls)
367{ 292{
368 struct GNUNET_RPS_Handle *h = cls;
369 struct GNUNET_PeerIdentity *peers;
370 struct GNUNET_RPS_Request_Handle *rh; 293 struct GNUNET_RPS_Request_Handle *rh;
371 uint32_t id;
372 294
373 /* Give the peers back */ 295 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
374 id = ntohl (msg->id); 296 rh->rps_handle = rps_handle;
375 LOG (GNUNET_ERROR_TYPE_DEBUG, 297 rh->num_requests = num_requests;
376 "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", 298 rh->sampler = RPS_sampler_mod_init (num_requests,
377 ntohl (msg->num_peers), 299 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
378 id); 300 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
301 num_requests,
302 peers_ready_cb,
303 rh);
304 rh->srh = GNUNET_RPS_stream_request (rps_handle,
305 0, /* infinite updates */
306 collect_peers_cb,
307 rh); /* cls */
308 rh->ready_cb = ready_cb;
309 rh->ready_cb_cls = cls;
379 310
380 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 311 return rh;
381 GNUNET_assert (GNUNET_YES ==
382 GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id));
383 rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id);
384 GNUNET_assert (NULL != rh);
385 GNUNET_assert (rh->num_requests == ntohl (msg->num_peers));
386 GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id);
387 rh->ready_cb (rh->ready_cb_cls,
388 ntohl (msg->num_peers),
389 peers);
390} 312}
391 313
392 314
@@ -683,7 +605,6 @@ mq_error_handler (void *cls,
683 reconnect (h); 605 reconnect (h);
684 /* Resend all pending request as the service destroyed its knowledge 606 /* Resend all pending request as the service destroyed its knowledge
685 * about them */ 607 * about them */
686 resend_requests (h);
687} 608}
688 609
689 610
@@ -694,10 +615,6 @@ static void
694reconnect (struct GNUNET_RPS_Handle *h) 615reconnect (struct GNUNET_RPS_Handle *h)
695{ 616{
696 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 617 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
697 GNUNET_MQ_hd_var_size (reply,
698 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY,
699 struct GNUNET_RPS_CS_ReplyMessage,
700 h),
701 GNUNET_MQ_hd_var_size (view_update, 618 GNUNET_MQ_hd_var_size (view_update,
702 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 619 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
703 struct GNUNET_RPS_CS_DEBUG_ViewReply, 620 struct GNUNET_RPS_CS_DEBUG_ViewReply,
@@ -731,7 +648,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
731 struct GNUNET_RPS_Handle *h; 648 struct GNUNET_RPS_Handle *h;
732 649
733 h = GNUNET_new (struct GNUNET_RPS_Handle); 650 h = GNUNET_new (struct GNUNET_RPS_Handle);
734 h->current_request_id = 0;
735 h->cfg = cfg; 651 h->cfg = cfg;
736 reconnect (h); 652 reconnect (h);
737 if (NULL == h->mq) 653 if (NULL == h->mq)
@@ -739,7 +655,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
739 GNUNET_free (h); 655 GNUNET_free (h);
740 return NULL; 656 return NULL;
741 } 657 }
742 h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4);
743 return h; 658 return h;
744} 659}
745 660
@@ -754,84 +669,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
754 * @return a handle to cancel this request 669 * @return a handle to cancel this request
755 */ 670 */
756struct GNUNET_RPS_Request_Handle * 671struct GNUNET_RPS_Request_Handle *
757GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle,
758 uint32_t num_req_peers,
759 GNUNET_RPS_NotifyReadyCB ready_cb,
760 void *cls)
761{
762 struct GNUNET_RPS_Request_Handle *rh;
763
764 rh = new_request_handle (rps_handle,
765 num_req_peers,
766 NULL, /* no sampler needed */
767 ready_cb,
768 cls);
769
770 LOG (GNUNET_ERROR_TYPE_DEBUG,
771 "Requesting %" PRIu32 " peers with id %" PRIu32 "\n",
772 num_req_peers,
773 rh->id);
774
775 send_request (rps_handle, rh->id, num_req_peers);
776 return rh;
777}
778
779
780/**
781 * @brief Callback to collect the peers from the biased stream and put those
782 * into the sampler.
783 *
784 * @param cls The #GNUNET_RPS_Request_Handle
785 * @param num_peers The number of peer that have been returned
786 * @param peers The array of @a num_peers that have been returned
787 */
788void
789collect_peers_cb (void *cls,
790 uint64_t num_peers,
791 const struct GNUNET_PeerIdentity *peers)
792{
793 struct GNUNET_RPS_Request_Handle *rh = cls;
794
795 for (uint64_t i = 0; i < num_peers; i++)
796 {
797 RPS_sampler_update (rh->sampler, &peers[i]);
798 }
799}
800
801
802/**
803 * @brief Called once the sampler has collected all requested peers.
804 *
805 * Calls the callback provided by the client with the corresponding cls.
806 *
807 * @param peers The array of @a num_peers that has been returned.
808 * @param num_peers The number of peers that have been returned
809 * @param cls The #GNUNET_RPS_Request_Handle
810 */
811void
812peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
813 uint32_t num_peers,
814 void *cls)
815{
816 struct GNUNET_RPS_Request_Handle *rh = cls;
817
818 rh->ready_cb (rh->ready_cb_cls,
819 num_peers,
820 peers);
821 // TODO cleanup, sampler, rh, cancel stuff
822 // TODO screw this function. We can give the cb,cls directly to the sampler.
823}
824
825/**
826 * Request n random peers.
827 *
828 * @param rps_handle handle to the rps service
829 * @param num_req_peers number of peers we want to receive
830 * @param ready_cb the callback called when the peers are available
831 * @param cls closure given to the callback
832 * @return a handle to cancel this request
833 */
834struct GNUNET_RPS_Request_Handle *
835GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, 672GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
836 uint32_t num_req_peers, 673 uint32_t num_req_peers,
837 GNUNET_RPS_NotifyReadyCB ready_cb, 674 GNUNET_RPS_NotifyReadyCB ready_cb,
@@ -841,19 +678,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
841 678
842 rh = new_request_handle (rps_handle, 679 rh = new_request_handle (rps_handle,
843 num_req_peers, 680 num_req_peers,
844 RPS_sampler_mod_init (num_req_peers,
845 GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff
846 ready_cb, 681 ready_cb,
847 cls); 682 cls);
848 RPS_sampler_get_n_rand_peers (rh->sampler,
849 num_req_peers,
850 peers_ready_cb,
851 rh);
852 683
853 GNUNET_RPS_stream_request (rps_handle,
854 0, /* infinite updates */
855 collect_peers_cb,
856 rh); /* cls */
857 684
858 return rh; 685 return rh;
859} 686}
@@ -1022,20 +849,21 @@ void
1022GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) 849GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1023{ 850{
1024 struct GNUNET_RPS_Handle *h; 851 struct GNUNET_RPS_Handle *h;
1025 struct GNUNET_MQ_Envelope *ev;
1026 struct GNUNET_RPS_CS_RequestCancelMessage*msg;
1027
1028 LOG (GNUNET_ERROR_TYPE_DEBUG,
1029 "Cancelling request with id %" PRIu32 "\n",
1030 rh->id);
1031 852
1032 h = rh->rps_handle; 853 h = rh->rps_handle;
1033 GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, 854 if (NULL != rh->srh)
1034 rh->id)); 855 {
1035 GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); 856 remove_stream_request (rh->srh,
1036 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); 857 h->stream_requests_head,
1037 msg->id = htonl (rh->id); 858 h->stream_requests_tail);
1038 GNUNET_MQ_send (rh->rps_handle->mq, ev); 859 }
860 if (NULL == h->stream_requests_head) cancel_stream(h);
861 if (NULL != rh->sampler_rh)
862 {
863 RPS_sampler_request_cancel (rh->sampler_rh);
864 }
865 RPS_sampler_destroy (rh->sampler);
866 GNUNET_free (rh);
1039} 867}
1040 868
1041 869
@@ -1048,10 +876,11 @@ void
1048GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) 876GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1049{ 877{
1050 GNUNET_MQ_destroy (h->mq); 878 GNUNET_MQ_destroy (h->mq);
1051 if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) 879 if (NULL != h->stream_requests_head)
880 {
1052 LOG (GNUNET_ERROR_TYPE_WARNING, 881 LOG (GNUNET_ERROR_TYPE_WARNING,
1053 "Still waiting for requests\n"); 882 "Still waiting for requests\n");
1054 GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); 883 }
1055 GNUNET_free (h); 884 GNUNET_free (h);
1056} 885}
1057 886