diff options
-rw-r--r-- | src/include/gnunet_stream_lib.h | 22 | ||||
-rw-r--r-- | src/stream/README | 12 | ||||
-rw-r--r-- | src/stream/stream_api.c | 95 | ||||
-rw-r--r-- | src/stream/test_stream_local.c | 16 |
4 files changed, 91 insertions, 54 deletions
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 78b11d1c5..5a2f9b2e4 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h | |||
@@ -217,13 +217,17 @@ struct GNUNET_STREAM_IOWriteHandle; | |||
217 | struct GNUNET_STREAM_IOReadHandle; | 217 | struct GNUNET_STREAM_IOReadHandle; |
218 | 218 | ||
219 | /** | 219 | /** |
220 | * Tries to write the given data to the stream | 220 | * Tries to write the given data to the stream. The maximum size of data that |
221 | * can be written as part of a write operation is (64 * (64000 - sizeof (struct | ||
222 | * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API | ||
223 | * violation, however only the said number of maximum bytes will be written. | ||
221 | * | 224 | * |
222 | * @param socket the socket representing a stream | 225 | * @param socket the socket representing a stream |
223 | * @param data the data buffer from where the data is written into the stream | 226 | * @param data the data buffer from where the data is written into the stream |
224 | * @param size the number of bytes to be written from the data buffer | 227 | * @param size the number of bytes to be written from the data buffer |
225 | * @param timeout the timeout period | 228 | * @param timeout the timeout period |
226 | * @param write_cont the function to call upon writing some bytes into the stream | 229 | * @param write_cont the function to call upon writing some bytes into the |
230 | * stream | ||
227 | * @param write_cont_cls the closure | 231 | * @param write_cont_cls the closure |
228 | * @return handle to cancel the operation; NULL if a previous write is pending | 232 | * @return handle to cancel the operation; NULL if a previous write is pending |
229 | */ | 233 | */ |
@@ -270,7 +274,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
270 | 274 | ||
271 | 275 | ||
272 | /** | 276 | /** |
273 | * Cancel pending write operation. | 277 | * Cancels pending write operation. Also cancels packet retransmissions which |
278 | * may have resulted otherwise. | ||
279 | * | ||
280 | * CAUTION: Normally a write operation is considered successful if the data | ||
281 | * given to it is sent and acknowledged by the receiver. As data is divided | ||
282 | * into packets, it is possible that not all packets are received by the | ||
283 | * receiver. Any missing packets are then retransmitted till the receiver | ||
284 | * acknowledges all packets or until a timeout . During this scenario if the | ||
285 | * write operation is cancelled all such retransmissions are also | ||
286 | * cancelled. This may leave the receiver's receive buffer incompletely filled | ||
287 | * as some missing packets are never retransmitted. So this operation should be | ||
288 | * used before shutting down transmission from our side or before closing the | ||
289 | * socket. | ||
274 | * | 290 | * |
275 | * @param ioh handle to operation to cancel | 291 | * @param ioh handle to operation to cancel |
276 | */ | 292 | */ |
diff --git a/src/stream/README b/src/stream/README index 9b550b09b..977ca2d49 100644 --- a/src/stream/README +++ b/src/stream/README | |||
@@ -1,11 +1,11 @@ | |||
1 | The aim of the stream library is to provide stream connections between peers in | 1 | Stream library provides stream connections between peers in GNUnet. This is a |
2 | GNUnet. This is a convenience library which hides the complexity of dividing | 2 | convenience library which hides the complexity of dividing data stream into |
3 | data stream into packets, transmitting them and retransmitting them in case of | 3 | packets, transmitting them and retransmitting them in case of communication |
4 | errors. | 4 | errors. |
5 | 5 | ||
6 | This library's API are similar to unix PIPE API. The user is expected to open a | 6 | This library's API are similar to unix PIPE API. The user is expected to open a |
7 | stream to a listening target peer. Once the stream is established, the user can | 7 | stream to a listening target peer. Once the stream is established, the user can |
8 | use it as a pipe. Any data written into the stream will be readable by the | 8 | use it as a pipe. Any data written into the stream at one peer will be readable |
9 | target peer. | 9 | by the other peer and vice versa. |
10 | 10 | ||
11 | This library uses mesh API for establishing streams between peers. \ No newline at end of file | 11 | This library uses mesh API for establishing tunnels between peers. \ No newline at end of file |
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 336ba0204..0749d8c5f 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -25,6 +25,8 @@ | |||
25 | * | 25 | * |
26 | * Decrement PEER intern count during socket close and listen close to free the | 26 | * Decrement PEER intern count during socket close and listen close to free the |
27 | * memory used for PEER interning | 27 | * memory used for PEER interning |
28 | * | ||
29 | * Add code for write io timeout | ||
28 | **/ | 30 | **/ |
29 | 31 | ||
30 | /** | 32 | /** |
@@ -32,6 +34,8 @@ | |||
32 | * @brief Implementation of the stream library | 34 | * @brief Implementation of the stream library |
33 | * @author Sree Harsha Totakura | 35 | * @author Sree Harsha Totakura |
34 | */ | 36 | */ |
37 | |||
38 | |||
35 | #include "platform.h" | 39 | #include "platform.h" |
36 | #include "gnunet_common.h" | 40 | #include "gnunet_common.h" |
37 | #include "gnunet_crypto_lib.h" | 41 | #include "gnunet_crypto_lib.h" |
@@ -46,15 +50,15 @@ | |||
46 | #define MAX_PACKET_SIZE 64000 | 50 | #define MAX_PACKET_SIZE 64000 |
47 | 51 | ||
48 | /** | 52 | /** |
49 | * The maximum payload a data message packet can carry | 53 | * Receive buffer |
50 | */ | 54 | */ |
51 | static size_t max_payload_size = | 55 | #define RECEIVE_BUFFER_SIZE 4096000 |
52 | MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); | ||
53 | 56 | ||
54 | /** | 57 | /** |
55 | * Receive buffer | 58 | * The maximum payload a data message packet can carry |
56 | */ | 59 | */ |
57 | #define RECEIVE_BUFFER_SIZE 4096000 | 60 | static size_t max_payload_size = |
61 | MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); | ||
58 | 62 | ||
59 | /** | 63 | /** |
60 | * states in the Protocol | 64 | * states in the Protocol |
@@ -239,7 +243,7 @@ struct GNUNET_STREAM_Socket | |||
239 | /** | 243 | /** |
240 | * Task identifier for the read io timeout task | 244 | * Task identifier for the read io timeout task |
241 | */ | 245 | */ |
242 | GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; | 246 | GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; |
243 | 247 | ||
244 | /** | 248 | /** |
245 | * Task identifier for retransmission task after timeout | 249 | * Task identifier for retransmission task after timeout |
@@ -374,6 +378,11 @@ struct GNUNET_STREAM_ListenSocket | |||
374 | struct GNUNET_STREAM_IOWriteHandle | 378 | struct GNUNET_STREAM_IOWriteHandle |
375 | { | 379 | { |
376 | /** | 380 | /** |
381 | * The socket to which this write handle is associated | ||
382 | */ | ||
383 | struct GNUNET_STREAM_Socket *socket; | ||
384 | |||
385 | /** | ||
377 | * The packet_buffers associated with this Handle | 386 | * The packet_buffers associated with this Handle |
378 | */ | 387 | */ |
379 | struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; | 388 | struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; |
@@ -398,13 +407,6 @@ struct GNUNET_STREAM_IOWriteHandle | |||
398 | * Number of bytes in this write handle | 407 | * Number of bytes in this write handle |
399 | */ | 408 | */ |
400 | size_t size; | 409 | size_t size; |
401 | |||
402 | /** | ||
403 | * Number of packets sent before waiting for an ack | ||
404 | * | ||
405 | * FIXME: Do we need this? | ||
406 | */ | ||
407 | unsigned int sent_packets; | ||
408 | }; | 410 | }; |
409 | 411 | ||
410 | 412 | ||
@@ -717,23 +719,6 @@ ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, | |||
717 | } | 719 | } |
718 | 720 | ||
719 | 721 | ||
720 | |||
721 | /** | ||
722 | * Function called when Data Message is sent | ||
723 | * | ||
724 | * @param cls the io_handle corresponding to the Data Message | ||
725 | * @param socket the socket which was used | ||
726 | */ | ||
727 | static void | ||
728 | write_data_finish_cb (void *cls, | ||
729 | struct GNUNET_STREAM_Socket *socket) | ||
730 | { | ||
731 | struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; | ||
732 | |||
733 | io_handle->sent_packets++; | ||
734 | } | ||
735 | |||
736 | |||
737 | /** | 722 | /** |
738 | * Writes data using the given socket. The amount of data written is limited by | 723 | * Writes data using the given socket. The amount of data written is limited by |
739 | * the receiver_window_size | 724 | * the receiver_window_size |
@@ -788,15 +773,15 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
788 | ntohl (io_handle->messages[packet]->sequence_number)); | 773 | ntohl (io_handle->messages[packet]->sequence_number)); |
789 | copy_and_queue_message (socket, | 774 | copy_and_queue_message (socket, |
790 | &io_handle->messages[packet]->header, | 775 | &io_handle->messages[packet]->header, |
791 | &write_data_finish_cb, | 776 | NULL, |
792 | io_handle); | 777 | NULL); |
793 | packet++; | 778 | packet++; |
794 | } | 779 | } |
795 | 780 | ||
796 | if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) | 781 | if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) |
797 | socket->retransmission_timeout_task_id = | 782 | socket->retransmission_timeout_task_id = |
798 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply | 783 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply |
799 | (GNUNET_TIME_UNIT_SECONDS, 5), | 784 | (GNUNET_TIME_UNIT_SECONDS, 8), |
800 | &retransmission_timeout_task, | 785 | &retransmission_timeout_task, |
801 | socket); | 786 | socket); |
802 | } | 787 | } |
@@ -810,7 +795,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
810 | */ | 795 | */ |
811 | static void | 796 | static void |
812 | call_read_processor (void *cls, | 797 | call_read_processor (void *cls, |
813 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 798 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
814 | { | 799 | { |
815 | struct GNUNET_STREAM_Socket *socket = cls; | 800 | struct GNUNET_STREAM_Socket *socket = cls; |
816 | size_t read_size; | 801 | size_t read_size; |
@@ -842,8 +827,8 @@ call_read_processor (void *cls, | |||
842 | GNUNET_assert (0 != valid_read_size); | 827 | GNUNET_assert (0 != valid_read_size); |
843 | 828 | ||
844 | /* Cancel the read_io_timeout_task */ | 829 | /* Cancel the read_io_timeout_task */ |
845 | GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); | 830 | GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); |
846 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; | 831 | socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; |
847 | 832 | ||
848 | /* Call the data processor */ | 833 | /* Call the data processor */ |
849 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 834 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -891,7 +876,8 @@ call_read_processor (void *cls, | |||
891 | memmove (socket->receive_buffer, | 876 | memmove (socket->receive_buffer, |
892 | socket->receive_buffer | 877 | socket->receive_buffer |
893 | + socket->receive_buffer_boundaries[sequence_increase-1], | 878 | + socket->receive_buffer_boundaries[sequence_increase-1], |
894 | socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); | 879 | socket->receive_buffer_size |
880 | - socket->receive_buffer_boundaries[sequence_increase-1]); | ||
895 | 881 | ||
896 | /* Shift the bitmap */ | 882 | /* Shift the bitmap */ |
897 | socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; | 883 | socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; |
@@ -936,7 +922,7 @@ read_io_timeout (void *cls, | |||
936 | GNUNET_STREAM_DataProcessor proc; | 922 | GNUNET_STREAM_DataProcessor proc; |
937 | void *proc_cls; | 923 | void *proc_cls; |
938 | 924 | ||
939 | socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; | 925 | socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; |
940 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) | 926 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) |
941 | { | 927 | { |
942 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 928 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1129,6 +1115,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
1129 | return GNUNET_YES; | 1115 | return GNUNET_YES; |
1130 | } | 1116 | } |
1131 | 1117 | ||
1118 | |||
1132 | /** | 1119 | /** |
1133 | * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA | 1120 | * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA |
1134 | * | 1121 | * |
@@ -2388,6 +2375,9 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) | |||
2388 | { | 2375 | { |
2389 | struct MessageQueue *head; | 2376 | struct MessageQueue *head; |
2390 | 2377 | ||
2378 | GNUNET_break (NULL == socket->read_handle); | ||
2379 | GNUNET_break (NULL == socket->write_handle); | ||
2380 | |||
2391 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) | 2381 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) |
2392 | { | 2382 | { |
2393 | /* socket closed with read task pending!? */ | 2383 | /* socket closed with read task pending!? */ |
@@ -2548,6 +2538,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
2548 | size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; | 2538 | size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; |
2549 | num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; | 2539 | num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; |
2550 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); | 2540 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); |
2541 | io_handle->socket = socket; | ||
2551 | io_handle->write_cont = write_cont; | 2542 | io_handle->write_cont = write_cont; |
2552 | io_handle->write_cont_cls = write_cont_cls; | 2543 | io_handle->write_cont_cls = write_cont_cls; |
2553 | io_handle->size = size; | 2544 | io_handle->size = size; |
@@ -2642,9 +2633,10 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
2642 | } | 2633 | } |
2643 | 2634 | ||
2644 | /* Setup the read timeout task */ | 2635 | /* Setup the read timeout task */ |
2645 | socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, | 2636 | socket->read_io_timeout_task_id = |
2646 | &read_io_timeout, | 2637 | GNUNET_SCHEDULER_add_delayed (timeout, |
2647 | socket); | 2638 | &read_io_timeout, |
2639 | socket); | ||
2648 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2640 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2649 | "%x: %s() END\n", | 2641 | "%x: %s() END\n", |
2650 | socket->our_id, | 2642 | socket->our_id, |
@@ -2661,7 +2653,26 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
2661 | void | 2653 | void |
2662 | GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) | 2654 | GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) |
2663 | { | 2655 | { |
2664 | /* FIXME: Should cancel the write retransmission task */ | 2656 | struct GNUNET_STREAM_Socket *socket = ioh->socket; |
2657 | unsigned int packet; | ||
2658 | |||
2659 | GNUNET_assert (NULL != socket->write_handle); | ||
2660 | GNUNET_assert (socket->write_handle == ioh); | ||
2661 | |||
2662 | if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) | ||
2663 | { | ||
2664 | GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); | ||
2665 | socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
2666 | } | ||
2667 | |||
2668 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) | ||
2669 | { | ||
2670 | if (NULL == ioh->messages[packet]) break; | ||
2671 | GNUNET_free (ioh->messages[packet]); | ||
2672 | } | ||
2673 | |||
2674 | GNUNET_free (socket->write_handle); | ||
2675 | socket->write_handle = NULL; | ||
2665 | return; | 2676 | return; |
2666 | } | 2677 | } |
2667 | 2678 | ||
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 535ee62a2..c3fcc6492 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c | |||
@@ -101,6 +101,9 @@ static GNUNET_SCHEDULER_TaskIdentifier read_task; | |||
101 | static char *data = "ABCD"; | 101 | static char *data = "ABCD"; |
102 | static int result; | 102 | static int result; |
103 | 103 | ||
104 | static int writing_success; | ||
105 | static int reading_success; | ||
106 | |||
104 | 107 | ||
105 | /** | 108 | /** |
106 | * Check whether peers successfully shut down. | 109 | * Check whether peers successfully shut down. |
@@ -197,9 +200,8 @@ write_completion (void *cls, | |||
197 | enum GNUNET_STREAM_Status status, | 200 | enum GNUNET_STREAM_Status status, |
198 | size_t size) | 201 | size_t size) |
199 | { | 202 | { |
200 | struct PeerData *peer; | 203 | struct PeerData *peer = cls; |
201 | 204 | ||
202 | peer = (struct PeerData *) cls; | ||
203 | GNUNET_assert (GNUNET_STREAM_OK == status); | 205 | GNUNET_assert (GNUNET_STREAM_OK == status); |
204 | GNUNET_assert (size <= strlen (data)); | 206 | GNUNET_assert (size <= strlen (data)); |
205 | peer->bytes_wrote += size; | 207 | peer->bytes_wrote += size; |
@@ -232,6 +234,12 @@ write_completion (void *cls, | |||
232 | cls); | 234 | cls); |
233 | GNUNET_assert (NULL!=peer->io_read_handle); | 235 | GNUNET_assert (NULL!=peer->io_read_handle); |
234 | } | 236 | } |
237 | else | ||
238 | { | ||
239 | writing_success = GNUNET_YES; | ||
240 | if (GNUNET_YES == reading_success) | ||
241 | GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); | ||
242 | } | ||
235 | } | 243 | } |
236 | } | 244 | } |
237 | 245 | ||
@@ -335,7 +343,9 @@ input_processor (void *cls, | |||
335 | } | 343 | } |
336 | else /* Peer1 has completed reading. End of tests */ | 344 | else /* Peer1 has completed reading. End of tests */ |
337 | { | 345 | { |
338 | GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); | 346 | reading_success = GNUNET_YES; |
347 | if (GNUNET_YES == writing_success) | ||
348 | GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); | ||
339 | } | 349 | } |
340 | } | 350 | } |
341 | return size; | 351 | return size; |