diff options
-rw-r--r-- | src/include/gnunet_protocols.h | 18 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps.c | 283 | ||||
-rw-r--r-- | src/rps/rps.h | 60 | ||||
-rw-r--r-- | src/rps/rps_api.c | 331 |
4 files changed, 82 insertions, 610 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 56e049608..7a84bda0c 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2619,31 +2619,17 @@ extern "C" | |||
2619 | 2619 | ||
2620 | /* Client-Service Messages */ | 2620 | /* Client-Service Messages */ |
2621 | 2621 | ||
2622 | /** | ||
2623 | * RPS CS REQUEST Message for the Client to request (a) random peer(s) | ||
2624 | */ | ||
2625 | #define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 954 | ||
2626 | |||
2627 | /** | ||
2628 | * RPS CS REPLY Message for the Server to send (a) random peer(s) | ||
2629 | */ | ||
2630 | #define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 955 | ||
2631 | |||
2632 | /** | ||
2633 | * RPS CS REQUEST CANCEL Message for the Client to cancel a request | ||
2634 | */ | ||
2635 | #define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 956 | ||
2636 | 2622 | ||
2637 | /** | 2623 | /** |
2638 | * RPS CS SEED Message for the Client to seed peers into rps | 2624 | * RPS CS SEED Message for the Client to seed peers into rps |
2639 | */ | 2625 | */ |
2640 | #define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 957 | 2626 | #define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 954 |
2641 | 2627 | ||
2642 | #ifdef ENABLE_MALICIOUS | 2628 | #ifdef ENABLE_MALICIOUS |
2643 | /** | 2629 | /** |
2644 | * Turn RPS service malicious | 2630 | * Turn RPS service malicious |
2645 | */ | 2631 | */ |
2646 | #define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 958 | 2632 | #define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 955 |
2647 | 2633 | ||
2648 | #endif /* ENABLE_MALICIOUS */ | 2634 | #endif /* ENABLE_MALICIOUS */ |
2649 | 2635 | ||
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 862514264..e3714bdba 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -1805,11 +1805,6 @@ struct ClientContext *cli_ctx_tail; | |||
1805 | static struct RPS_Sampler *prot_sampler; | 1805 | static struct RPS_Sampler *prot_sampler; |
1806 | 1806 | ||
1807 | /** | 1807 | /** |
1808 | * Sampler used for the clients. | ||
1809 | */ | ||
1810 | static struct RPS_Sampler *client_sampler; | ||
1811 | |||
1812 | /** | ||
1813 | * Name to log view to | 1808 | * Name to log view to |
1814 | */ | 1809 | */ |
1815 | static const char *file_name_view_log; | 1810 | static const char *file_name_view_log; |
@@ -1832,12 +1827,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; | |||
1832 | #endif /* TO_FILE */ | 1827 | #endif /* TO_FILE */ |
1833 | 1828 | ||
1834 | /** | 1829 | /** |
1835 | * The size of sampler we need to be able to satisfy the client's need | ||
1836 | * of random peers. | ||
1837 | */ | ||
1838 | static unsigned int sampler_size_client_need; | ||
1839 | |||
1840 | /** | ||
1841 | * The size of sampler we need to be able to satisfy the Brahms protocol's | 1830 | * The size of sampler we need to be able to satisfy the Brahms protocol's |
1842 | * need of random peers. | 1831 | * need of random peers. |
1843 | * | 1832 | * |
@@ -1923,38 +1912,6 @@ static struct GNUNET_PEERINFO_Handle *peerinfo_handle; | |||
1923 | */ | 1912 | */ |
1924 | static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; | 1913 | static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; |
1925 | 1914 | ||
1926 | /** | ||
1927 | * Request counter. | ||
1928 | * | ||
1929 | * Counts how many requets clients already issued. | ||
1930 | * Only needed in the beginning to check how many of the 64 deltas | ||
1931 | * we already have | ||
1932 | */ | ||
1933 | static unsigned int req_counter; | ||
1934 | |||
1935 | /** | ||
1936 | * Time of the last request we received. | ||
1937 | * | ||
1938 | * Used to compute the expected request rate. | ||
1939 | */ | ||
1940 | static struct GNUNET_TIME_Absolute last_request; | ||
1941 | |||
1942 | /** | ||
1943 | * Size of #request_deltas. | ||
1944 | */ | ||
1945 | #define REQUEST_DELTAS_SIZE 64 | ||
1946 | static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE; | ||
1947 | |||
1948 | /** | ||
1949 | * Last 64 deltas between requests | ||
1950 | */ | ||
1951 | static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE]; | ||
1952 | |||
1953 | /** | ||
1954 | * The prediction of the rate of requests | ||
1955 | */ | ||
1956 | static struct GNUNET_TIME_Relative request_rate; | ||
1957 | |||
1958 | 1915 | ||
1959 | #ifdef ENABLE_MALICIOUS | 1916 | #ifdef ENABLE_MALICIOUS |
1960 | /** | 1917 | /** |
@@ -2108,38 +2065,6 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list, | |||
2108 | 2065 | ||
2109 | 2066 | ||
2110 | /** | 2067 | /** |
2111 | * Sum all time relatives of an array. | ||
2112 | */ | ||
2113 | static struct GNUNET_TIME_Relative | ||
2114 | T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, | ||
2115 | uint32_t arr_size) | ||
2116 | { | ||
2117 | struct GNUNET_TIME_Relative sum; | ||
2118 | uint32_t i; | ||
2119 | |||
2120 | sum = GNUNET_TIME_UNIT_ZERO; | ||
2121 | for ( i = 0 ; i < arr_size ; i++ ) | ||
2122 | { | ||
2123 | sum = GNUNET_TIME_relative_add (sum, rel_array[i]); | ||
2124 | } | ||
2125 | return sum; | ||
2126 | } | ||
2127 | |||
2128 | |||
2129 | /** | ||
2130 | * Compute the average of given time relatives. | ||
2131 | */ | ||
2132 | static struct GNUNET_TIME_Relative | ||
2133 | T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, | ||
2134 | uint32_t arr_size) | ||
2135 | { | ||
2136 | return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, | ||
2137 | arr_size), | ||
2138 | arr_size); | ||
2139 | } | ||
2140 | |||
2141 | |||
2142 | /** | ||
2143 | * Insert PeerID in #view | 2068 | * Insert PeerID in #view |
2144 | * | 2069 | * |
2145 | * Called once we know a peer is live. | 2070 | * Called once we know a peer is live. |
@@ -2369,69 +2294,6 @@ resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size) | |||
2369 | 2294 | ||
2370 | 2295 | ||
2371 | /** | 2296 | /** |
2372 | * Wrapper around #RPS_sampler_resize() resizing the client sampler | ||
2373 | */ | ||
2374 | static void | ||
2375 | client_resize_wrapper () | ||
2376 | { | ||
2377 | uint32_t bigger_size; | ||
2378 | |||
2379 | // TODO statistics | ||
2380 | |||
2381 | bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need); | ||
2382 | |||
2383 | // TODO respect the min, max | ||
2384 | resize_wrapper (client_sampler, bigger_size); | ||
2385 | LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n", | ||
2386 | bigger_size); | ||
2387 | } | ||
2388 | |||
2389 | |||
2390 | /** | ||
2391 | * Estimate request rate | ||
2392 | * | ||
2393 | * Called every time we receive a request from the client. | ||
2394 | */ | ||
2395 | static void | ||
2396 | est_request_rate() | ||
2397 | { | ||
2398 | struct GNUNET_TIME_Relative max_round_duration; | ||
2399 | |||
2400 | if (request_deltas_size > req_counter) | ||
2401 | req_counter++; | ||
2402 | if ( 1 < req_counter) | ||
2403 | { | ||
2404 | /* Shift last request deltas to the right */ | ||
2405 | memmove (&request_deltas[1], | ||
2406 | request_deltas, | ||
2407 | (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative)); | ||
2408 | |||
2409 | /* Add current delta to beginning */ | ||
2410 | request_deltas[0] = | ||
2411 | GNUNET_TIME_absolute_get_difference (last_request, | ||
2412 | GNUNET_TIME_absolute_get ()); | ||
2413 | request_rate = T_relative_avg (request_deltas, req_counter); | ||
2414 | request_rate = (request_rate.rel_value_us < 1) ? | ||
2415 | GNUNET_TIME_relative_get_unit_ () : request_rate; | ||
2416 | |||
2417 | /* Compute the duration a round will maximally take */ | ||
2418 | max_round_duration = | ||
2419 | GNUNET_TIME_relative_add (round_interval, | ||
2420 | GNUNET_TIME_relative_divide (round_interval, 2)); | ||
2421 | |||
2422 | /* Set the estimated size the sampler has to have to | ||
2423 | * satisfy the current client request rate */ | ||
2424 | sampler_size_client_need = | ||
2425 | max_round_duration.rel_value_us / request_rate.rel_value_us; | ||
2426 | |||
2427 | /* Resize the sampler */ | ||
2428 | client_resize_wrapper (); | ||
2429 | } | ||
2430 | last_request = GNUNET_TIME_absolute_get (); | ||
2431 | } | ||
2432 | |||
2433 | |||
2434 | /** | ||
2435 | * Add all peers in @a peer_array to @a peer_map used as set. | 2297 | * Add all peers in @a peer_array to @a peer_map used as set. |
2436 | * | 2298 | * |
2437 | * @param peer_array array containing the peers | 2299 | * @param peer_array array containing the peers |
@@ -2565,7 +2427,6 @@ insert_in_sampler (void *cls, | |||
2565 | "Updating samplers with peer %s from insert_in_sampler()\n", | 2427 | "Updating samplers with peer %s from insert_in_sampler()\n", |
2566 | GNUNET_i2s (peer)); | 2428 | GNUNET_i2s (peer)); |
2567 | RPS_sampler_update (prot_sampler, peer); | 2429 | RPS_sampler_update (prot_sampler, peer); |
2568 | RPS_sampler_update (client_sampler, peer); | ||
2569 | if (0 < RPS_sampler_count_id (prot_sampler, peer)) | 2430 | if (0 < RPS_sampler_count_id (prot_sampler, peer)) |
2570 | { | 2431 | { |
2571 | /* Make sure we 'know' about this peer */ | 2432 | /* Make sure we 'know' about this peer */ |
@@ -2658,7 +2519,6 @@ remove_peer (const struct GNUNET_PeerIdentity *peer) | |||
2658 | CustomPeerMap_remove_peer (pull_map, peer); | 2519 | CustomPeerMap_remove_peer (pull_map, peer); |
2659 | CustomPeerMap_remove_peer (push_map, peer); | 2520 | CustomPeerMap_remove_peer (push_map, peer); |
2660 | RPS_sampler_reinitialise_by_value (prot_sampler, peer); | 2521 | RPS_sampler_reinitialise_by_value (prot_sampler, peer); |
2661 | RPS_sampler_reinitialise_by_value (client_sampler, peer); | ||
2662 | destroy_peer (get_peer_ctx (peer)); | 2522 | destroy_peer (get_peer_ctx (peer)); |
2663 | } | 2523 | } |
2664 | 2524 | ||
@@ -2691,7 +2551,6 @@ clean_peer (const struct GNUNET_PeerIdentity *peer) | |||
2691 | (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && | 2551 | (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && |
2692 | (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && | 2552 | (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && |
2693 | (0 == RPS_sampler_count_id (prot_sampler, peer)) && | 2553 | (0 == RPS_sampler_count_id (prot_sampler, peer)) && |
2694 | (0 == RPS_sampler_count_id (client_sampler, peer)) && | ||
2695 | (GNUNET_NO != check_removable (peer)) ) | 2554 | (GNUNET_NO != check_removable (peer)) ) |
2696 | { /* We can safely remove this peer */ | 2555 | { /* We can safely remove this peer */ |
2697 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2556 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -2813,143 +2672,11 @@ nse_callback (void *cls, | |||
2813 | 2672 | ||
2814 | /* If the NSE has changed adapt the lists accordingly */ | 2673 | /* If the NSE has changed adapt the lists accordingly */ |
2815 | resize_wrapper (prot_sampler, sampler_size_est_need); | 2674 | resize_wrapper (prot_sampler, sampler_size_est_need); |
2816 | client_resize_wrapper (); | ||
2817 | View_change_len (view_size_est_need); | 2675 | View_change_len (view_size_est_need); |
2818 | } | 2676 | } |
2819 | 2677 | ||
2820 | 2678 | ||
2821 | /** | 2679 | /** |
2822 | * Callback called once the requested PeerIDs are ready. | ||
2823 | * | ||
2824 | * Sends those to the requesting client. | ||
2825 | */ | ||
2826 | static void | ||
2827 | client_respond (const struct GNUNET_PeerIdentity *peer_ids, | ||
2828 | uint32_t num_peers, | ||
2829 | void *cls) | ||
2830 | { | ||
2831 | struct ReplyCls *reply_cls = cls; | ||
2832 | uint32_t i; | ||
2833 | struct GNUNET_MQ_Envelope *ev; | ||
2834 | struct GNUNET_RPS_CS_ReplyMessage *out_msg; | ||
2835 | uint32_t size_needed; | ||
2836 | struct ClientContext *cli_ctx; | ||
2837 | |||
2838 | GNUNET_assert (NULL != reply_cls); | ||
2839 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2840 | "sampler returned %" PRIu32 " peers:\n", | ||
2841 | num_peers); | ||
2842 | for (i = 0; i < num_peers; i++) | ||
2843 | { | ||
2844 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2845 | " %" PRIu32 ": %s\n", | ||
2846 | i, | ||
2847 | GNUNET_i2s (&peer_ids[i])); | ||
2848 | } | ||
2849 | |||
2850 | size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) + | ||
2851 | num_peers * sizeof (struct GNUNET_PeerIdentity); | ||
2852 | |||
2853 | GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed); | ||
2854 | |||
2855 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
2856 | num_peers * sizeof (struct GNUNET_PeerIdentity), | ||
2857 | GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); | ||
2858 | out_msg->num_peers = htonl (num_peers); | ||
2859 | out_msg->id = htonl (reply_cls->id); | ||
2860 | |||
2861 | GNUNET_memcpy (&out_msg[1], | ||
2862 | peer_ids, | ||
2863 | num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
2864 | |||
2865 | cli_ctx = reply_cls->cli_ctx; | ||
2866 | GNUNET_assert (NULL != cli_ctx); | ||
2867 | reply_cls->req_handle = NULL; | ||
2868 | destroy_reply_cls (reply_cls); | ||
2869 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2870 | } | ||
2871 | |||
2872 | |||
2873 | /** | ||
2874 | * Handle RPS request from the client. | ||
2875 | * | ||
2876 | * @param cls closure | ||
2877 | * @param message the actual message | ||
2878 | */ | ||
2879 | static void | ||
2880 | handle_client_request (void *cls, | ||
2881 | const struct GNUNET_RPS_CS_RequestMessage *msg) | ||
2882 | { | ||
2883 | struct ClientContext *cli_ctx = cls; | ||
2884 | uint32_t num_peers; | ||
2885 | uint32_t size_needed; | ||
2886 | struct ReplyCls *reply_cls; | ||
2887 | uint32_t i; | ||
2888 | |||
2889 | num_peers = ntohl (msg->num_peers); | ||
2890 | size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) + | ||
2891 | num_peers * sizeof (struct GNUNET_PeerIdentity); | ||
2892 | |||
2893 | if (GNUNET_MAX_MESSAGE_SIZE < size_needed) | ||
2894 | { | ||
2895 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2896 | "Message received from client has size larger than expected\n"); | ||
2897 | GNUNET_SERVICE_client_drop (cli_ctx->client); | ||
2898 | return; | ||
2899 | } | ||
2900 | |||
2901 | for (i = 0 ; i < num_peers ; i++) | ||
2902 | est_request_rate(); | ||
2903 | |||
2904 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2905 | "Client requested %" PRIu32 " random peer(s).\n", | ||
2906 | num_peers); | ||
2907 | |||
2908 | reply_cls = GNUNET_new (struct ReplyCls); | ||
2909 | reply_cls->id = ntohl (msg->id); | ||
2910 | reply_cls->cli_ctx = cli_ctx; | ||
2911 | reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler, | ||
2912 | num_peers, | ||
2913 | client_respond, | ||
2914 | reply_cls); | ||
2915 | |||
2916 | GNUNET_assert (NULL != cli_ctx); | ||
2917 | GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head, | ||
2918 | cli_ctx->rep_cls_tail, | ||
2919 | reply_cls); | ||
2920 | GNUNET_SERVICE_client_continue (cli_ctx->client); | ||
2921 | } | ||
2922 | |||
2923 | |||
2924 | /** | ||
2925 | * @brief Handle a message that requests the cancellation of a request | ||
2926 | * | ||
2927 | * @param cls unused | ||
2928 | * @param message the message containing the id of the request | ||
2929 | */ | ||
2930 | static void | ||
2931 | handle_client_request_cancel (void *cls, | ||
2932 | const struct GNUNET_RPS_CS_RequestCancelMessage *msg) | ||
2933 | { | ||
2934 | struct ClientContext *cli_ctx = cls; | ||
2935 | struct ReplyCls *rep_cls; | ||
2936 | |||
2937 | GNUNET_assert (NULL != cli_ctx); | ||
2938 | GNUNET_assert (NULL != cli_ctx->rep_cls_head); | ||
2939 | rep_cls = cli_ctx->rep_cls_head; | ||
2940 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2941 | "Client cancels request with id %" PRIu32 "\n", | ||
2942 | ntohl (msg->id)); | ||
2943 | while ( (NULL != rep_cls->next) && | ||
2944 | (rep_cls->id != ntohl (msg->id)) ) | ||
2945 | rep_cls = rep_cls->next; | ||
2946 | GNUNET_assert (rep_cls->id == ntohl (msg->id)); | ||
2947 | destroy_reply_cls (rep_cls); | ||
2948 | GNUNET_SERVICE_client_continue (cli_ctx->client); | ||
2949 | } | ||
2950 | |||
2951 | |||
2952 | /** | ||
2953 | * @brief This function is called, when the client seeds peers. | 2680 | * @brief This function is called, when the client seeds peers. |
2954 | * It verifies that @a msg is well-formed. | 2681 | * It verifies that @a msg is well-formed. |
2955 | * | 2682 | * |
@@ -4102,7 +3829,6 @@ shutdown_task (void *cls) | |||
4102 | 3829 | ||
4103 | GNUNET_NSE_disconnect (nse); | 3830 | GNUNET_NSE_disconnect (nse); |
4104 | RPS_sampler_destroy (prot_sampler); | 3831 | RPS_sampler_destroy (prot_sampler); |
4105 | RPS_sampler_destroy (client_sampler); | ||
4106 | GNUNET_CADET_close_port (cadet_port); | 3832 | GNUNET_CADET_close_port (cadet_port); |
4107 | GNUNET_CADET_disconnect (cadet_handle); | 3833 | GNUNET_CADET_disconnect (cadet_handle); |
4108 | cadet_handle = NULL; | 3834 | cadet_handle = NULL; |
@@ -4345,7 +4071,6 @@ run (void *cls, | |||
4345 | max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); | 4071 | max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); |
4346 | 4072 | ||
4347 | prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval); | 4073 | prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval); |
4348 | client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval); | ||
4349 | 4074 | ||
4350 | /* Initialise push and pull maps */ | 4075 | /* Initialise push and pull maps */ |
4351 | push_map = CustomPeerMap_create (4); | 4076 | push_map = CustomPeerMap_create (4); |
@@ -4385,14 +4110,6 @@ GNUNET_SERVICE_MAIN | |||
4385 | &client_connect_cb, | 4110 | &client_connect_cb, |
4386 | &client_disconnect_cb, | 4111 | &client_disconnect_cb, |
4387 | NULL, | 4112 | NULL, |
4388 | GNUNET_MQ_hd_fixed_size (client_request, | ||
4389 | GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, | ||
4390 | struct GNUNET_RPS_CS_RequestMessage, | ||
4391 | NULL), | ||
4392 | GNUNET_MQ_hd_fixed_size (client_request_cancel, | ||
4393 | GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL, | ||
4394 | struct GNUNET_RPS_CS_RequestCancelMessage, | ||
4395 | NULL), | ||
4396 | GNUNET_MQ_hd_var_size (client_seed, | 4113 | GNUNET_MQ_hd_var_size (client_seed, |
4397 | GNUNET_MESSAGE_TYPE_RPS_CS_SEED, | 4114 | GNUNET_MESSAGE_TYPE_RPS_CS_SEED, |
4398 | struct GNUNET_RPS_CS_SeedMessage, | 4115 | struct GNUNET_RPS_CS_SeedMessage, |
diff --git a/src/rps/rps.h b/src/rps/rps.h index 9e4487f88..915524f88 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h | |||
@@ -60,66 +60,6 @@ struct GNUNET_RPS_P2P_PullReplyMessage | |||
60 | ***********************************************************************/ | 60 | ***********************************************************************/ |
61 | 61 | ||
62 | /** | 62 | /** |
63 | * Message from client to RPS service to request random peer(s). | ||
64 | */ | ||
65 | struct GNUNET_RPS_CS_RequestMessage | ||
66 | { | ||
67 | /** | ||
68 | * Header including size and type in NBO | ||
69 | */ | ||
70 | struct GNUNET_MessageHeader header; | ||
71 | |||
72 | /** | ||
73 | * Identifyer of the message. | ||
74 | */ | ||
75 | uint32_t id GNUNET_PACKED; | ||
76 | |||
77 | /** | ||
78 | * Number of random peer requested | ||
79 | */ | ||
80 | uint32_t num_peers GNUNET_PACKED; | ||
81 | }; | ||
82 | |||
83 | /** | ||
84 | * Message from RPS service to client to reply with random peer(s). | ||
85 | */ | ||
86 | struct GNUNET_RPS_CS_ReplyMessage | ||
87 | { | ||
88 | /** | ||
89 | * Type is #GNUNET_MESSAGE_TYPE_RPS_CS_REPLY. | ||
90 | */ | ||
91 | struct GNUNET_MessageHeader header; | ||
92 | |||
93 | /** | ||
94 | * Identifyer of the message. | ||
95 | */ | ||
96 | uint32_t id GNUNET_PACKED; | ||
97 | |||
98 | /** | ||
99 | * Number of random peer replied | ||
100 | */ | ||
101 | uint32_t num_peers GNUNET_PACKED; | ||
102 | |||
103 | /* Followed by num_peers * GNUNET_PeerIdentity */ | ||
104 | }; | ||
105 | |||
106 | /** | ||
107 | * Message from client to RPS service to cancel request. | ||
108 | */ | ||
109 | struct GNUNET_RPS_CS_RequestCancelMessage | ||
110 | { | ||
111 | /** | ||
112 | * Header including size and type in NBO | ||
113 | */ | ||
114 | struct GNUNET_MessageHeader header; | ||
115 | |||
116 | /** | ||
117 | * Identifyer of the message. | ||
118 | */ | ||
119 | uint32_t id GNUNET_PACKED; | ||
120 | }; | ||
121 | |||
122 | /** | ||
123 | * Message from client to service with seed of peers. | 63 | * Message from client to service with seed of peers. |
124 | */ | 64 | */ |
125 | struct GNUNET_RPS_CS_SeedMessage | 65 | struct GNUNET_RPS_CS_SeedMessage |
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index e4f4db506..ee65c2a82 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c | |||
@@ -84,16 +84,6 @@ struct GNUNET_RPS_Handle | |||
84 | struct GNUNET_MQ_Handle *mq; | 84 | struct GNUNET_MQ_Handle *mq; |
85 | 85 | ||
86 | /** | 86 | /** |
87 | * Array of Request_Handles. | ||
88 | */ | ||
89 | struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers; | ||
90 | |||
91 | /** | ||
92 | * The id of the last request. | ||
93 | */ | ||
94 | uint32_t current_request_id; | ||
95 | |||
96 | /** | ||
97 | * @brief Callback called on each update of the view | 87 | * @brief Callback called on each update of the view |
98 | */ | 88 | */ |
99 | GNUNET_RPS_NotifyReadyCB view_update_cb; | 89 | GNUNET_RPS_NotifyReadyCB view_update_cb; |
@@ -131,11 +121,6 @@ struct GNUNET_RPS_Request_Handle | |||
131 | struct GNUNET_RPS_Handle *rps_handle; | 121 | struct GNUNET_RPS_Handle *rps_handle; |
132 | 122 | ||
133 | /** | 123 | /** |
134 | * The id of the request. | ||
135 | */ | ||
136 | uint32_t id; | ||
137 | |||
138 | /** | ||
139 | * The number of requested peers. | 124 | * The number of requested peers. |
140 | */ | 125 | */ |
141 | uint32_t num_requests; | 126 | uint32_t num_requests; |
@@ -146,6 +131,17 @@ struct GNUNET_RPS_Request_Handle | |||
146 | struct RPS_Sampler *sampler; | 131 | struct RPS_Sampler *sampler; |
147 | 132 | ||
148 | /** | 133 | /** |
134 | * @brief Request handle of the request to the sampler - needed to cancel the request | ||
135 | */ | ||
136 | struct RPS_SamplerRequestHandle *sampler_rh; | ||
137 | |||
138 | /** | ||
139 | * @brief Request handle of the request of the biased stream of peers - | ||
140 | * needed to cancel the request | ||
141 | */ | ||
142 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
143 | |||
144 | /** | ||
149 | * The callback to be called when we receive an answer. | 145 | * The callback to be called when we receive an answer. |
150 | */ | 146 | */ |
151 | GNUNET_RPS_NotifyReadyCB ready_cb; | 147 | GNUNET_RPS_NotifyReadyCB ready_cb; |
@@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, | |||
233 | 229 | ||
234 | 230 | ||
235 | /** | 231 | /** |
236 | * @brief Create new request handle | 232 | * @brief Called once the sampler has collected all requested peers. |
237 | * | ||
238 | * @param rps_handle Handle to the service | ||
239 | * @param num_requests Number of requests | ||
240 | * @param ready_cb Callback | ||
241 | * @param cls Closure | ||
242 | * | 233 | * |
243 | * @return The newly created request handle | 234 | * Calls the callback provided by the client with the corresponding cls. |
244 | */ | ||
245 | static struct GNUNET_RPS_Request_Handle * | ||
246 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, | ||
247 | uint64_t num_requests, | ||
248 | struct RPS_Sampler *sampler, | ||
249 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
250 | void *cls) | ||
251 | { | ||
252 | struct GNUNET_RPS_Request_Handle *rh; | ||
253 | |||
254 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
255 | rh->rps_handle = rps_handle; | ||
256 | rh->id = rps_handle->current_request_id++; | ||
257 | rh->num_requests = num_requests; | ||
258 | rh->sampler = sampler; | ||
259 | rh->ready_cb = ready_cb; | ||
260 | rh->ready_cb_cls = cls; | ||
261 | GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, | ||
262 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
263 | |||
264 | return rh; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
269 | * @brief Send a request to the service. | ||
270 | * | 235 | * |
271 | * @param h rps handle | 236 | * @param peers The array of @a num_peers that has been returned. |
272 | * @param id id of the request | 237 | * @param num_peers The number of peers that have been returned |
273 | * @param num_req_peers number of peers | 238 | * @param cls The #GNUNET_RPS_Request_Handle |
274 | */ | 239 | */ |
275 | void | 240 | void |
276 | send_request (const struct GNUNET_RPS_Handle *h, | 241 | peers_ready_cb (const struct GNUNET_PeerIdentity *peers, |
277 | uint32_t id, | 242 | uint32_t num_peers, |
278 | uint32_t num_req_peers) | 243 | void *cls) |
279 | { | 244 | { |
280 | struct GNUNET_MQ_Envelope *ev; | 245 | struct GNUNET_RPS_Request_Handle *rh = cls; |
281 | struct GNUNET_RPS_CS_RequestMessage *msg; | ||
282 | 246 | ||
283 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); | 247 | rh->ready_cb (rh->ready_cb_cls, |
284 | msg->num_peers = htonl (num_req_peers); | 248 | num_peers, |
285 | msg->id = htonl (id); | 249 | peers); |
286 | GNUNET_MQ_send (h->mq, ev); | 250 | // TODO cleanup, sampler, rh, cancel stuff |
251 | // TODO screw this function. We can give the cb,cls directly to the sampler. | ||
287 | } | 252 | } |
288 | 253 | ||
289 | /** | ||
290 | * @brief Iterator function over pending requests | ||
291 | * | ||
292 | * Implements #GNUNET_CONTAINER_HashMapIterator32 | ||
293 | * | ||
294 | * @param cls rps handle | ||
295 | * @param key id of the request | ||
296 | * @param value request handle | ||
297 | * | ||
298 | * @return GNUNET_YES to continue iteration | ||
299 | */ | ||
300 | int | ||
301 | resend_requests_iterator (void *cls, uint32_t key, void *value) | ||
302 | { | ||
303 | const struct GNUNET_RPS_Handle *h = cls; | ||
304 | const struct GNUNET_RPS_Request_Handle *req_handle = value; | ||
305 | (void) key; | ||
306 | |||
307 | send_request (h, req_handle->id, req_handle->num_requests); | ||
308 | return GNUNET_YES; /* continue iterating */ | ||
309 | } | ||
310 | 254 | ||
311 | /** | 255 | /** |
312 | * @brief Resend all pending requests | 256 | * @brief Callback to collect the peers from the biased stream and put those |
313 | * | 257 | * into the sampler. |
314 | * This is used to resend all pending requests after the client | ||
315 | * reconnected to the service, because the service cancels all | ||
316 | * pending requests after reconnection. | ||
317 | * | 258 | * |
318 | * @param h rps handle | 259 | * @param cls The #GNUNET_RPS_Request_Handle |
260 | * @param num_peers The number of peer that have been returned | ||
261 | * @param peers The array of @a num_peers that have been returned | ||
319 | */ | 262 | */ |
320 | void | 263 | void |
321 | resend_requests (struct GNUNET_RPS_Handle *h) | 264 | collect_peers_cb (void *cls, |
322 | { | 265 | uint64_t num_peers, |
323 | GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, | 266 | const struct GNUNET_PeerIdentity *peers) |
324 | resend_requests_iterator, | ||
325 | h); | ||
326 | } | ||
327 | |||
328 | |||
329 | /** | ||
330 | * This function is called, when the service replies to our request. | ||
331 | * It verifies that @a msg is well-formed. | ||
332 | * | ||
333 | * @param cls the closure | ||
334 | * @param msg the message | ||
335 | * @return #GNUNET_OK if @a msg is well-formed | ||
336 | */ | ||
337 | static int | ||
338 | check_reply (void *cls, | ||
339 | const struct GNUNET_RPS_CS_ReplyMessage *msg) | ||
340 | { | 267 | { |
341 | uint16_t msize = ntohs (msg->header.size); | 268 | struct GNUNET_RPS_Request_Handle *rh = cls; |
342 | uint32_t num_peers = ntohl (msg->num_peers); | ||
343 | (void) cls; | ||
344 | 269 | ||
345 | msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); | 270 | for (uint64_t i = 0; i < num_peers; i++) |
346 | if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || | ||
347 | (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) | ||
348 | { | 271 | { |
349 | GNUNET_break (0); | 272 | RPS_sampler_update (rh->sampler, &peers[i]); |
350 | return GNUNET_SYSERR; | ||
351 | } | 273 | } |
352 | return GNUNET_OK; | ||
353 | } | 274 | } |
354 | 275 | ||
355 | 276 | ||
356 | /** | 277 | /** |
357 | * This function is called, when the service replies to our request. | 278 | * @brief Create new request handle |
358 | * It calls the callback the caller gave us with the provided closure | ||
359 | * and disconnects afterwards. | ||
360 | * | 279 | * |
361 | * @param cls the closure | 280 | * @param rps_handle Handle to the service |
362 | * @param msg the message | 281 | * @param num_requests Number of requests |
282 | * @param ready_cb Callback | ||
283 | * @param cls Closure | ||
284 | * | ||
285 | * @return The newly created request handle | ||
363 | */ | 286 | */ |
364 | static void | 287 | static struct GNUNET_RPS_Request_Handle * |
365 | handle_reply (void *cls, | 288 | new_request_handle (struct GNUNET_RPS_Handle *rps_handle, |
366 | const struct GNUNET_RPS_CS_ReplyMessage *msg) | 289 | uint64_t num_requests, |
290 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
291 | void *cls) | ||
367 | { | 292 | { |
368 | struct GNUNET_RPS_Handle *h = cls; | ||
369 | struct GNUNET_PeerIdentity *peers; | ||
370 | struct GNUNET_RPS_Request_Handle *rh; | 293 | struct GNUNET_RPS_Request_Handle *rh; |
371 | uint32_t id; | ||
372 | 294 | ||
373 | /* Give the peers back */ | 295 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); |
374 | id = ntohl (msg->id); | 296 | rh->rps_handle = rps_handle; |
375 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 297 | rh->num_requests = num_requests; |
376 | "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", | 298 | rh->sampler = RPS_sampler_mod_init (num_requests, |
377 | ntohl (msg->num_peers), | 299 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff |
378 | id); | 300 | rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, |
301 | num_requests, | ||
302 | peers_ready_cb, | ||
303 | rh); | ||
304 | rh->srh = GNUNET_RPS_stream_request (rps_handle, | ||
305 | 0, /* infinite updates */ | ||
306 | collect_peers_cb, | ||
307 | rh); /* cls */ | ||
308 | rh->ready_cb = ready_cb; | ||
309 | rh->ready_cb_cls = cls; | ||
379 | 310 | ||
380 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | 311 | return rh; |
381 | GNUNET_assert (GNUNET_YES == | ||
382 | GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id)); | ||
383 | rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id); | ||
384 | GNUNET_assert (NULL != rh); | ||
385 | GNUNET_assert (rh->num_requests == ntohl (msg->num_peers)); | ||
386 | GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id); | ||
387 | rh->ready_cb (rh->ready_cb_cls, | ||
388 | ntohl (msg->num_peers), | ||
389 | peers); | ||
390 | } | 312 | } |
391 | 313 | ||
392 | 314 | ||
@@ -683,7 +605,6 @@ mq_error_handler (void *cls, | |||
683 | reconnect (h); | 605 | reconnect (h); |
684 | /* Resend all pending request as the service destroyed its knowledge | 606 | /* Resend all pending request as the service destroyed its knowledge |
685 | * about them */ | 607 | * about them */ |
686 | resend_requests (h); | ||
687 | } | 608 | } |
688 | 609 | ||
689 | 610 | ||
@@ -694,10 +615,6 @@ static void | |||
694 | reconnect (struct GNUNET_RPS_Handle *h) | 615 | reconnect (struct GNUNET_RPS_Handle *h) |
695 | { | 616 | { |
696 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 617 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
697 | GNUNET_MQ_hd_var_size (reply, | ||
698 | GNUNET_MESSAGE_TYPE_RPS_CS_REPLY, | ||
699 | struct GNUNET_RPS_CS_ReplyMessage, | ||
700 | h), | ||
701 | GNUNET_MQ_hd_var_size (view_update, | 618 | GNUNET_MQ_hd_var_size (view_update, |
702 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, | 619 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, |
703 | struct GNUNET_RPS_CS_DEBUG_ViewReply, | 620 | struct GNUNET_RPS_CS_DEBUG_ViewReply, |
@@ -731,7 +648,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
731 | struct GNUNET_RPS_Handle *h; | 648 | struct GNUNET_RPS_Handle *h; |
732 | 649 | ||
733 | h = GNUNET_new (struct GNUNET_RPS_Handle); | 650 | h = GNUNET_new (struct GNUNET_RPS_Handle); |
734 | h->current_request_id = 0; | ||
735 | h->cfg = cfg; | 651 | h->cfg = cfg; |
736 | reconnect (h); | 652 | reconnect (h); |
737 | if (NULL == h->mq) | 653 | if (NULL == h->mq) |
@@ -739,7 +655,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
739 | GNUNET_free (h); | 655 | GNUNET_free (h); |
740 | return NULL; | 656 | return NULL; |
741 | } | 657 | } |
742 | h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4); | ||
743 | return h; | 658 | return h; |
744 | } | 659 | } |
745 | 660 | ||
@@ -754,84 +669,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
754 | * @return a handle to cancel this request | 669 | * @return a handle to cancel this request |
755 | */ | 670 | */ |
756 | struct GNUNET_RPS_Request_Handle * | 671 | struct GNUNET_RPS_Request_Handle * |
757 | GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle, | ||
758 | uint32_t num_req_peers, | ||
759 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
760 | void *cls) | ||
761 | { | ||
762 | struct GNUNET_RPS_Request_Handle *rh; | ||
763 | |||
764 | rh = new_request_handle (rps_handle, | ||
765 | num_req_peers, | ||
766 | NULL, /* no sampler needed */ | ||
767 | ready_cb, | ||
768 | cls); | ||
769 | |||
770 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
771 | "Requesting %" PRIu32 " peers with id %" PRIu32 "\n", | ||
772 | num_req_peers, | ||
773 | rh->id); | ||
774 | |||
775 | send_request (rps_handle, rh->id, num_req_peers); | ||
776 | return rh; | ||
777 | } | ||
778 | |||
779 | |||
780 | /** | ||
781 | * @brief Callback to collect the peers from the biased stream and put those | ||
782 | * into the sampler. | ||
783 | * | ||
784 | * @param cls The #GNUNET_RPS_Request_Handle | ||
785 | * @param num_peers The number of peer that have been returned | ||
786 | * @param peers The array of @a num_peers that have been returned | ||
787 | */ | ||
788 | void | ||
789 | collect_peers_cb (void *cls, | ||
790 | uint64_t num_peers, | ||
791 | const struct GNUNET_PeerIdentity *peers) | ||
792 | { | ||
793 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
794 | |||
795 | for (uint64_t i = 0; i < num_peers; i++) | ||
796 | { | ||
797 | RPS_sampler_update (rh->sampler, &peers[i]); | ||
798 | } | ||
799 | } | ||
800 | |||
801 | |||
802 | /** | ||
803 | * @brief Called once the sampler has collected all requested peers. | ||
804 | * | ||
805 | * Calls the callback provided by the client with the corresponding cls. | ||
806 | * | ||
807 | * @param peers The array of @a num_peers that has been returned. | ||
808 | * @param num_peers The number of peers that have been returned | ||
809 | * @param cls The #GNUNET_RPS_Request_Handle | ||
810 | */ | ||
811 | void | ||
812 | peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | ||
813 | uint32_t num_peers, | ||
814 | void *cls) | ||
815 | { | ||
816 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
817 | |||
818 | rh->ready_cb (rh->ready_cb_cls, | ||
819 | num_peers, | ||
820 | peers); | ||
821 | // TODO cleanup, sampler, rh, cancel stuff | ||
822 | // TODO screw this function. We can give the cb,cls directly to the sampler. | ||
823 | } | ||
824 | |||
825 | /** | ||
826 | * Request n random peers. | ||
827 | * | ||
828 | * @param rps_handle handle to the rps service | ||
829 | * @param num_req_peers number of peers we want to receive | ||
830 | * @param ready_cb the callback called when the peers are available | ||
831 | * @param cls closure given to the callback | ||
832 | * @return a handle to cancel this request | ||
833 | */ | ||
834 | struct GNUNET_RPS_Request_Handle * | ||
835 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | 672 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, |
836 | uint32_t num_req_peers, | 673 | uint32_t num_req_peers, |
837 | GNUNET_RPS_NotifyReadyCB ready_cb, | 674 | GNUNET_RPS_NotifyReadyCB ready_cb, |
@@ -841,19 +678,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | |||
841 | 678 | ||
842 | rh = new_request_handle (rps_handle, | 679 | rh = new_request_handle (rps_handle, |
843 | num_req_peers, | 680 | num_req_peers, |
844 | RPS_sampler_mod_init (num_req_peers, | ||
845 | GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff | ||
846 | ready_cb, | 681 | ready_cb, |
847 | cls); | 682 | cls); |
848 | RPS_sampler_get_n_rand_peers (rh->sampler, | ||
849 | num_req_peers, | ||
850 | peers_ready_cb, | ||
851 | rh); | ||
852 | 683 | ||
853 | GNUNET_RPS_stream_request (rps_handle, | ||
854 | 0, /* infinite updates */ | ||
855 | collect_peers_cb, | ||
856 | rh); /* cls */ | ||
857 | 684 | ||
858 | return rh; | 685 | return rh; |
859 | } | 686 | } |
@@ -1022,20 +849,21 @@ void | |||
1022 | GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | 849 | GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) |
1023 | { | 850 | { |
1024 | struct GNUNET_RPS_Handle *h; | 851 | struct GNUNET_RPS_Handle *h; |
1025 | struct GNUNET_MQ_Envelope *ev; | ||
1026 | struct GNUNET_RPS_CS_RequestCancelMessage*msg; | ||
1027 | |||
1028 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1029 | "Cancelling request with id %" PRIu32 "\n", | ||
1030 | rh->id); | ||
1031 | 852 | ||
1032 | h = rh->rps_handle; | 853 | h = rh->rps_handle; |
1033 | GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, | 854 | if (NULL != rh->srh) |
1034 | rh->id)); | 855 | { |
1035 | GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); | 856 | remove_stream_request (rh->srh, |
1036 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); | 857 | h->stream_requests_head, |
1037 | msg->id = htonl (rh->id); | 858 | h->stream_requests_tail); |
1038 | GNUNET_MQ_send (rh->rps_handle->mq, ev); | 859 | } |
860 | if (NULL == h->stream_requests_head) cancel_stream(h); | ||
861 | if (NULL != rh->sampler_rh) | ||
862 | { | ||
863 | RPS_sampler_request_cancel (rh->sampler_rh); | ||
864 | } | ||
865 | RPS_sampler_destroy (rh->sampler); | ||
866 | GNUNET_free (rh); | ||
1039 | } | 867 | } |
1040 | 868 | ||
1041 | 869 | ||
@@ -1048,10 +876,11 @@ void | |||
1048 | GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | 876 | GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) |
1049 | { | 877 | { |
1050 | GNUNET_MQ_destroy (h->mq); | 878 | GNUNET_MQ_destroy (h->mq); |
1051 | if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) | 879 | if (NULL != h->stream_requests_head) |
880 | { | ||
1052 | LOG (GNUNET_ERROR_TYPE_WARNING, | 881 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1053 | "Still waiting for requests\n"); | 882 | "Still waiting for requests\n"); |
1054 | GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); | 883 | } |
1055 | GNUNET_free (h); | 884 | GNUNET_free (h); |
1056 | } | 885 | } |
1057 | 886 | ||