diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-02-26 22:32:23 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-02-26 22:32:23 +0000 |
commit | b231b33e0a4da703c962a9a3e02ca684d27dcc75 (patch) | |
tree | 279da0b7c5f15cacea0850c4e1d71e30baa766c7 /src | |
parent | 9cd78fb24282746e988cf768430082ca2e17a82c (diff) | |
download | gnunet-b231b33e0a4da703c962a9a3e02ca684d27dcc75.tar.gz gnunet-b231b33e0a4da703c962a9a3e02ca684d27dcc75.zip |
-copy buffer and STREAM_read(incomplete)
Diffstat (limited to 'src')
-rw-r--r-- | src/stream/stream_api.c | 189 |
1 files changed, 154 insertions, 35 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 0a8782c61..41bae2da5 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -219,12 +219,12 @@ struct GNUNET_STREAM_Socket | |||
219 | /** | 219 | /** |
220 | * The write IO_handle associated with this socket | 220 | * The write IO_handle associated with this socket |
221 | */ | 221 | */ |
222 | struct GNUNET_STREAM_IOHandle *write_handle; | 222 | struct GNUNET_STREAM_IOWriteHandle *write_handle; |
223 | 223 | ||
224 | /** | 224 | /** |
225 | * The read IO_handle associated with this socket | 225 | * The read IO_handle associated with this socket |
226 | */ | 226 | */ |
227 | struct GNUNET_STREAM_IOHandle *read_handle; | 227 | struct GNUNET_STREAM_IOReadHandle *read_handle; |
228 | 228 | ||
229 | /** | 229 | /** |
230 | * Buffer for storing received messages | 230 | * Buffer for storing received messages |
@@ -232,6 +232,11 @@ struct GNUNET_STREAM_Socket | |||
232 | void *receive_buffer; | 232 | void *receive_buffer; |
233 | 233 | ||
234 | /** | 234 | /** |
235 | * Copy buffer pointer; Used during read operations | ||
236 | */ | ||
237 | void *copy_buffer; | ||
238 | |||
239 | /** | ||
235 | * The state of the protocol associated with this socket | 240 | * The state of the protocol associated with this socket |
236 | */ | 241 | */ |
237 | enum State state; | 242 | enum State state; |
@@ -269,6 +274,11 @@ struct GNUNET_STREAM_Socket | |||
269 | uint32_t receive_buffer_size; | 274 | uint32_t receive_buffer_size; |
270 | 275 | ||
271 | /** | 276 | /** |
277 | * The receiver buffer boundaries | ||
278 | */ | ||
279 | uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; | ||
280 | |||
281 | /** | ||
272 | * receiver's available buffer after the last acknowledged packet | 282 | * receiver's available buffer after the last acknowledged packet |
273 | */ | 283 | */ |
274 | uint32_t receive_window_available; | 284 | uint32_t receive_window_available; |
@@ -282,6 +292,16 @@ struct GNUNET_STREAM_Socket | |||
282 | * The offset after which we are expecting data | 292 | * The offset after which we are expecting data |
283 | */ | 293 | */ |
284 | uint32_t read_offset; | 294 | uint32_t read_offset; |
295 | |||
296 | /** | ||
297 | * The size of the copy buffer | ||
298 | */ | ||
299 | uint32_t copy_buffer_size; | ||
300 | |||
301 | /** | ||
302 | * The read offset of copy buffer | ||
303 | */ | ||
304 | uint32_t copy_buffer_read_offset; | ||
285 | }; | 305 | }; |
286 | 306 | ||
287 | 307 | ||
@@ -314,9 +334,9 @@ struct GNUNET_STREAM_ListenSocket | |||
314 | 334 | ||
315 | 335 | ||
316 | /** | 336 | /** |
317 | * The IO Handle | 337 | * The IO Write Handle |
318 | */ | 338 | */ |
319 | struct GNUNET_STREAM_IOHandle | 339 | struct GNUNET_STREAM_IOWriteHandle |
320 | { | 340 | { |
321 | /** | 341 | /** |
322 | * The packet_buffers associated with this Handle | 342 | * The packet_buffers associated with this Handle |
@@ -330,11 +350,6 @@ struct GNUNET_STREAM_IOHandle | |||
330 | GNUNET_STREAM_AckBitmap ack_bitmap; | 350 | GNUNET_STREAM_AckBitmap ack_bitmap; |
331 | 351 | ||
332 | /** | 352 | /** |
333 | * receiver's available buffer | ||
334 | */ | ||
335 | uint32_t receive_window_available; | ||
336 | |||
337 | /** | ||
338 | * Number of packets sent before waiting for an ack | 353 | * Number of packets sent before waiting for an ack |
339 | * | 354 | * |
340 | * FIXME: Do we need this? | 355 | * FIXME: Do we need this? |
@@ -344,6 +359,23 @@ struct GNUNET_STREAM_IOHandle | |||
344 | 359 | ||
345 | 360 | ||
346 | /** | 361 | /** |
362 | * The IO Read Handle | ||
363 | */ | ||
364 | struct GNUNET_STREAM_IOReadHandle | ||
365 | { | ||
366 | /** | ||
367 | * Callback for the read processor | ||
368 | */ | ||
369 | GNUNET_STREAM_DataProcessor proc; | ||
370 | |||
371 | /** | ||
372 | * The closure pointer for the read processor callback | ||
373 | */ | ||
374 | void *proc_cls; | ||
375 | }; | ||
376 | |||
377 | |||
378 | /** | ||
347 | * Default value in seconds for various timeouts | 379 | * Default value in seconds for various timeouts |
348 | */ | 380 | */ |
349 | static unsigned int default_timeout = 300; | 381 | static unsigned int default_timeout = 300; |
@@ -511,7 +543,8 @@ ack_task (void *cls, | |||
511 | ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); | 543 | ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); |
512 | ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); | 544 | ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); |
513 | ack_msg->base_sequence_number = htonl (socket->read_sequence_number); | 545 | ack_msg->base_sequence_number = htonl (socket->read_sequence_number); |
514 | ack_msg->receive_window_remaining = htonl (socket->receive_window_available); | 546 | ack_msg->receive_window_remaining = |
547 | htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); | ||
515 | 548 | ||
516 | /* Request MESH for sending ACK */ | 549 | /* Request MESH for sending ACK */ |
517 | GNUNET_MESH_notify_transmit_ready (socket->tunnel, | 550 | GNUNET_MESH_notify_transmit_ready (socket->tunnel, |
@@ -574,7 +607,7 @@ static void | |||
574 | write_data_finish_cb (void *cls, | 607 | write_data_finish_cb (void *cls, |
575 | struct GNUNET_STREAM_Socket *socket) | 608 | struct GNUNET_STREAM_Socket *socket) |
576 | { | 609 | { |
577 | struct GNUNET_STREAM_IOHandle *io_handle = cls; | 610 | struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; |
578 | 611 | ||
579 | io_handle->sent_packets++; | 612 | io_handle->sent_packets++; |
580 | } | 613 | } |
@@ -589,7 +622,7 @@ write_data_finish_cb (void *cls, | |||
589 | static void | 622 | static void |
590 | write_data (struct GNUNET_STREAM_Socket *socket) | 623 | write_data (struct GNUNET_STREAM_Socket *socket) |
591 | { | 624 | { |
592 | struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle; | 625 | struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle; |
593 | unsigned int packet; | 626 | unsigned int packet; |
594 | int ack_packet; | 627 | int ack_packet; |
595 | 628 | ||
@@ -618,9 +651,9 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
618 | packet = ack_packet + 1; | 651 | packet = ack_packet + 1; |
619 | /* Now send new packets if there is enough buffer space */ | 652 | /* Now send new packets if there is enough buffer space */ |
620 | while ( (NULL != io_handle->messages[packet]) && | 653 | while ( (NULL != io_handle->messages[packet]) && |
621 | (io_handle->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) | 654 | (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) |
622 | { | 655 | { |
623 | io_handle->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); | 656 | socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); |
624 | queue_message (socket, | 657 | queue_message (socket, |
625 | &io_handle->messages[packet]->header, | 658 | &io_handle->messages[packet]->header, |
626 | &write_data_finish_cb, | 659 | &write_data_finish_cb, |
@@ -651,6 +684,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
651 | const void *payload; | 684 | const void *payload; |
652 | uint32_t bytes_needed; | 685 | uint32_t bytes_needed; |
653 | uint32_t relative_offset; | 686 | uint32_t relative_offset; |
687 | uint32_t relative_sequence_number; | ||
654 | uint16_t size; | 688 | uint16_t size; |
655 | 689 | ||
656 | size = htons (msg->header.header.size); | 690 | size = htons (msg->header.header.size); |
@@ -666,9 +700,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
666 | case STATE_TRANSMIT_CLOSED: | 700 | case STATE_TRANSMIT_CLOSED: |
667 | case STATE_TRANSMIT_CLOSE_WAIT: | 701 | case STATE_TRANSMIT_CLOSE_WAIT: |
668 | 702 | ||
669 | /* check if the message's sequence number is greater than the one we are | 703 | /* check if the message's sequence number is in the range we are |
670 | expecting */ | 704 | expecting */ |
671 | if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64) | 705 | relative_sequence_number = |
706 | ntohl (msg->sequence_number) - socket->read_sequence_number; | ||
707 | if ( relative_sequence_number > 64) | ||
672 | { | 708 | { |
673 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 709 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
674 | "Ignoring received message with sequence number %d", | 710 | "Ignoring received message with sequence number %d", |
@@ -688,8 +724,6 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
688 | socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, | 724 | socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, |
689 | bytes_needed); | 725 | bytes_needed); |
690 | socket->receive_buffer_size = bytes_needed; | 726 | socket->receive_buffer_size = bytes_needed; |
691 | socket->receive_window_available = | ||
692 | RECEIVE_BUFFER_SIZE - socket->receive_buffer_size; | ||
693 | } | 727 | } |
694 | else | 728 | else |
695 | { | 729 | { |
@@ -700,16 +734,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
700 | } | 734 | } |
701 | } | 735 | } |
702 | 736 | ||
703 | /* Copy Data to buffer and send acknowledgement for this packet */ | 737 | /* Copy Data to buffer */ |
704 | payload = &msg[1]; | 738 | payload = &msg[1]; |
739 | GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); | ||
705 | memcpy (socket->receive_buffer + relative_offset, | 740 | memcpy (socket->receive_buffer + relative_offset, |
706 | payload, | 741 | payload, |
707 | size); | 742 | size); |
743 | socket->receive_buffer_boundaries[relative_sequence_number] = | ||
744 | relative_offset + size; | ||
708 | 745 | ||
709 | /* Modify the ACK bitmap */ | 746 | /* Modify the ACK bitmap */ |
710 | ackbitmap_modify_bit (&socket->ack_bitmap, | 747 | ackbitmap_modify_bit (&socket->ack_bitmap, |
711 | ntohl (msg->sequence_number) - | 748 | relative_sequence_number, |
712 | socket->read_sequence_number, | ||
713 | GNUNET_YES); | 749 | GNUNET_YES); |
714 | 750 | ||
715 | /* Start ACK sending task if one is not already present */ | 751 | /* Start ACK sending task if one is not already present */ |
@@ -1427,7 +1463,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, | |||
1427 | } | 1463 | } |
1428 | 1464 | ||
1429 | socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); | 1465 | socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); |
1430 | socket->write_handle->receive_window_available = | 1466 | socket->receive_window_available = |
1431 | ntohl (ack->receive_window_remaining); | 1467 | ntohl (ack->receive_window_remaining); |
1432 | write_data (socket); | 1468 | write_data (socket); |
1433 | break; | 1469 | break; |
@@ -1617,6 +1653,53 @@ mesh_peer_disconnect_callback (void *cls, | |||
1617 | } | 1653 | } |
1618 | 1654 | ||
1619 | 1655 | ||
1656 | /** | ||
1657 | * Task for calling the read processor | ||
1658 | * | ||
1659 | * @param cls the socket | ||
1660 | */ | ||
1661 | static void | ||
1662 | call_read_processor_task (void *cls, | ||
1663 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1664 | { | ||
1665 | struct GNUNET_STREAM_Socket *socket = cls; | ||
1666 | size_t read_size; | ||
1667 | size_t valid_read_size; | ||
1668 | |||
1669 | if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; | ||
1670 | |||
1671 | GNUNET_assert (NULL != socket->read_handle); | ||
1672 | GNUNET_assert (NULL != socket->read_handle->proc); | ||
1673 | GNUNET_assert (NULL != socket->copy_buffer); | ||
1674 | GNUNET_assert (0 != socket->copy_buffer_size); | ||
1675 | |||
1676 | valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset; | ||
1677 | GNUNET_assert (0 != valid_read_size); | ||
1678 | |||
1679 | read_size = socket->read_handle->proc (socket->read_handle->proc_cls, | ||
1680 | socket->status, | ||
1681 | socket->copy_buffer | ||
1682 | + socket->copy_buffer_read_offset, | ||
1683 | valid_read_size); | ||
1684 | |||
1685 | GNUNET_assert (read_size <= valid_read_size); | ||
1686 | socket->copy_buffer_read_offset += read_size; | ||
1687 | |||
1688 | /* Free the copy buffer once it has been read entirely */ | ||
1689 | if (socket->copy_buffer_read_offset == socket->copy_buffer_size) | ||
1690 | { | ||
1691 | GNUNET_free (socket->copy_buffer); | ||
1692 | socket->copy_buffer = NULL; | ||
1693 | socket->copy_buffer_size = 0; | ||
1694 | socket->copy_buffer_read_offset = 0; | ||
1695 | } | ||
1696 | |||
1697 | /* Free the read handle */ | ||
1698 | GNUNET_free (socket->read_handle); | ||
1699 | socket->read_handle = NULL; | ||
1700 | } | ||
1701 | |||
1702 | |||
1620 | /*****************/ | 1703 | /*****************/ |
1621 | /* API functions */ | 1704 | /* API functions */ |
1622 | /*****************/ | 1705 | /*****************/ |
@@ -1878,7 +1961,7 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) | |||
1878 | * @param write_cont_cls the closure | 1961 | * @param write_cont_cls the closure |
1879 | * @return handle to cancel the operation | 1962 | * @return handle to cancel the operation |
1880 | */ | 1963 | */ |
1881 | struct GNUNET_STREAM_IOHandle * | 1964 | struct GNUNET_STREAM_IOWriteHandle * |
1882 | GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | 1965 | GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, |
1883 | const void *data, | 1966 | const void *data, |
1884 | size_t size, | 1967 | size_t size, |
@@ -1888,7 +1971,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1888 | { | 1971 | { |
1889 | unsigned int num_needed_packets; | 1972 | unsigned int num_needed_packets; |
1890 | unsigned int packet; | 1973 | unsigned int packet; |
1891 | struct GNUNET_STREAM_IOHandle *io_handle; | 1974 | struct GNUNET_STREAM_IOWriteHandle *io_handle; |
1892 | uint32_t packet_size; | 1975 | uint32_t packet_size; |
1893 | uint32_t payload_size; | 1976 | uint32_t payload_size; |
1894 | struct GNUNET_STREAM_DataMessage *data_msg; | 1977 | struct GNUNET_STREAM_DataMessage *data_msg; |
@@ -1912,8 +1995,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1912 | if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) | 1995 | if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) |
1913 | size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; | 1996 | size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; |
1914 | num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; | 1997 | num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; |
1915 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); | 1998 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); |
1916 | io_handle->receive_window_available = socket->receive_window_available; | ||
1917 | sweep = data; | 1999 | sweep = data; |
1918 | /* Divide the given buffer into packets for sending */ | 2000 | /* Divide the given buffer into packets for sending */ |
1919 | for (packet=0; packet < num_needed_packets; packet++) | 2001 | for (packet=0; packet < num_needed_packets; packet++) |
@@ -1966,22 +2048,59 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1966 | * @param proc_cls the closure for proc | 2048 | * @param proc_cls the closure for proc |
1967 | * @return handle to cancel the operation | 2049 | * @return handle to cancel the operation |
1968 | */ | 2050 | */ |
1969 | struct GNUNET_STREAM_IOHandle * | 2051 | struct GNUNET_STREAM_IOReadHandle * |
1970 | GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket, | 2052 | GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, |
1971 | struct GNUNET_TIME_Relative timeout, | 2053 | struct GNUNET_TIME_Relative timeout, |
1972 | GNUNET_STREAM_DataProcessor proc, | 2054 | GNUNET_STREAM_DataProcessor proc, |
1973 | void *proc_cls) | 2055 | void *proc_cls) |
1974 | { | 2056 | { |
2057 | unsigned int packet; | ||
2058 | struct GNUNET_STREAM_IOReadHandle *read_handle; | ||
2059 | |||
2060 | /* Return NULL if there is already a read handle; the user has to cancel that | ||
2061 | first before continuing or has to wait until it is completed */ | ||
2062 | if (NULL != socket->read_handle) return NULL; | ||
2063 | |||
2064 | read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); | ||
2065 | read_handle->proc = proc; | ||
2066 | socket->read_handle = read_handle; | ||
2067 | |||
2068 | /* if previous copy buffer is still not read call the data processor on it */ | ||
2069 | if (NULL != socket->copy_buffer) | ||
2070 | { | ||
2071 | GNUNET_SCHEDULER_add_now (&call_read_processor_task, | ||
2072 | socket); | ||
2073 | } | ||
2074 | |||
1975 | /* Check the bitmap for any holes */ | 2075 | /* Check the bitmap for any holes */ |
2076 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) | ||
2077 | { | ||
2078 | if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, | ||
2079 | packet)) | ||
2080 | break; | ||
2081 | } | ||
1976 | 2082 | ||
1977 | /* Deem the data from the starting of the bitmap upto a hole as available | 2083 | if (0 == packet) /* The first packet is still missing */ |
1978 | data */ | 2084 | { |
2085 | /* We can't do anything until it arrives */ | ||
2086 | } | ||
2087 | else | ||
2088 | { | ||
2089 | /* Copy data to copy buffer */ | ||
2090 | socket->copy_buffer = | ||
2091 | GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]); | ||
2092 | |||
2093 | /* Shift the bitmap */ | ||
2094 | socket->ack_bitmap << packet; | ||
1979 | 2095 | ||
1980 | /* Create an IO handle */ | 2096 | /* Set read_sequence_number */ |
2097 | socket->read_sequence_number += packet; | ||
1981 | 2098 | ||
1982 | /* Call the Data processor with this available data */ | 2099 | /* Set read_offset */ |
1983 | 2100 | socket->read_offset += packet; | |
1984 | /* Update the read_sequence_number to the first hole in the bitmap */ | 2101 | |
2102 | /* FIXME: Fix relative calucations in receive buffer management */ | ||
2103 | } | ||
1985 | 2104 | ||
1986 | /* Shift the bitmap so that the first hole is now at the start */ | 2105 | return read_handle; |
1987 | } | 2106 | } |