aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_stream_lib.h22
-rw-r--r--src/stream/README12
-rw-r--r--src/stream/stream_api.c95
-rw-r--r--src/stream/test_stream_local.c16
4 files changed, 91 insertions, 54 deletions
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h
index 78b11d1c5..5a2f9b2e4 100644
--- a/src/include/gnunet_stream_lib.h
+++ b/src/include/gnunet_stream_lib.h
@@ -217,13 +217,17 @@ struct GNUNET_STREAM_IOWriteHandle;
217struct GNUNET_STREAM_IOReadHandle; 217struct GNUNET_STREAM_IOReadHandle;
218 218
219/** 219/**
220 * Tries to write the given data to the stream 220 * Tries to write the given data to the stream. The maximum size of data that
221 * can be written as part of a write operation is (64 * (64000 - sizeof (struct
222 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
223 * violation, however only the said number of maximum bytes will be written.
221 * 224 *
222 * @param socket the socket representing a stream 225 * @param socket the socket representing a stream
223 * @param data the data buffer from where the data is written into the stream 226 * @param data the data buffer from where the data is written into the stream
224 * @param size the number of bytes to be written from the data buffer 227 * @param size the number of bytes to be written from the data buffer
225 * @param timeout the timeout period 228 * @param timeout the timeout period
226 * @param write_cont the function to call upon writing some bytes into the stream 229 * @param write_cont the function to call upon writing some bytes into the
230 * stream
227 * @param write_cont_cls the closure 231 * @param write_cont_cls the closure
228 * @return handle to cancel the operation; NULL if a previous write is pending 232 * @return handle to cancel the operation; NULL if a previous write is pending
229 */ 233 */
@@ -270,7 +274,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
270 274
271 275
272/** 276/**
273 * Cancel pending write operation. 277 * Cancels pending write operation. Also cancels packet retransmissions which
278 * may have resulted otherwise.
279 *
280 * CAUTION: Normally a write operation is considered successful if the data
281 * given to it is sent and acknowledged by the receiver. As data is divided
282 * into packets, it is possible that not all packets are received by the
283 * receiver. Any missing packets are then retransmitted till the receiver
284 * acknowledges all packets or until a timeout . During this scenario if the
285 * write operation is cancelled all such retransmissions are also
286 * cancelled. This may leave the receiver's receive buffer incompletely filled
287 * as some missing packets are never retransmitted. So this operation should be
288 * used before shutting down transmission from our side or before closing the
289 * socket.
274 * 290 *
275 * @param ioh handle to operation to cancel 291 * @param ioh handle to operation to cancel
276 */ 292 */
diff --git a/src/stream/README b/src/stream/README
index 9b550b09b..977ca2d49 100644
--- a/src/stream/README
+++ b/src/stream/README
@@ -1,11 +1,11 @@
1The aim of the stream library is to provide stream connections between peers in 1Stream library provides stream connections between peers in GNUnet. This is a
2GNUnet. This is a convenience library which hides the complexity of dividing 2convenience library which hides the complexity of dividing data stream into
3data stream into packets, transmitting them and retransmitting them in case of 3packets, transmitting them and retransmitting them in case of communication
4errors. 4errors.
5 5
6This library's API are similar to unix PIPE API. The user is expected to open a 6This library's API are similar to unix PIPE API. The user is expected to open a
7stream to a listening target peer. Once the stream is established, the user can 7stream to a listening target peer. Once the stream is established, the user can
8use it as a pipe. Any data written into the stream will be readable by the 8use it as a pipe. Any data written into the stream at one peer will be readable
9target peer. 9by the other peer and vice versa.
10 10
11This library uses mesh API for establishing streams between peers. \ No newline at end of file 11This library uses mesh API for establishing tunnels between peers. \ No newline at end of file
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 336ba0204..0749d8c5f 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -25,6 +25,8 @@
25 * 25 *
26 * Decrement PEER intern count during socket close and listen close to free the 26 * Decrement PEER intern count during socket close and listen close to free the
27 * memory used for PEER interning 27 * memory used for PEER interning
28 *
29 * Add code for write io timeout
28 **/ 30 **/
29 31
30/** 32/**
@@ -32,6 +34,8 @@
32 * @brief Implementation of the stream library 34 * @brief Implementation of the stream library
33 * @author Sree Harsha Totakura 35 * @author Sree Harsha Totakura
34 */ 36 */
37
38
35#include "platform.h" 39#include "platform.h"
36#include "gnunet_common.h" 40#include "gnunet_common.h"
37#include "gnunet_crypto_lib.h" 41#include "gnunet_crypto_lib.h"
@@ -46,15 +50,15 @@
46#define MAX_PACKET_SIZE 64000 50#define MAX_PACKET_SIZE 64000
47 51
48/** 52/**
49 * The maximum payload a data message packet can carry 53 * Receive buffer
50 */ 54 */
51static size_t max_payload_size = 55#define RECEIVE_BUFFER_SIZE 4096000
52 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
53 56
54/** 57/**
55 * Receive buffer 58 * The maximum payload a data message packet can carry
56 */ 59 */
57#define RECEIVE_BUFFER_SIZE 4096000 60static size_t max_payload_size =
61 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
58 62
59/** 63/**
60 * states in the Protocol 64 * states in the Protocol
@@ -239,7 +243,7 @@ struct GNUNET_STREAM_Socket
239 /** 243 /**
240 * Task identifier for the read io timeout task 244 * Task identifier for the read io timeout task
241 */ 245 */
242 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; 246 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
243 247
244 /** 248 /**
245 * Task identifier for retransmission task after timeout 249 * Task identifier for retransmission task after timeout
@@ -374,6 +378,11 @@ struct GNUNET_STREAM_ListenSocket
374struct GNUNET_STREAM_IOWriteHandle 378struct GNUNET_STREAM_IOWriteHandle
375{ 379{
376 /** 380 /**
381 * The socket to which this write handle is associated
382 */
383 struct GNUNET_STREAM_Socket *socket;
384
385 /**
377 * The packet_buffers associated with this Handle 386 * The packet_buffers associated with this Handle
378 */ 387 */
379 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; 388 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
@@ -398,13 +407,6 @@ struct GNUNET_STREAM_IOWriteHandle
398 * Number of bytes in this write handle 407 * Number of bytes in this write handle
399 */ 408 */
400 size_t size; 409 size_t size;
401
402 /**
403 * Number of packets sent before waiting for an ack
404 *
405 * FIXME: Do we need this?
406 */
407 unsigned int sent_packets;
408}; 410};
409 411
410 412
@@ -717,23 +719,6 @@ ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
717} 719}
718 720
719 721
720
721/**
722 * Function called when Data Message is sent
723 *
724 * @param cls the io_handle corresponding to the Data Message
725 * @param socket the socket which was used
726 */
727static void
728write_data_finish_cb (void *cls,
729 struct GNUNET_STREAM_Socket *socket)
730{
731 struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
732
733 io_handle->sent_packets++;
734}
735
736
737/** 722/**
738 * Writes data using the given socket. The amount of data written is limited by 723 * Writes data using the given socket. The amount of data written is limited by
739 * the receiver_window_size 724 * the receiver_window_size
@@ -788,15 +773,15 @@ write_data (struct GNUNET_STREAM_Socket *socket)
788 ntohl (io_handle->messages[packet]->sequence_number)); 773 ntohl (io_handle->messages[packet]->sequence_number));
789 copy_and_queue_message (socket, 774 copy_and_queue_message (socket,
790 &io_handle->messages[packet]->header, 775 &io_handle->messages[packet]->header,
791 &write_data_finish_cb, 776 NULL,
792 io_handle); 777 NULL);
793 packet++; 778 packet++;
794 } 779 }
795 780
796 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) 781 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
797 socket->retransmission_timeout_task_id = 782 socket->retransmission_timeout_task_id =
798 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 783 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
799 (GNUNET_TIME_UNIT_SECONDS, 5), 784 (GNUNET_TIME_UNIT_SECONDS, 8),
800 &retransmission_timeout_task, 785 &retransmission_timeout_task,
801 socket); 786 socket);
802} 787}
@@ -810,7 +795,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
810 */ 795 */
811static void 796static void
812call_read_processor (void *cls, 797call_read_processor (void *cls,
813 const struct GNUNET_SCHEDULER_TaskContext *tc) 798 const struct GNUNET_SCHEDULER_TaskContext *tc)
814{ 799{
815 struct GNUNET_STREAM_Socket *socket = cls; 800 struct GNUNET_STREAM_Socket *socket = cls;
816 size_t read_size; 801 size_t read_size;
@@ -842,8 +827,8 @@ call_read_processor (void *cls,
842 GNUNET_assert (0 != valid_read_size); 827 GNUNET_assert (0 != valid_read_size);
843 828
844 /* Cancel the read_io_timeout_task */ 829 /* Cancel the read_io_timeout_task */
845 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); 830 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
846 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; 831 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
847 832
848 /* Call the data processor */ 833 /* Call the data processor */
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 834 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -891,7 +876,8 @@ call_read_processor (void *cls,
891 memmove (socket->receive_buffer, 876 memmove (socket->receive_buffer,
892 socket->receive_buffer 877 socket->receive_buffer
893 + socket->receive_buffer_boundaries[sequence_increase-1], 878 + socket->receive_buffer_boundaries[sequence_increase-1],
894 socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); 879 socket->receive_buffer_size
880 - socket->receive_buffer_boundaries[sequence_increase-1]);
895 881
896 /* Shift the bitmap */ 882 /* Shift the bitmap */
897 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; 883 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
@@ -936,7 +922,7 @@ read_io_timeout (void *cls,
936 GNUNET_STREAM_DataProcessor proc; 922 GNUNET_STREAM_DataProcessor proc;
937 void *proc_cls; 923 void *proc_cls;
938 924
939 socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; 925 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
940 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 926 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
941 { 927 {
942 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 928 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1129,6 +1115,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1129 return GNUNET_YES; 1115 return GNUNET_YES;
1130} 1116}
1131 1117
1118
1132/** 1119/**
1133 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA 1120 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1134 * 1121 *
@@ -2388,6 +2375,9 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2388{ 2375{
2389 struct MessageQueue *head; 2376 struct MessageQueue *head;
2390 2377
2378 GNUNET_break (NULL == socket->read_handle);
2379 GNUNET_break (NULL == socket->write_handle);
2380
2391 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 2381 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2392 { 2382 {
2393 /* socket closed with read task pending!? */ 2383 /* socket closed with read task pending!? */
@@ -2548,6 +2538,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
2548 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; 2538 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
2549 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; 2539 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
2550 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); 2540 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
2541 io_handle->socket = socket;
2551 io_handle->write_cont = write_cont; 2542 io_handle->write_cont = write_cont;
2552 io_handle->write_cont_cls = write_cont_cls; 2543 io_handle->write_cont_cls = write_cont_cls;
2553 io_handle->size = size; 2544 io_handle->size = size;
@@ -2642,9 +2633,10 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2642 } 2633 }
2643 2634
2644 /* Setup the read timeout task */ 2635 /* Setup the read timeout task */
2645 socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, 2636 socket->read_io_timeout_task_id =
2646 &read_io_timeout, 2637 GNUNET_SCHEDULER_add_delayed (timeout,
2647 socket); 2638 &read_io_timeout,
2639 socket);
2648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2649 "%x: %s() END\n", 2641 "%x: %s() END\n",
2650 socket->our_id, 2642 socket->our_id,
@@ -2661,7 +2653,26 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
2661void 2653void
2662GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) 2654GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
2663{ 2655{
2664 /* FIXME: Should cancel the write retransmission task */ 2656 struct GNUNET_STREAM_Socket *socket = ioh->socket;
2657 unsigned int packet;
2658
2659 GNUNET_assert (NULL != socket->write_handle);
2660 GNUNET_assert (socket->write_handle == ioh);
2661
2662 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2663 {
2664 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2665 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2666 }
2667
2668 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2669 {
2670 if (NULL == ioh->messages[packet]) break;
2671 GNUNET_free (ioh->messages[packet]);
2672 }
2673
2674 GNUNET_free (socket->write_handle);
2675 socket->write_handle = NULL;
2665 return; 2676 return;
2666} 2677}
2667 2678
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
index 535ee62a2..c3fcc6492 100644
--- a/src/stream/test_stream_local.c
+++ b/src/stream/test_stream_local.c
@@ -101,6 +101,9 @@ static GNUNET_SCHEDULER_TaskIdentifier read_task;
101static char *data = "ABCD"; 101static char *data = "ABCD";
102static int result; 102static int result;
103 103
104static int writing_success;
105static int reading_success;
106
104 107
105/** 108/**
106 * Check whether peers successfully shut down. 109 * Check whether peers successfully shut down.
@@ -197,9 +200,8 @@ write_completion (void *cls,
197 enum GNUNET_STREAM_Status status, 200 enum GNUNET_STREAM_Status status,
198 size_t size) 201 size_t size)
199{ 202{
200 struct PeerData *peer; 203 struct PeerData *peer = cls;
201 204
202 peer = (struct PeerData *) cls;
203 GNUNET_assert (GNUNET_STREAM_OK == status); 205 GNUNET_assert (GNUNET_STREAM_OK == status);
204 GNUNET_assert (size <= strlen (data)); 206 GNUNET_assert (size <= strlen (data));
205 peer->bytes_wrote += size; 207 peer->bytes_wrote += size;
@@ -232,6 +234,12 @@ write_completion (void *cls,
232 cls); 234 cls);
233 GNUNET_assert (NULL!=peer->io_read_handle); 235 GNUNET_assert (NULL!=peer->io_read_handle);
234 } 236 }
237 else
238 {
239 writing_success = GNUNET_YES;
240 if (GNUNET_YES == reading_success)
241 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
242 }
235 } 243 }
236} 244}
237 245
@@ -335,7 +343,9 @@ input_processor (void *cls,
335 } 343 }
336 else /* Peer1 has completed reading. End of tests */ 344 else /* Peer1 has completed reading. End of tests */
337 { 345 {
338 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); 346 reading_success = GNUNET_YES;
347 if (GNUNET_YES == writing_success)
348 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
339 } 349 }
340 } 350 }
341 return size; 351 return size;