aboutsummaryrefslogtreecommitdiff
path: root/src/rps/gnunet-service-rps.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/gnunet-service-rps.c')
-rw-r--r--src/rps/gnunet-service-rps.c278
1 files changed, 198 insertions, 80 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3e30041e8..5b78bb4a8 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1769,6 +1769,12 @@ struct ClientContext
1769 int64_t view_updates_left; 1769 int64_t view_updates_left;
1770 1770
1771 /** 1771 /**
1772 * @brief How many peers from the biased
1773 * stream this client expects to receive.
1774 */
1775 int64_t stream_peers_left;
1776
1777 /**
1772 * The client handle to send the reply to 1778 * The client handle to send the reply to
1773 */ 1779 */
1774 struct GNUNET_SERVICE_Client *client; 1780 struct GNUNET_SERVICE_Client *client;
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
2174 return ret; 2180 return ret;
2175} 2181}
2176 2182
2183
2184/**
2185 * @brief Send view to client
2186 *
2187 * @param cli_ctx the context of the client
2188 * @param view_array the peerids of the view as array (can be empty)
2189 * @param view_size the size of the view array (can be 0)
2190 */
2191void
2192send_view (const struct ClientContext *cli_ctx,
2193 const struct GNUNET_PeerIdentity *view_array,
2194 uint64_t view_size)
2195{
2196 struct GNUNET_MQ_Envelope *ev;
2197 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2198
2199 if (NULL == view_array)
2200 {
2201 view_size = View_size ();
2202 view_array = View_get_as_array();
2203 }
2204
2205 ev = GNUNET_MQ_msg_extra (out_msg,
2206 view_size * sizeof (struct GNUNET_PeerIdentity),
2207 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2208 out_msg->num_peers = htonl (view_size);
2209
2210 GNUNET_memcpy (&out_msg[1],
2211 view_array,
2212 view_size * sizeof (struct GNUNET_PeerIdentity));
2213 GNUNET_MQ_send (cli_ctx->mq, ev);
2214}
2215
2216
2217/**
2218 * @brief Send peer from biased stream to client.
2219 *
2220 * @param cli_ctx the context of the client
2221 * @param view_array the peerids of the view as array (can be empty)
2222 * @param view_size the size of the view array (can be 0)
2223 */
2224void
2225send_stream_peer (const struct ClientContext *cli_ctx,
2226 const struct GNUNET_PeerIdentity *peer)
2227{
2228 struct GNUNET_MQ_Envelope *ev;
2229 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2230
2231 GNUNET_assert (NULL != peer);
2232
2233 ev = GNUNET_MQ_msg (out_msg,
2234 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2235
2236 GNUNET_memcpy (&out_msg->peer,
2237 peer,
2238 sizeof (struct GNUNET_PeerIdentity));
2239 GNUNET_MQ_send (cli_ctx->mq, ev);
2240}
2241
2242
2177/** 2243/**
2178 * @brief sends updates to clients that are interested 2244 * @brief sends updates to clients that are interested
2179 */ 2245 */
2180static void 2246static void
2181clients_notify_view_update (void); 2247clients_notify_view_update (void)
2248{
2249 struct ClientContext *cli_ctx_iter;
2250 uint64_t num_peers;
2251 const struct GNUNET_PeerIdentity *view_array;
2252
2253 num_peers = View_size ();
2254 view_array = View_get_as_array();
2255 /* check size of view is small enough */
2256 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2257 {
2258 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2259 "View is too big to send\n");
2260 return;
2261 }
2262
2263 for (cli_ctx_iter = cli_ctx_head;
2264 NULL != cli_ctx_iter;
2265 cli_ctx_iter = cli_ctx_head->next)
2266 {
2267 if (1 < cli_ctx_iter->view_updates_left)
2268 {
2269 /* Client wants to receive limited amount of updates */
2270 cli_ctx_iter->view_updates_left -= 1;
2271 } else if (1 == cli_ctx_iter->view_updates_left)
2272 {
2273 /* Last update of view for client */
2274 cli_ctx_iter->view_updates_left = -1;
2275 } else if (0 > cli_ctx_iter->view_updates_left) {
2276 /* Client is not interested in updates */
2277 continue;
2278 }
2279 /* else _updates_left == 0 - infinite amount of updates */
2280
2281 /* send view */
2282 send_view (cli_ctx_iter, view_array, num_peers);
2283 }
2284}
2285
2286
2287/**
2288 * @brief sends updates to clients that are interested
2289 */
2290static void
2291clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
2292 //enum StreamPeerSource)
2293{
2294 struct ClientContext *cli_ctx_iter;
2295
2296 LOG (GNUNET_ERROR_TYPE_DEBUG,
2297 "Got peer (%s) from biased stream - update all clients\n",
2298 GNUNET_i2s (peer));
2299
2300 /* check size of view is small enough */
2301 for (cli_ctx_iter = cli_ctx_head;
2302 NULL != cli_ctx_iter;
2303 cli_ctx_iter = cli_ctx_head->next)
2304 {
2305 if (1 < cli_ctx_iter->stream_peers_left)
2306 {
2307 /* Client wants to receive limited amount of updates */
2308 cli_ctx_iter->stream_peers_left -= 1;
2309 } else if (1 == cli_ctx_iter->stream_peers_left)
2310 {
2311 /* Last update of view for client */
2312 cli_ctx_iter->stream_peers_left = -1;
2313 } else if (0 > cli_ctx_iter->stream_peers_left) {
2314 /* Client is not interested in updates */
2315 continue;
2316 }
2317 /* else _updates_left == 0 - infinite amount of updates */
2318
2319 /* send view */
2320 send_stream_peer (cli_ctx_iter, peer);
2321 }
2322}
2182 2323
2183/** 2324/**
2184 * Put random peer from sampler into the view as history update. 2325 * Put random peer from sampler into the view as history update.
@@ -2193,7 +2334,12 @@ hist_update (void *cls,
2193 2334
2194 for (i = 0; i < num_peers; i++) 2335 for (i = 0; i < num_peers; i++)
2195 { 2336 {
2196 (void) insert_in_view (&ids[i]); 2337 int inserted;
2338 inserted = insert_in_view (&ids[i]);
2339 if (GNUNET_OK == inserted)
2340 {
2341 clients_notify_stream_peer (&ids[i]);
2342 }
2197 to_file (file_name_view_log, 2343 to_file (file_name_view_log,
2198 "+%s\t(hist)", 2344 "+%s\t(hist)",
2199 GNUNET_i2s_full (ids)); 2345 GNUNET_i2s_full (ids));
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls,
2398 const struct GNUNET_PeerIdentity *peer) 2544 const struct GNUNET_PeerIdentity *peer)
2399{ 2545{
2400 (void) cls; 2546 (void) cls;
2401 (void) insert_in_view (peer); 2547 int inserted;
2548
2549 inserted = insert_in_view (peer);
2550 if (GNUNET_OK == inserted)
2551 {
2552 clients_notify_stream_peer (peer);
2553 }
2402} 2554}
2403 2555
2404 2556
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls,
2860 GNUNET_SERVICE_client_continue (cli_ctx->client); 3012 GNUNET_SERVICE_client_continue (cli_ctx->client);
2861} 3013}
2862 3014
2863/**
2864 * @brief Send view to client
2865 *
2866 * @param cli_ctx the context of the client
2867 * @param view_array the peerids of the view as array (can be empty)
2868 * @param view_size the size of the view array (can be 0)
2869 */
2870void
2871send_view (const struct ClientContext *cli_ctx,
2872 const struct GNUNET_PeerIdentity *view_array,
2873 uint64_t view_size)
2874{
2875 struct GNUNET_MQ_Envelope *ev;
2876 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2877
2878 if (NULL == view_array)
2879 {
2880 view_size = View_size ();
2881 view_array = View_get_as_array();
2882 }
2883
2884 ev = GNUNET_MQ_msg_extra (out_msg,
2885 view_size * sizeof (struct GNUNET_PeerIdentity),
2886 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2887 out_msg->num_peers = htonl (view_size);
2888
2889 GNUNET_memcpy (&out_msg[1],
2890 view_array,
2891 view_size * sizeof (struct GNUNET_PeerIdentity));
2892 GNUNET_MQ_send (cli_ctx->mq, ev);
2893}
2894 3015
2895/** 3016/**
2896 * @brief sends updates to clients that are interested 3017 * Handle RPS request from the client.
3018 *
3019 * @param cls closure
3020 * @param message the actual message
2897 */ 3021 */
2898static void 3022static void
2899clients_notify_view_update (void) 3023handle_client_view_request (void *cls,
3024 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2900{ 3025{
2901 struct ClientContext *cli_ctx_iter; 3026 struct ClientContext *cli_ctx = cls;
2902 uint64_t num_peers; 3027 uint64_t num_updates;
2903 const struct GNUNET_PeerIdentity *view_array;
2904 3028
2905 num_peers = View_size (); 3029 num_updates = ntohl (msg->num_updates);
2906 view_array = View_get_as_array();
2907 /* check size of view is small enough */
2908 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2909 {
2910 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2911 "View is too big to send\n");
2912 return;
2913 }
2914 3030
2915 for (cli_ctx_iter = cli_ctx_head; 3031 LOG (GNUNET_ERROR_TYPE_DEBUG,
2916 NULL != cli_ctx_iter; 3032 "Client requested %" PRIu64 " updates of view.\n",
2917 cli_ctx_iter = cli_ctx_head->next) 3033 num_updates);
2918 {
2919 if (1 < cli_ctx_iter->view_updates_left)
2920 {
2921 /* Client wants to receive limited amount of updates */
2922 cli_ctx_iter->view_updates_left -= 1;
2923 } else if (1 == cli_ctx_iter->view_updates_left)
2924 {
2925 /* Last update of view for client */
2926 cli_ctx_iter->view_updates_left = -1;
2927 } else if (0 > cli_ctx_iter->view_updates_left) {
2928 /* Client is not interested in updates */
2929 continue;
2930 }
2931 /* else _updates_left == 0 - infinite amount of updates */
2932 3034
2933 /* send view */ 3035 GNUNET_assert (NULL != cli_ctx);
2934 send_view (cli_ctx_iter, view_array, num_peers); 3036 cli_ctx->view_updates_left = num_updates;
2935 } 3037 send_view (cli_ctx, NULL, 0);
3038 GNUNET_SERVICE_client_continue (cli_ctx->client);
2936} 3039}
2937 3040
2938 3041
2939/** 3042/**
2940 * Handle RPS request from the client. 3043 * Handle RPS request for biased stream from the client.
2941 * 3044 *
2942 * @param cls closure 3045 * @param cls closure
2943 * @param message the actual message 3046 * @param message the actual message
2944 */ 3047 */
2945static void 3048static void
2946handle_client_view_request (void *cls, 3049handle_client_stream_request (void *cls,
2947 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg) 3050 const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
2948{ 3051{
2949 struct ClientContext *cli_ctx = cls; 3052 struct ClientContext *cli_ctx = cls;
2950 uint64_t num_updates; 3053 uint64_t num_peers;
2951 3054
2952 num_updates = ntohl (msg->num_updates); 3055 num_peers = ntohl (msg->num_peers);
2953 3056
2954 LOG (GNUNET_ERROR_TYPE_DEBUG, 3057 LOG (GNUNET_ERROR_TYPE_DEBUG,
2955 "Client requested %" PRIu64 " updates of view.\n", 3058 "Client requested %" PRIu64 " peers from biased stream.\n",
2956 num_updates); 3059 num_peers);
2957 3060
2958 GNUNET_assert (NULL != cli_ctx); 3061 GNUNET_assert (NULL != cli_ctx);
2959 cli_ctx->view_updates_left = num_updates; 3062 cli_ctx->stream_peers_left = num_peers;
2960 send_view (cli_ctx, NULL, 0);
2961 GNUNET_SERVICE_client_continue (cli_ctx->client); 3063 GNUNET_SERVICE_client_continue (cli_ctx->client);
2962} 3064}
2963 3065
@@ -3727,8 +3829,14 @@ do_round (void *cls)
3727 CustomPeerMap_size (push_map)); 3829 CustomPeerMap_size (push_map));
3728 for (i = 0; i < first_border; i++) 3830 for (i = 0; i < first_border; i++)
3729 { 3831 {
3730 (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map, 3832 int inserted;
3731 permut[i])); 3833 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3834 permut[i]));
3835 if (GNUNET_OK == inserted)
3836 {
3837 clients_notify_stream_peer (
3838 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3839 }
3732 to_file (file_name_view_log, 3840 to_file (file_name_view_log,
3733 "+%s\t(push list)", 3841 "+%s\t(push list)",
3734 GNUNET_i2s_full (&view_array[i])); 3842 GNUNET_i2s_full (&view_array[i]));
@@ -3742,8 +3850,14 @@ do_round (void *cls)
3742 CustomPeerMap_size (pull_map)); 3850 CustomPeerMap_size (pull_map));
3743 for (i = first_border; i < second_border; i++) 3851 for (i = first_border; i < second_border; i++)
3744 { 3852 {
3745 (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map, 3853 int inserted;
3854 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3746 permut[i - first_border])); 3855 permut[i - first_border]));
3856 if (GNUNET_OK == inserted)
3857 {
3858 clients_notify_stream_peer (
3859 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3860 }
3747 to_file (file_name_view_log, 3861 to_file (file_name_view_log,
3748 "+%s\t(pull list)", 3862 "+%s\t(pull list)",
3749 GNUNET_i2s_full (&view_array[i])); 3863 GNUNET_i2s_full (&view_array[i]));
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN
4296 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST, 4410 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4297 struct GNUNET_RPS_CS_DEBUG_ViewRequest, 4411 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4298 NULL), 4412 NULL),
4413 GNUNET_MQ_hd_fixed_size (client_stream_request,
4414 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4415 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4416 NULL),
4299 GNUNET_MQ_handler_end()); 4417 GNUNET_MQ_handler_end());
4300 4418
4301/* end of gnunet-service-rps.c */ 4419/* end of gnunet-service-rps.c */