aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-03-21 12:32:55 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-03-21 12:32:55 +0000
commite83f713b6cd9795ca576402682448f0c54879331 (patch)
tree68a7e57efba44380bf6ec350da75e335ec857427 /src
parent3343e64659d6e8ff5ed6a74faac7226e55fc1c55 (diff)
downloadgnunet-e83f713b6cd9795ca576402682448f0c54879331.tar.gz
gnunet-e83f713b6cd9795ca576402682448f0c54879331.zip
fixed read timeout problem and added ack sending incase of ignored data messages
Diffstat (limited to 'src')
-rw-r--r--src/stream/stream_api.c62
-rw-r--r--src/stream/test_stream_local.c23
2 files changed, 75 insertions, 10 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 1547f0228..2b9363c68 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -845,12 +845,19 @@ call_read_processor (void *cls,
845 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; 845 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
846 846
847 /* Call the data processor */ 847 /* Call the data processor */
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 "%x: Calling read processor\n",
850 socket->our_id);
848 read_size = 851 read_size =
849 socket->read_handle->proc (socket->read_handle->proc_cls, 852 socket->read_handle->proc (socket->read_handle->proc_cls,
850 socket->status, 853 socket->status,
851 socket->receive_buffer + socket->copy_offset, 854 socket->receive_buffer + socket->copy_offset,
852 valid_read_size); 855 valid_read_size);
853 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857 "%x: Read processor read %d bytes\n",
858 socket->our_id,
859 read_size);
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854 "%x: Read processor completed successfully\n", 861 "%x: Read processor completed successfully\n",
855 socket->our_id); 862 socket->our_id);
856 863
@@ -917,17 +924,29 @@ read_io_timeout (void *cls,
917 const struct GNUNET_SCHEDULER_TaskContext *tc) 924 const struct GNUNET_SCHEDULER_TaskContext *tc)
918{ 925{
919 struct GNUNET_STREAM_Socket *socket = cls; 926 struct GNUNET_STREAM_Socket *socket = cls;
927 GNUNET_STREAM_DataProcessor proc;
928 void *proc_cls;
920 929
921 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; 930 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
922 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 931 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
923 { 932 {
933 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934 "%x: Read task timedout - Cancelling it\n",
935 socket->our_id);
924 GNUNET_SCHEDULER_cancel (socket->read_task_id); 936 GNUNET_SCHEDULER_cancel (socket->read_task_id);
925 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 937 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
926 } 938 }
927 GNUNET_assert (NULL != socket->read_handle); 939 GNUNET_assert (NULL != socket->read_handle);
928 940 proc = socket->read_handle->proc;
941 proc_cls = socket->read_handle->proc_cls;
942
929 GNUNET_free (socket->read_handle); 943 GNUNET_free (socket->read_handle);
930 socket->read_handle = NULL; 944 socket->read_handle = NULL;
945 /* Call the read processor to signal timeout */
946 proc (proc_cls,
947 GNUNET_STREAM_TIMEOUT,
948 NULL,
949 0);
931} 950}
932 951
933 952
@@ -986,9 +1005,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
986 "%x: Ignoring received message with sequence number %u\n", 1005 "%x: Ignoring received message with sequence number %u\n",
987 socket->our_id, 1006 socket->our_id,
988 ntohl (msg->sequence_number)); 1007 ntohl (msg->sequence_number));
1008 /* Start ACK sending task if one is not already present */
1009 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1010 {
1011 socket->ack_task_id =
1012 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1013 (msg->ack_deadline),
1014 &ack_task,
1015 socket);
1016 }
989 return GNUNET_YES; 1017 return GNUNET_YES;
990 } 1018 }
991 1019
992 /* Check if we have already seen this message */ 1020 /* Check if we have already seen this message */
993 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 1021 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
994 relative_sequence_number)) 1022 relative_sequence_number))
@@ -998,6 +1026,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
998 "number %u\n", 1026 "number %u\n",
999 socket->our_id, 1027 socket->our_id,
1000 ntohl (msg->sequence_number)); 1028 ntohl (msg->sequence_number));
1029 /* Start ACK sending task if one is not already present */
1030 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1031 {
1032 socket->ack_task_id =
1033 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1034 (msg->ack_deadline),
1035 &ack_task,
1036 socket);
1037 }
1001 return GNUNET_YES; 1038 return GNUNET_YES;
1002 } 1039 }
1003 1040
@@ -1063,6 +1100,10 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1063 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 1100 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1064 0))) 1101 0)))
1065 { 1102 {
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104 "%x: Scheduling read processor\n",
1105 socket->our_id);
1106
1066 socket->read_task_id = 1107 socket->read_task_id =
1067 GNUNET_SCHEDULER_add_now (&call_read_processor, 1108 GNUNET_SCHEDULER_add_now (&call_read_processor,
1068 socket); 1109 socket);
@@ -1864,12 +1905,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
1864 socket->our_id); 1905 socket->our_id);
1865 return GNUNET_OK; 1906 return GNUNET_OK;
1866 } 1907 }
1867 1908 /* FIXME: increment in the base sequence number is breaking current flow
1909 */
1868 if (!((socket->write_sequence_number 1910 if (!((socket->write_sequence_number
1869 - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) 1911 - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
1870 { 1912 {
1871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1872 "%x: Received DATA_ACK with unexpected base sequence", 1914 "%x: Received DATA_ACK with unexpected base sequence "
1873 "number\n", 1915 "number\n",
1874 socket->our_id); 1916 socket->our_id);
1875 return GNUNET_OK; 1917 return GNUNET_OK;
@@ -2532,14 +2574,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2532 struct GNUNET_STREAM_IOReadHandle *read_handle; 2574 struct GNUNET_STREAM_IOReadHandle *read_handle;
2533 2575
2534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2535 "%s()\n", __func__); 2577 "%x: %s()\n",
2578 socket->our_id,
2579 __func__);
2536 2580
2537 /* Return NULL if there is already a read handle; the user has to cancel that 2581 /* Return NULL if there is already a read handle; the user has to cancel that
2538 first before continuing or has to wait until it is completed */ 2582 first before continuing or has to wait until it is completed */
2539 if (NULL != socket->read_handle) return NULL; 2583 if (NULL != socket->read_handle) return NULL;
2540 2584
2585 GNUNET_assert (NULL != proc);
2586
2541 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); 2587 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2542 read_handle->proc = proc; 2588 read_handle->proc = proc;
2589 read_handle->proc_cls = proc_cls;
2543 socket->read_handle = read_handle; 2590 socket->read_handle = read_handle;
2544 2591
2545 /* Check if we have a packet at bitmap 0 */ 2592 /* Check if we have a packet at bitmap 0 */
@@ -2556,7 +2603,9 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2556 &read_io_timeout, 2603 &read_io_timeout,
2557 socket); 2604 socket);
2558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2559 "%s() END\n", __func__); 2606 "%x: %s() END\n",
2607 socket->our_id,
2608 __func__);
2560 return read_handle; 2609 return read_handle;
2561} 2610}
2562 2611
@@ -2569,6 +2618,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2569void 2618void
2570GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) 2619GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2571{ 2620{
2621 /* FIXME: Should cancel the write retransmission task */
2572 return; 2622 return;
2573} 2623}
2574 2624
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
index 9a6c13da6..535ee62a2 100644
--- a/src/stream/test_stream_local.c
+++ b/src/stream/test_stream_local.c
@@ -287,11 +287,26 @@ input_processor (void *cls,
287 287
288 peer = (struct PeerData *) cls; 288 peer = (struct PeerData *) cls;
289 289
290 if (GNUNET_STREAM_TIMEOUT == status)
291 {
292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
293 "Read operation timedout - reading again!\n");
294 GNUNET_assert (0 == size);
295 peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
296 peer->socket,
297 GNUNET_TIME_relative_multiply
298 (GNUNET_TIME_UNIT_SECONDS, 5),
299 &input_processor,
300 cls);
301 GNUNET_assert (NULL != peer->io_read_handle);
302 return 0;
303 }
304
290 GNUNET_assert (GNUNET_STREAM_OK == status); 305 GNUNET_assert (GNUNET_STREAM_OK == status);
291 GNUNET_assert (size < strlen (data)); 306 GNUNET_assert (size <= strlen (data));
292 GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 307 GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
293 (const char *) input_data, 308 (const char *) input_data,
294 size)); 309 size));
295 peer->bytes_read += size; 310 peer->bytes_read += size;
296 311
297 if (peer->bytes_read < strlen (data)) 312 if (peer->bytes_read < strlen (data))