diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-03-21 12:32:55 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-03-21 12:32:55 +0000 |
commit | e83f713b6cd9795ca576402682448f0c54879331 (patch) | |
tree | 68a7e57efba44380bf6ec350da75e335ec857427 /src | |
parent | 3343e64659d6e8ff5ed6a74faac7226e55fc1c55 (diff) | |
download | gnunet-e83f713b6cd9795ca576402682448f0c54879331.tar.gz gnunet-e83f713b6cd9795ca576402682448f0c54879331.zip |
fixed read timeout problem and added ack sending incase of ignored data messages
Diffstat (limited to 'src')
-rw-r--r-- | src/stream/stream_api.c | 62 | ||||
-rw-r--r-- | src/stream/test_stream_local.c | 23 |
2 files changed, 75 insertions, 10 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 1547f0228..2b9363c68 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -845,12 +845,19 @@ call_read_processor (void *cls, | |||
845 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; | 845 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; |
846 | 846 | ||
847 | /* Call the data processor */ | 847 | /* Call the data processor */ |
848 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
849 | "%x: Calling read processor\n", | ||
850 | socket->our_id); | ||
848 | read_size = | 851 | read_size = |
849 | socket->read_handle->proc (socket->read_handle->proc_cls, | 852 | socket->read_handle->proc (socket->read_handle->proc_cls, |
850 | socket->status, | 853 | socket->status, |
851 | socket->receive_buffer + socket->copy_offset, | 854 | socket->receive_buffer + socket->copy_offset, |
852 | valid_read_size); | 855 | valid_read_size); |
853 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 856 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
857 | "%x: Read processor read %d bytes\n", | ||
858 | socket->our_id, | ||
859 | read_size); | ||
860 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
854 | "%x: Read processor completed successfully\n", | 861 | "%x: Read processor completed successfully\n", |
855 | socket->our_id); | 862 | socket->our_id); |
856 | 863 | ||
@@ -917,17 +924,29 @@ read_io_timeout (void *cls, | |||
917 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 924 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
918 | { | 925 | { |
919 | struct GNUNET_STREAM_Socket *socket = cls; | 926 | struct GNUNET_STREAM_Socket *socket = cls; |
927 | GNUNET_STREAM_DataProcessor proc; | ||
928 | void *proc_cls; | ||
920 | 929 | ||
921 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; | 930 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; |
922 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) | 931 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) |
923 | { | 932 | { |
933 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
934 | "%x: Read task timedout - Cancelling it\n", | ||
935 | socket->our_id); | ||
924 | GNUNET_SCHEDULER_cancel (socket->read_task_id); | 936 | GNUNET_SCHEDULER_cancel (socket->read_task_id); |
925 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; | 937 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; |
926 | } | 938 | } |
927 | GNUNET_assert (NULL != socket->read_handle); | 939 | GNUNET_assert (NULL != socket->read_handle); |
928 | 940 | proc = socket->read_handle->proc; | |
941 | proc_cls = socket->read_handle->proc_cls; | ||
942 | |||
929 | GNUNET_free (socket->read_handle); | 943 | GNUNET_free (socket->read_handle); |
930 | socket->read_handle = NULL; | 944 | socket->read_handle = NULL; |
945 | /* Call the read processor to signal timeout */ | ||
946 | proc (proc_cls, | ||
947 | GNUNET_STREAM_TIMEOUT, | ||
948 | NULL, | ||
949 | 0); | ||
931 | } | 950 | } |
932 | 951 | ||
933 | 952 | ||
@@ -986,9 +1005,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
986 | "%x: Ignoring received message with sequence number %u\n", | 1005 | "%x: Ignoring received message with sequence number %u\n", |
987 | socket->our_id, | 1006 | socket->our_id, |
988 | ntohl (msg->sequence_number)); | 1007 | ntohl (msg->sequence_number)); |
1008 | /* Start ACK sending task if one is not already present */ | ||
1009 | if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) | ||
1010 | { | ||
1011 | socket->ack_task_id = | ||
1012 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh | ||
1013 | (msg->ack_deadline), | ||
1014 | &ack_task, | ||
1015 | socket); | ||
1016 | } | ||
989 | return GNUNET_YES; | 1017 | return GNUNET_YES; |
990 | } | 1018 | } |
991 | 1019 | ||
992 | /* Check if we have already seen this message */ | 1020 | /* Check if we have already seen this message */ |
993 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, | 1021 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, |
994 | relative_sequence_number)) | 1022 | relative_sequence_number)) |
@@ -998,6 +1026,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
998 | "number %u\n", | 1026 | "number %u\n", |
999 | socket->our_id, | 1027 | socket->our_id, |
1000 | ntohl (msg->sequence_number)); | 1028 | ntohl (msg->sequence_number)); |
1029 | /* Start ACK sending task if one is not already present */ | ||
1030 | if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) | ||
1031 | { | ||
1032 | socket->ack_task_id = | ||
1033 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh | ||
1034 | (msg->ack_deadline), | ||
1035 | &ack_task, | ||
1036 | socket); | ||
1037 | } | ||
1001 | return GNUNET_YES; | 1038 | return GNUNET_YES; |
1002 | } | 1039 | } |
1003 | 1040 | ||
@@ -1063,6 +1100,10 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
1063 | && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, | 1100 | && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, |
1064 | 0))) | 1101 | 0))) |
1065 | { | 1102 | { |
1103 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1104 | "%x: Scheduling read processor\n", | ||
1105 | socket->our_id); | ||
1106 | |||
1066 | socket->read_task_id = | 1107 | socket->read_task_id = |
1067 | GNUNET_SCHEDULER_add_now (&call_read_processor, | 1108 | GNUNET_SCHEDULER_add_now (&call_read_processor, |
1068 | socket); | 1109 | socket); |
@@ -1864,12 +1905,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, | |||
1864 | socket->our_id); | 1905 | socket->our_id); |
1865 | return GNUNET_OK; | 1906 | return GNUNET_OK; |
1866 | } | 1907 | } |
1867 | 1908 | /* FIXME: increment in the base sequence number is breaking current flow | |
1909 | */ | ||
1868 | if (!((socket->write_sequence_number | 1910 | if (!((socket->write_sequence_number |
1869 | - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) | 1911 | - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) |
1870 | { | 1912 | { |
1871 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1913 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1872 | "%x: Received DATA_ACK with unexpected base sequence", | 1914 | "%x: Received DATA_ACK with unexpected base sequence " |
1873 | "number\n", | 1915 | "number\n", |
1874 | socket->our_id); | 1916 | socket->our_id); |
1875 | return GNUNET_OK; | 1917 | return GNUNET_OK; |
@@ -2532,14 +2574,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
2532 | struct GNUNET_STREAM_IOReadHandle *read_handle; | 2574 | struct GNUNET_STREAM_IOReadHandle *read_handle; |
2533 | 2575 | ||
2534 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2576 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2535 | "%s()\n", __func__); | 2577 | "%x: %s()\n", |
2578 | socket->our_id, | ||
2579 | __func__); | ||
2536 | 2580 | ||
2537 | /* Return NULL if there is already a read handle; the user has to cancel that | 2581 | /* Return NULL if there is already a read handle; the user has to cancel that |
2538 | first before continuing or has to wait until it is completed */ | 2582 | first before continuing or has to wait until it is completed */ |
2539 | if (NULL != socket->read_handle) return NULL; | 2583 | if (NULL != socket->read_handle) return NULL; |
2540 | 2584 | ||
2585 | GNUNET_assert (NULL != proc); | ||
2586 | |||
2541 | read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); | 2587 | read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); |
2542 | read_handle->proc = proc; | 2588 | read_handle->proc = proc; |
2589 | read_handle->proc_cls = proc_cls; | ||
2543 | socket->read_handle = read_handle; | 2590 | socket->read_handle = read_handle; |
2544 | 2591 | ||
2545 | /* Check if we have a packet at bitmap 0 */ | 2592 | /* Check if we have a packet at bitmap 0 */ |
@@ -2556,7 +2603,9 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
2556 | &read_io_timeout, | 2603 | &read_io_timeout, |
2557 | socket); | 2604 | socket); |
2558 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2605 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2559 | "%s() END\n", __func__); | 2606 | "%x: %s() END\n", |
2607 | socket->our_id, | ||
2608 | __func__); | ||
2560 | return read_handle; | 2609 | return read_handle; |
2561 | } | 2610 | } |
2562 | 2611 | ||
@@ -2569,6 +2618,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
2569 | void | 2618 | void |
2570 | GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) | 2619 | GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) |
2571 | { | 2620 | { |
2621 | /* FIXME: Should cancel the write retransmission task */ | ||
2572 | return; | 2622 | return; |
2573 | } | 2623 | } |
2574 | 2624 | ||
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 9a6c13da6..535ee62a2 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c | |||
@@ -287,11 +287,26 @@ input_processor (void *cls, | |||
287 | 287 | ||
288 | peer = (struct PeerData *) cls; | 288 | peer = (struct PeerData *) cls; |
289 | 289 | ||
290 | if (GNUNET_STREAM_TIMEOUT == status) | ||
291 | { | ||
292 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
293 | "Read operation timedout - reading again!\n"); | ||
294 | GNUNET_assert (0 == size); | ||
295 | peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) | ||
296 | peer->socket, | ||
297 | GNUNET_TIME_relative_multiply | ||
298 | (GNUNET_TIME_UNIT_SECONDS, 5), | ||
299 | &input_processor, | ||
300 | cls); | ||
301 | GNUNET_assert (NULL != peer->io_read_handle); | ||
302 | return 0; | ||
303 | } | ||
304 | |||
290 | GNUNET_assert (GNUNET_STREAM_OK == status); | 305 | GNUNET_assert (GNUNET_STREAM_OK == status); |
291 | GNUNET_assert (size < strlen (data)); | 306 | GNUNET_assert (size <= strlen (data)); |
292 | GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, | 307 | GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, |
293 | (const char *) input_data, | 308 | (const char *) input_data, |
294 | size)); | 309 | size)); |
295 | peer->bytes_read += size; | 310 | peer->bytes_read += size; |
296 | 311 | ||
297 | if (peer->bytes_read < strlen (data)) | 312 | if (peer->bytes_read < strlen (data)) |