aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/Makefile.am6
-rw-r--r--src/rps/gnunet-service-rps.c206
-rw-r--r--src/rps/gnunet-service-rps_sampler.c170
-rw-r--r--src/rps/gnunet-service-rps_sampler.h15
-rw-r--r--src/rps/rps.h16
-rw-r--r--src/rps/rps_api.c25
-rw-r--r--src/rps/test_rps.c606
7 files changed, 716 insertions, 328 deletions
diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am
index b1ebffee4..c07434a0f 100644
--- a/src/rps/Makefile.am
+++ b/src/rps/Makefile.am
@@ -72,7 +72,8 @@ check_PROGRAMS = \
72 test_rps_malicious_2 \ 72 test_rps_malicious_2 \
73 test_rps_malicious_3 \ 73 test_rps_malicious_3 \
74 test_rps_seed_request \ 74 test_rps_seed_request \
75 test_rps_single_req 75 test_rps_single_req \
76 test_rps_req_cancel
76endif 77endif
77 78
78ld_rps_test_lib = \ 79ld_rps_test_lib = \
@@ -106,6 +107,9 @@ test_rps_single_req_LDADD = $(ld_rps_test_lib)
106test_rps_seed_request_SOURCES = $(rps_test_src) 107test_rps_seed_request_SOURCES = $(rps_test_src)
107test_rps_seed_request_LDADD = $(ld_rps_test_lib) 108test_rps_seed_request_LDADD = $(ld_rps_test_lib)
108 109
110test_rps_req_cancel_SOURCES = $(rps_test_src)
111test_rps_req_cancel_LDADD = $(ld_rps_test_lib)
112
109gnunet_rps_profiler_SOURCES = $(rps_test_src) 113gnunet_rps_profiler_SOURCES = $(rps_test_src)
110gnunet_rps_profiler_LDADD = $(ld_rps_test_lib) 114gnunet_rps_profiler_LDADD = $(ld_rps_test_lib)
111 115
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3c98d79da..8c1a1dc12 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -75,22 +75,73 @@ get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list, unsigned
75***********************************************************************/ 75***********************************************************************/
76 76
77/** 77/**
78 * Closure used to pass the client and the id to the callback
79 * that replies to a client's request
80 */
81struct ReplyCls
82{
83 /**
84 * DLL
85 */
86 struct ReplyCls *next;
87 struct ReplyCls *prev;
88
89 /**
90 * The identifier of the request
91 */
92 uint32_t id;
93
94 /**
95 * The handle to the request
96 */
97 struct RPS_SamplerRequestHandle *req_handle;
98
99 /**
100 * The client handle to send the reply to
101 */
102 struct GNUNET_SERVER_Client *client;
103};
104
105
106/**
78 * Struct used to store the context of a connected client. 107 * Struct used to store the context of a connected client.
79 */ 108 */
80struct ClientContext 109struct ClientContext
81{ 110{
82 /** 111 /**
112 * DLL
113 */
114 struct ClientContext *next;
115 struct ClientContext *prev;
116
117 /**
83 * The message queue to communicate with the client. 118 * The message queue to communicate with the client.
84 */ 119 */
85 struct GNUNET_MQ_Handle *mq; 120 struct GNUNET_MQ_Handle *mq;
121
122 /**
123 * DLL with handles to single requests from the client
124 */
125 struct ReplyCls *rep_cls_head;
126 struct ReplyCls *rep_cls_tail;
86}; 127};
87 128
88/** 129/**
130 * DLL with all clients currently connected to us
131 */
132struct ClientContext *cli_ctx_head;
133struct ClientContext *cli_ctx_tail;
134
135/**
89 * Used to keep track in what lists single peerIDs are. 136 * Used to keep track in what lists single peerIDs are.
90 */ 137 */
91enum PeerFlags 138enum PeerFlags
92{ 139{
140 /**
141 * If we are waiting for a reply from that peer (sent a pull request).
142 */
93 PULL_REPLY_PENDING = 0x01, 143 PULL_REPLY_PENDING = 0x01,
144
94 IN_OTHER_GOSSIP_LIST = 0x02, // unneeded? 145 IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
95 IN_OWN_SAMPLER_LIST = 0x04, // unneeded? 146 IN_OWN_SAMPLER_LIST = 0x04, // unneeded?
96 IN_OWN_GOSSIP_LIST = 0x08, // unneeded? 147 IN_OWN_GOSSIP_LIST = 0x08, // unneeded?
@@ -365,24 +416,6 @@ static struct GNUNET_TIME_Relative request_rate;
365uint32_t num_hist_update_tasks; 416uint32_t num_hist_update_tasks;
366 417
367 418
368/**
369 * Closure used to pass the client and the id to the callback
370 * that replies to a client's request
371 */
372struct ReplyCls
373{
374 /**
375 * The identifier of the request
376 */
377 uint32_t id;
378
379 /**
380 * The client handle to send the reply to
381 */
382 struct GNUNET_SERVER_Client *client;
383};
384
385
386#ifdef ENABLE_MALICIOUS 419#ifdef ENABLE_MALICIOUS
387/** 420/**
388 * Type of malicious peer 421 * Type of malicious peer
@@ -1234,8 +1267,36 @@ new_peer_id (const struct GNUNET_PeerIdentity *peer_id)
1234 * /Util functions 1267 * /Util functions
1235***********************************************************************/ 1268***********************************************************************/
1236 1269
1270static void
1271destroy_reply_cls (struct ReplyCls *rep_cls)
1272{
1273 struct ClientContext *cli_ctx;
1237 1274
1275 cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client,
1276 struct ClientContext);
1277 GNUNET_assert (NULL != cli_ctx);
1278 GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
1279 cli_ctx->rep_cls_tail,
1280 rep_cls);
1281 GNUNET_free (rep_cls);
1282}
1238 1283
1284static void
1285destroy_cli_ctx (struct ClientContext *cli_ctx)
1286{
1287 GNUNET_assert (NULL != cli_ctx);
1288 if (NULL != cli_ctx->rep_cls_head)
1289 {
1290 LOG (GNUNET_ERROR_TYPE_WARNING,
1291 "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
1292 while (NULL != cli_ctx->rep_cls_head)
1293 destroy_reply_cls (cli_ctx->rep_cls_head);
1294 }
1295 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
1296 cli_ctx_tail,
1297 cli_ctx);
1298 GNUNET_free (cli_ctx);
1299}
1239 1300
1240 1301
1241/** 1302/**
@@ -1316,15 +1377,10 @@ void client_respond (void *cls,
1316 num_peers * sizeof (struct GNUNET_PeerIdentity)); 1377 num_peers * sizeof (struct GNUNET_PeerIdentity));
1317 GNUNET_free (peer_ids); 1378 GNUNET_free (peer_ids);
1318 1379
1319 cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client, struct ClientContext); 1380 cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client,
1320 if (NULL == cli_ctx) { 1381 struct ClientContext);
1321 cli_ctx = GNUNET_new (struct ClientContext); 1382 GNUNET_assert (NULL != cli_ctx);
1322 cli_ctx->mq = GNUNET_MQ_queue_for_server_client (reply_cls->client); 1383 destroy_reply_cls (reply_cls);
1323 GNUNET_SERVER_client_set_user_context (reply_cls->client, cli_ctx);
1324 }
1325
1326 GNUNET_free (reply_cls);
1327
1328 GNUNET_MQ_send (cli_ctx->mq, ev); 1384 GNUNET_MQ_send (cli_ctx->mq, ev);
1329} 1385}
1330 1386
@@ -1346,6 +1402,7 @@ handle_client_request (void *cls,
1346 uint32_t size_needed; 1402 uint32_t size_needed;
1347 struct ReplyCls *reply_cls; 1403 struct ReplyCls *reply_cls;
1348 uint32_t i; 1404 uint32_t i;
1405 struct ClientContext *cli_ctx;
1349 1406
1350 msg = (struct GNUNET_RPS_CS_RequestMessage *) message; 1407 msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
1351 1408
@@ -1371,12 +1428,50 @@ handle_client_request (void *cls,
1371 reply_cls = GNUNET_new (struct ReplyCls); 1428 reply_cls = GNUNET_new (struct ReplyCls);
1372 reply_cls->id = ntohl (msg->id); 1429 reply_cls->id = ntohl (msg->id);
1373 reply_cls->client = client; 1430 reply_cls->client = client;
1431 reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1432 client_respond,
1433 reply_cls,
1434 num_peers);
1435
1436 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1437 GNUNET_assert (NULL != cli_ctx);
1438 GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1439 cli_ctx->rep_cls_tail,
1440 reply_cls);
1441 GNUNET_SERVER_receive_done (client,
1442 GNUNET_OK);
1443}
1374 1444
1375 RPS_sampler_get_n_rand_peers (client_sampler,
1376 client_respond,
1377 reply_cls,
1378 num_peers);
1379 1445
1446/**
1447 * @brief Handle a message that requests the cancellation of a request
1448 *
1449 * @param cls unused
1450 * @param client the client that requests the cancellation
1451 * @param message the message containing the id of the request
1452 */
1453static void
1454handle_client_request_cancel (void *cls,
1455 struct GNUNET_SERVER_Client *client,
1456 const struct GNUNET_MessageHeader *message)
1457{
1458 struct GNUNET_RPS_CS_RequestCancelMessage *msg =
1459 (struct GNUNET_RPS_CS_RequestCancelMessage *) message;
1460 struct ClientContext *cli_ctx;
1461 struct ReplyCls *rep_cls;
1462
1463 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1464 GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1465 rep_cls = cli_ctx->rep_cls_head;
1466 LOG (GNUNET_ERROR_TYPE_DEBUG,
1467 "Client cancels request with id %lu\n",
1468 ntohl (msg->id));
1469 while ( (NULL != rep_cls->next) &&
1470 (rep_cls->id != ntohl (msg->id)) )
1471 rep_cls = rep_cls->next;
1472 GNUNET_assert (rep_cls->id == ntohl (msg->id));
1473 RPS_sampler_request_cancel (rep_cls->req_handle);
1474 destroy_reply_cls (rep_cls);
1380 GNUNET_SERVER_receive_done (client, 1475 GNUNET_SERVER_receive_done (client,
1381 GNUNET_OK); 1476 GNUNET_OK);
1382} 1477}
@@ -2584,6 +2679,30 @@ shutdown_task (void *cls,
2584 2679
2585 2680
2586/** 2681/**
2682 * @brief Get informed about a connecting client.
2683 *
2684 * @param cls unused
2685 * @param client the client that connects
2686 */
2687static void
2688handle_client_connect (void *cls,
2689 struct GNUNET_SERVER_Client *client)
2690{
2691 struct ClientContext *cli_ctx;
2692
2693 LOG (GNUNET_ERROR_TYPE_DEBUG,
2694 "Client connected\n");
2695 if (NULL == client)
2696 return; /* Server was destroyed before a client connected. Shutting down */
2697 cli_ctx = GNUNET_new (struct ClientContext);
2698 cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
2699 GNUNET_SERVER_client_set_user_context (client, cli_ctx);
2700 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2701 cli_ctx_tail,
2702 cli_ctx);
2703}
2704
2705/**
2587 * A client disconnected. Remove all of its data structure entries. 2706 * A client disconnected. Remove all of its data structure entries.
2588 * 2707 *
2589 * @param cls closure, NULL 2708 * @param cls closure, NULL
@@ -2591,8 +2710,20 @@ shutdown_task (void *cls,
2591 */ 2710 */
2592static void 2711static void
2593handle_client_disconnect (void *cls, 2712handle_client_disconnect (void *cls,
2594 struct GNUNET_SERVER_Client * client) 2713 struct GNUNET_SERVER_Client *client)
2595{ 2714{
2715 struct ClientContext *cli_ctx;
2716
2717 if (NULL == client)
2718 {/* shutdown task */
2719 while (NULL != cli_ctx_head)
2720 destroy_cli_ctx (cli_ctx_head);
2721 }
2722 else
2723 {
2724 cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
2725 destroy_cli_ctx (cli_ctx);
2726 }
2596} 2727}
2597 2728
2598 2729
@@ -2716,16 +2847,21 @@ cleanup_channel (void *cls,
2716rps_start (struct GNUNET_SERVER_Handle *server) 2847rps_start (struct GNUNET_SERVER_Handle *server)
2717{ 2848{
2718 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 2849 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2719 {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 2850 {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2720 sizeof (struct GNUNET_RPS_CS_RequestMessage)}, 2851 sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2721 {&handle_client_seed, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0}, 2852 {&handle_client_request_cancel, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2853 sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)},
2854 {&handle_client_seed, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2722 #ifdef ENABLE_MALICIOUS 2855 #ifdef ENABLE_MALICIOUS
2723 {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0}, 2856 {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2724 #endif /* ENABLE_MALICIOUS */ 2857 #endif /* ENABLE_MALICIOUS */
2725 {NULL, NULL, 0, 0} 2858 {NULL, NULL, 0, 0}
2726 }; 2859 };
2727 2860
2728 GNUNET_SERVER_add_handlers (server, handlers); 2861 GNUNET_SERVER_add_handlers (server, handlers);
2862 GNUNET_SERVER_connect_notify (server,
2863 &handle_client_connect,
2864 NULL);
2729 GNUNET_SERVER_disconnect_notify (server, 2865 GNUNET_SERVER_disconnect_notify (server,
2730 &handle_client_disconnect, 2866 &handle_client_disconnect,
2731 NULL); 2867 NULL);
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 6857c80ba..b65dd7c47 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -73,16 +73,12 @@ struct GetPeerCls
73 * DLL 73 * DLL
74 */ 74 */
75 struct GetPeerCls *next; 75 struct GetPeerCls *next;
76
77 /**
78 * DLL
79 */
80 struct GetPeerCls *prev; 76 struct GetPeerCls *prev;
81 77
82 /** 78 /**
83 * The sampler this function operates on. 79 * The #RPS_SamplerRequestHandle this single request belongs to.
84 */ 80 */
85 struct RPS_Sampler *sampler; 81 struct RPS_SamplerRequestHandle *req_handle;
86 82
87 /** 83 /**
88 * The task for this function. 84 * The task for this function.
@@ -166,14 +162,10 @@ struct RPS_Sampler
166 RPS_get_peers_type get_peers; 162 RPS_get_peers_type get_peers;
167 163
168 /** 164 /**
169 * Head for the DLL to store the closures to pending requests. 165 * Head and tail for the DLL to store the #RPS_SamplerRequestHandle
170 */ 166 */
171 struct GetPeerCls *gpc_head; 167 struct RPS_SamplerRequestHandle *req_handle_head;
172 168 struct RPS_SamplerRequestHandle *req_handle_tail;
173 /**
174 * Tail for the DLL to store the closures to pending requests.
175 */
176 struct GetPeerCls *gpc_tail;
177 169
178 #ifdef TO_FILE 170 #ifdef TO_FILE
179 /** 171 /**
@@ -186,9 +178,15 @@ struct RPS_Sampler
186/** 178/**
187 * Closure to _get_n_rand_peers_ready_cb() 179 * Closure to _get_n_rand_peers_ready_cb()
188 */ 180 */
189struct NRandPeersReadyCls 181struct RPS_SamplerRequestHandle
190{ 182{
191 /** 183 /**
184 * DLL
185 */
186 struct RPS_SamplerRequestHandle *next;
187 struct RPS_SamplerRequestHandle *prev;
188
189 /**
192 * Number of peers we are waiting for. 190 * Number of peers we are waiting for.
193 */ 191 */
194 uint32_t num_peers; 192 uint32_t num_peers;
@@ -204,6 +202,17 @@ struct NRandPeersReadyCls
204 struct GNUNET_PeerIdentity *ids; 202 struct GNUNET_PeerIdentity *ids;
205 203
206 /** 204 /**
205 * Head and tail for the DLL to store the tasks for single requests
206 */
207 struct GetPeerCls *gpc_head;
208 struct GetPeerCls *gpc_tail;
209
210 /**
211 * Sampler.
212 */
213 struct RPS_Sampler *sampler;
214
215 /**
207 * Callback to be called when all ids are available. 216 * Callback to be called when all ids are available.
208 */ 217 */
209 RPS_sampler_n_rand_peers_ready_cb callback; 218 RPS_sampler_n_rand_peers_ready_cb callback;
@@ -251,23 +260,23 @@ static void
251check_n_peers_ready (void *cls, 260check_n_peers_ready (void *cls,
252 const struct GNUNET_PeerIdentity *id) 261 const struct GNUNET_PeerIdentity *id)
253{ 262{
254 struct NRandPeersReadyCls *n_peers_cls = cls; 263 struct RPS_SamplerRequestHandle *req_handle = cls;
255 264
256 n_peers_cls->cur_num_peers++; 265 req_handle->cur_num_peers++;
257 LOG (GNUNET_ERROR_TYPE_DEBUG, 266 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Got %" PRIX32 ". of %" PRIX32 " peers\n", 267 "Got %" PRIX32 ". of %" PRIX32 " peers\n",
259 n_peers_cls->cur_num_peers, n_peers_cls->num_peers); 268 req_handle->cur_num_peers, req_handle->num_peers);
260 269
261 if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) 270 if (req_handle->num_peers == req_handle->cur_num_peers)
262 { /* All peers are ready -- return those to the client */ 271 { /* All peers are ready -- return those to the client */
263 GNUNET_assert (NULL != n_peers_cls->callback); 272 GNUNET_assert (NULL != req_handle->callback);
264 273
265 LOG (GNUNET_ERROR_TYPE_DEBUG, 274 LOG (GNUNET_ERROR_TYPE_DEBUG,
266 "returning %" PRIX32 " peers to the client\n", 275 "returning %" PRIX32 " peers to the client\n",
267 n_peers_cls->num_peers); 276 req_handle->num_peers);
268 n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); 277 req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers);
269 278
270 GNUNET_free (n_peers_cls); 279 RPS_sampler_request_cancel (req_handle);
271 } 280 }
272} 281}
273 282
@@ -420,12 +429,8 @@ RPS_sampler_init (size_t init_size,
420 sampler->file_name); 429 sampler->file_name);
421 #endif /* TO_FILE */ 430 #endif /* TO_FILE */
422 431
423 sampler->sampler_size = 0;
424 sampler->sampler_elements = NULL;
425 sampler->max_round_interval = max_round_interval; 432 sampler->max_round_interval = max_round_interval;
426 sampler->get_peers = sampler_get_rand_peer; 433 sampler->get_peers = sampler_get_rand_peer;
427 sampler->gpc_head = NULL;
428 sampler->gpc_tail = NULL;
429 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); 434 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
430 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); 435 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
431 RPS_sampler_resize (sampler, init_size); 436 RPS_sampler_resize (sampler, init_size);
@@ -530,19 +535,21 @@ sampler_get_rand_peer (void *cls,
530{ 535{
531 struct GetPeerCls *gpc = cls; 536 struct GetPeerCls *gpc = cls;
532 uint32_t r_index; 537 uint32_t r_index;
538 struct RPS_Sampler *sampler;
533 539
534 gpc->get_peer_task = NULL; 540 gpc->get_peer_task = NULL;
535 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 541 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
536 return; 542 return;
543 sampler = gpc->req_handle->sampler;
537 544
538 /**; 545 /**;
539 * Choose the r_index of the peer we want to return 546 * Choose the r_index of the peer we want to return
540 * at random from the interval of the gossip list 547 * at random from the interval of the gossip list
541 */ 548 */
542 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, 549 r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
543 gpc->sampler->sampler_size); 550 sampler->sampler_size);
544 551
545 if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty) 552 if (EMPTY == sampler->sampler_elements[r_index]->is_empty)
546 { 553 {
547 //LOG (GNUNET_ERROR_TYPE_DEBUG, 554 //LOG (GNUNET_ERROR_TYPE_DEBUG,
548 // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); 555 // "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
@@ -552,20 +559,18 @@ sampler_get_rand_peer (void *cls,
552 * Counter? 559 * Counter?
553 */ 560 */
554 gpc->get_peer_task = 561 gpc->get_peer_task =
555 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( 562 GNUNET_SCHEDULER_add_delayed (
556 GNUNET_TIME_UNIT_SECONDS, 0.1), 563 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
557 &sampler_get_rand_peer, 564 &sampler_get_rand_peer,
558 cls); 565 cls);
559 return; 566 return;
560 } 567 }
561 568
562 *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id; 569 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
563 570 gpc->req_handle->gpc_tail,
564 gpc->cont (gpc->cont_cls, gpc->id);
565
566 GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
567 gpc->sampler->gpc_tail,
568 gpc); 571 gpc);
572 *gpc->id = sampler->sampler_elements[r_index]->peer_id;
573 gpc->cont (gpc->cont_cls, gpc->id);
569 574
570 GNUNET_free (gpc); 575 GNUNET_free (gpc);
571} 576}
@@ -584,17 +589,19 @@ sampler_mod_get_rand_peer (void *cls,
584 struct GetPeerCls *gpc = cls; 589 struct GetPeerCls *gpc = cls;
585 struct RPS_SamplerElement *s_elem; 590 struct RPS_SamplerElement *s_elem;
586 struct GNUNET_TIME_Relative last_request_diff; 591 struct GNUNET_TIME_Relative last_request_diff;
592 struct RPS_Sampler *sampler;
587 593
588 gpc->get_peer_task = NULL; 594 gpc->get_peer_task = NULL;
589 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 595 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
590 return; 596 return;
597 sampler = gpc->req_handle->sampler;
591 598
592 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); 599 LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
593 600
594 /* Cycle the #client_get_index one step further */ 601 /* Cycle the #client_get_index one step further */
595 client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size; 602 client_get_index = (client_get_index + 1) % sampler->sampler_size;
596 603
597 s_elem = gpc->sampler->sampler_elements[client_get_index]; 604 s_elem = sampler->sampler_elements[client_get_index];
598 *gpc->id = s_elem->peer_id; 605 *gpc->id = s_elem->peer_id;
599 GNUNET_assert (NULL != s_elem); 606 GNUNET_assert (NULL != s_elem);
600 607
@@ -603,7 +610,7 @@ sampler_mod_get_rand_peer (void *cls,
603 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); 610 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n");
604 GNUNET_assert (NULL == gpc->get_peer_task); 611 GNUNET_assert (NULL == gpc->get_peer_task);
605 gpc->get_peer_task = 612 gpc->get_peer_task =
606 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, 613 GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
607 &sampler_mod_get_rand_peer, 614 &sampler_mod_get_rand_peer,
608 cls); 615 cls);
609 return; 616 return;
@@ -617,7 +624,7 @@ sampler_mod_get_rand_peer (void *cls,
617 GNUNET_TIME_absolute_get ()); 624 GNUNET_TIME_absolute_get ());
618 /* We're not going to give it back now if it was 625 /* We're not going to give it back now if it was
619 * already requested by a client this round */ 626 * already requested by a client this round */
620 if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us) 627 if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
621 { 628 {
622 LOG (GNUNET_ERROR_TYPE_DEBUG, 629 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); 630 "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
@@ -629,7 +636,7 @@ sampler_mod_get_rand_peer (void *cls,
629 /* Schedule it one round later */ 636 /* Schedule it one round later */
630 GNUNET_assert (NULL == gpc->get_peer_task); 637 GNUNET_assert (NULL == gpc->get_peer_task);
631 gpc->get_peer_task = 638 gpc->get_peer_task =
632 GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, 639 GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
633 &sampler_mod_get_rand_peer, 640 &sampler_mod_get_rand_peer,
634 cls); 641 cls);
635 return; 642 return;
@@ -639,8 +646,8 @@ sampler_mod_get_rand_peer (void *cls,
639 646
640 s_elem->last_client_request = GNUNET_TIME_absolute_get (); 647 s_elem->last_client_request = GNUNET_TIME_absolute_get ();
641 648
642 GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, 649 GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
643 gpc->sampler->gpc_tail, 650 gpc->req_handle->gpc_tail,
644 gpc); 651 gpc);
645 gpc->cont (gpc->cont_cls, gpc->id); 652 gpc->cont (gpc->cont_cls, gpc->id);
646 GNUNET_free (gpc); 653 GNUNET_free (gpc);
@@ -661,26 +668,30 @@ sampler_mod_get_rand_peer (void *cls,
661 * #GNUNET_NO if used internally 668 * #GNUNET_NO if used internally
662 * @param num_peers the number of peers requested 669 * @param num_peers the number of peers requested
663 */ 670 */
664 void 671struct RPS_SamplerRequestHandle *
665RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, 672RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
666 RPS_sampler_n_rand_peers_ready_cb cb, 673 RPS_sampler_n_rand_peers_ready_cb cb,
667 void *cls, uint32_t num_peers) 674 void *cls, uint32_t num_peers)
668{ 675{
669 GNUNET_assert (0 != sampler->sampler_size); 676 GNUNET_assert (0 != sampler->sampler_size);
670 if (0 == num_peers) 677 if (0 == num_peers)
671 return; 678 return NULL;
672 679
673 // TODO check if we have too much (distinct) sampled peers 680 // TODO check if we have too much (distinct) sampled peers
674 uint32_t i; 681 uint32_t i;
675 struct NRandPeersReadyCls *cb_cls; 682 struct RPS_SamplerRequestHandle *req_handle;
676 struct GetPeerCls *gpc; 683 struct GetPeerCls *gpc;
677 684
678 cb_cls = GNUNET_new (struct NRandPeersReadyCls); 685 req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
679 cb_cls->num_peers = num_peers; 686 req_handle->num_peers = num_peers;
680 cb_cls->cur_num_peers = 0; 687 req_handle->cur_num_peers = 0;
681 cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); 688 req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
682 cb_cls->callback = cb; 689 req_handle->sampler = sampler;
683 cb_cls->cls = cls; 690 req_handle->callback = cb;
691 req_handle->cls = cls;
692 GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
693 sampler->req_handle_tail,
694 req_handle);
684 695
685 LOG (GNUNET_ERROR_TYPE_DEBUG, 696 LOG (GNUNET_ERROR_TYPE_DEBUG,
686 "Scheduling requests for %" PRIu32 " peers\n", num_peers); 697 "Scheduling requests for %" PRIu32 " peers\n", num_peers);
@@ -688,18 +699,43 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
688 for (i = 0 ; i < num_peers ; i++) 699 for (i = 0 ; i < num_peers ; i++)
689 { 700 {
690 gpc = GNUNET_new (struct GetPeerCls); 701 gpc = GNUNET_new (struct GetPeerCls);
691 gpc->sampler = sampler; 702 gpc->req_handle = req_handle;
692 gpc->cont = check_n_peers_ready; 703 gpc->cont = check_n_peers_ready;
693 gpc->cont_cls = cb_cls; 704 gpc->cont_cls = req_handle;
694 gpc->id = &cb_cls->ids[i]; 705 gpc->id = &req_handle->ids[i];
695 706
707 GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
708 req_handle->gpc_tail,
709 gpc);
696 // maybe add a little delay 710 // maybe add a little delay
697 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); 711 gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc);
712 }
713 return req_handle;
714}
698 715
699 GNUNET_CONTAINER_DLL_insert (sampler->gpc_head, 716/**
700 sampler->gpc_tail, 717 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
701 gpc); 718 *
719 * @param req_handle the handle to the request
720 */
721void
722RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
723{
724 struct GetPeerCls *i;
725
726 while (NULL != (i = req_handle->gpc_head) )
727 {
728 GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
729 req_handle->gpc_tail,
730 i);
731 if (NULL != i->get_peer_task)
732 GNUNET_SCHEDULER_cancel (i->get_peer_task);
733 GNUNET_free (i);
702 } 734 }
735 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
736 req_handle->sampler->req_handle_tail,
737 req_handle);
738 GNUNET_free (req_handle);
703} 739}
704 740
705 741
@@ -735,17 +771,13 @@ RPS_sampler_count_id (struct RPS_Sampler *sampler,
735 void 771 void
736RPS_sampler_destroy (struct RPS_Sampler *sampler) 772RPS_sampler_destroy (struct RPS_Sampler *sampler)
737{ 773{
738 struct GetPeerCls *i; 774 if (NULL != sampler->req_handle_head)
739
740 for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head)
741 { 775 {
742 GNUNET_CONTAINER_DLL_remove (sampler->gpc_head, 776 LOG (GNUNET_ERROR_TYPE_WARNING,
743 sampler->gpc_tail, 777 "There are still pending requests. Going to remove them.\n");
744 i); 778 while (NULL != sampler->req_handle_head)
745 GNUNET_SCHEDULER_cancel (i->get_peer_task); 779 RPS_sampler_request_cancel (sampler->req_handle_head);
746 GNUNET_free (i);
747 } 780 }
748
749 sampler_empty (sampler); 781 sampler_empty (sampler);
750 GNUNET_free (sampler); 782 GNUNET_free (sampler);
751} 783}
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index 83705b013..f33e7430c 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -34,6 +34,11 @@
34 */ 34 */
35struct RPS_Sampler; 35struct RPS_Sampler;
36 36
37/**
38 * A handle to cancel a request.
39 */
40struct RPS_SamplerRequestHandle;
41
37 42
38/** 43/**
39 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. 44 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
@@ -130,11 +135,19 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
130 * #GNUNET_NO if used internally 135 * #GNUNET_NO if used internally
131 * @param num_peers the number of peers requested 136 * @param num_peers the number of peers requested
132 */ 137 */
133 void 138struct RPS_SamplerRequestHandle *
134RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, 139RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
135 RPS_sampler_n_rand_peers_ready_cb cb, 140 RPS_sampler_n_rand_peers_ready_cb cb,
136 void *cls, uint32_t num_peers); 141 void *cls, uint32_t num_peers);
137 142
143/**
144 * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
145 *
146 * @param req_handle the handle to the request
147 */
148void
149RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
150
138 151
139/** 152/**
140 * Counts how many Samplers currently hold a given PeerID. 153 * Counts how many Samplers currently hold a given PeerID.
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 9a16e7593..44b93e396 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -106,6 +106,22 @@ struct GNUNET_RPS_CS_ReplyMessage
106}; 106};
107 107
108/** 108/**
109 * Message from client to RPS service to cancel request.
110 */
111struct GNUNET_RPS_CS_RequestCancelMessage
112{
113 /**
114 * Header including size and type in NBO
115 */
116 struct GNUNET_MessageHeader header;
117
118 /**
119 * Identifyer of the message.
120 */
121 uint32_t id GNUNET_PACKED;
122};
123
124/**
109 * Message from client to service with seed of peers. 125 * Message from client to service with seed of peers.
110 */ 126 */
111struct GNUNET_RPS_CS_SeedMessage 127struct GNUNET_RPS_CS_SeedMessage
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 854ea25cf..42cec9761 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -339,7 +339,8 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
339GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, 339GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
340 uint32_t type, 340 uint32_t type,
341 uint32_t num_peers, 341 uint32_t num_peers,
342 const struct GNUNET_PeerIdentity *peer_ids) 342 const struct GNUNET_PeerIdentity *peer_ids,
343 const struct GNUNET_PeerIdentity *target_peer)
343{ 344{
344 size_t size_needed; 345 size_t size_needed;
345 uint32_t num_peers_max; 346 uint32_t num_peers_max;
@@ -379,8 +380,8 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
379 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 380 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
380 msg->type = htonl (type); 381 msg->type = htonl (type);
381 msg->num_peers = htonl (num_peers_max); 382 msg->num_peers = htonl (num_peers_max);
382 if (2 == type 383 if ( (2 == type) ||
383 || 3 == type) 384 (3 == type) )
384 msg->attacked_peer = peer_ids[num_peers]; 385 msg->attacked_peer = peer_ids[num_peers];
385 memcpy (&msg[1], 386 memcpy (&msg[1],
386 tmp_peer_pointer, 387 tmp_peer_pointer,
@@ -400,9 +401,9 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
400 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 401 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
401 msg->type = htonl (type); 402 msg->type = htonl (type);
402 msg->num_peers = htonl (num_peers); 403 msg->num_peers = htonl (num_peers);
403 if (2 == type 404 if ( (2 == type) ||
404 || 3 == type) 405 (3 == type) )
405 msg->attacked_peer = peer_ids[num_peers]; 406 msg->attacked_peer = *target_peer;
406 memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity)); 407 memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity));
407 408
408 GNUNET_MQ_send (h->mq, ev); 409 GNUNET_MQ_send (h->mq, ev);
@@ -418,7 +419,17 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
418 void 419 void
419GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) 420GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
420{ 421{
421 // TODO 422 struct GNUNET_MQ_Envelope *ev;
423 struct GNUNET_RPS_CS_RequestCancelMessage*msg;
424
425 LOG (GNUNET_ERROR_TYPE_DEBUG,
426 "Cancelling request with id %" PRIu32 "\n",
427 rh->id);
428
429 GNUNET_array_append (req_handlers, req_handlers_size, *rh);
430 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL);
431 msg->id = htonl (rh->id);
432 GNUNET_MQ_send (rh->rps_handle->mq, ev);
422} 433}
423 434
424 435
diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c
index 01777bd90..4a4a9ee1a 100644
--- a/src/rps/test_rps.c
+++ b/src/rps/test_rps.c
@@ -46,6 +46,7 @@ uint32_t num_peers;
46//#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 46//#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
47static struct GNUNET_TIME_Relative timeout; 47static struct GNUNET_TIME_Relative timeout;
48 48
49
49/** 50/**
50 * Portion of malicious peers 51 * Portion of malicious peers
51 */ 52 */
@@ -106,6 +107,52 @@ static struct OpListEntry *oplist_tail;
106 107
107 108
108/** 109/**
110 * A pending reply: A request was sent and the reply is pending.
111 */
112struct PendingReply
113{
114 /**
115 * DLL next,prev ptr
116 */
117 struct PendingReply *next;
118 struct PendingReply *prev;
119
120 /**
121 * Handle to the request we are waiting for
122 */
123 struct GNUNET_RPS_Request_Handle *req_handle;
124
125 /**
126 * The peer that requested
127 */
128 struct RPSPeer *rps_peer;
129};
130
131
132/**
133 * A pending request: A request was not made yet but is scheduled for later.
134 */
135struct PendingRequest
136{
137 /**
138 * DLL next,prev ptr
139 */
140 struct PendingRequest *next;
141 struct PendingRequest *prev;
142
143 /**
144 * Handle to the request we are waiting for
145 */
146 struct GNUNET_SCHEDULER_Task *request_task;
147
148 /**
149 * The peer that requested
150 */
151 struct RPSPeer *rps_peer;
152};
153
154
155/**
109 * Information we track for each peer. 156 * Information we track for each peer.
110 */ 157 */
111struct RPSPeer 158struct RPSPeer
@@ -141,6 +188,33 @@ struct RPSPeer
141 int online; 188 int online;
142 189
143 /** 190 /**
191 * Number of Peer IDs to request
192 */
193 unsigned int num_ids_to_request;
194
195 /**
196 * Pending requests DLL
197 */
198 struct PendingRequest *pending_req_head;
199 struct PendingRequest *pending_req_tail;
200
201 /**
202 * Number of pending requests
203 */
204 unsigned int num_pending_reqs;
205
206 /**
207 * Pending replies DLL
208 */
209 struct PendingReply *pending_rep_head;
210 struct PendingReply *pending_rep_tail;
211
212 /**
213 * Number of pending replies
214 */
215 unsigned int num_pending_reps;
216
217 /**
144 * Received PeerIDs 218 * Received PeerIDs
145 */ 219 */
146 struct GNUNET_PeerIdentity *rec_ids; 220 struct GNUNET_PeerIdentity *rec_ids;
@@ -168,6 +242,16 @@ static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
168static struct GNUNET_PeerIdentity *rps_peer_ids; 242static struct GNUNET_PeerIdentity *rps_peer_ids;
169 243
170/** 244/**
245 * ID of the targeted peer.
246 */
247static struct GNUNET_PeerIdentity *target_peer;
248
249/**
250 * ID of the peer that requests for the evaluation.
251 */
252static struct RPSPeer *eval_peer;
253
254/**
171 * Number of online peers. 255 * Number of online peers.
172 */ 256 */
173static unsigned int num_peers_online; 257static unsigned int num_peers_online;
@@ -185,6 +269,11 @@ static struct GNUNET_SCHEDULER_Task *churn_task;
185 269
186 270
187/** 271/**
272 * Called to initialise the given RPSPeer
273 */
274typedef void (*InitPeer) (struct RPSPeer *rps_peer);
275
276/**
188 * Called directly after connecting to the service 277 * Called directly after connecting to the service
189 */ 278 */
190typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h); 279typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h);
@@ -224,6 +313,11 @@ struct SingleTestRun
224 char *name; 313 char *name;
225 314
226 /** 315 /**
316 * Called to initialise peer
317 */
318 InitPeer init_peer;
319
320 /**
227 * Called directly after connecting to the service 321 * Called directly after connecting to the service
228 */ 322 */
229 PreTest pre_test; 323 PreTest pre_test;
@@ -398,80 +492,6 @@ make_oplist_entry ()
398 492
399 493
400/** 494/**
401 * Callback to be called when RPS service is started or stopped at peers
402 *
403 * @param cls NULL
404 * @param op the operation handle
405 * @param emsg NULL on success; otherwise an error description
406 */
407static void
408churn_cb (void *cls,
409 struct GNUNET_TESTBED_Operation *op,
410 const char *emsg)
411{
412 // FIXME
413 struct OpListEntry *entry = cls;
414
415 GNUNET_TESTBED_operation_done (entry->op);
416 if (NULL != emsg)
417 {
418 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
419 GNUNET_SCHEDULER_shutdown ();
420 return;
421 }
422 GNUNET_assert (0 != entry->delta);
423
424 num_peers_online += entry->delta;
425
426 if (0 > entry->delta)
427 { /* Peer hopefully just went offline */
428 if (GNUNET_YES != rps_peers[entry->index].online)
429 {
430 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
431 "peer %s was expected to go offline but is still marked as online\n",
432 GNUNET_i2s (rps_peers[entry->index].peer_id));
433 GNUNET_break (0);
434 }
435 else
436 {
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "peer %s probably went offline as expected\n",
439 GNUNET_i2s (rps_peers[entry->index].peer_id));
440 }
441 rps_peers[entry->index].online = GNUNET_NO;
442 }
443
444 else if (0 < entry->delta)
445 { /* Peer hopefully just went online */
446 if (GNUNET_NO != rps_peers[entry->index].online)
447 {
448 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
449 "peer %s was expected to go online but is still marked as offline\n",
450 GNUNET_i2s (rps_peers[entry->index].peer_id));
451 GNUNET_break (0);
452 }
453 else
454 {
455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456 "peer %s probably went online as expected\n",
457 GNUNET_i2s (rps_peers[entry->index].peer_id));
458 if (NULL != cur_test_run.pre_test)
459 {
460 cur_test_run.pre_test (&rps_peers[entry->index],
461 rps_peers[entry->index].rps_handle);
462 }
463 }
464 rps_peers[entry->index].online = GNUNET_YES;
465 }
466
467 GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
468 GNUNET_free (entry);
469 //if (num_peers_in_round[current_round] == peers_running)
470 // run_round ();
471}
472
473
474/**
475 * Task run on timeout to shut everything down. 495 * Task run on timeout to shut everything down.
476 */ 496 */
477static void 497static void
@@ -543,14 +563,15 @@ info_cb (void *cb_cls,
543 &rps_peer_ids[entry->index], 563 &rps_peer_ids[entry->index],
544 &rps_peers[entry->index], 564 &rps_peers[entry->index],
545 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 565 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
546
547 tofile ("/tmp/rps/peer_ids", 566 tofile ("/tmp/rps/peer_ids",
548 "%u\t%s\n", 567 "%u\t%s\n",
549 entry->index, 568 entry->index,
550 GNUNET_i2s_full (&rps_peer_ids[entry->index])); 569 GNUNET_i2s_full (&rps_peer_ids[entry->index]));
551 570
552 GNUNET_TESTBED_operation_done (entry->op); 571 if (NULL != cur_test_run.init_peer)
572 cur_test_run.init_peer (&rps_peers[entry->index]);
553 573
574 GNUNET_TESTBED_operation_done (entry->op);
554 GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); 575 GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
555 GNUNET_free (entry); 576 GNUNET_free (entry);
556} 577}
@@ -650,7 +671,15 @@ default_eval_cb (void)
650static int 671static int
651no_eval (void) 672no_eval (void)
652{ 673{
653 return 1; 674 return 0;
675}
676
677/**
678 * Initialise given RPSPeer
679 */
680static void default_init_peer (struct RPSPeer *rps_peer)
681{
682 rps_peer->num_ids_to_request = 1;
654} 683}
655 684
656/** 685/**
@@ -665,9 +694,15 @@ default_reply_handle (void *cls,
665 uint64_t n, 694 uint64_t n,
666 const struct GNUNET_PeerIdentity *recv_peers) 695 const struct GNUNET_PeerIdentity *recv_peers)
667{ 696{
668 struct RPSPeer *rps_peer = (struct RPSPeer *) cls; 697 struct RPSPeer *rps_peer;
698 struct PendingReply *pending_rep = (struct PendingReply *) cls;
669 unsigned int i; 699 unsigned int i;
670 700
701 rps_peer = pending_rep->rps_peer;
702 GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
703 rps_peer->pending_rep_tail,
704 pending_rep);
705 rps_peer->num_pending_reps--;
671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672 "[%s] got %" PRIu64 " peers:\n", 707 "[%s] got %" PRIu64 " peers:\n",
673 GNUNET_i2s (rps_peer->peer_id), 708 GNUNET_i2s (rps_peer->peer_id),
@@ -691,20 +726,119 @@ static void
691request_peers (void *cls, 726request_peers (void *cls,
692 const struct GNUNET_SCHEDULER_TaskContext *tc) 727 const struct GNUNET_SCHEDULER_TaskContext *tc)
693{ 728{
694 struct RPSPeer *rps_peer = (struct RPSPeer *) cls; 729 struct RPSPeer *rps_peer;
730 struct PendingRequest *pending_req = (struct PendingRequest *) cls;
731 struct PendingReply *pending_rep;
695 732
696 if (GNUNET_YES == in_shutdown) 733 if (GNUNET_YES == in_shutdown)
697 return; 734 return;
735 rps_peer = pending_req->rps_peer;
736 GNUNET_assert (1 <= rps_peer->num_pending_reqs);
737 GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
738 rps_peer->pending_req_tail,
739 pending_req);
698 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
699 "Requesting one peer\n"); 741 "Requesting one peer\n");
742 pending_rep = GNUNET_new (struct PendingReply);
743 pending_rep->rps_peer = rps_peer;
744 pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
745 1,
746 cur_test_run.reply_handle,
747 pending_rep);
748 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
749 rps_peer->pending_rep_tail,
750 pending_rep);
751 rps_peer->num_pending_reps++;
752 rps_peer->num_pending_reqs--;
753}
700 754
701 GNUNET_free (GNUNET_RPS_request_peers (rps_peer->rps_handle, 755static void
702 1, 756cancel_pending_req (struct PendingRequest *pending_req)
703 cur_test_run.reply_handle, 757{
704 rps_peer)); 758 struct RPSPeer *rps_peer;
705 //rps_peer->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1, handle_reply, rps_peer); 759
760 rps_peer = pending_req->rps_peer;
761 GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
762 rps_peer->pending_req_tail,
763 pending_req);
764 rps_peer->num_pending_reqs--;
765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 "Cancelling pending request\n");
767 GNUNET_SCHEDULER_cancel (pending_req->request_task);
768 GNUNET_free (pending_req);
706} 769}
707 770
771static void
772cancel_request (struct PendingReply *pending_rep)
773{
774 struct RPSPeer *rps_peer;
775
776 rps_peer = pending_rep->rps_peer;
777 GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
778 rps_peer->pending_rep_tail,
779 pending_rep);
780 rps_peer->num_pending_reps--;
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "Cancelling request\n");
783 GNUNET_RPS_request_cancel (pending_rep->req_handle);
784 GNUNET_free (pending_rep);
785}
786
787/**
788 * Cancel a request.
789 */
790static void
791cancel_request_cb (void *cls,
792 const struct GNUNET_SCHEDULER_TaskContext *tc)
793{
794 struct PendingReply *pending_rep;
795 struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
796
797 if (GNUNET_YES == in_shutdown)
798 return;
799 pending_rep = rps_peer->pending_rep_head;
800 GNUNET_assert (1 <= rps_peer->num_pending_reps);
801 cancel_request (pending_rep);
802}
803
804
805/**
806 * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
807 * issued, nor replied
808 */
809void
810schedule_missing_requests (struct RPSPeer *rps_peer)
811{
812 unsigned int i;
813 struct PendingRequest *pending_req;
814
815 for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
816 i < rps_peer->num_ids_to_request; i++)
817 {
818 pending_req = GNUNET_new (struct PendingRequest);
819 pending_req->rps_peer = rps_peer;
820 pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
821 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
822 cur_test_run.request_interval * i),
823 request_peers,
824 pending_req);
825 GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
826 rps_peer->pending_req_tail,
827 pending_req);
828 rps_peer->num_pending_reqs++;
829 }
830}
831
832void
833cancel_pending_req_rep (struct RPSPeer *rps_peer)
834{
835 while (NULL != rps_peer->pending_req_head)
836 cancel_pending_req (rps_peer->pending_req_head);
837 GNUNET_assert (0 == rps_peer->num_pending_reqs);
838 while (NULL != rps_peer->pending_rep_head)
839 cancel_request (rps_peer->pending_rep_head);
840 GNUNET_assert (0 == rps_peer->num_pending_reps);
841}
708 842
709/*********************************** 843/***********************************
710 * MALICIOUS 844 * MALICIOUS
@@ -716,8 +850,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h)
716 uint32_t num_mal_peers; 850 uint32_t num_mal_peers;
717 struct RPSPeer *rps_peer = (struct RPSPeer *) cls; 851 struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
718 852
719 GNUNET_assert (1 >= portion 853 GNUNET_assert ( (1 >= portion) &&
720 && 0 < portion); 854 (0 < portion) );
721 num_mal_peers = round (portion * num_peers); 855 num_mal_peers = round (portion * num_peers);
722 856
723 if (rps_peer->index < num_mal_peers) 857 if (rps_peer->index < num_mal_peers)
@@ -728,7 +862,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h)
728 GNUNET_i2s (rps_peer->peer_id), 862 GNUNET_i2s (rps_peer->peer_id),
729 num_mal_peers); 863 num_mal_peers);
730 864
731 GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, rps_peer_ids); 865 GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
866 rps_peer_ids, target_peer);
732 } 867 }
733 #endif /* ENABLE_MALICIOUS */ 868 #endif /* ENABLE_MALICIOUS */
734} 869}
@@ -739,8 +874,8 @@ mal_cb (struct RPSPeer *rps_peer)
739 uint32_t num_mal_peers; 874 uint32_t num_mal_peers;
740 875
741 #ifdef ENABLE_MALICIOUS 876 #ifdef ENABLE_MALICIOUS
742 GNUNET_assert (1 >= portion 877 GNUNET_assert ( (1 >= portion) &&
743 && 0 < portion); 878 (0 < portion) );
744 num_mal_peers = round (portion * num_peers); 879 num_mal_peers = round (portion * num_peers);
745 880
746 if (rps_peer->index >= num_mal_peers) 881 if (rps_peer->index >= num_mal_peers)
@@ -748,8 +883,7 @@ mal_cb (struct RPSPeer *rps_peer)
748 it's not sampling */ 883 it's not sampling */
749 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), 884 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
750 seed_peers, rps_peer); 885 seed_peers, rps_peer);
751 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), 886 schedule_missing_requests (rps_peer);
752 request_peers, rps_peer);
753 } 887 }
754 #endif /* ENABLE_MALICIOUS */ 888 #endif /* ENABLE_MALICIOUS */
755} 889}
@@ -772,8 +906,7 @@ mal_eval (void)
772static void 906static void
773single_req_cb (struct RPSPeer *rps_peer) 907single_req_cb (struct RPSPeer *rps_peer)
774{ 908{
775 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), 909 schedule_missing_requests (rps_peer);
776 request_peers, rps_peer);
777} 910}
778 911
779/*********************************** 912/***********************************
@@ -782,10 +915,7 @@ single_req_cb (struct RPSPeer *rps_peer)
782static void 915static void
783delay_req_cb (struct RPSPeer *rps_peer) 916delay_req_cb (struct RPSPeer *rps_peer)
784{ 917{
785 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), 918 schedule_missing_requests (rps_peer);
786 request_peers, rps_peer);
787 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
788 request_peers, rps_peer);
789} 919}
790 920
791/*********************************** 921/***********************************
@@ -824,8 +954,7 @@ seed_req_cb (struct RPSPeer *rps_peer)
824{ 954{
825 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), 955 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
826 seed_peers, rps_peer); 956 seed_peers, rps_peer);
827 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15), 957 schedule_missing_requests (rps_peer);
828 request_peers, rps_peer);
829} 958}
830 959
831//TODO start big mal 960//TODO start big mal
@@ -836,16 +965,128 @@ seed_req_cb (struct RPSPeer *rps_peer)
836static void 965static void
837req_cancel_cb (struct RPSPeer *rps_peer) 966req_cancel_cb (struct RPSPeer *rps_peer)
838{ 967{
839 // TODO 968 schedule_missing_requests (rps_peer);
969 GNUNET_SCHEDULER_add_delayed (
970 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
971 (cur_test_run.request_interval + 1)),
972 cancel_request_cb, rps_peer);
840} 973}
841 974
842/*********************************** 975/***********************************
843 * PROFILER 976 * PROFILER
844***********************************/ 977***********************************/
978
979/**
980 * Callback to be called when RPS service is started or stopped at peers
981 *
982 * @param cls NULL
983 * @param op the operation handle
984 * @param emsg NULL on success; otherwise an error description
985 */
845static void 986static void
846churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 987churn_cb (void *cls,
988 struct GNUNET_TESTBED_Operation *op,
989 const char *emsg)
990{
991 // FIXME
992 struct OpListEntry *entry = cls;
993
994 GNUNET_TESTBED_operation_done (entry->op);
995 if (NULL != emsg)
996 {
997 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n");
998 GNUNET_SCHEDULER_shutdown ();
999 return;
1000 }
1001 GNUNET_assert (0 != entry->delta);
1002
1003 num_peers_online += entry->delta;
1004
1005 if (0 > entry->delta)
1006 { /* Peer hopefully just went offline */
1007 if (GNUNET_YES != rps_peers[entry->index].online)
1008 {
1009 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1010 "peer %s was expected to go offline but is still marked as online\n",
1011 GNUNET_i2s (rps_peers[entry->index].peer_id));
1012 GNUNET_break (0);
1013 }
1014 else
1015 {
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017 "peer %s probably went offline as expected\n",
1018 GNUNET_i2s (rps_peers[entry->index].peer_id));
1019 }
1020 rps_peers[entry->index].online = GNUNET_NO;
1021 }
1022
1023 else if (0 < entry->delta)
1024 { /* Peer hopefully just went online */
1025 if (GNUNET_NO != rps_peers[entry->index].online)
1026 {
1027 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1028 "peer %s was expected to go online but is still marked as offline\n",
1029 GNUNET_i2s (rps_peers[entry->index].peer_id));
1030 GNUNET_break (0);
1031 }
1032 else
1033 {
1034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035 "peer %s probably went online as expected\n",
1036 GNUNET_i2s (rps_peers[entry->index].peer_id));
1037 if (NULL != cur_test_run.pre_test)
1038 {
1039 cur_test_run.pre_test (&rps_peers[entry->index],
1040 rps_peers[entry->index].rps_handle);
1041 schedule_missing_requests (&rps_peers[entry->index]);
1042 }
1043 }
1044 rps_peers[entry->index].online = GNUNET_YES;
1045 }
1046
1047 GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
1048 GNUNET_free (entry);
1049 //if (num_peers_in_round[current_round] == peers_running)
1050 // run_round ();
1051}
1052
1053static void
1054manage_service_wrapper (unsigned int i, unsigned int j, int delta,
1055 double prob_go_on_off)
847{ 1056{
848 struct OpListEntry *entry; 1057 struct OpListEntry *entry;
1058 uint32_t prob;
1059
1060 prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1061 UINT32_MAX);
1062 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063 "%u. selected peer (%u: %s) is %s.\n",
1064 i,
1065 j,
1066 GNUNET_i2s (rps_peers[j].peer_id),
1067 (delta < 0)? "online" : "offline");
1068 if (prob < prob_go_on_off * UINT32_MAX)
1069 {
1070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071 "%s goes %s\n",
1072 GNUNET_i2s (rps_peers[j].peer_id),
1073 (delta < 0) ? "offline" : "online");
1074
1075 entry = make_oplist_entry ();
1076 entry->delta = delta;
1077 entry->index = j;
1078 entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
1079 testbed_peers[j],
1080 "rps",
1081 &churn_cb,
1082 entry,
1083 (delta < 0) ? 0 : 1);
1084 }
1085}
1086
1087static void
1088churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1089{
849 unsigned int i; 1090 unsigned int i;
850 unsigned int j; 1091 unsigned int j;
851 double portion_online; 1092 double portion_online;
@@ -853,7 +1094,6 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
853 double prob_go_offline; 1094 double prob_go_offline;
854 double portion_go_online; 1095 double portion_go_online;
855 double portion_go_offline; 1096 double portion_go_offline;
856 uint32_t prob;
857 1097
858 /* Compute the probability for an online peer to go offline 1098 /* Compute the probability for an online peer to go offline
859 * this round */ 1099 * this round */
@@ -878,65 +1118,22 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
878 (unsigned int) num_peers); 1118 (unsigned int) num_peers);
879 1119
880 /* Go over 50% randomly chosen peers */ 1120 /* Go over 50% randomly chosen peers */
881 for (i = 0 ; i < .5 * num_peers ; i++) 1121 for (i = 0; i < .5 * num_peers; i++)
882 { 1122 {
883 j = permut[i]; 1123 j = permut[i];
884 1124
885 /* If online, shut down with certain probability */ 1125 /* If online, shut down with certain probability */
886 if (GNUNET_YES == rps_peers[j].online) 1126 if (GNUNET_YES == rps_peers[j].online)
887 { 1127 {
888 prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1128 cancel_pending_req_rep (&rps_peers[j]);
889 UINT32_MAX); 1129 manage_service_wrapper (i, j, -1, prob_go_offline);
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1130 }
891 "%u. selected peer (%u: %s) is online.\n", 1131
892 i, 1132 /* If offline, restart with certain probability */
893 j, 1133 else if (GNUNET_NO == rps_peers[j].online)
894 GNUNET_i2s (rps_peers[j].peer_id)); 1134 {
895 if (prob < prob_go_offline * UINT32_MAX) 1135 manage_service_wrapper (i, j, 1, 0.66);
896 { 1136 }
897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898 "%s goes offline\n",
899 GNUNET_i2s (rps_peers[j].peer_id));
900
901 entry = make_oplist_entry ();
902 entry->delta = -1;
903 entry->index = j;
904 entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
905 testbed_peers[j],
906 "rps",
907 &churn_cb,
908 entry,
909 0);
910 }
911 }
912
913 /* If offline, restart with certain probability */
914 else if (GNUNET_NO == rps_peers[j].online)
915 {
916 prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
917 UINT32_MAX);
918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919 "%u. selected peer (%u: %s) is offline.\n",
920 i,
921 j,
922 GNUNET_i2s (rps_peers[j].peer_id));
923 if (prob < .66 * UINT32_MAX)
924 {
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "%s goes online\n",
927 GNUNET_i2s (rps_peers[j].peer_id));
928
929 entry = make_oplist_entry ();
930 entry->delta = 1;
931 entry->index = j;
932 entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
933 testbed_peers[j],
934 "rps",
935 &churn_cb,
936 entry,
937 1);
938 }
939 }
940 } 1137 }
941 1138
942 GNUNET_free (permut); 1139 GNUNET_free (permut);
@@ -948,21 +1145,13 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
948} 1145}
949 1146
950 1147
951static void 1148/**
952profiler_pre (void *cls, struct GNUNET_RPS_Handle *h) 1149 * Initialise given RPSPeer
1150 */
1151static void profiler_init_peer (struct RPSPeer *rps_peer)
953{ 1152{
954 //churn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1153 if (num_peers - 1 == rps_peer->index)
955 // 10), 1154 rps_peer->num_ids_to_request = cur_test_run.num_requests;
956 // churn, NULL);
957 mal_pre (cls, h);
958
959 /* if (NULL == churn_task)
960 {
961 churn_task = GNUNET_SCHEDULER_add_delayed (
962 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
963 churn,
964 NULL);
965 } */
966} 1155}
967 1156
968 1157
@@ -978,52 +1167,50 @@ profiler_reply_handle (void *cls,
978 uint64_t n, 1167 uint64_t n,
979 const struct GNUNET_PeerIdentity *recv_peers) 1168 const struct GNUNET_PeerIdentity *recv_peers)
980{ 1169{
981 struct RPSPeer *rps_peer = (struct RPSPeer *) cls; 1170 struct RPSPeer *rps_peer;
982 struct RPSPeer *rcv_rps_peer; 1171 struct RPSPeer *rcv_rps_peer;
983 char *file_name; 1172 char *file_name;
984 char *file_name_dh; 1173 char *file_name_dh;
985 unsigned int i; 1174 unsigned int i;
1175 struct PendingReply *pending_rep = (struct PendingReply *) cls;
986 1176
1177 rps_peer = pending_rep->rps_peer;
987 file_name = "/tmp/rps/received_ids"; 1178 file_name = "/tmp/rps/received_ids";
988 file_name_dh = "/tmp/rps/diehard_input"; 1179 file_name_dh = "/tmp/rps/diehard_input";
989
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "[%s] got %" PRIu64 " peers:\n", 1181 "[%s] got %" PRIu64 " peers:\n",
992 GNUNET_i2s (rps_peer->peer_id), 1182 GNUNET_i2s (rps_peer->peer_id),
993 n); 1183 n);
994 1184 for (i = 0; i < n; i++)
995 for (i = 0 ; i < n ; i++)
996 { 1185 {
997 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998 "%u: %s\n", 1187 "%u: %s\n",
999 i, 1188 i,
1000 GNUNET_i2s (&recv_peers[i])); 1189 GNUNET_i2s (&recv_peers[i]));
1001
1002 /* GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]); */
1003 tofile (file_name, 1190 tofile (file_name,
1004 "%s\n", 1191 "%s\n",
1005 GNUNET_i2s_full (&recv_peers[i])); 1192 GNUNET_i2s_full (&recv_peers[i]));
1006
1007 rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]); 1193 rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
1008
1009 tofile (file_name_dh, 1194 tofile (file_name_dh,
1010 "%" PRIu32 "\n", 1195 "%" PRIu32 "\n",
1011 (uint32_t) rcv_rps_peer->index); 1196 (uint32_t) rcv_rps_peer->index);
1012 } 1197 }
1198 /* Find #PendingReply holding the request handle */
1199 GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
1200 rps_peer->pending_rep_tail,
1201 pending_rep);
1202 rps_peer->num_pending_reps--;
1013} 1203}
1014 1204
1015 1205
1016static void 1206static void
1017profiler_cb (struct RPSPeer *rps_peer) 1207profiler_cb (struct RPSPeer *rps_peer)
1018{ 1208{
1019 uint32_t i; 1209 /* Start churn */
1020 1210 if (NULL == churn_task)
1021 /* Churn only at peers that do not request peers for evaluation */
1022 if (NULL == churn_task &&
1023 rps_peer->index != num_peers - 2)
1024 { 1211 {
1025 churn_task = GNUNET_SCHEDULER_add_delayed ( 1212 churn_task = GNUNET_SCHEDULER_add_delayed (
1026 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), 1213 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1027 churn, 1214 churn,
1028 NULL); 1215 NULL);
1029 } 1216 }
@@ -1031,17 +1218,8 @@ profiler_cb (struct RPSPeer *rps_peer)
1031 /* Only request peer ids at one peer. 1218 /* Only request peer ids at one peer.
1032 * (It's the before-last because last one is target of the focussed attack.) 1219 * (It's the before-last because last one is target of the focussed attack.)
1033 */ 1220 */
1034 if (rps_peer->index == num_peers - 2) 1221 if (eval_peer == rps_peer)
1035 { 1222 schedule_missing_requests (rps_peer);
1036 for (i = 0 ; i < cur_test_run.num_requests ; i++)
1037 {
1038 GNUNET_SCHEDULER_add_delayed (
1039 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1040 cur_test_run.request_interval * i),
1041 request_peers,
1042 rps_peer);
1043 }
1044 }
1045} 1223}
1046 1224
1047/** 1225/**
@@ -1126,7 +1304,6 @@ run (void *cls,
1126 1304
1127 testbed_peers = peers; 1305 testbed_peers = peers;
1128 num_peers_online = 0; 1306 num_peers_online = 0;
1129
1130 for (i = 0 ; i < num_peers ; i++) 1307 for (i = 0 ; i < num_peers ; i++)
1131 { 1308 {
1132 entry = make_oplist_entry (); 1309 entry = make_oplist_entry ();
@@ -1137,16 +1314,6 @@ run (void *cls,
1137 entry); 1314 entry);
1138 } 1315 }
1139 1316
1140
1141 // This seems not to work
1142 //if (NULL != strstr (cur_test_run.name, "profiler"))
1143 //{
1144 // churn_task = GNUNET_SCHEDULER_add_delayed (
1145 // GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
1146 // churn,
1147 // NULL);
1148 //}
1149
1150 GNUNET_assert (num_peers == n_peers); 1317 GNUNET_assert (num_peers == n_peers);
1151 for (i = 0 ; i < n_peers ; i++) 1318 for (i = 0 ; i < n_peers ; i++)
1152 { 1319 {
@@ -1161,6 +1328,9 @@ run (void *cls,
1161 &rps_disconnect_adapter, 1328 &rps_disconnect_adapter,
1162 &rps_peers[i]); 1329 &rps_peers[i]);
1163 } 1330 }
1331
1332 if (NULL != churn_task)
1333 GNUNET_SCHEDULER_cancel (churn_task);
1164 GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL); 1334 GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL);
1165} 1335}
1166 1336
@@ -1177,13 +1347,13 @@ main (int argc, char *argv[])
1177{ 1347{
1178 int ret_value; 1348 int ret_value;
1179 1349
1350 num_peers = 5;
1180 cur_test_run.name = "test-rps-default"; 1351 cur_test_run.name = "test-rps-default";
1352 cur_test_run.init_peer = default_init_peer;
1181 cur_test_run.pre_test = NULL; 1353 cur_test_run.pre_test = NULL;
1182 cur_test_run.reply_handle = default_reply_handle; 1354 cur_test_run.reply_handle = default_reply_handle;
1183 cur_test_run.eval_cb = default_eval_cb; 1355 cur_test_run.eval_cb = default_eval_cb;
1184 churn_task = NULL; 1356 churn_task = NULL;
1185
1186 num_peers = 5;
1187 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); 1357 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
1188 1358
1189 if (strstr (argv[0], "malicious") != NULL) 1359 if (strstr (argv[0], "malicious") != NULL)
@@ -1259,31 +1429,39 @@ main (int argc, char *argv[])
1259 { 1429 {
1260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n"); 1430 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
1261 cur_test_run.name = "test-rps-req-cancel"; 1431 cur_test_run.name = "test-rps-req-cancel";
1432 num_peers = 1;
1262 cur_test_run.main_test = req_cancel_cb; 1433 cur_test_run.main_test = req_cancel_cb;
1434 cur_test_run.eval_cb = no_eval;
1263 } 1435 }
1264 1436
1265 else if (strstr (argv[0], "profiler") != NULL) 1437 else if (strstr (argv[0], "profiler") != NULL)
1266 { 1438 {
1267 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n"); 1439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
1268 cur_test_run.name = "test-rps-profiler"; 1440 cur_test_run.name = "test-rps-profiler";
1441 num_peers = 10;
1269 mal_type = 3; 1442 mal_type = 3;
1270 cur_test_run.pre_test = profiler_pre; 1443 cur_test_run.init_peer = profiler_init_peer;
1444 cur_test_run.pre_test = mal_pre;
1271 cur_test_run.main_test = profiler_cb; 1445 cur_test_run.main_test = profiler_cb;
1272 cur_test_run.reply_handle = profiler_reply_handle; 1446 cur_test_run.reply_handle = profiler_reply_handle;
1273 cur_test_run.eval_cb = profiler_eval; 1447 cur_test_run.eval_cb = profiler_eval;
1274 cur_test_run.request_interval = 2; 1448 cur_test_run.request_interval = 2;
1275 cur_test_run.num_requests = 50; 1449 cur_test_run.num_requests = 5;
1276 1450 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
1277 num_peers = 50;
1278 1451
1452 /* 'Clean' directory */
1279 (void) GNUNET_DISK_directory_remove ("/tmp/rps/"); 1453 (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
1280 GNUNET_DISK_directory_create ("/tmp/rps/"); 1454 GNUNET_DISK_directory_create ("/tmp/rps/");
1281 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
1282 } 1455 }
1283 1456
1284 rps_peers = GNUNET_new_array (num_peers, struct RPSPeer); 1457 rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
1285 rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1286 peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO); 1458 peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
1459 rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1460 if ( (2 == mal_type) ||
1461 (3 == mal_type))
1462 target_peer = &rps_peer_ids[num_peers - 2];
1463 if (profiler_eval == cur_test_run.eval_cb)
1464 eval_peer = &rps_peers[num_peers - 1];
1287 1465
1288 ok = 1; 1466 ok = 1;
1289 (void) GNUNET_TESTBED_test_run (cur_test_run.name, 1467 (void) GNUNET_TESTBED_test_run (cur_test_run.name,
@@ -1293,11 +1471,9 @@ main (int argc, char *argv[])
1293 &run, NULL); 1471 &run, NULL);
1294 1472
1295 ret_value = cur_test_run.eval_cb(); 1473 ret_value = cur_test_run.eval_cb();
1296
1297 GNUNET_free (rps_peers ); 1474 GNUNET_free (rps_peers );
1298 GNUNET_free (rps_peer_ids); 1475 GNUNET_free (rps_peer_ids);
1299 GNUNET_CONTAINER_multipeermap_destroy (peer_map); 1476 GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1300
1301 return ret_value; 1477 return ret_value;
1302} 1478}
1303 1479