diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-06-15 15:19:08 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-06-15 15:19:08 +0000 |
commit | e444070fbb20f49bb55afd5d6158a9b42333557f (patch) | |
tree | 7784d9a782e5147601e78a1d1cfaee538b0fb50b | |
parent | 819304af72322c391fc3e0bfd441586af851b8da (diff) | |
download | gnunet-e444070fbb20f49bb55afd5d6158a9b42333557f.tar.gz gnunet-e444070fbb20f49bb55afd5d6158a9b42333557f.zip |
stream misc fixing
-rw-r--r-- | src/stream/stream_api.c | 66 | ||||
-rw-r--r-- | src/stream/test_stream_big.c | 55 |
2 files changed, 83 insertions, 38 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 5d22ff7b3..63e27ea98 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -471,6 +471,22 @@ static unsigned int default_timeout = 10; | |||
471 | 471 | ||
472 | 472 | ||
473 | /** | 473 | /** |
474 | * Function to print the contents of an address location. Used only for debugging | ||
475 | * | ||
476 | * @param ptr the address location; Should be more than 5 bytes long | ||
477 | */ | ||
478 | static void | ||
479 | debug_print_contents (const void *ptr) | ||
480 | { | ||
481 | /* const char *c; */ | ||
482 | |||
483 | /* c = ptr; */ | ||
484 | /* LOG (GNUNET_ERROR_TYPE_DEBUG, */ | ||
485 | /* "--- contents: %u %u %u %u %u\n", c[0], c[1], c[2], c[3], c[4]); */ | ||
486 | } | ||
487 | |||
488 | |||
489 | /** | ||
474 | * Callback function for sending queued message | 490 | * Callback function for sending queued message |
475 | * | 491 | * |
476 | * @param cls closure the socket | 492 | * @param cls closure the socket |
@@ -830,6 +846,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
830 | "%s: Placing DATA message with sequence %u in send queue\n", | 846 | "%s: Placing DATA message with sequence %u in send queue\n", |
831 | GNUNET_i2s (&socket->other_peer), | 847 | GNUNET_i2s (&socket->other_peer), |
832 | ntohl (io_handle->messages[packet]->sequence_number)); | 848 | ntohl (io_handle->messages[packet]->sequence_number)); |
849 | debug_print_contents(&(io_handle->messages[packet][1])); | ||
833 | copy_and_queue_message (socket, | 850 | copy_and_queue_message (socket, |
834 | &io_handle->messages[packet]->header, | 851 | &io_handle->messages[packet]->header, |
835 | NULL, | 852 | NULL, |
@@ -849,6 +866,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
849 | "%s: Placing DATA message with sequence %u in send queue\n", | 866 | "%s: Placing DATA message with sequence %u in send queue\n", |
850 | GNUNET_i2s (&socket->other_peer), | 867 | GNUNET_i2s (&socket->other_peer), |
851 | ntohl (io_handle->messages[packet]->sequence_number)); | 868 | ntohl (io_handle->messages[packet]->sequence_number)); |
869 | debug_print_contents(&(io_handle->messages[packet][1])); | ||
852 | copy_and_queue_message (socket, | 870 | copy_and_queue_message (socket, |
853 | &io_handle->messages[packet]->header, | 871 | &io_handle->messages[packet]->header, |
854 | NULL, | 872 | NULL, |
@@ -910,6 +928,7 @@ call_read_processor (void *cls, | |||
910 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 928 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
911 | "%s: Calling read processor\n", | 929 | "%s: Calling read processor\n", |
912 | GNUNET_i2s (&socket->other_peer)); | 930 | GNUNET_i2s (&socket->other_peer)); |
931 | debug_print_contents (socket->receive_buffer + socket->copy_offset); | ||
913 | read_size = | 932 | read_size = |
914 | socket->read_handle->proc (socket->read_handle->proc_cls, | 933 | socket->read_handle->proc (socket->read_handle->proc_cls, |
915 | socket->status, | 934 | socket->status, |
@@ -943,11 +962,12 @@ call_read_processor (void *cls, | |||
943 | GNUNET_i2s (&socket->other_peer), sequence_increase); | 962 | GNUNET_i2s (&socket->other_peer), sequence_increase); |
944 | 963 | ||
945 | /* Shift the data in the receive buffer */ | 964 | /* Shift the data in the receive buffer */ |
946 | memmove (socket->receive_buffer, | 965 | socket->receive_buffer = |
947 | socket->receive_buffer | 966 | memmove (socket->receive_buffer, |
948 | + socket->receive_buffer_boundaries[sequence_increase-1], | 967 | socket->receive_buffer |
949 | socket->receive_buffer_size | 968 | + socket->receive_buffer_boundaries[sequence_increase-1], |
950 | - socket->receive_buffer_boundaries[sequence_increase-1]); | 969 | socket->receive_buffer_size |
970 | - socket->receive_buffer_boundaries[sequence_increase-1]); | ||
951 | /* Shift the bitmap */ | 971 | /* Shift the bitmap */ |
952 | socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; | 972 | socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; |
953 | /* Set read_sequence_number */ | 973 | /* Set read_sequence_number */ |
@@ -963,9 +983,18 @@ call_read_processor (void *cls, | |||
963 | { | 983 | { |
964 | if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) | 984 | if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) |
965 | { | 985 | { |
966 | socket->receive_buffer_boundaries[packet] = | 986 | uint32_t ahead_buffer_boundary; |
967 | socket->receive_buffer_boundaries[packet + sequence_increase] | 987 | |
968 | - offset_increase; | 988 | ahead_buffer_boundary = |
989 | socket->receive_buffer_boundaries[packet + sequence_increase]; | ||
990 | if (0 == ahead_buffer_boundary) | ||
991 | socket->receive_buffer_boundaries[packet] = 0; | ||
992 | else | ||
993 | { | ||
994 | GNUNET_assert (offset_increase < ahead_buffer_boundary); | ||
995 | socket->receive_buffer_boundaries[packet] = | ||
996 | ahead_buffer_boundary - offset_increase; | ||
997 | } | ||
969 | } | 998 | } |
970 | else | 999 | else |
971 | socket->receive_buffer_boundaries[packet] = 0; | 1000 | socket->receive_buffer_boundaries[packet] = 0; |
@@ -1130,6 +1159,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
1130 | 1159 | ||
1131 | /* Copy Data to buffer */ | 1160 | /* Copy Data to buffer */ |
1132 | payload = &msg[1]; | 1161 | payload = &msg[1]; |
1162 | debug_print_contents(payload); | ||
1133 | GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); | 1163 | GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); |
1134 | memcpy (socket->receive_buffer + relative_offset, | 1164 | memcpy (socket->receive_buffer + relative_offset, |
1135 | payload, | 1165 | payload, |
@@ -2418,22 +2448,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, | |||
2418 | } | 2448 | } |
2419 | else /* We have to call the write continuation callback now */ | 2449 | else /* We have to call the write continuation callback now */ |
2420 | { | 2450 | { |
2451 | struct GNUNET_STREAM_IOWriteHandle *write_handle; | ||
2452 | |||
2421 | /* Free the packets */ | 2453 | /* Free the packets */ |
2422 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) | 2454 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) |
2423 | { | 2455 | { |
2424 | GNUNET_free_non_null (socket->write_handle->messages[packet]); | 2456 | GNUNET_free_non_null (socket->write_handle->messages[packet]); |
2425 | } | 2457 | } |
2426 | if (NULL != socket->write_handle->write_cont) | 2458 | write_handle = socket->write_handle; |
2427 | socket->write_handle->write_cont | 2459 | socket->write_handle = NULL; |
2428 | (socket->write_handle->write_cont_cls, | 2460 | if (NULL != write_handle->write_cont) |
2429 | socket->status, | 2461 | write_handle->write_cont (write_handle->write_cont_cls, |
2430 | socket->write_handle->size); | 2462 | socket->status, |
2463 | write_handle->size); | ||
2464 | /* We are done with the write handle - Freeing it */ | ||
2465 | GNUNET_free (write_handle); | ||
2431 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2466 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2432 | "%s: Write completion callback completed\n", | 2467 | "%s: Write completion callback completed\n", |
2433 | GNUNET_i2s (&socket->other_peer)); | 2468 | GNUNET_i2s (&socket->other_peer)); |
2434 | /* We are done with the write handle - Freeing it */ | ||
2435 | GNUNET_free (socket->write_handle); | ||
2436 | socket->write_handle = NULL; | ||
2437 | } | 2469 | } |
2438 | break; | 2470 | break; |
2439 | default: | 2471 | default: |
diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c index b3e6fb20c..001c4f67e 100644 --- a/src/stream/test_stream_big.c +++ b/src/stream/test_stream_big.c | |||
@@ -89,6 +89,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
89 | GNUNET_STREAM_close (peer1.socket); | 89 | GNUNET_STREAM_close (peer1.socket); |
90 | if (NULL != peer2.socket) | 90 | if (NULL != peer2.socket) |
91 | GNUNET_STREAM_close (peer2.socket); | 91 | GNUNET_STREAM_close (peer2.socket); |
92 | if (NULL != peer2_listen_socket) | ||
93 | GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ | ||
92 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); | 94 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); |
93 | if (0 != abort_task) | 95 | if (0 != abort_task) |
94 | { | 96 | { |
@@ -152,7 +154,7 @@ write_completion (void *cls, | |||
152 | { | 154 | { |
153 | peer->io_write_handle = | 155 | peer->io_write_handle = |
154 | GNUNET_STREAM_write (peer->socket, | 156 | GNUNET_STREAM_write (peer->socket, |
155 | (void *) data, | 157 | ((void *) data) + peer->bytes_wrote, |
156 | DATA_SIZE - peer->bytes_wrote, | 158 | DATA_SIZE - peer->bytes_wrote, |
157 | GNUNET_TIME_relative_multiply | 159 | GNUNET_TIME_relative_multiply |
158 | (GNUNET_TIME_UNIT_SECONDS, 5), | 160 | (GNUNET_TIME_UNIT_SECONDS, 5), |
@@ -222,9 +224,17 @@ stream_open_cb (void *cls, | |||
222 | 224 | ||
223 | 225 | ||
224 | /** | 226 | /** |
227 | * Scheduler call back; to be executed when a new stream is connected | ||
228 | * Called from listen connect for peer2 | ||
229 | */ | ||
230 | static void | ||
231 | stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
232 | |||
233 | |||
234 | /** | ||
225 | * Input processor | 235 | * Input processor |
226 | * | 236 | * |
227 | * @param cls the closure from GNUNET_STREAM_write/read | 237 | * @param cls peer2 |
228 | * @param status the status of the stream at the time this function is called | 238 | * @param status the status of the stream at the time this function is called |
229 | * @param data traffic from the other side | 239 | * @param data traffic from the other side |
230 | * @param size the number of bytes available in data read | 240 | * @param size the number of bytes available in data read |
@@ -240,21 +250,23 @@ input_processor (void *cls, | |||
240 | struct PeerData *peer = cls; | 250 | struct PeerData *peer = cls; |
241 | 251 | ||
242 | GNUNET_assert (GNUNET_STREAM_OK == status); | 252 | GNUNET_assert (GNUNET_STREAM_OK == status); |
253 | GNUNET_assert (&peer2 == peer); | ||
243 | GNUNET_assert (size < DATA_SIZE); | 254 | GNUNET_assert (size < DATA_SIZE); |
244 | GNUNET_assert (memcmp (data + peer->bytes_read, | 255 | GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read, |
245 | input_data, | 256 | input_data, size)); |
246 | size)); | ||
247 | peer->bytes_read += size; | 257 | peer->bytes_read += size; |
248 | 258 | ||
249 | if (peer->bytes_read < DATA_SIZE) | 259 | if (peer->bytes_read < DATA_SIZE) |
250 | { | 260 | { |
251 | peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) | 261 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task); |
252 | peer->socket, | 262 | read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); |
253 | GNUNET_TIME_relative_multiply | 263 | /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */ |
254 | (GNUNET_TIME_UNIT_SECONDS, 5), | 264 | /* peer->socket, */ |
255 | &input_processor, | 265 | /* GNUNET_TIME_relative_multiply */ |
256 | cls); | 266 | /* (GNUNET_TIME_UNIT_SECONDS, 5), */ |
257 | GNUNET_assert (NULL != peer->io_read_handle); | 267 | /* &input_processor, */ |
268 | /* cls); */ | ||
269 | /* GNUNET_assert (NULL != peer->io_read_handle); */ | ||
258 | } | 270 | } |
259 | else | 271 | else |
260 | { | 272 | { |
@@ -274,18 +286,17 @@ static void | |||
274 | stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 286 | stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
275 | { | 287 | { |
276 | struct GNUNET_STREAM_Socket *socket = cls; | 288 | struct GNUNET_STREAM_Socket *socket = cls; |
289 | struct PeerData *peer = cls; | ||
277 | 290 | ||
278 | read_task = GNUNET_SCHEDULER_NO_TASK; | 291 | read_task = GNUNET_SCHEDULER_NO_TASK; |
279 | GNUNET_assert (socket == peer2.socket); | 292 | GNUNET_assert (&peer2 == peer); |
280 | peer2.bytes_read = 0; | 293 | peer->io_read_handle = |
281 | GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ | 294 | GNUNET_STREAM_read (peer->socket, |
282 | peer2.io_read_handle = | ||
283 | GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, | ||
284 | GNUNET_TIME_relative_multiply | 295 | GNUNET_TIME_relative_multiply |
285 | (GNUNET_TIME_UNIT_SECONDS, 10), | 296 | (GNUNET_TIME_UNIT_SECONDS, 10), |
286 | &input_processor, | 297 | &input_processor, |
287 | cls); | 298 | peer); |
288 | GNUNET_assert (NULL != peer2.io_read_handle); | 299 | GNUNET_assert (NULL != peer->io_read_handle); |
289 | } | 300 | } |
290 | 301 | ||
291 | 302 | ||
@@ -311,7 +322,8 @@ stream_listen_cb (void *cls, | |||
311 | "Peer connected: %s\n", GNUNET_i2s(initiator)); | 322 | "Peer connected: %s\n", GNUNET_i2s(initiator)); |
312 | 323 | ||
313 | peer2.socket = socket; | 324 | peer2.socket = socket; |
314 | read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, socket); | 325 | peer2.bytes_read = 0; |
326 | read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); | ||
315 | return GNUNET_OK; | 327 | return GNUNET_OK; |
316 | } | 328 | } |
317 | 329 | ||
@@ -347,6 +359,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
347 | GNUNET_assert (NULL != peer1.socket); | 359 | GNUNET_assert (NULL != peer1.socket); |
348 | } | 360 | } |
349 | 361 | ||
362 | |||
350 | /** | 363 | /** |
351 | * Initialize framework and start test | 364 | * Initialize framework and start test |
352 | */ | 365 | */ |
@@ -366,7 +379,7 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
366 | 379 | ||
367 | abort_task = | 380 | abort_task = |
368 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply | 381 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply |
369 | (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort, | 382 | (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, |
370 | NULL); | 383 | NULL); |
371 | 384 | ||
372 | test_task = GNUNET_SCHEDULER_add_now (&test, NULL); | 385 | test_task = GNUNET_SCHEDULER_add_now (&test, NULL); |