diff options
-rw-r--r-- | po/POTFILES.in | 1 | ||||
-rw-r--r-- | src/include/gnunet_rps_service.h | 2 | ||||
-rw-r--r-- | src/rps/Makefile.am | 3 | ||||
-rw-r--r-- | src/rps/gnunet-rps-profiler.c | 4 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps.c | 61 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 65 | ||||
-rw-r--r-- | src/rps/rps.h | 11 | ||||
-rw-r--r-- | src/rps/rps_api.c | 289 |
8 files changed, 293 insertions, 143 deletions
diff --git a/po/POTFILES.in b/po/POTFILES.in index 45471731c..44bd751e4 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in | |||
@@ -329,6 +329,7 @@ src/rps/gnunet-service-rps_sampler.c | |||
329 | src/rps/gnunet-service-rps_sampler_elem.c | 329 | src/rps/gnunet-service-rps_sampler_elem.c |
330 | src/rps/gnunet-service-rps_view.c | 330 | src/rps/gnunet-service-rps_view.c |
331 | src/rps/rps_api.c | 331 | src/rps/rps_api.c |
332 | src/rps/rps_test_lib.c | ||
332 | src/rps/rps-test_util.c | 333 | src/rps/rps-test_util.c |
333 | src/scalarproduct/gnunet-scalarproduct.c | 334 | src/scalarproduct/gnunet-scalarproduct.c |
334 | src/scalarproduct/gnunet-service-scalarproduct_alice.c | 335 | src/scalarproduct/gnunet-service-scalarproduct_alice.c |
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index 22e944d0f..f77c3dbc4 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h | |||
@@ -162,7 +162,7 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | |||
162 | * @param cls a closure that will be given to the callback | 162 | * @param cls a closure that will be given to the callback |
163 | * @param ready_cb the callback called when the peers are available | 163 | * @param ready_cb the callback called when the peers are available |
164 | */ | 164 | */ |
165 | void | 165 | struct GNUNET_RPS_StreamRequestHandle * |
166 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 166 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
167 | uint32_t num_updates, | 167 | uint32_t num_updates, |
168 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | 168 | GNUNET_RPS_NotifyReadyCB stream_input_cb, |
diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index 2ed93ef7c..5e9fd09fa 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am | |||
@@ -90,7 +90,8 @@ endif | |||
90 | rps_test_src = \ | 90 | rps_test_src = \ |
91 | test_rps.c \ | 91 | test_rps.c \ |
92 | rps-test_util.h rps-test_util.c \ | 92 | rps-test_util.h rps-test_util.c \ |
93 | gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c | 93 | gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ |
94 | gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c | ||
94 | 95 | ||
95 | ld_rps_test_lib = \ | 96 | ld_rps_test_lib = \ |
96 | libgnunetrps.la \ | 97 | libgnunetrps.la \ |
diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c index d2640225a..f2a8083e7 100644 --- a/src/rps/gnunet-rps-profiler.c +++ b/src/rps/gnunet-rps-profiler.c | |||
@@ -1932,8 +1932,8 @@ static uint32_t binom (uint32_t n, uint32_t k) | |||
1932 | { | 1932 | { |
1933 | //GNUNET_assert (n >= k); | 1933 | //GNUNET_assert (n >= k); |
1934 | if (k > n) return 0; | 1934 | if (k > n) return 0; |
1935 | if (0 > n) return 0; /* just for clarity - always false */ | 1935 | /* if (0 > n) return 0; - always false */ |
1936 | if (0 > k) return 0; /* just for clarity - always false */ | 1936 | /* if (0 > k) return 0; - always false */ |
1937 | if (0 == k) return 1; | 1937 | if (0 == k) return 1; |
1938 | return fac (n) | 1938 | return fac (n) |
1939 | / | 1939 | / |
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 40f576d3e..862514264 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -47,8 +47,6 @@ | |||
47 | 47 | ||
48 | // TODO connect to friends | 48 | // TODO connect to friends |
49 | 49 | ||
50 | // TODO store peers somewhere persistent | ||
51 | |||
52 | // TODO blacklist? (-> mal peer detection on top of brahms) | 50 | // TODO blacklist? (-> mal peer detection on top of brahms) |
53 | 51 | ||
54 | // hist_size_init, hist_size_max | 52 | // hist_size_init, hist_size_max |
@@ -978,7 +976,6 @@ destroy_peer (struct PeerContext *peer_ctx) | |||
978 | peer_ctx->liveliness_check_pending, | 976 | peer_ctx->liveliness_check_pending, |
979 | sizeof (struct PendingMessage))) ) | 977 | sizeof (struct PendingMessage))) ) |
980 | { | 978 | { |
981 | // TODO this may leak memory | ||
982 | peer_ctx->liveliness_check_pending = NULL; | 979 | peer_ctx->liveliness_check_pending = NULL; |
983 | GNUNET_STATISTICS_update (stats, | 980 | GNUNET_STATISTICS_update (stats, |
984 | "# pending liveliness checks", | 981 | "# pending liveliness checks", |
@@ -1620,10 +1617,6 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) | |||
1620 | * @brief Destroy the send channel of a peer e.g. stop indicating a sending | 1617 | * @brief Destroy the send channel of a peer e.g. stop indicating a sending |
1621 | * intention to another peer | 1618 | * intention to another peer |
1622 | * | 1619 | * |
1623 | * If there is also no channel to receive messages from that peer, remove it | ||
1624 | * from the peermap. | ||
1625 | * TODO really? | ||
1626 | * | ||
1627 | * @peer the peer identity of the peer whose sending channel to destroy | 1620 | * @peer the peer identity of the peer whose sending channel to destroy |
1628 | * @return #GNUNET_YES if channel was destroyed | 1621 | * @return #GNUNET_YES if channel was destroyed |
1629 | * #GNUNET_NO otherwise | 1622 | * #GNUNET_NO otherwise |
@@ -1777,10 +1770,10 @@ struct ClientContext | |||
1777 | int64_t view_updates_left; | 1770 | int64_t view_updates_left; |
1778 | 1771 | ||
1779 | /** | 1772 | /** |
1780 | * @brief How many peers from the biased | 1773 | * @brief Whether this client wants to receive stream updates. |
1781 | * stream this client expects to receive. | 1774 | * Either #GNUNET_YES or #GNUNET_NO |
1782 | */ | 1775 | */ |
1783 | int64_t stream_peers_left; | 1776 | int8_t stream_update; |
1784 | 1777 | ||
1785 | /** | 1778 | /** |
1786 | * The client handle to send the reply to | 1779 | * The client handle to send the reply to |
@@ -2232,9 +2225,9 @@ send_view (const struct ClientContext *cli_ctx, | |||
2232 | * @param view_size the size of the view array (can be 0) | 2225 | * @param view_size the size of the view array (can be 0) |
2233 | */ | 2226 | */ |
2234 | void | 2227 | void |
2235 | send_stream_peer (const struct ClientContext *cli_ctx, | 2228 | send_stream_peers (const struct ClientContext *cli_ctx, |
2236 | uint64_t num_peers, | 2229 | uint64_t num_peers, |
2237 | const struct GNUNET_PeerIdentity *peers) | 2230 | const struct GNUNET_PeerIdentity *peers) |
2238 | { | 2231 | { |
2239 | struct GNUNET_MQ_Envelope *ev; | 2232 | struct GNUNET_MQ_Envelope *ev; |
2240 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; | 2233 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; |
@@ -2275,7 +2268,7 @@ clients_notify_view_update (void) | |||
2275 | 2268 | ||
2276 | for (cli_ctx_iter = cli_ctx_head; | 2269 | for (cli_ctx_iter = cli_ctx_head; |
2277 | NULL != cli_ctx_iter; | 2270 | NULL != cli_ctx_iter; |
2278 | cli_ctx_iter = cli_ctx_head->next) | 2271 | cli_ctx_iter = cli_ctx_iter->next) |
2279 | { | 2272 | { |
2280 | if (1 < cli_ctx_iter->view_updates_left) | 2273 | if (1 < cli_ctx_iter->view_updates_left) |
2281 | { | 2274 | { |
@@ -2306,7 +2299,6 @@ clients_notify_stream_peer (uint64_t num_peers, | |||
2306 | // TODO enum StreamPeerSource) | 2299 | // TODO enum StreamPeerSource) |
2307 | { | 2300 | { |
2308 | struct ClientContext *cli_ctx_iter; | 2301 | struct ClientContext *cli_ctx_iter; |
2309 | uint64_t num_peers_send; | ||
2310 | 2302 | ||
2311 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2303 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2312 | "Got peer (%s) from biased stream - update all clients\n", | 2304 | "Got peer (%s) from biased stream - update all clients\n", |
@@ -2314,32 +2306,12 @@ clients_notify_stream_peer (uint64_t num_peers, | |||
2314 | 2306 | ||
2315 | for (cli_ctx_iter = cli_ctx_head; | 2307 | for (cli_ctx_iter = cli_ctx_head; |
2316 | NULL != cli_ctx_iter; | 2308 | NULL != cli_ctx_iter; |
2317 | cli_ctx_iter = cli_ctx_head->next) | 2309 | cli_ctx_iter = cli_ctx_iter->next) |
2318 | { | 2310 | { |
2319 | if (0 < cli_ctx_iter->stream_peers_left) | 2311 | if (GNUNET_YES == cli_ctx_iter->stream_update) |
2320 | { | ||
2321 | /* Client wants to receive limited amount of updates */ | ||
2322 | if (num_peers > cli_ctx_iter->stream_peers_left) | ||
2323 | { | ||
2324 | num_peers_send = num_peers - cli_ctx_iter->stream_peers_left; | ||
2325 | cli_ctx_iter->stream_peers_left = 0; | ||
2326 | } | ||
2327 | else | ||
2328 | { | ||
2329 | num_peers_send = cli_ctx_iter->stream_peers_left - num_peers; | ||
2330 | cli_ctx_iter->stream_peers_left -= num_peers_send; | ||
2331 | } | ||
2332 | } else if (0 > cli_ctx_iter->stream_peers_left) | ||
2333 | { | ||
2334 | /* Client is not interested in updates */ | ||
2335 | continue; | ||
2336 | } else /* _updates_left == 0 - infinite amount of updates */ | ||
2337 | { | 2312 | { |
2338 | num_peers_send = num_peers; | 2313 | send_stream_peers (cli_ctx_iter, num_peers, peers); |
2339 | } | 2314 | } |
2340 | |||
2341 | /* send view */ | ||
2342 | send_stream_peer (cli_ctx_iter, num_peers_send, peers); | ||
2343 | } | 2315 | } |
2344 | } | 2316 | } |
2345 | 2317 | ||
@@ -3076,16 +3048,13 @@ handle_client_stream_request (void *cls, | |||
3076 | const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) | 3048 | const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) |
3077 | { | 3049 | { |
3078 | struct ClientContext *cli_ctx = cls; | 3050 | struct ClientContext *cli_ctx = cls; |
3079 | uint64_t num_peers; | 3051 | (void) msg; |
3080 | |||
3081 | num_peers = ntohl (msg->num_peers); | ||
3082 | 3052 | ||
3083 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3053 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
3084 | "Client requested %" PRIu64 " peers from biased stream.\n", | 3054 | "Client requested peers from biased stream.\n"); |
3085 | num_peers); | 3055 | cli_ctx->stream_update = GNUNET_YES; |
3086 | 3056 | ||
3087 | GNUNET_assert (NULL != cli_ctx); | 3057 | GNUNET_assert (NULL != cli_ctx); |
3088 | cli_ctx->stream_peers_left = num_peers; | ||
3089 | GNUNET_SERVICE_client_continue (cli_ctx->client); | 3058 | GNUNET_SERVICE_client_continue (cli_ctx->client); |
3090 | } | 3059 | } |
3091 | 3060 | ||
@@ -3882,7 +3851,8 @@ do_round (void *cls) | |||
3882 | if (GNUNET_OK == inserted) | 3851 | if (GNUNET_OK == inserted) |
3883 | { | 3852 | { |
3884 | clients_notify_stream_peer (1, | 3853 | clients_notify_stream_peer (1, |
3885 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | 3854 | CustomPeerMap_get_peer_by_index (pull_map, |
3855 | permut[i - first_border])); | ||
3886 | } | 3856 | } |
3887 | to_file (file_name_view_log, | 3857 | to_file (file_name_view_log, |
3888 | "+%s\t(pull list)", | 3858 | "+%s\t(pull list)", |
@@ -4195,6 +4165,7 @@ client_connect_cb (void *cls, | |||
4195 | cli_ctx = GNUNET_new (struct ClientContext); | 4165 | cli_ctx = GNUNET_new (struct ClientContext); |
4196 | cli_ctx->mq = mq; | 4166 | cli_ctx->mq = mq; |
4197 | cli_ctx->view_updates_left = -1; | 4167 | cli_ctx->view_updates_left = -1; |
4168 | cli_ctx->stream_update = GNUNET_NO; | ||
4198 | cli_ctx->client = client; | 4169 | cli_ctx->client = client; |
4199 | GNUNET_CONTAINER_DLL_insert (cli_ctx_head, | 4170 | GNUNET_CONTAINER_DLL_insert (cli_ctx_head, |
4200 | cli_ctx_tail, | 4171 | cli_ctx_tail, |
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index ff4bc9e42..2cd4cb996 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c | |||
@@ -313,26 +313,9 @@ sampler_notify_on_update (struct RPS_Sampler *sampler, | |||
313 | notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); | 313 | notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); |
314 | notify_ctx->notify_cb = notify_cb; | 314 | notify_ctx->notify_cb = notify_cb; |
315 | notify_ctx->cls = cls; | 315 | notify_ctx->cls = cls; |
316 | if (NULL != sampler->notify_ctx_head) | ||
317 | { | ||
318 | for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; | ||
319 | NULL != notify_iter->next; | ||
320 | notify_iter = notify_iter->next) | ||
321 | { | ||
322 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
323 | "Pre: Context\n"); | ||
324 | } | ||
325 | } | ||
326 | GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, | 316 | GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, |
327 | sampler->notify_ctx_tail, | 317 | sampler->notify_ctx_tail, |
328 | notify_ctx); | 318 | notify_ctx); |
329 | for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; | ||
330 | NULL != notify_iter; | ||
331 | notify_iter = notify_iter->next) | ||
332 | { | ||
333 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
334 | "Post: Context\n"); | ||
335 | } | ||
336 | return notify_ctx; | 319 | return notify_ctx; |
337 | } | 320 | } |
338 | 321 | ||
@@ -559,27 +542,21 @@ RPS_sampler_mod_init (size_t init_size, | |||
559 | 542 | ||
560 | 543 | ||
561 | /** | 544 | /** |
562 | * Update every sampler element of this sampler with given peer | 545 | * @brief Notify about update of the sampler. |
563 | * | 546 | * |
564 | * @param sampler the sampler to update. | 547 | * Call the callbacks that are waiting for notification on updates to the |
565 | * @param id the PeerID that is put in the sampler | 548 | * sampler. |
549 | * | ||
550 | * @param sampler The sampler the updates are waiting for | ||
566 | */ | 551 | */ |
567 | void | 552 | static void |
568 | RPS_sampler_update (struct RPS_Sampler *sampler, | 553 | notify_update (struct RPS_Sampler *sampler) |
569 | const struct GNUNET_PeerIdentity *id) | ||
570 | { | 554 | { |
571 | struct SamplerNotifyUpdateCTX *tmp_notify_head; | 555 | struct SamplerNotifyUpdateCTX *tmp_notify_head; |
572 | struct SamplerNotifyUpdateCTX *tmp_notify_tail; | 556 | struct SamplerNotifyUpdateCTX *tmp_notify_tail; |
573 | 557 | ||
574 | to_file (sampler->file_name, | 558 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
575 | "Got %s", | 559 | "Calling callbacks waiting for update notification.\n"); |
576 | GNUNET_i2s_full (id)); | ||
577 | |||
578 | for (uint32_t i = 0; i < sampler->sampler_size; i++) | ||
579 | { | ||
580 | RPS_sampler_elem_next (sampler->sampler_elements[i], | ||
581 | id); | ||
582 | } | ||
583 | tmp_notify_head = sampler->notify_ctx_head; | 560 | tmp_notify_head = sampler->notify_ctx_head; |
584 | tmp_notify_tail = sampler->notify_ctx_tail; | 561 | tmp_notify_tail = sampler->notify_ctx_tail; |
585 | sampler->notify_ctx_head = NULL; | 562 | sampler->notify_ctx_head = NULL; |
@@ -599,6 +576,29 @@ RPS_sampler_update (struct RPS_Sampler *sampler, | |||
599 | 576 | ||
600 | 577 | ||
601 | /** | 578 | /** |
579 | * Update every sampler element of this sampler with given peer | ||
580 | * | ||
581 | * @param sampler the sampler to update. | ||
582 | * @param id the PeerID that is put in the sampler | ||
583 | */ | ||
584 | void | ||
585 | RPS_sampler_update (struct RPS_Sampler *sampler, | ||
586 | const struct GNUNET_PeerIdentity *id) | ||
587 | { | ||
588 | to_file (sampler->file_name, | ||
589 | "Got %s", | ||
590 | GNUNET_i2s_full (id)); | ||
591 | |||
592 | for (uint32_t i = 0; i < sampler->sampler_size; i++) | ||
593 | { | ||
594 | RPS_sampler_elem_next (sampler->sampler_elements[i], | ||
595 | id); | ||
596 | } | ||
597 | notify_update (sampler); | ||
598 | } | ||
599 | |||
600 | |||
601 | /** | ||
602 | * Reinitialise all previously initialised sampler elements with the given value. | 602 | * Reinitialise all previously initialised sampler elements with the given value. |
603 | * | 603 | * |
604 | * Used to get rid of a PeerID. | 604 | * Used to get rid of a PeerID. |
@@ -714,6 +714,7 @@ sampler_mod_get_rand_peer (void *cls) | |||
714 | /* Check whether we may use this sampler to give it back to the client */ | 714 | /* Check whether we may use this sampler to give it back to the client */ |
715 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) | 715 | if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) |
716 | { | 716 | { |
717 | // TODO remove this condition at least for the client sampler | ||
717 | last_request_diff = | 718 | last_request_diff = |
718 | GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, | 719 | GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, |
719 | GNUNET_TIME_absolute_get ()); | 720 | GNUNET_TIME_absolute_get ()); |
diff --git a/src/rps/rps.h b/src/rps/rps.h index 26615bfc5..9e4487f88 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h | |||
@@ -226,12 +226,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamRequest | |||
226 | * Header including size and type in NBO | 226 | * Header including size and type in NBO |
227 | */ | 227 | */ |
228 | struct GNUNET_MessageHeader header; | 228 | struct GNUNET_MessageHeader header; |
229 | |||
230 | /** | ||
231 | * Number of peers | ||
232 | * 0 for sending updates until cancellation | ||
233 | */ | ||
234 | uint32_t num_peers GNUNET_PACKED; | ||
235 | }; | 229 | }; |
236 | 230 | ||
237 | /** | 231 | /** |
@@ -245,11 +239,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply | |||
245 | struct GNUNET_MessageHeader header; | 239 | struct GNUNET_MessageHeader header; |
246 | 240 | ||
247 | /** | 241 | /** |
248 | * Identifyer of the message. | ||
249 | */ | ||
250 | uint32_t id GNUNET_PACKED; | ||
251 | |||
252 | /** | ||
253 | * Number of peers | 242 | * Number of peers |
254 | */ | 243 | */ |
255 | uint64_t num_peers GNUNET_PACKED; | 244 | uint64_t num_peers GNUNET_PACKED; |
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index a558c8a35..e4f4db506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -32,6 +32,43 @@ | |||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) | 32 | #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) |
33 | 33 | ||
34 | /** | 34 | /** |
35 | * Handle for a request to get peers from biased stream of ids | ||
36 | */ | ||
37 | struct GNUNET_RPS_StreamRequestHandle | ||
38 | { | ||
39 | /** | ||
40 | * The client issuing the request. | ||
41 | */ | ||
42 | struct GNUNET_RPS_Handle *rps_handle; | ||
43 | |||
44 | /** | ||
45 | * The number of requested peers. | ||
46 | */ | ||
47 | uint32_t num_peers_left; | ||
48 | |||
49 | /** | ||
50 | * The callback to be called when we receive an answer. | ||
51 | */ | ||
52 | GNUNET_RPS_NotifyReadyCB ready_cb; | ||
53 | |||
54 | /** | ||
55 | * The closure for the callback. | ||
56 | */ | ||
57 | void *ready_cb_cls; | ||
58 | |||
59 | /** | ||
60 | * @brief Next element of the DLL | ||
61 | */ | ||
62 | struct GNUNET_RPS_StreamRequestHandle *next; | ||
63 | |||
64 | /** | ||
65 | * @brief Previous element of the DLL | ||
66 | */ | ||
67 | struct GNUNET_RPS_StreamRequestHandle *prev; | ||
68 | }; | ||
69 | |||
70 | |||
71 | /** | ||
35 | * Handler to handle requests from a client. | 72 | * Handler to handle requests from a client. |
36 | */ | 73 | */ |
37 | struct GNUNET_RPS_Handle | 74 | struct GNUNET_RPS_Handle |
@@ -67,14 +104,19 @@ struct GNUNET_RPS_Handle | |||
67 | void *view_update_cls; | 104 | void *view_update_cls; |
68 | 105 | ||
69 | /** | 106 | /** |
70 | * @brief Callback called on each peer of the biased input stream | 107 | * @brief Closure to each requested peer from the biased stream |
108 | */ | ||
109 | void *stream_input_cls; | ||
110 | |||
111 | /** | ||
112 | * @brief Head of the DLL of stream requests | ||
71 | */ | 113 | */ |
72 | GNUNET_RPS_NotifyReadyCB stream_input_cb; | 114 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_head; |
73 | 115 | ||
74 | /** | 116 | /** |
75 | * @brief Closure to each requested peer from the biased stream | 117 | * @brief Tail of the DLL of stream requests |
76 | */ | 118 | */ |
77 | void *stream_input_cls; | 119 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; |
78 | }; | 120 | }; |
79 | 121 | ||
80 | 122 | ||
@@ -139,6 +181,91 @@ struct cb_cls_pack | |||
139 | 181 | ||
140 | 182 | ||
141 | /** | 183 | /** |
184 | * @brief Create a new handle for a stream request | ||
185 | * | ||
186 | * @param rps_handle The rps handle | ||
187 | * @param num_peers The number of desired peers | ||
188 | * @param ready_cb The callback to be called, once all peers are ready | ||
189 | * @param cls The colsure to provide to the callback | ||
190 | * | ||
191 | * @return The handle to the stream request | ||
192 | */ | ||
193 | static struct GNUNET_RPS_StreamRequestHandle * | ||
194 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
195 | uint64_t num_peers, | ||
196 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
197 | void *cls) | ||
198 | { | ||
199 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
200 | |||
201 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); | ||
202 | |||
203 | srh->rps_handle = rps_handle; | ||
204 | srh->num_peers_left = num_peers; | ||
205 | srh->ready_cb = ready_cb; | ||
206 | srh->ready_cb_cls = cls; | ||
207 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, | ||
208 | rps_handle->stream_requests_tail, | ||
209 | srh); | ||
210 | |||
211 | return srh; | ||
212 | } | ||
213 | |||
214 | |||
215 | /** | ||
216 | * @brief Remove the given stream request from the list of requests and memory | ||
217 | * | ||
218 | * @param srh The request to be removed | ||
219 | * @param srh_head Head of the DLL to remove request from | ||
220 | * @param srh_tail Tail of the DLL to remove request from | ||
221 | */ | ||
222 | static void | ||
223 | remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, | ||
224 | struct GNUNET_RPS_StreamRequestHandle *srh_head, | ||
225 | struct GNUNET_RPS_StreamRequestHandle *srh_tail) | ||
226 | { | ||
227 | GNUNET_CONTAINER_DLL_remove (srh_head, | ||
228 | srh_tail, | ||
229 | srh); | ||
230 | |||
231 | GNUNET_free (srh); | ||
232 | } | ||
233 | |||
234 | |||
235 | /** | ||
236 | * @brief Create new request handle | ||
237 | * | ||
238 | * @param rps_handle Handle to the service | ||
239 | * @param num_requests Number of requests | ||
240 | * @param ready_cb Callback | ||
241 | * @param cls Closure | ||
242 | * | ||
243 | * @return The newly created request handle | ||
244 | */ | ||
245 | static struct GNUNET_RPS_Request_Handle * | ||
246 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
247 | uint64_t num_requests, | ||
248 | struct RPS_Sampler *sampler, | ||
249 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
250 | void *cls) | ||
251 | { | ||
252 | struct GNUNET_RPS_Request_Handle *rh; | ||
253 | |||
254 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
255 | rh->rps_handle = rps_handle; | ||
256 | rh->id = rps_handle->current_request_id++; | ||
257 | rh->num_requests = num_requests; | ||
258 | rh->sampler = sampler; | ||
259 | rh->ready_cb = ready_cb; | ||
260 | rh->ready_cb_cls = cls; | ||
261 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
262 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
263 | |||
264 | return rh; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
142 | * @brief Send a request to the service. | 269 | * @brief Send a request to the service. |
143 | * | 270 | * |
144 | * @param h rps handle | 271 | * @param h rps handle |
@@ -304,24 +431,27 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | |||
304 | * @param cls a closure that will be given to the callback | 431 | * @param cls a closure that will be given to the callback |
305 | * @param ready_cb the callback called when the peers are available | 432 | * @param ready_cb the callback called when the peers are available |
306 | */ | 433 | */ |
307 | void | 434 | struct GNUNET_RPS_StreamRequestHandle * |
308 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | 435 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, |
309 | uint32_t num_peers, | 436 | uint32_t num_peers, |
310 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | 437 | GNUNET_RPS_NotifyReadyCB stream_input_cb, |
311 | void *cls) | 438 | void *cls) |
312 | { | 439 | { |
440 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
313 | struct GNUNET_MQ_Envelope *ev; | 441 | struct GNUNET_MQ_Envelope *ev; |
314 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | 442 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; |
315 | 443 | ||
444 | srh = new_stream_request (rps_handle, | ||
445 | num_peers, /* num requests */ | ||
446 | stream_input_cb, | ||
447 | cls); | ||
316 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 448 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
317 | "Client requests %" PRIu32 " biased stream updates\n", | 449 | "Client requests %" PRIu32 " biased stream updates\n", |
318 | num_peers); | 450 | num_peers); |
319 | rps_handle->stream_input_cb = stream_input_cb; | ||
320 | rps_handle->stream_input_cls = cls; | ||
321 | 451 | ||
322 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | 452 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); |
323 | msg->num_peers = htonl (num_peers); | ||
324 | GNUNET_MQ_send (rps_handle->mq, ev); | 453 | GNUNET_MQ_send (rps_handle->mq, ev); |
454 | return srh; | ||
325 | } | 455 | } |
326 | 456 | ||
327 | 457 | ||
@@ -379,6 +509,41 @@ handle_view_update (void *cls, | |||
379 | 509 | ||
380 | 510 | ||
381 | /** | 511 | /** |
512 | * @brief Send message to service that this client does not want to receive | ||
513 | * further updates from the biased peer stream | ||
514 | * | ||
515 | * @param rps_handle The handle representing the service to the client | ||
516 | */ | ||
517 | static void | ||
518 | cancel_stream (struct GNUNET_RPS_Handle *rps_handle) | ||
519 | { | ||
520 | struct GNUNET_MQ_Envelope *ev; | ||
521 | |||
522 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); | ||
523 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
524 | } | ||
525 | |||
526 | |||
527 | /** | ||
528 | * @brief Cancel a specific request for updates from the biased peer stream | ||
529 | * | ||
530 | * @param srh The request handle to cancel | ||
531 | */ | ||
532 | void | ||
533 | GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) | ||
534 | { | ||
535 | struct GNUNET_RPS_Handle *rps_handle; | ||
536 | |||
537 | rps_handle = srh->rps_handle; | ||
538 | GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, | ||
539 | rps_handle->stream_requests_tail, | ||
540 | srh); | ||
541 | GNUNET_free (srh); | ||
542 | if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); | ||
543 | } | ||
544 | |||
545 | |||
546 | /** | ||
382 | * This function is called, when the service sends another peer from the biased | 547 | * This function is called, when the service sends another peer from the biased |
383 | * stream. | 548 | * stream. |
384 | * It calls the callback the caller provided | 549 | * It calls the callback the caller provided |
@@ -420,16 +585,71 @@ handle_stream_input (void *cls, | |||
420 | { | 585 | { |
421 | struct GNUNET_RPS_Handle *h = cls; | 586 | struct GNUNET_RPS_Handle *h = cls; |
422 | const struct GNUNET_PeerIdentity *peers; | 587 | const struct GNUNET_PeerIdentity *peers; |
588 | /* The following two pointers are used to prevent that new handles are | ||
589 | * inserted into the DLL, that is currently iterated over, from within a call | ||
590 | * to that handler_cb, are executed and in turn again add themselves to the | ||
591 | * iterated DLL infinitely */ | ||
592 | struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; | ||
593 | struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; | ||
594 | uint64_t num_peers; | ||
595 | uint64_t num_peers_return; | ||
423 | 596 | ||
424 | /* Give the peers back */ | 597 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; |
598 | num_peers = ntohl (msg->num_peers); | ||
425 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 599 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
426 | "New peer of %" PRIu64 " biased input stream\n", | 600 | "Received %" PRIu64 " peer(s) from stream input.\n", |
427 | ntohl (msg->num_peers)); | 601 | num_peers); |
602 | srh_head_tmp = h->stream_requests_head; | ||
603 | srh_tail_tmp = h->stream_requests_tail; | ||
604 | h->stream_requests_head = NULL; | ||
605 | h->stream_requests_tail = NULL; | ||
606 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
607 | NULL != srh_iter; | ||
608 | srh_iter = srh_iter->next) | ||
609 | { | ||
610 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
611 | "Calling srh - left: %" PRIu64 "\n", | ||
612 | srh_iter->num_peers_left); | ||
613 | if (0 == srh_iter->num_peers_left) /* infinite updates */ | ||
614 | { | ||
615 | num_peers_return = num_peers; | ||
616 | } | ||
617 | else if (num_peers > srh_iter->num_peers_left) | ||
618 | { | ||
619 | num_peers_return = num_peers - srh_iter->num_peers_left; | ||
620 | } | ||
621 | else /* num_peers <= srh_iter->num_peers_left */ | ||
622 | { | ||
623 | num_peers_return = srh_iter->num_peers_left - num_peers; | ||
624 | } | ||
625 | srh_iter->ready_cb (srh_iter->ready_cb_cls, | ||
626 | num_peers_return, | ||
627 | peers); | ||
628 | if (0 == srh_iter->num_peers_left) ; | ||
629 | else if (num_peers_return >= srh_iter->num_peers_left) | ||
630 | { | ||
631 | remove_stream_request (srh_iter, | ||
632 | srh_head_tmp, | ||
633 | srh_tail_tmp); | ||
634 | } | ||
635 | else | ||
636 | { | ||
637 | srh_iter->num_peers_left -= num_peers_return; | ||
638 | } | ||
639 | } | ||
640 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; | ||
641 | NULL != srh_iter; | ||
642 | srh_iter = srh_iter->next) | ||
643 | { | ||
644 | GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, | ||
645 | h->stream_requests_tail, | ||
646 | srh_iter); | ||
647 | } | ||
428 | 648 | ||
429 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 649 | if (NULL == h->stream_requests_head) |
430 | GNUNET_assert (NULL != h); | 650 | { |
431 | GNUNET_assert (NULL != h->stream_input_cb); | 651 | cancel_stream (h); |
432 | h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers); | 652 | } |
433 | } | 653 | } |
434 | 654 | ||
435 | 655 | ||
@@ -525,39 +745,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
525 | 745 | ||
526 | 746 | ||
527 | /** | 747 | /** |
528 | * @brief Create new request handle | ||
529 | * | ||
530 | * @param rps_handle Handle to the service | ||
531 | * @param num_requests Number of requests | ||
532 | * @param ready_cb Callback | ||
533 | * @param cls Closure | ||
534 | * | ||
535 | * @return The newly created request handle | ||
536 | */ | ||
537 | static struct GNUNET_RPS_Request_Handle * | ||
538 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
539 | uint64_t num_requests, | ||
540 | struct RPS_Sampler *sampler, | ||
541 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
542 | void *cls) | ||
543 | { | ||
544 | struct GNUNET_RPS_Request_Handle *rh; | ||
545 | |||
546 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
547 | rh->rps_handle = rps_handle; | ||
548 | rh->id = rps_handle->current_request_id++; | ||
549 | rh->num_requests = num_requests; | ||
550 | rh->sampler = sampler; | ||
551 | rh->ready_cb = ready_cb; | ||
552 | rh->ready_cb_cls = cls; | ||
553 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
554 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
555 | |||
556 | return rh; | ||
557 | } | ||
558 | |||
559 | |||
560 | /** | ||
561 | * Request n random peers. | 748 | * Request n random peers. |
562 | * | 749 | * |
563 | * @param rps_handle handle to the rps service | 750 | * @param rps_handle handle to the rps service |
@@ -646,9 +833,9 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | |||
646 | */ | 833 | */ |
647 | struct GNUNET_RPS_Request_Handle * | 834 | struct GNUNET_RPS_Request_Handle * |
648 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | 835 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, |
649 | uint32_t num_req_peers, | 836 | uint32_t num_req_peers, |
650 | GNUNET_RPS_NotifyReadyCB ready_cb, | 837 | GNUNET_RPS_NotifyReadyCB ready_cb, |
651 | void *cls) | 838 | void *cls) |
652 | { | 839 | { |
653 | struct GNUNET_RPS_Request_Handle *rh; | 840 | struct GNUNET_RPS_Request_Handle *rh; |
654 | 841 | ||