aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-09-12 10:04:42 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-09-12 10:04:42 +0000
commit946dc546692beb127009d403857df8dec2781ae7 (patch)
tree1f59988253aaa94711ab4c9e5e6d8a926e6a8ffa /src/stream
parente5d1a97cd8c30a54cadd158a81f72bc1f2166c4f (diff)
downloadgnunet-946dc546692beb127009d403857df8dec2781ae7.tar.gz
gnunet-946dc546692beb127009d403857df8dec2781ae7.zip
stream performance - implemented 1 hop throughput calculation
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/perf_stream_api.c245
-rw-r--r--src/stream/test_stream_big.c15
2 files changed, 236 insertions, 24 deletions
diff --git a/src/stream/perf_stream_api.c b/src/stream/perf_stream_api.c
index 584e60e6a..faf827ae2 100644
--- a/src/stream/perf_stream_api.c
+++ b/src/stream/perf_stream_api.c
@@ -24,6 +24,9 @@
24 * @author Sree Harsha Totakura 24 * @author Sree Harsha Totakura
25 */ 25 */
26 26
27#define LOG(kind, ...) \
28 GNUNET_log (kind, __VA_ARGS__);
29
27/****************************************************************************************/ 30/****************************************************************************************/
28/* Test is setup into the following major steps: */ 31/* Test is setup into the following major steps: */
29/* 1. Measurements over loopback (1 hop). i.e. we use only one peer and open */ 32/* 1. Measurements over loopback (1 hop). i.e. we use only one peer and open */
@@ -88,6 +91,7 @@ enum TestStep
88 TEST_STEP_3_HOP 91 TEST_STEP_3_HOP
89}; 92};
90 93
94
91/** 95/**
92 * Structure for holding peer's sockets and IO Handles 96 * Structure for holding peer's sockets and IO Handles
93 */ 97 */
@@ -138,6 +142,11 @@ struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
138const struct GNUNET_CONFIGURATION_Handle *config; 142const struct GNUNET_CONFIGURATION_Handle *config;
139 143
140/** 144/**
145 * Handle for the progress meter
146 */
147static struct ProgressMeter *meter;
148
149/**
141 * Placeholder for peer data 150 * Placeholder for peer data
142 */ 151 */
143static struct PeerData peer_data[3]; 152static struct PeerData peer_data[3];
@@ -148,6 +157,26 @@ static struct PeerData peer_data[3];
148static GNUNET_SCHEDULER_TaskIdentifier abort_task; 157static GNUNET_SCHEDULER_TaskIdentifier abort_task;
149 158
150/** 159/**
160 * Task ID for write task
161 */
162static GNUNET_SCHEDULER_TaskIdentifier write_task;
163
164/**
165 * Task ID for read task
166 */
167static GNUNET_SCHEDULER_TaskIdentifier read_task;
168
169/**
170 * Absolute time when profiling starts
171 */
172static struct GNUNET_TIME_Absolute prof_start_time;
173
174/**
175 * Test time taken for sending the data
176 */
177static struct GNUNET_TIME_Relative prof_time;
178
179/**
151 * Random data block. Should generate data first 180 * Random data block. Should generate data first
152 */ 181 */
153static uint32_t data[DATA_SIZE / 4]; /* 64KB array */ 182static uint32_t data[DATA_SIZE / 4]; /* 64KB array */
@@ -171,7 +200,12 @@ static enum TestStep test_step;
171/** 200/**
172 * Index for choosing payload size 201 * Index for choosing payload size
173 */ 202 */
174unsigned int payload_size_index; 203static unsigned int payload_size_index;
204
205/**
206 * Testing result of a major test
207 */
208static int result;
175 209
176/** 210/**
177 * Create a meter to keep track of the progress of some task. 211 * Create a meter to keep track of the progress of some task.
@@ -277,12 +311,157 @@ free_meter (struct ProgressMeter *meter)
277 311
278 312
279/** 313/**
314 * Shutdown nicely
315 */
316static void
317do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
318{
319 GNUNET_STREAM_close (peer_data[1].socket);
320 if (NULL != peer_data[2].socket)
321 GNUNET_STREAM_close (peer_data[2].socket);
322 if (NULL != peer2_listen_socket)
323 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
324 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
325 GNUNET_SCHEDULER_cancel (abort_task);
326 if (GNUNET_SCHEDULER_NO_TASK != write_task)
327 GNUNET_SCHEDULER_cancel (write_task);
328 GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */
329}
330
331
332/**
280 * Something went wrong and timed out. Kill everything and set error flag 333 * Something went wrong and timed out. Kill everything and set error flag
281 */ 334 */
282static void 335static void
283do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 336do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
284{ 337{
285 GNUNET_break (0); 338 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
339 if (GNUNET_SCHEDULER_NO_TASK != read_task)
340 GNUNET_SCHEDULER_cancel (read_task);
341 result = GNUNET_SYSERR;
342 abort_task = GNUNET_SCHEDULER_NO_TASK;
343 do_shutdown (cls, tc);
344}
345
346
347/**
348 * The write completion function; called upon writing some data to stream or
349 * upon error
350 *
351 * @param cls the closure from GNUNET_STREAM_write/read
352 * @param status the status of the stream at the time this function is called
353 * @param size the number of bytes read or written
354 */
355static void
356write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size)
357{
358 struct PeerData *pdata = cls;
359
360 GNUNET_assert (GNUNET_STREAM_OK == status);
361 GNUNET_assert (size <= DATA_SIZE);
362 pdata->bytes_wrote += size;
363 if (pdata->bytes_wrote < DATA_SIZE) /* Have more data to send */
364 {
365 pdata->io_write_handle =
366 GNUNET_STREAM_write (pdata->socket,
367 ((void *) data) + pdata->bytes_wrote,
368 sizeof (data) - pdata->bytes_wrote,
369 GNUNET_TIME_UNIT_FOREVER_REL, &write_completion,
370 pdata);
371 GNUNET_assert (NULL != pdata->io_write_handle);
372 }
373 else
374 {
375 prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
376 result = GNUNET_OK;
377 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
378 }
379 for (;size > 0; size--)
380 update_meter (meter);
381}
382
383
384/**
385 * Task for calling STREAM_write with a chunk of random data
386 *
387 * @param cls the peer data entity
388 * @param tc the task context
389 */
390static void
391stream_write_task (void *cls,
392 const struct GNUNET_SCHEDULER_TaskContext *tc)
393{
394 struct PeerData *pdata = cls;
395
396 write_task = GNUNET_SCHEDULER_NO_TASK;
397 prof_start_time = GNUNET_TIME_absolute_get ();
398 pdata->bytes_wrote = 0;
399 pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
400 sizeof (data),
401 GNUNET_TIME_UNIT_FOREVER_REL,
402 &write_completion, pdata);
403 GNUNET_assert (NULL != pdata->io_write_handle);
404}
405
406
407/**
408 * Scheduler call back; to be executed when a new stream is connected
409 * Called from listen connect for peer2
410 */
411static void
412stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
413
414
415/**
416 * Input processor
417 *
418 * @param cls peer2
419 * @param status the status of the stream at the time this function is called
420 * @param data traffic from the other side
421 * @param size the number of bytes available in data read
422 * @return number of bytes of processed from 'data' (any data remaining should be
423 * given to the next time the read processor is called).
424 */
425static size_t
426input_processor (void *cls, enum GNUNET_STREAM_Status status,
427 const void *input_data, size_t size)
428{
429 struct PeerData *pdata = cls;
430
431 GNUNET_assert (GNUNET_STREAM_OK == status);
432 GNUNET_assert (size < DATA_SIZE);
433 GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read,
434 input_data, size));
435 pdata->bytes_read += size;
436
437 if (pdata->bytes_read < DATA_SIZE)
438 {
439 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
440 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
441 }
442 else
443 {
444 /* Peer2 has completed reading*/
445 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
446 }
447 return size;
448}
449
450
451/**
452 * Scheduler call back; to be executed when a new stream is connected
453 * Called from listen connect for peer2
454 */
455static void
456stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
457{
458 struct PeerData *pdata = cls;
459
460 read_task = GNUNET_SCHEDULER_NO_TASK;
461 pdata->io_read_handle =
462 GNUNET_STREAM_read (pdata->socket, GNUNET_TIME_UNIT_FOREVER_REL,
463 &input_processor, pdata);
464 GNUNET_assert (NULL != pdata->io_read_handle);
286} 465}
287 466
288 467
@@ -300,18 +479,48 @@ static int
300stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, 479stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket,
301 const struct GNUNET_PeerIdentity *initiator) 480 const struct GNUNET_PeerIdentity *initiator)
302{ 481{
303 GNUNET_break (0); 482 struct PeerData *pdata = cls;
483
484 GNUNET_assert (NULL != socket);
485 GNUNET_assert (pdata == &peer_data[2]);
486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n",
487 GNUNET_i2s(initiator));
488 pdata->socket = socket;
489 pdata->bytes_read = 0;
490 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
304 return GNUNET_OK; 491 return GNUNET_OK;
305} 492}
306 493
307 494
308/** 495/**
496 * Function executed after stream has been established
497 *
498 * @param cls the closure from GNUNET_STREAM_open
499 * @param socket socket to use to communicate with the other side (read/write)
500 */
501static void
502stream_open_cb (void *cls,
503 struct GNUNET_STREAM_Socket *socket)
504{
505 struct PeerData *pdata = cls;
506
507 GNUNET_assert (socket == pdata->socket);
508 write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata);
509}
510
511
512/**
309 * Listen success callback; connects a peer to stream as client 513 * Listen success callback; connects a peer to stream as client
310 */ 514 */
311static void 515static void
312stream_connect (void) 516stream_connect (void)
313{ 517{
314 GNUNET_break (0); 518 peer_data[1].socket =
519 GNUNET_STREAM_open (config, &peer_data[2].self, 10, &stream_open_cb,
520 &peer_data[1],
521 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, 500,
522 GNUNET_STREAM_OPTION_END);
523 GNUNET_assert (NULL != peer_data[1].socket);
315} 524}
316 525
317 526
@@ -332,8 +541,8 @@ run (void *cls,
332 GNUNET_TESTING_peer_get_identity (peer, &self); 541 GNUNET_TESTING_peer_get_identity (peer, &self);
333 config = cfg; 542 config = cfg;
334 peer2_listen_socket = 543 peer2_listen_socket =
335 GNUNET_STREAM_listen (config, 10, /* App port */ &stream_listen_cb, NULL, 544 GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[2],
336 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS, 545 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
337 &stream_connect, GNUNET_STREAM_OPTION_END); 546 &stream_connect, GNUNET_STREAM_OPTION_END);
338 GNUNET_assert (NULL != peer2_listen_socket); 547 GNUNET_assert (NULL != peer2_listen_socket);
339 peer_data[1].self = self; 548 peer_data[1].self = self;
@@ -351,6 +560,10 @@ run (void *cls,
351int main (int argc, char **argv) 560int main (int argc, char **argv)
352{ 561{
353 char *pmsg; 562 char *pmsg;
563 char *test_name = "perf_stream_api";
564 char *cfg_file = "test_stream_local.conf";
565 double throughput;
566 double prof_time_sec;
354 unsigned int count; 567 unsigned int count;
355 int ret; 568 int ret;
356 569
@@ -368,15 +581,19 @@ int main (int argc, char **argv)
368 payload_size_index < (sizeof (payload_size) / sizeof (uint16_t)); 581 payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
369 payload_size_index++) 582 payload_size_index++)
370 { 583 {
371 GNUNET_asprintf (&pmsg, "Testing over loopback with payload size %hu\n", 584 GNUNET_asprintf (&pmsg, "\nTesting over loopback with payload size %hu\n",
372 payload_size[payload_size_index]); 585 payload_size[payload_size_index]);
373 meter = create_meter ((sizeof (data) / 4), pmsg, GNUNET_YES); 586 meter = create_meter (sizeof (data), pmsg, GNUNET_YES);
374 GNUNET_free (pmsg); 587 GNUNET_free (pmsg);
375 ret = GNUNET_TESTING_peer_run ("test_stream_big", "test_stream_local.conf", 588 result = GNUNET_SYSERR;
376 &run, NULL); 589 ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL);
377 free_meter (meter); 590 free_meter (meter);
378 if (0 != ret) 591 if ((0 != ret) || (GNUNET_OK != result))
379 break; 592 goto return_fail;
593 prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000));
594 throughput = (((float) sizeof (data)) / prof_time_sec);
595 //PRINTF ("Profiling time %llu ms = %.2f sec\n", prof_time.rel_value, prof_time_sec);
596 PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00);
380 } 597 }
381 test_step = TEST_STEP_2_HOP; 598 test_step = TEST_STEP_2_HOP;
382 for (payload_size_index = 0; 599 for (payload_size_index = 0;
@@ -393,4 +610,8 @@ int main (int argc, char **argv)
393 /* Initialize testbed here */ 610 /* Initialize testbed here */
394 } 611 }
395 return ret; 612 return ret;
613
614 return_fail:
615 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n");
616 return 1;
396} 617}
diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c
index ec1cd07af..35302182a 100644
--- a/src/stream/test_stream_big.c
+++ b/src/stream/test_stream_big.c
@@ -77,7 +77,6 @@ static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
77static const struct GNUNET_CONFIGURATION_Handle *config; 77static const struct GNUNET_CONFIGURATION_Handle *config;
78 78
79static GNUNET_SCHEDULER_TaskIdentifier abort_task; 79static GNUNET_SCHEDULER_TaskIdentifier abort_task;
80static GNUNET_SCHEDULER_TaskIdentifier test_task;
81static GNUNET_SCHEDULER_TaskIdentifier read_task; 80static GNUNET_SCHEDULER_TaskIdentifier read_task;
82static GNUNET_SCHEDULER_TaskIdentifier write_task; 81static GNUNET_SCHEDULER_TaskIdentifier write_task;
83 82
@@ -97,10 +96,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
97 if (NULL != peer2_listen_socket) 96 if (NULL != peer2_listen_socket)
98 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ 97 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
99 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); 98 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
100 if (0 != abort_task) 99 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
101 {
102 GNUNET_SCHEDULER_cancel (abort_task); 100 GNUNET_SCHEDULER_cancel (abort_task);
103 }
104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); 101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
105 GNUNET_SCHEDULER_shutdown (); 102 GNUNET_SCHEDULER_shutdown ();
106} 103}
@@ -113,16 +110,10 @@ static void
113do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 110do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
114{ 111{
115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); 112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
116 if (0 != test_task) 113 if (GNUNET_SCHEDULER_NO_TASK != read_task)
117 {
118 GNUNET_SCHEDULER_cancel (test_task);
119 }
120 if (0 != read_task)
121 {
122 GNUNET_SCHEDULER_cancel (read_task); 114 GNUNET_SCHEDULER_cancel (read_task);
123 }
124 result = GNUNET_SYSERR; 115 result = GNUNET_SYSERR;
125 abort_task = 0; 116 abort_task = GNUNET_SCHEDULER_NO_TASK;
126 do_shutdown (cls, tc); 117 do_shutdown (cls, tc);
127} 118}
128 119