aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-02-26 22:32:23 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-02-26 22:32:23 +0000
commitb231b33e0a4da703c962a9a3e02ca684d27dcc75 (patch)
tree279da0b7c5f15cacea0850c4e1d71e30baa766c7
parent9cd78fb24282746e988cf768430082ca2e17a82c (diff)
downloadgnunet-b231b33e0a4da703c962a9a3e02ca684d27dcc75.tar.gz
gnunet-b231b33e0a4da703c962a9a3e02ca684d27dcc75.zip
-copy buffer and STREAM_read(incomplete)
-rw-r--r--src/stream/stream_api.c189
1 files changed, 154 insertions, 35 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 0a8782c61..41bae2da5 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -219,12 +219,12 @@ struct GNUNET_STREAM_Socket
219 /** 219 /**
220 * The write IO_handle associated with this socket 220 * The write IO_handle associated with this socket
221 */ 221 */
222 struct GNUNET_STREAM_IOHandle *write_handle; 222 struct GNUNET_STREAM_IOWriteHandle *write_handle;
223 223
224 /** 224 /**
225 * The read IO_handle associated with this socket 225 * The read IO_handle associated with this socket
226 */ 226 */
227 struct GNUNET_STREAM_IOHandle *read_handle; 227 struct GNUNET_STREAM_IOReadHandle *read_handle;
228 228
229 /** 229 /**
230 * Buffer for storing received messages 230 * Buffer for storing received messages
@@ -232,6 +232,11 @@ struct GNUNET_STREAM_Socket
232 void *receive_buffer; 232 void *receive_buffer;
233 233
234 /** 234 /**
235 * Copy buffer pointer; Used during read operations
236 */
237 void *copy_buffer;
238
239 /**
235 * The state of the protocol associated with this socket 240 * The state of the protocol associated with this socket
236 */ 241 */
237 enum State state; 242 enum State state;
@@ -269,6 +274,11 @@ struct GNUNET_STREAM_Socket
269 uint32_t receive_buffer_size; 274 uint32_t receive_buffer_size;
270 275
271 /** 276 /**
277 * The receiver buffer boundaries
278 */
279 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
280
281 /**
272 * receiver's available buffer after the last acknowledged packet 282 * receiver's available buffer after the last acknowledged packet
273 */ 283 */
274 uint32_t receive_window_available; 284 uint32_t receive_window_available;
@@ -282,6 +292,16 @@ struct GNUNET_STREAM_Socket
282 * The offset after which we are expecting data 292 * The offset after which we are expecting data
283 */ 293 */
284 uint32_t read_offset; 294 uint32_t read_offset;
295
296 /**
297 * The size of the copy buffer
298 */
299 uint32_t copy_buffer_size;
300
301 /**
302 * The read offset of copy buffer
303 */
304 uint32_t copy_buffer_read_offset;
285}; 305};
286 306
287 307
@@ -314,9 +334,9 @@ struct GNUNET_STREAM_ListenSocket
314 334
315 335
316/** 336/**
317 * The IO Handle 337 * The IO Write Handle
318 */ 338 */
319struct GNUNET_STREAM_IOHandle 339struct GNUNET_STREAM_IOWriteHandle
320{ 340{
321 /** 341 /**
322 * The packet_buffers associated with this Handle 342 * The packet_buffers associated with this Handle
@@ -330,11 +350,6 @@ struct GNUNET_STREAM_IOHandle
330 GNUNET_STREAM_AckBitmap ack_bitmap; 350 GNUNET_STREAM_AckBitmap ack_bitmap;
331 351
332 /** 352 /**
333 * receiver's available buffer
334 */
335 uint32_t receive_window_available;
336
337 /**
338 * Number of packets sent before waiting for an ack 353 * Number of packets sent before waiting for an ack
339 * 354 *
340 * FIXME: Do we need this? 355 * FIXME: Do we need this?
@@ -344,6 +359,23 @@ struct GNUNET_STREAM_IOHandle
344 359
345 360
346/** 361/**
362 * The IO Read Handle
363 */
364struct GNUNET_STREAM_IOReadHandle
365{
366 /**
367 * Callback for the read processor
368 */
369 GNUNET_STREAM_DataProcessor proc;
370
371 /**
372 * The closure pointer for the read processor callback
373 */
374 void *proc_cls;
375};
376
377
378/**
347 * Default value in seconds for various timeouts 379 * Default value in seconds for various timeouts
348 */ 380 */
349static unsigned int default_timeout = 300; 381static unsigned int default_timeout = 300;
@@ -511,7 +543,8 @@ ack_task (void *cls,
511 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); 543 ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
512 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); 544 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
513 ack_msg->base_sequence_number = htonl (socket->read_sequence_number); 545 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
514 ack_msg->receive_window_remaining = htonl (socket->receive_window_available); 546 ack_msg->receive_window_remaining =
547 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
515 548
516 /* Request MESH for sending ACK */ 549 /* Request MESH for sending ACK */
517 GNUNET_MESH_notify_transmit_ready (socket->tunnel, 550 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
@@ -574,7 +607,7 @@ static void
574write_data_finish_cb (void *cls, 607write_data_finish_cb (void *cls,
575 struct GNUNET_STREAM_Socket *socket) 608 struct GNUNET_STREAM_Socket *socket)
576{ 609{
577 struct GNUNET_STREAM_IOHandle *io_handle = cls; 610 struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
578 611
579 io_handle->sent_packets++; 612 io_handle->sent_packets++;
580} 613}
@@ -589,7 +622,7 @@ write_data_finish_cb (void *cls,
589static void 622static void
590write_data (struct GNUNET_STREAM_Socket *socket) 623write_data (struct GNUNET_STREAM_Socket *socket)
591{ 624{
592 struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle; 625 struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
593 unsigned int packet; 626 unsigned int packet;
594 int ack_packet; 627 int ack_packet;
595 628
@@ -618,9 +651,9 @@ write_data (struct GNUNET_STREAM_Socket *socket)
618 packet = ack_packet + 1; 651 packet = ack_packet + 1;
619 /* Now send new packets if there is enough buffer space */ 652 /* Now send new packets if there is enough buffer space */
620 while ( (NULL != io_handle->messages[packet]) && 653 while ( (NULL != io_handle->messages[packet]) &&
621 (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) 654 (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
622 { 655 {
623 io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); 656 socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
624 queue_message (socket, 657 queue_message (socket,
625 &io_handle->messages[packet]->header, 658 &io_handle->messages[packet]->header,
626 &write_data_finish_cb, 659 &write_data_finish_cb,
@@ -651,6 +684,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
651 const void *payload; 684 const void *payload;
652 uint32_t bytes_needed; 685 uint32_t bytes_needed;
653 uint32_t relative_offset; 686 uint32_t relative_offset;
687 uint32_t relative_sequence_number;
654 uint16_t size; 688 uint16_t size;
655 689
656 size = htons (msg->header.header.size); 690 size = htons (msg->header.header.size);
@@ -666,9 +700,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
666 case STATE_TRANSMIT_CLOSED: 700 case STATE_TRANSMIT_CLOSED:
667 case STATE_TRANSMIT_CLOSE_WAIT: 701 case STATE_TRANSMIT_CLOSE_WAIT:
668 702
669 /* check if the message's sequence number is greater than the one we are 703 /* check if the message's sequence number is in the range we are
670 expecting */ 704 expecting */
671 if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64) 705 relative_sequence_number =
706 ntohl (msg->sequence_number) - socket->read_sequence_number;
707 if ( relative_sequence_number > 64)
672 { 708 {
673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 709 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674 "Ignoring received message with sequence number %d", 710 "Ignoring received message with sequence number %d",
@@ -688,8 +724,6 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
688 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, 724 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
689 bytes_needed); 725 bytes_needed);
690 socket->receive_buffer_size = bytes_needed; 726 socket->receive_buffer_size = bytes_needed;
691 socket->receive_window_available =
692 RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
693 } 727 }
694 else 728 else
695 { 729 {
@@ -700,16 +734,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
700 } 734 }
701 } 735 }
702 736
703 /* Copy Data to buffer and send acknowledgement for this packet */ 737 /* Copy Data to buffer */
704 payload = &msg[1]; 738 payload = &msg[1];
739 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
705 memcpy (socket->receive_buffer + relative_offset, 740 memcpy (socket->receive_buffer + relative_offset,
706 payload, 741 payload,
707 size); 742 size);
743 socket->receive_buffer_boundaries[relative_sequence_number] =
744 relative_offset + size;
708 745
709 /* Modify the ACK bitmap */ 746 /* Modify the ACK bitmap */
710 ackbitmap_modify_bit (&socket->ack_bitmap, 747 ackbitmap_modify_bit (&socket->ack_bitmap,
711 ntohl (msg->sequence_number) - 748 relative_sequence_number,
712 socket->read_sequence_number,
713 GNUNET_YES); 749 GNUNET_YES);
714 750
715 /* Start ACK sending task if one is not already present */ 751 /* Start ACK sending task if one is not already present */
@@ -1427,7 +1463,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
1427 } 1463 }
1428 1464
1429 socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); 1465 socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
1430 socket->write_handle->receive_window_available = 1466 socket->receive_window_available =
1431 ntohl (ack->receive_window_remaining); 1467 ntohl (ack->receive_window_remaining);
1432 write_data (socket); 1468 write_data (socket);
1433 break; 1469 break;
@@ -1617,6 +1653,53 @@ mesh_peer_disconnect_callback (void *cls,
1617} 1653}
1618 1654
1619 1655
1656/**
1657 * Task for calling the read processor
1658 *
1659 * @param cls the socket
1660 */
1661static void
1662call_read_processor_task (void *cls,
1663 const struct GNUNET_SCHEDULER_TaskContext *tc)
1664{
1665 struct GNUNET_STREAM_Socket *socket = cls;
1666 size_t read_size;
1667 size_t valid_read_size;
1668
1669 if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
1670
1671 GNUNET_assert (NULL != socket->read_handle);
1672 GNUNET_assert (NULL != socket->read_handle->proc);
1673 GNUNET_assert (NULL != socket->copy_buffer);
1674 GNUNET_assert (0 != socket->copy_buffer_size);
1675
1676 valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
1677 GNUNET_assert (0 != valid_read_size);
1678
1679 read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
1680 socket->status,
1681 socket->copy_buffer
1682 + socket->copy_buffer_read_offset,
1683 valid_read_size);
1684
1685 GNUNET_assert (read_size <= valid_read_size);
1686 socket->copy_buffer_read_offset += read_size;
1687
1688 /* Free the copy buffer once it has been read entirely */
1689 if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
1690 {
1691 GNUNET_free (socket->copy_buffer);
1692 socket->copy_buffer = NULL;
1693 socket->copy_buffer_size = 0;
1694 socket->copy_buffer_read_offset = 0;
1695 }
1696
1697 /* Free the read handle */
1698 GNUNET_free (socket->read_handle);
1699 socket->read_handle = NULL;
1700}
1701
1702
1620/*****************/ 1703/*****************/
1621/* API functions */ 1704/* API functions */
1622/*****************/ 1705/*****************/
@@ -1878,7 +1961,7 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
1878 * @param write_cont_cls the closure 1961 * @param write_cont_cls the closure
1879 * @return handle to cancel the operation 1962 * @return handle to cancel the operation
1880 */ 1963 */
1881struct GNUNET_STREAM_IOHandle * 1964struct GNUNET_STREAM_IOWriteHandle *
1882GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, 1965GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1883 const void *data, 1966 const void *data,
1884 size_t size, 1967 size_t size,
@@ -1888,7 +1971,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1888{ 1971{
1889 unsigned int num_needed_packets; 1972 unsigned int num_needed_packets;
1890 unsigned int packet; 1973 unsigned int packet;
1891 struct GNUNET_STREAM_IOHandle *io_handle; 1974 struct GNUNET_STREAM_IOWriteHandle *io_handle;
1892 uint32_t packet_size; 1975 uint32_t packet_size;
1893 uint32_t payload_size; 1976 uint32_t payload_size;
1894 struct GNUNET_STREAM_DataMessage *data_msg; 1977 struct GNUNET_STREAM_DataMessage *data_msg;
@@ -1912,8 +1995,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1912 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) 1995 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
1913 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; 1996 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
1914 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; 1997 num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
1915 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); 1998 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
1916 io_handle->receive_window_available = socket->receive_window_available;
1917 sweep = data; 1999 sweep = data;
1918 /* Divide the given buffer into packets for sending */ 2000 /* Divide the given buffer into packets for sending */
1919 for (packet=0; packet < num_needed_packets; packet++) 2001 for (packet=0; packet < num_needed_packets; packet++)
@@ -1966,22 +2048,59 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1966 * @param proc_cls the closure for proc 2048 * @param proc_cls the closure for proc
1967 * @return handle to cancel the operation 2049 * @return handle to cancel the operation
1968 */ 2050 */
1969struct GNUNET_STREAM_IOHandle * 2051struct GNUNET_STREAM_IOReadHandle *
1970GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket, 2052GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
1971 struct GNUNET_TIME_Relative timeout, 2053 struct GNUNET_TIME_Relative timeout,
1972 GNUNET_STREAM_DataProcessor proc, 2054 GNUNET_STREAM_DataProcessor proc,
1973 void *proc_cls) 2055 void *proc_cls)
1974{ 2056{
2057 unsigned int packet;
2058 struct GNUNET_STREAM_IOReadHandle *read_handle;
2059
2060 /* Return NULL if there is already a read handle; the user has to cancel that
2061 first before continuing or has to wait until it is completed */
2062 if (NULL != socket->read_handle) return NULL;
2063
2064 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
2065 read_handle->proc = proc;
2066 socket->read_handle = read_handle;
2067
2068 /* if previous copy buffer is still not read call the data processor on it */
2069 if (NULL != socket->copy_buffer)
2070 {
2071 GNUNET_SCHEDULER_add_now (&call_read_processor_task,
2072 socket);
2073 }
2074
1975 /* Check the bitmap for any holes */ 2075 /* Check the bitmap for any holes */
2076 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2077 {
2078 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
2079 packet))
2080 break;
2081 }
1976 2082
1977 /* Deem the data from the starting of the bitmap upto a hole as available 2083 if (0 == packet) /* The first packet is still missing */
1978 data */ 2084 {
2085 /* We can't do anything until it arrives */
2086 }
2087 else
2088 {
2089 /* Copy data to copy buffer */
2090 socket->copy_buffer =
2091 GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]);
2092
2093 /* Shift the bitmap */
2094 socket->ack_bitmap << packet;
1979 2095
1980 /* Create an IO handle */ 2096 /* Set read_sequence_number */
2097 socket->read_sequence_number += packet;
1981 2098
1982 /* Call the Data processor with this available data */ 2099 /* Set read_offset */
1983 2100 socket->read_offset += packet;
1984 /* Update the read_sequence_number to the first hole in the bitmap */ 2101
2102 /* FIXME: Fix relative calucations in receive buffer management */
2103 }
1985 2104
1986 /* Shift the bitmap so that the first hole is now at the start */ 2105 return read_handle;
1987} 2106}