diff options
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r-- | src/rps/gnunet-service-rps.c | 278 |
1 files changed, 198 insertions, 80 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 3e30041e8..5b78bb4a8 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -1769,6 +1769,12 @@ struct ClientContext | |||
1769 | int64_t view_updates_left; | 1769 | int64_t view_updates_left; |
1770 | 1770 | ||
1771 | /** | 1771 | /** |
1772 | * @brief How many peers from the biased | ||
1773 | * stream this client expects to receive. | ||
1774 | */ | ||
1775 | int64_t stream_peers_left; | ||
1776 | |||
1777 | /** | ||
1772 | * The client handle to send the reply to | 1778 | * The client handle to send the reply to |
1773 | */ | 1779 | */ |
1774 | struct GNUNET_SERVICE_Client *client; | 1780 | struct GNUNET_SERVICE_Client *client; |
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer) | |||
2174 | return ret; | 2180 | return ret; |
2175 | } | 2181 | } |
2176 | 2182 | ||
2183 | |||
2184 | /** | ||
2185 | * @brief Send view to client | ||
2186 | * | ||
2187 | * @param cli_ctx the context of the client | ||
2188 | * @param view_array the peerids of the view as array (can be empty) | ||
2189 | * @param view_size the size of the view array (can be 0) | ||
2190 | */ | ||
2191 | void | ||
2192 | send_view (const struct ClientContext *cli_ctx, | ||
2193 | const struct GNUNET_PeerIdentity *view_array, | ||
2194 | uint64_t view_size) | ||
2195 | { | ||
2196 | struct GNUNET_MQ_Envelope *ev; | ||
2197 | struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; | ||
2198 | |||
2199 | if (NULL == view_array) | ||
2200 | { | ||
2201 | view_size = View_size (); | ||
2202 | view_array = View_get_as_array(); | ||
2203 | } | ||
2204 | |||
2205 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
2206 | view_size * sizeof (struct GNUNET_PeerIdentity), | ||
2207 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); | ||
2208 | out_msg->num_peers = htonl (view_size); | ||
2209 | |||
2210 | GNUNET_memcpy (&out_msg[1], | ||
2211 | view_array, | ||
2212 | view_size * sizeof (struct GNUNET_PeerIdentity)); | ||
2213 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2214 | } | ||
2215 | |||
2216 | |||
2217 | /** | ||
2218 | * @brief Send peer from biased stream to client. | ||
2219 | * | ||
2220 | * @param cli_ctx the context of the client | ||
2221 | * @param view_array the peerids of the view as array (can be empty) | ||
2222 | * @param view_size the size of the view array (can be 0) | ||
2223 | */ | ||
2224 | void | ||
2225 | send_stream_peer (const struct ClientContext *cli_ctx, | ||
2226 | const struct GNUNET_PeerIdentity *peer) | ||
2227 | { | ||
2228 | struct GNUNET_MQ_Envelope *ev; | ||
2229 | struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; | ||
2230 | |||
2231 | GNUNET_assert (NULL != peer); | ||
2232 | |||
2233 | ev = GNUNET_MQ_msg (out_msg, | ||
2234 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY); | ||
2235 | |||
2236 | GNUNET_memcpy (&out_msg->peer, | ||
2237 | peer, | ||
2238 | sizeof (struct GNUNET_PeerIdentity)); | ||
2239 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2240 | } | ||
2241 | |||
2242 | |||
2177 | /** | 2243 | /** |
2178 | * @brief sends updates to clients that are interested | 2244 | * @brief sends updates to clients that are interested |
2179 | */ | 2245 | */ |
2180 | static void | 2246 | static void |
2181 | clients_notify_view_update (void); | 2247 | clients_notify_view_update (void) |
2248 | { | ||
2249 | struct ClientContext *cli_ctx_iter; | ||
2250 | uint64_t num_peers; | ||
2251 | const struct GNUNET_PeerIdentity *view_array; | ||
2252 | |||
2253 | num_peers = View_size (); | ||
2254 | view_array = View_get_as_array(); | ||
2255 | /* check size of view is small enough */ | ||
2256 | if (GNUNET_MAX_MESSAGE_SIZE < num_peers) | ||
2257 | { | ||
2258 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2259 | "View is too big to send\n"); | ||
2260 | return; | ||
2261 | } | ||
2262 | |||
2263 | for (cli_ctx_iter = cli_ctx_head; | ||
2264 | NULL != cli_ctx_iter; | ||
2265 | cli_ctx_iter = cli_ctx_head->next) | ||
2266 | { | ||
2267 | if (1 < cli_ctx_iter->view_updates_left) | ||
2268 | { | ||
2269 | /* Client wants to receive limited amount of updates */ | ||
2270 | cli_ctx_iter->view_updates_left -= 1; | ||
2271 | } else if (1 == cli_ctx_iter->view_updates_left) | ||
2272 | { | ||
2273 | /* Last update of view for client */ | ||
2274 | cli_ctx_iter->view_updates_left = -1; | ||
2275 | } else if (0 > cli_ctx_iter->view_updates_left) { | ||
2276 | /* Client is not interested in updates */ | ||
2277 | continue; | ||
2278 | } | ||
2279 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2280 | |||
2281 | /* send view */ | ||
2282 | send_view (cli_ctx_iter, view_array, num_peers); | ||
2283 | } | ||
2284 | } | ||
2285 | |||
2286 | |||
2287 | /** | ||
2288 | * @brief sends updates to clients that are interested | ||
2289 | */ | ||
2290 | static void | ||
2291 | clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer) | ||
2292 | //enum StreamPeerSource) | ||
2293 | { | ||
2294 | struct ClientContext *cli_ctx_iter; | ||
2295 | |||
2296 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2297 | "Got peer (%s) from biased stream - update all clients\n", | ||
2298 | GNUNET_i2s (peer)); | ||
2299 | |||
2300 | /* check size of view is small enough */ | ||
2301 | for (cli_ctx_iter = cli_ctx_head; | ||
2302 | NULL != cli_ctx_iter; | ||
2303 | cli_ctx_iter = cli_ctx_head->next) | ||
2304 | { | ||
2305 | if (1 < cli_ctx_iter->stream_peers_left) | ||
2306 | { | ||
2307 | /* Client wants to receive limited amount of updates */ | ||
2308 | cli_ctx_iter->stream_peers_left -= 1; | ||
2309 | } else if (1 == cli_ctx_iter->stream_peers_left) | ||
2310 | { | ||
2311 | /* Last update of view for client */ | ||
2312 | cli_ctx_iter->stream_peers_left = -1; | ||
2313 | } else if (0 > cli_ctx_iter->stream_peers_left) { | ||
2314 | /* Client is not interested in updates */ | ||
2315 | continue; | ||
2316 | } | ||
2317 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2318 | |||
2319 | /* send view */ | ||
2320 | send_stream_peer (cli_ctx_iter, peer); | ||
2321 | } | ||
2322 | } | ||
2182 | 2323 | ||
2183 | /** | 2324 | /** |
2184 | * Put random peer from sampler into the view as history update. | 2325 | * Put random peer from sampler into the view as history update. |
@@ -2193,7 +2334,12 @@ hist_update (void *cls, | |||
2193 | 2334 | ||
2194 | for (i = 0; i < num_peers; i++) | 2335 | for (i = 0; i < num_peers; i++) |
2195 | { | 2336 | { |
2196 | (void) insert_in_view (&ids[i]); | 2337 | int inserted; |
2338 | inserted = insert_in_view (&ids[i]); | ||
2339 | if (GNUNET_OK == inserted) | ||
2340 | { | ||
2341 | clients_notify_stream_peer (&ids[i]); | ||
2342 | } | ||
2197 | to_file (file_name_view_log, | 2343 | to_file (file_name_view_log, |
2198 | "+%s\t(hist)", | 2344 | "+%s\t(hist)", |
2199 | GNUNET_i2s_full (ids)); | 2345 | GNUNET_i2s_full (ids)); |
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls, | |||
2398 | const struct GNUNET_PeerIdentity *peer) | 2544 | const struct GNUNET_PeerIdentity *peer) |
2399 | { | 2545 | { |
2400 | (void) cls; | 2546 | (void) cls; |
2401 | (void) insert_in_view (peer); | 2547 | int inserted; |
2548 | |||
2549 | inserted = insert_in_view (peer); | ||
2550 | if (GNUNET_OK == inserted) | ||
2551 | { | ||
2552 | clients_notify_stream_peer (peer); | ||
2553 | } | ||
2402 | } | 2554 | } |
2403 | 2555 | ||
2404 | 2556 | ||
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls, | |||
2860 | GNUNET_SERVICE_client_continue (cli_ctx->client); | 3012 | GNUNET_SERVICE_client_continue (cli_ctx->client); |
2861 | } | 3013 | } |
2862 | 3014 | ||
2863 | /** | ||
2864 | * @brief Send view to client | ||
2865 | * | ||
2866 | * @param cli_ctx the context of the client | ||
2867 | * @param view_array the peerids of the view as array (can be empty) | ||
2868 | * @param view_size the size of the view array (can be 0) | ||
2869 | */ | ||
2870 | void | ||
2871 | send_view (const struct ClientContext *cli_ctx, | ||
2872 | const struct GNUNET_PeerIdentity *view_array, | ||
2873 | uint64_t view_size) | ||
2874 | { | ||
2875 | struct GNUNET_MQ_Envelope *ev; | ||
2876 | struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; | ||
2877 | |||
2878 | if (NULL == view_array) | ||
2879 | { | ||
2880 | view_size = View_size (); | ||
2881 | view_array = View_get_as_array(); | ||
2882 | } | ||
2883 | |||
2884 | ev = GNUNET_MQ_msg_extra (out_msg, | ||
2885 | view_size * sizeof (struct GNUNET_PeerIdentity), | ||
2886 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY); | ||
2887 | out_msg->num_peers = htonl (view_size); | ||
2888 | |||
2889 | GNUNET_memcpy (&out_msg[1], | ||
2890 | view_array, | ||
2891 | view_size * sizeof (struct GNUNET_PeerIdentity)); | ||
2892 | GNUNET_MQ_send (cli_ctx->mq, ev); | ||
2893 | } | ||
2894 | 3015 | ||
2895 | /** | 3016 | /** |
2896 | * @brief sends updates to clients that are interested | 3017 | * Handle RPS request from the client. |
3018 | * | ||
3019 | * @param cls closure | ||
3020 | * @param message the actual message | ||
2897 | */ | 3021 | */ |
2898 | static void | 3022 | static void |
2899 | clients_notify_view_update (void) | 3023 | handle_client_view_request (void *cls, |
3024 | const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) | ||
2900 | { | 3025 | { |
2901 | struct ClientContext *cli_ctx_iter; | 3026 | struct ClientContext *cli_ctx = cls; |
2902 | uint64_t num_peers; | 3027 | uint64_t num_updates; |
2903 | const struct GNUNET_PeerIdentity *view_array; | ||
2904 | 3028 | ||
2905 | num_peers = View_size (); | 3029 | num_updates = ntohl (msg->num_updates); |
2906 | view_array = View_get_as_array(); | ||
2907 | /* check size of view is small enough */ | ||
2908 | if (GNUNET_MAX_MESSAGE_SIZE < num_peers) | ||
2909 | { | ||
2910 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2911 | "View is too big to send\n"); | ||
2912 | return; | ||
2913 | } | ||
2914 | 3030 | ||
2915 | for (cli_ctx_iter = cli_ctx_head; | 3031 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2916 | NULL != cli_ctx_iter; | 3032 | "Client requested %" PRIu64 " updates of view.\n", |
2917 | cli_ctx_iter = cli_ctx_head->next) | 3033 | num_updates); |
2918 | { | ||
2919 | if (1 < cli_ctx_iter->view_updates_left) | ||
2920 | { | ||
2921 | /* Client wants to receive limited amount of updates */ | ||
2922 | cli_ctx_iter->view_updates_left -= 1; | ||
2923 | } else if (1 == cli_ctx_iter->view_updates_left) | ||
2924 | { | ||
2925 | /* Last update of view for client */ | ||
2926 | cli_ctx_iter->view_updates_left = -1; | ||
2927 | } else if (0 > cli_ctx_iter->view_updates_left) { | ||
2928 | /* Client is not interested in updates */ | ||
2929 | continue; | ||
2930 | } | ||
2931 | /* else _updates_left == 0 - infinite amount of updates */ | ||
2932 | 3034 | ||
2933 | /* send view */ | 3035 | GNUNET_assert (NULL != cli_ctx); |
2934 | send_view (cli_ctx_iter, view_array, num_peers); | 3036 | cli_ctx->view_updates_left = num_updates; |
2935 | } | 3037 | send_view (cli_ctx, NULL, 0); |
3038 | GNUNET_SERVICE_client_continue (cli_ctx->client); | ||
2936 | } | 3039 | } |
2937 | 3040 | ||
2938 | 3041 | ||
2939 | /** | 3042 | /** |
2940 | * Handle RPS request from the client. | 3043 | * Handle RPS request for biased stream from the client. |
2941 | * | 3044 | * |
2942 | * @param cls closure | 3045 | * @param cls closure |
2943 | * @param message the actual message | 3046 | * @param message the actual message |
2944 | */ | 3047 | */ |
2945 | static void | 3048 | static void |
2946 | handle_client_view_request (void *cls, | 3049 | handle_client_stream_request (void *cls, |
2947 | const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) | 3050 | const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) |
2948 | { | 3051 | { |
2949 | struct ClientContext *cli_ctx = cls; | 3052 | struct ClientContext *cli_ctx = cls; |
2950 | uint64_t num_updates; | 3053 | uint64_t num_peers; |
2951 | 3054 | ||
2952 | num_updates = ntohl (msg->num_updates); | 3055 | num_peers = ntohl (msg->num_peers); |
2953 | 3056 | ||
2954 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3057 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2955 | "Client requested %" PRIu64 " updates of view.\n", | 3058 | "Client requested %" PRIu64 " peers from biased stream.\n", |
2956 | num_updates); | 3059 | num_peers); |
2957 | 3060 | ||
2958 | GNUNET_assert (NULL != cli_ctx); | 3061 | GNUNET_assert (NULL != cli_ctx); |
2959 | cli_ctx->view_updates_left = num_updates; | 3062 | cli_ctx->stream_peers_left = num_peers; |
2960 | send_view (cli_ctx, NULL, 0); | ||
2961 | GNUNET_SERVICE_client_continue (cli_ctx->client); | 3063 | GNUNET_SERVICE_client_continue (cli_ctx->client); |
2962 | } | 3064 | } |
2963 | 3065 | ||
@@ -3727,8 +3829,14 @@ do_round (void *cls) | |||
3727 | CustomPeerMap_size (push_map)); | 3829 | CustomPeerMap_size (push_map)); |
3728 | for (i = 0; i < first_border; i++) | 3830 | for (i = 0; i < first_border; i++) |
3729 | { | 3831 | { |
3730 | (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, | 3832 | int inserted; |
3731 | permut[i])); | 3833 | inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map, |
3834 | permut[i])); | ||
3835 | if (GNUNET_OK == inserted) | ||
3836 | { | ||
3837 | clients_notify_stream_peer ( | ||
3838 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | ||
3839 | } | ||
3732 | to_file (file_name_view_log, | 3840 | to_file (file_name_view_log, |
3733 | "+%s\t(push list)", | 3841 | "+%s\t(push list)", |
3734 | GNUNET_i2s_full (&view_array[i])); | 3842 | GNUNET_i2s_full (&view_array[i])); |
@@ -3742,8 +3850,14 @@ do_round (void *cls) | |||
3742 | CustomPeerMap_size (pull_map)); | 3850 | CustomPeerMap_size (pull_map)); |
3743 | for (i = first_border; i < second_border; i++) | 3851 | for (i = first_border; i < second_border; i++) |
3744 | { | 3852 | { |
3745 | (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, | 3853 | int inserted; |
3854 | inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, | ||
3746 | permut[i - first_border])); | 3855 | permut[i - first_border])); |
3856 | if (GNUNET_OK == inserted) | ||
3857 | { | ||
3858 | clients_notify_stream_peer ( | ||
3859 | CustomPeerMap_get_peer_by_index (push_map, permut[i])); | ||
3860 | } | ||
3747 | to_file (file_name_view_log, | 3861 | to_file (file_name_view_log, |
3748 | "+%s\t(pull list)", | 3862 | "+%s\t(pull list)", |
3749 | GNUNET_i2s_full (&view_array[i])); | 3863 | GNUNET_i2s_full (&view_array[i])); |
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN | |||
4296 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, | 4410 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, |
4297 | struct GNUNET_RPS_CS_DEBUG_ViewRequest, | 4411 | struct GNUNET_RPS_CS_DEBUG_ViewRequest, |
4298 | NULL), | 4412 | NULL), |
4413 | GNUNET_MQ_hd_fixed_size (client_stream_request, | ||
4414 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST, | ||
4415 | struct GNUNET_RPS_CS_DEBUG_StreamRequest, | ||
4416 | NULL), | ||
4299 | GNUNET_MQ_handler_end()); | 4417 | GNUNET_MQ_handler_end()); |
4300 | 4418 | ||
4301 | /* end of gnunet-service-rps.c */ | 4419 | /* end of gnunet-service-rps.c */ |