aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h18
-rw-r--r--src/rps/gnunet-service-rps.c283
-rw-r--r--src/rps/rps.h60
-rw-r--r--src/rps/rps_api.c331
4 files changed, 82 insertions, 610 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 56e049608..7a84bda0c 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2619,31 +2619,17 @@ extern "C"
2619 2619
2620/* Client-Service Messages */ 2620/* Client-Service Messages */
2621 2621
2622/**
2623 * RPS CS REQUEST Message for the Client to request (a) random peer(s)
2624 */
2625#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 954
2626
2627/**
2628 * RPS CS REPLY Message for the Server to send (a) random peer(s)
2629 */
2630#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 955
2631
2632/**
2633 * RPS CS REQUEST CANCEL Message for the Client to cancel a request
2634 */
2635#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 956
2636 2622
2637/** 2623/**
2638 * RPS CS SEED Message for the Client to seed peers into rps 2624 * RPS CS SEED Message for the Client to seed peers into rps
2639 */ 2625 */
2640#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 957 2626#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 954
2641 2627
2642#ifdef ENABLE_MALICIOUS 2628#ifdef ENABLE_MALICIOUS
2643/** 2629/**
2644 * Turn RPS service malicious 2630 * Turn RPS service malicious
2645 */ 2631 */
2646#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 958 2632#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 955
2647 2633
2648#endif /* ENABLE_MALICIOUS */ 2634#endif /* ENABLE_MALICIOUS */
2649 2635
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 862514264..e3714bdba 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1805,11 +1805,6 @@ struct ClientContext *cli_ctx_tail;
1805static struct RPS_Sampler *prot_sampler; 1805static struct RPS_Sampler *prot_sampler;
1806 1806
1807/** 1807/**
1808 * Sampler used for the clients.
1809 */
1810static struct RPS_Sampler *client_sampler;
1811
1812/**
1813 * Name to log view to 1808 * Name to log view to
1814 */ 1809 */
1815static const char *file_name_view_log; 1810static const char *file_name_view_log;
@@ -1832,12 +1827,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1832#endif /* TO_FILE */ 1827#endif /* TO_FILE */
1833 1828
1834/** 1829/**
1835 * The size of sampler we need to be able to satisfy the client's need
1836 * of random peers.
1837 */
1838static unsigned int sampler_size_client_need;
1839
1840/**
1841 * The size of sampler we need to be able to satisfy the Brahms protocol's 1830 * The size of sampler we need to be able to satisfy the Brahms protocol's
1842 * need of random peers. 1831 * need of random peers.
1843 * 1832 *
@@ -1923,38 +1912,6 @@ static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1923 */ 1912 */
1924static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle; 1913static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1925 1914
1926/**
1927 * Request counter.
1928 *
1929 * Counts how many requets clients already issued.
1930 * Only needed in the beginning to check how many of the 64 deltas
1931 * we already have
1932 */
1933static unsigned int req_counter;
1934
1935/**
1936 * Time of the last request we received.
1937 *
1938 * Used to compute the expected request rate.
1939 */
1940static struct GNUNET_TIME_Absolute last_request;
1941
1942/**
1943 * Size of #request_deltas.
1944 */
1945#define REQUEST_DELTAS_SIZE 64
1946static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
1947
1948/**
1949 * Last 64 deltas between requests
1950 */
1951static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
1952
1953/**
1954 * The prediction of the rate of requests
1955 */
1956static struct GNUNET_TIME_Relative request_rate;
1957
1958 1915
1959#ifdef ENABLE_MALICIOUS 1916#ifdef ENABLE_MALICIOUS
1960/** 1917/**
@@ -2108,38 +2065,6 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2108 2065
2109 2066
2110/** 2067/**
2111 * Sum all time relatives of an array.
2112 */
2113static struct GNUNET_TIME_Relative
2114T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2115 uint32_t arr_size)
2116{
2117 struct GNUNET_TIME_Relative sum;
2118 uint32_t i;
2119
2120 sum = GNUNET_TIME_UNIT_ZERO;
2121 for ( i = 0 ; i < arr_size ; i++ )
2122 {
2123 sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2124 }
2125 return sum;
2126}
2127
2128
2129/**
2130 * Compute the average of given time relatives.
2131 */
2132static struct GNUNET_TIME_Relative
2133T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2134 uint32_t arr_size)
2135{
2136 return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2137 arr_size),
2138 arr_size);
2139}
2140
2141
2142/**
2143 * Insert PeerID in #view 2068 * Insert PeerID in #view
2144 * 2069 *
2145 * Called once we know a peer is live. 2070 * Called once we know a peer is live.
@@ -2369,69 +2294,6 @@ resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2369 2294
2370 2295
2371/** 2296/**
2372 * Wrapper around #RPS_sampler_resize() resizing the client sampler
2373 */
2374static void
2375client_resize_wrapper ()
2376{
2377 uint32_t bigger_size;
2378
2379 // TODO statistics
2380
2381 bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2382
2383 // TODO respect the min, max
2384 resize_wrapper (client_sampler, bigger_size);
2385 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2386 bigger_size);
2387}
2388
2389
2390/**
2391 * Estimate request rate
2392 *
2393 * Called every time we receive a request from the client.
2394 */
2395static void
2396est_request_rate()
2397{
2398 struct GNUNET_TIME_Relative max_round_duration;
2399
2400 if (request_deltas_size > req_counter)
2401 req_counter++;
2402 if ( 1 < req_counter)
2403 {
2404 /* Shift last request deltas to the right */
2405 memmove (&request_deltas[1],
2406 request_deltas,
2407 (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2408
2409 /* Add current delta to beginning */
2410 request_deltas[0] =
2411 GNUNET_TIME_absolute_get_difference (last_request,
2412 GNUNET_TIME_absolute_get ());
2413 request_rate = T_relative_avg (request_deltas, req_counter);
2414 request_rate = (request_rate.rel_value_us < 1) ?
2415 GNUNET_TIME_relative_get_unit_ () : request_rate;
2416
2417 /* Compute the duration a round will maximally take */
2418 max_round_duration =
2419 GNUNET_TIME_relative_add (round_interval,
2420 GNUNET_TIME_relative_divide (round_interval, 2));
2421
2422 /* Set the estimated size the sampler has to have to
2423 * satisfy the current client request rate */
2424 sampler_size_client_need =
2425 max_round_duration.rel_value_us / request_rate.rel_value_us;
2426
2427 /* Resize the sampler */
2428 client_resize_wrapper ();
2429 }
2430 last_request = GNUNET_TIME_absolute_get ();
2431}
2432
2433
2434/**
2435 * Add all peers in @a peer_array to @a peer_map used as set. 2297 * Add all peers in @a peer_array to @a peer_map used as set.
2436 * 2298 *
2437 * @param peer_array array containing the peers 2299 * @param peer_array array containing the peers
@@ -2565,7 +2427,6 @@ insert_in_sampler (void *cls,
2565 "Updating samplers with peer %s from insert_in_sampler()\n", 2427 "Updating samplers with peer %s from insert_in_sampler()\n",
2566 GNUNET_i2s (peer)); 2428 GNUNET_i2s (peer));
2567 RPS_sampler_update (prot_sampler, peer); 2429 RPS_sampler_update (prot_sampler, peer);
2568 RPS_sampler_update (client_sampler, peer);
2569 if (0 < RPS_sampler_count_id (prot_sampler, peer)) 2430 if (0 < RPS_sampler_count_id (prot_sampler, peer))
2570 { 2431 {
2571 /* Make sure we 'know' about this peer */ 2432 /* Make sure we 'know' about this peer */
@@ -2658,7 +2519,6 @@ remove_peer (const struct GNUNET_PeerIdentity *peer)
2658 CustomPeerMap_remove_peer (pull_map, peer); 2519 CustomPeerMap_remove_peer (pull_map, peer);
2659 CustomPeerMap_remove_peer (push_map, peer); 2520 CustomPeerMap_remove_peer (push_map, peer);
2660 RPS_sampler_reinitialise_by_value (prot_sampler, peer); 2521 RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2661 RPS_sampler_reinitialise_by_value (client_sampler, peer);
2662 destroy_peer (get_peer_ctx (peer)); 2522 destroy_peer (get_peer_ctx (peer));
2663} 2523}
2664 2524
@@ -2691,7 +2551,6 @@ clean_peer (const struct GNUNET_PeerIdentity *peer)
2691 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && 2551 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2692 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && 2552 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2693 (0 == RPS_sampler_count_id (prot_sampler, peer)) && 2553 (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
2694 (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2695 (GNUNET_NO != check_removable (peer)) ) 2554 (GNUNET_NO != check_removable (peer)) )
2696 { /* We can safely remove this peer */ 2555 { /* We can safely remove this peer */
2697 LOG (GNUNET_ERROR_TYPE_DEBUG, 2556 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2813,143 +2672,11 @@ nse_callback (void *cls,
2813 2672
2814 /* If the NSE has changed adapt the lists accordingly */ 2673 /* If the NSE has changed adapt the lists accordingly */
2815 resize_wrapper (prot_sampler, sampler_size_est_need); 2674 resize_wrapper (prot_sampler, sampler_size_est_need);
2816 client_resize_wrapper ();
2817 View_change_len (view_size_est_need); 2675 View_change_len (view_size_est_need);
2818} 2676}
2819 2677
2820 2678
2821/** 2679/**
2822 * Callback called once the requested PeerIDs are ready.
2823 *
2824 * Sends those to the requesting client.
2825 */
2826static void
2827client_respond (const struct GNUNET_PeerIdentity *peer_ids,
2828 uint32_t num_peers,
2829 void *cls)
2830{
2831 struct ReplyCls *reply_cls = cls;
2832 uint32_t i;
2833 struct GNUNET_MQ_Envelope *ev;
2834 struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2835 uint32_t size_needed;
2836 struct ClientContext *cli_ctx;
2837
2838 GNUNET_assert (NULL != reply_cls);
2839 LOG (GNUNET_ERROR_TYPE_DEBUG,
2840 "sampler returned %" PRIu32 " peers:\n",
2841 num_peers);
2842 for (i = 0; i < num_peers; i++)
2843 {
2844 LOG (GNUNET_ERROR_TYPE_DEBUG,
2845 " %" PRIu32 ": %s\n",
2846 i,
2847 GNUNET_i2s (&peer_ids[i]));
2848 }
2849
2850 size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2851 num_peers * sizeof (struct GNUNET_PeerIdentity);
2852
2853 GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2854
2855 ev = GNUNET_MQ_msg_extra (out_msg,
2856 num_peers * sizeof (struct GNUNET_PeerIdentity),
2857 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2858 out_msg->num_peers = htonl (num_peers);
2859 out_msg->id = htonl (reply_cls->id);
2860
2861 GNUNET_memcpy (&out_msg[1],
2862 peer_ids,
2863 num_peers * sizeof (struct GNUNET_PeerIdentity));
2864
2865 cli_ctx = reply_cls->cli_ctx;
2866 GNUNET_assert (NULL != cli_ctx);
2867 reply_cls->req_handle = NULL;
2868 destroy_reply_cls (reply_cls);
2869 GNUNET_MQ_send (cli_ctx->mq, ev);
2870}
2871
2872
2873/**
2874 * Handle RPS request from the client.
2875 *
2876 * @param cls closure
2877 * @param message the actual message
2878 */
2879static void
2880handle_client_request (void *cls,
2881 const struct GNUNET_RPS_CS_RequestMessage *msg)
2882{
2883 struct ClientContext *cli_ctx = cls;
2884 uint32_t num_peers;
2885 uint32_t size_needed;
2886 struct ReplyCls *reply_cls;
2887 uint32_t i;
2888
2889 num_peers = ntohl (msg->num_peers);
2890 size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2891 num_peers * sizeof (struct GNUNET_PeerIdentity);
2892
2893 if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2894 {
2895 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2896 "Message received from client has size larger than expected\n");
2897 GNUNET_SERVICE_client_drop (cli_ctx->client);
2898 return;
2899 }
2900
2901 for (i = 0 ; i < num_peers ; i++)
2902 est_request_rate();
2903
2904 LOG (GNUNET_ERROR_TYPE_DEBUG,
2905 "Client requested %" PRIu32 " random peer(s).\n",
2906 num_peers);
2907
2908 reply_cls = GNUNET_new (struct ReplyCls);
2909 reply_cls->id = ntohl (msg->id);
2910 reply_cls->cli_ctx = cli_ctx;
2911 reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2912 num_peers,
2913 client_respond,
2914 reply_cls);
2915
2916 GNUNET_assert (NULL != cli_ctx);
2917 GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2918 cli_ctx->rep_cls_tail,
2919 reply_cls);
2920 GNUNET_SERVICE_client_continue (cli_ctx->client);
2921}
2922
2923
2924/**
2925 * @brief Handle a message that requests the cancellation of a request
2926 *
2927 * @param cls unused
2928 * @param message the message containing the id of the request
2929 */
2930static void
2931handle_client_request_cancel (void *cls,
2932 const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2933{
2934 struct ClientContext *cli_ctx = cls;
2935 struct ReplyCls *rep_cls;
2936
2937 GNUNET_assert (NULL != cli_ctx);
2938 GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2939 rep_cls = cli_ctx->rep_cls_head;
2940 LOG (GNUNET_ERROR_TYPE_DEBUG,
2941 "Client cancels request with id %" PRIu32 "\n",
2942 ntohl (msg->id));
2943 while ( (NULL != rep_cls->next) &&
2944 (rep_cls->id != ntohl (msg->id)) )
2945 rep_cls = rep_cls->next;
2946 GNUNET_assert (rep_cls->id == ntohl (msg->id));
2947 destroy_reply_cls (rep_cls);
2948 GNUNET_SERVICE_client_continue (cli_ctx->client);
2949}
2950
2951
2952/**
2953 * @brief This function is called, when the client seeds peers. 2680 * @brief This function is called, when the client seeds peers.
2954 * It verifies that @a msg is well-formed. 2681 * It verifies that @a msg is well-formed.
2955 * 2682 *
@@ -4102,7 +3829,6 @@ shutdown_task (void *cls)
4102 3829
4103 GNUNET_NSE_disconnect (nse); 3830 GNUNET_NSE_disconnect (nse);
4104 RPS_sampler_destroy (prot_sampler); 3831 RPS_sampler_destroy (prot_sampler);
4105 RPS_sampler_destroy (client_sampler);
4106 GNUNET_CADET_close_port (cadet_port); 3832 GNUNET_CADET_close_port (cadet_port);
4107 GNUNET_CADET_disconnect (cadet_handle); 3833 GNUNET_CADET_disconnect (cadet_handle);
4108 cadet_handle = NULL; 3834 cadet_handle = NULL;
@@ -4345,7 +4071,6 @@ run (void *cls,
4345 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); 4071 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4346 4072
4347 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval); 4073 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
4348 client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4349 4074
4350 /* Initialise push and pull maps */ 4075 /* Initialise push and pull maps */
4351 push_map = CustomPeerMap_create (4); 4076 push_map = CustomPeerMap_create (4);
@@ -4385,14 +4110,6 @@ GNUNET_SERVICE_MAIN
4385 &client_connect_cb, 4110 &client_connect_cb,
4386 &client_disconnect_cb, 4111 &client_disconnect_cb,
4387 NULL, 4112 NULL,
4388 GNUNET_MQ_hd_fixed_size (client_request,
4389 GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4390 struct GNUNET_RPS_CS_RequestMessage,
4391 NULL),
4392 GNUNET_MQ_hd_fixed_size (client_request_cancel,
4393 GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4394 struct GNUNET_RPS_CS_RequestCancelMessage,
4395 NULL),
4396 GNUNET_MQ_hd_var_size (client_seed, 4113 GNUNET_MQ_hd_var_size (client_seed,
4397 GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 4114 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4398 struct GNUNET_RPS_CS_SeedMessage, 4115 struct GNUNET_RPS_CS_SeedMessage,
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 9e4487f88..915524f88 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -60,66 +60,6 @@ struct GNUNET_RPS_P2P_PullReplyMessage
60***********************************************************************/ 60***********************************************************************/
61 61
62/** 62/**
63 * Message from client to RPS service to request random peer(s).
64 */
65struct GNUNET_RPS_CS_RequestMessage
66{
67 /**
68 * Header including size and type in NBO
69 */
70 struct GNUNET_MessageHeader header;
71
72 /**
73 * Identifyer of the message.
74 */
75 uint32_t id GNUNET_PACKED;
76
77 /**
78 * Number of random peer requested
79 */
80 uint32_t num_peers GNUNET_PACKED;
81};
82
83/**
84 * Message from RPS service to client to reply with random peer(s).
85 */
86struct GNUNET_RPS_CS_ReplyMessage
87{
88 /**
89 * Type is #GNUNET_MESSAGE_TYPE_RPS_CS_REPLY.
90 */
91 struct GNUNET_MessageHeader header;
92
93 /**
94 * Identifyer of the message.
95 */
96 uint32_t id GNUNET_PACKED;
97
98 /**
99 * Number of random peer replied
100 */
101 uint32_t num_peers GNUNET_PACKED;
102
103 /* Followed by num_peers * GNUNET_PeerIdentity */
104};
105
106/**
107 * Message from client to RPS service to cancel request.
108 */
109struct GNUNET_RPS_CS_RequestCancelMessage
110{
111 /**
112 * Header including size and type in NBO
113 */
114 struct GNUNET_MessageHeader header;
115
116 /**
117 * Identifyer of the message.
118 */
119 uint32_t id GNUNET_PACKED;
120};
121
122/**
123 * Message from client to service with seed of peers. 63 * Message from client to service with seed of peers.
124 */ 64 */
125struct GNUNET_RPS_CS_SeedMessage 65struct GNUNET_RPS_CS_SeedMessage
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index e4f4db506..ee65c2a82 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -84,16 +84,6 @@ struct GNUNET_RPS_Handle
84 struct GNUNET_MQ_Handle *mq; 84 struct GNUNET_MQ_Handle *mq;
85 85
86 /** 86 /**
87 * Array of Request_Handles.
88 */
89 struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers;
90
91 /**
92 * The id of the last request.
93 */
94 uint32_t current_request_id;
95
96 /**
97 * @brief Callback called on each update of the view 87 * @brief Callback called on each update of the view
98 */ 88 */
99 GNUNET_RPS_NotifyReadyCB view_update_cb; 89 GNUNET_RPS_NotifyReadyCB view_update_cb;
@@ -131,11 +121,6 @@ struct GNUNET_RPS_Request_Handle
131 struct GNUNET_RPS_Handle *rps_handle; 121 struct GNUNET_RPS_Handle *rps_handle;
132 122
133 /** 123 /**
134 * The id of the request.
135 */
136 uint32_t id;
137
138 /**
139 * The number of requested peers. 124 * The number of requested peers.
140 */ 125 */
141 uint32_t num_requests; 126 uint32_t num_requests;
@@ -146,6 +131,17 @@ struct GNUNET_RPS_Request_Handle
146 struct RPS_Sampler *sampler; 131 struct RPS_Sampler *sampler;
147 132
148 /** 133 /**
134 * @brief Request handle of the request to the sampler - needed to cancel the request
135 */
136 struct RPS_SamplerRequestHandle *sampler_rh;
137
138 /**
139 * @brief Request handle of the request of the biased stream of peers -
140 * needed to cancel the request
141 */
142 struct GNUNET_RPS_StreamRequestHandle *srh;
143
144 /**
149 * The callback to be called when we receive an answer. 145 * The callback to be called when we receive an answer.
150 */ 146 */
151 GNUNET_RPS_NotifyReadyCB ready_cb; 147 GNUNET_RPS_NotifyReadyCB ready_cb;
@@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
233 229
234 230
235/** 231/**
236 * @brief Create new request handle 232 * @brief Called once the sampler has collected all requested peers.
237 *
238 * @param rps_handle Handle to the service
239 * @param num_requests Number of requests
240 * @param ready_cb Callback
241 * @param cls Closure
242 * 233 *
243 * @return The newly created request handle 234 * Calls the callback provided by the client with the corresponding cls.
244 */
245static struct GNUNET_RPS_Request_Handle *
246new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
247 uint64_t num_requests,
248 struct RPS_Sampler *sampler,
249 GNUNET_RPS_NotifyReadyCB ready_cb,
250 void *cls)
251{
252 struct GNUNET_RPS_Request_Handle *rh;
253
254 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
255 rh->rps_handle = rps_handle;
256 rh->id = rps_handle->current_request_id++;
257 rh->num_requests = num_requests;
258 rh->sampler = sampler;
259 rh->ready_cb = ready_cb;
260 rh->ready_cb_cls = cls;
261 GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
262 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
263
264 return rh;
265}
266
267
268/**
269 * @brief Send a request to the service.
270 * 235 *
271 * @param h rps handle 236 * @param peers The array of @a num_peers that has been returned.
272 * @param id id of the request 237 * @param num_peers The number of peers that have been returned
273 * @param num_req_peers number of peers 238 * @param cls The #GNUNET_RPS_Request_Handle
274 */ 239 */
275void 240void
276send_request (const struct GNUNET_RPS_Handle *h, 241peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
277 uint32_t id, 242 uint32_t num_peers,
278 uint32_t num_req_peers) 243 void *cls)
279{ 244{
280 struct GNUNET_MQ_Envelope *ev; 245 struct GNUNET_RPS_Request_Handle *rh = cls;
281 struct GNUNET_RPS_CS_RequestMessage *msg;
282 246
283 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); 247 rh->ready_cb (rh->ready_cb_cls,
284 msg->num_peers = htonl (num_req_peers); 248 num_peers,
285 msg->id = htonl (id); 249 peers);
286 GNUNET_MQ_send (h->mq, ev); 250 // TODO cleanup, sampler, rh, cancel stuff
251 // TODO screw this function. We can give the cb,cls directly to the sampler.
287} 252}
288 253
289/**
290 * @brief Iterator function over pending requests
291 *
292 * Implements #GNUNET_CONTAINER_HashMapIterator32
293 *
294 * @param cls rps handle
295 * @param key id of the request
296 * @param value request handle
297 *
298 * @return GNUNET_YES to continue iteration
299 */
300int
301resend_requests_iterator (void *cls, uint32_t key, void *value)
302{
303 const struct GNUNET_RPS_Handle *h = cls;
304 const struct GNUNET_RPS_Request_Handle *req_handle = value;
305 (void) key;
306
307 send_request (h, req_handle->id, req_handle->num_requests);
308 return GNUNET_YES; /* continue iterating */
309}
310 254
311/** 255/**
312 * @brief Resend all pending requests 256 * @brief Callback to collect the peers from the biased stream and put those
313 * 257 * into the sampler.
314 * This is used to resend all pending requests after the client
315 * reconnected to the service, because the service cancels all
316 * pending requests after reconnection.
317 * 258 *
318 * @param h rps handle 259 * @param cls The #GNUNET_RPS_Request_Handle
260 * @param num_peers The number of peer that have been returned
261 * @param peers The array of @a num_peers that have been returned
319 */ 262 */
320void 263void
321resend_requests (struct GNUNET_RPS_Handle *h) 264collect_peers_cb (void *cls,
322{ 265 uint64_t num_peers,
323 GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, 266 const struct GNUNET_PeerIdentity *peers)
324 resend_requests_iterator,
325 h);
326}
327
328
329/**
330 * This function is called, when the service replies to our request.
331 * It verifies that @a msg is well-formed.
332 *
333 * @param cls the closure
334 * @param msg the message
335 * @return #GNUNET_OK if @a msg is well-formed
336 */
337static int
338check_reply (void *cls,
339 const struct GNUNET_RPS_CS_ReplyMessage *msg)
340{ 267{
341 uint16_t msize = ntohs (msg->header.size); 268 struct GNUNET_RPS_Request_Handle *rh = cls;
342 uint32_t num_peers = ntohl (msg->num_peers);
343 (void) cls;
344 269
345 msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); 270 for (uint64_t i = 0; i < num_peers; i++)
346 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
347 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
348 { 271 {
349 GNUNET_break (0); 272 RPS_sampler_update (rh->sampler, &peers[i]);
350 return GNUNET_SYSERR;
351 } 273 }
352 return GNUNET_OK;
353} 274}
354 275
355 276
356/** 277/**
357 * This function is called, when the service replies to our request. 278 * @brief Create new request handle
358 * It calls the callback the caller gave us with the provided closure
359 * and disconnects afterwards.
360 * 279 *
361 * @param cls the closure 280 * @param rps_handle Handle to the service
362 * @param msg the message 281 * @param num_requests Number of requests
282 * @param ready_cb Callback
283 * @param cls Closure
284 *
285 * @return The newly created request handle
363 */ 286 */
364static void 287static struct GNUNET_RPS_Request_Handle *
365handle_reply (void *cls, 288new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
366 const struct GNUNET_RPS_CS_ReplyMessage *msg) 289 uint64_t num_requests,
290 GNUNET_RPS_NotifyReadyCB ready_cb,
291 void *cls)
367{ 292{
368 struct GNUNET_RPS_Handle *h = cls;
369 struct GNUNET_PeerIdentity *peers;
370 struct GNUNET_RPS_Request_Handle *rh; 293 struct GNUNET_RPS_Request_Handle *rh;
371 uint32_t id;
372 294
373 /* Give the peers back */ 295 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
374 id = ntohl (msg->id); 296 rh->rps_handle = rps_handle;
375 LOG (GNUNET_ERROR_TYPE_DEBUG, 297 rh->num_requests = num_requests;
376 "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", 298 rh->sampler = RPS_sampler_mod_init (num_requests,
377 ntohl (msg->num_peers), 299 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
378 id); 300 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
301 num_requests,
302 peers_ready_cb,
303 rh);
304 rh->srh = GNUNET_RPS_stream_request (rps_handle,
305 0, /* infinite updates */
306 collect_peers_cb,
307 rh); /* cls */
308 rh->ready_cb = ready_cb;
309 rh->ready_cb_cls = cls;
379 310
380 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 311 return rh;
381 GNUNET_assert (GNUNET_YES ==
382 GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id));
383 rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id);
384 GNUNET_assert (NULL != rh);
385 GNUNET_assert (rh->num_requests == ntohl (msg->num_peers));
386 GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id);
387 rh->ready_cb (rh->ready_cb_cls,
388 ntohl (msg->num_peers),
389 peers);
390} 312}
391 313
392 314
@@ -683,7 +605,6 @@ mq_error_handler (void *cls,
683 reconnect (h); 605 reconnect (h);
684 /* Resend all pending request as the service destroyed its knowledge 606 /* Resend all pending request as the service destroyed its knowledge
685 * about them */ 607 * about them */
686 resend_requests (h);
687} 608}
688 609
689 610
@@ -694,10 +615,6 @@ static void
694reconnect (struct GNUNET_RPS_Handle *h) 615reconnect (struct GNUNET_RPS_Handle *h)
695{ 616{
696 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 617 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
697 GNUNET_MQ_hd_var_size (reply,
698 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY,
699 struct GNUNET_RPS_CS_ReplyMessage,
700 h),
701 GNUNET_MQ_hd_var_size (view_update, 618 GNUNET_MQ_hd_var_size (view_update,
702 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 619 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
703 struct GNUNET_RPS_CS_DEBUG_ViewReply, 620 struct GNUNET_RPS_CS_DEBUG_ViewReply,
@@ -731,7 +648,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
731 struct GNUNET_RPS_Handle *h; 648 struct GNUNET_RPS_Handle *h;
732 649
733 h = GNUNET_new (struct GNUNET_RPS_Handle); 650 h = GNUNET_new (struct GNUNET_RPS_Handle);
734 h->current_request_id = 0;
735 h->cfg = cfg; 651 h->cfg = cfg;
736 reconnect (h); 652 reconnect (h);
737 if (NULL == h->mq) 653 if (NULL == h->mq)
@@ -739,7 +655,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
739 GNUNET_free (h); 655 GNUNET_free (h);
740 return NULL; 656 return NULL;
741 } 657 }
742 h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4);
743 return h; 658 return h;
744} 659}
745 660
@@ -754,84 +669,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
754 * @return a handle to cancel this request 669 * @return a handle to cancel this request
755 */ 670 */
756struct GNUNET_RPS_Request_Handle * 671struct GNUNET_RPS_Request_Handle *
757GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle,
758 uint32_t num_req_peers,
759 GNUNET_RPS_NotifyReadyCB ready_cb,
760 void *cls)
761{
762 struct GNUNET_RPS_Request_Handle *rh;
763
764 rh = new_request_handle (rps_handle,
765 num_req_peers,
766 NULL, /* no sampler needed */
767 ready_cb,
768 cls);
769
770 LOG (GNUNET_ERROR_TYPE_DEBUG,
771 "Requesting %" PRIu32 " peers with id %" PRIu32 "\n",
772 num_req_peers,
773 rh->id);
774
775 send_request (rps_handle, rh->id, num_req_peers);
776 return rh;
777}
778
779
780/**
781 * @brief Callback to collect the peers from the biased stream and put those
782 * into the sampler.
783 *
784 * @param cls The #GNUNET_RPS_Request_Handle
785 * @param num_peers The number of peer that have been returned
786 * @param peers The array of @a num_peers that have been returned
787 */
788void
789collect_peers_cb (void *cls,
790 uint64_t num_peers,
791 const struct GNUNET_PeerIdentity *peers)
792{
793 struct GNUNET_RPS_Request_Handle *rh = cls;
794
795 for (uint64_t i = 0; i < num_peers; i++)
796 {
797 RPS_sampler_update (rh->sampler, &peers[i]);
798 }
799}
800
801
802/**
803 * @brief Called once the sampler has collected all requested peers.
804 *
805 * Calls the callback provided by the client with the corresponding cls.
806 *
807 * @param peers The array of @a num_peers that has been returned.
808 * @param num_peers The number of peers that have been returned
809 * @param cls The #GNUNET_RPS_Request_Handle
810 */
811void
812peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
813 uint32_t num_peers,
814 void *cls)
815{
816 struct GNUNET_RPS_Request_Handle *rh = cls;
817
818 rh->ready_cb (rh->ready_cb_cls,
819 num_peers,
820 peers);
821 // TODO cleanup, sampler, rh, cancel stuff
822 // TODO screw this function. We can give the cb,cls directly to the sampler.
823}
824
825/**
826 * Request n random peers.
827 *
828 * @param rps_handle handle to the rps service
829 * @param num_req_peers number of peers we want to receive
830 * @param ready_cb the callback called when the peers are available
831 * @param cls closure given to the callback
832 * @return a handle to cancel this request
833 */
834struct GNUNET_RPS_Request_Handle *
835GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, 672GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
836 uint32_t num_req_peers, 673 uint32_t num_req_peers,
837 GNUNET_RPS_NotifyReadyCB ready_cb, 674 GNUNET_RPS_NotifyReadyCB ready_cb,
@@ -841,19 +678,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
841 678
842 rh = new_request_handle (rps_handle, 679 rh = new_request_handle (rps_handle,
843 num_req_peers, 680 num_req_peers,
844 RPS_sampler_mod_init (num_req_peers,
845 GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff
846 ready_cb, 681 ready_cb,
847 cls); 682 cls);
848 RPS_sampler_get_n_rand_peers (rh->sampler,
849 num_req_peers,
850 peers_ready_cb,
851 rh);
852 683
853 GNUNET_RPS_stream_request (rps_handle,
854 0, /* infinite updates */
855 collect_peers_cb,
856 rh); /* cls */
857 684
858 return rh; 685 return rh;
859} 686}
@@ -1022,20 +849,21 @@ void
1022GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) 849GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1023{ 850{
1024 struct GNUNET_RPS_Handle *h; 851 struct GNUNET_RPS_Handle *h;
1025 struct GNUNET_MQ_Envelope *ev;
1026 struct GNUNET_RPS_CS_RequestCancelMessage*msg;
1027
1028 LOG (GNUNET_ERROR_TYPE_DEBUG,
1029 "Cancelling request with id %" PRIu32 "\n",
1030 rh->id);
1031 852
1032 h = rh->rps_handle; 853 h = rh->rps_handle;
1033 GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, 854 if (NULL != rh->srh)
1034 rh->id)); 855 {
1035 GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); 856 remove_stream_request (rh->srh,
1036 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); 857 h->stream_requests_head,
1037 msg->id = htonl (rh->id); 858 h->stream_requests_tail);
1038 GNUNET_MQ_send (rh->rps_handle->mq, ev); 859 }
860 if (NULL == h->stream_requests_head) cancel_stream(h);
861 if (NULL != rh->sampler_rh)
862 {
863 RPS_sampler_request_cancel (rh->sampler_rh);
864 }
865 RPS_sampler_destroy (rh->sampler);
866 GNUNET_free (rh);
1039} 867}
1040 868
1041 869
@@ -1048,10 +876,11 @@ void
1048GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) 876GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1049{ 877{
1050 GNUNET_MQ_destroy (h->mq); 878 GNUNET_MQ_destroy (h->mq);
1051 if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) 879 if (NULL != h->stream_requests_head)
880 {
1052 LOG (GNUNET_ERROR_TYPE_WARNING, 881 LOG (GNUNET_ERROR_TYPE_WARNING,
1053 "Still waiting for requests\n"); 882 "Still waiting for requests\n");
1054 GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); 883 }
1055 GNUNET_free (h); 884 GNUNET_free (h);
1056} 885}
1057 886