aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream_api.c
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-02-12 12:28:31 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-02-12 12:28:31 +0000
commit954517d8f65fa3a228ee66ecf6f7fefbf1967d81 (patch)
tree5e47de0f53811b38b149195ccaf22aadedddfbdb /src/stream/stream_api.c
parentcd896065dc6b0b61913a5af9054865f040afc885 (diff)
downloadgnunet-954517d8f65fa3a228ee66ecf6f7fefbf1967d81.tar.gz
gnunet-954517d8f65fa3a228ee66ecf6f7fefbf1967d81.zip
-added write operation
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r--src/stream/stream_api.c140
1 files changed, 129 insertions, 11 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index faceac297..c25cb6c9b 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -30,8 +30,21 @@
30#include "stream_protocol.h" 30#include "stream_protocol.h"
31 31
32 32
33/**
34 * The maximum packet size of a stream packet
35 */
33#define MAX_PACKET_SIZE 64000 36#define MAX_PACKET_SIZE 64000
34 37
38/**
39 * The maximum payload a data message packet can carry
40 */
41static size_t max_payload_size =
42 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
43
44/**
45 * Receive buffer
46 */
47#define RECEIVE_BUFFER_SIZE 4096000
35 48
36/** 49/**
37 * states in the Protocol 50 * states in the Protocol
@@ -221,6 +234,11 @@ struct GNUNET_STREAM_Socket
221 * Read sequence number. This number's value is determined during handshake 234 * Read sequence number. This number's value is determined during handshake
222 */ 235 */
223 uint32_t read_sequence_number; 236 uint32_t read_sequence_number;
237
238 /**
239 * receiver's available buffer
240 */
241 uint32_t receive_window_available;
224}; 242};
225 243
226 244
@@ -266,7 +284,19 @@ struct GNUNET_STREAM_IOHandle
266 * The bitmap of this IOHandle; Corresponding bit for a message is set when 284 * The bitmap of this IOHandle; Corresponding bit for a message is set when
267 * it has been acknowledged by the receiver 285 * it has been acknowledged by the receiver
268 */ 286 */
269 GNUNET_STREAM_AckBitmap bitmap; 287 GNUNET_STREAM_AckBitmap ack_bitmap;
288
289 /**
290 * receiver's available buffer
291 */
292 uint32_t receive_window_available;
293
294 /**
295 * Number of packets sent before waiting for an ack
296 *
297 * FIXME: Do we need this?
298 */
299 unsigned int sent_packets;
270}; 300};
271 301
272 302
@@ -392,7 +422,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
392 * @param value GNUNET_YES to on, GNUNET_NO to off 422 * @param value GNUNET_YES to on, GNUNET_NO to off
393 */ 423 */
394static void 424static void
395AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, 425ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
396 unsigned int bit, 426 unsigned int bit,
397 int value) 427 int value)
398{ 428{
@@ -411,7 +441,7 @@ AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
411 * @return GNUNET_YES if the bit is set; GNUNET_NO if not 441 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
412 */ 442 */
413static uint8_t 443static uint8_t
414AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, 444ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
415 unsigned int bit) 445 unsigned int bit)
416{ 446{
417 GNUNET_assert (bit < 64); 447 GNUNET_assert (bit < 64);
@@ -419,6 +449,71 @@ AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
419} 449}
420 450
421 451
452
453/**
454 * Function called when Data Message is sent
455 *
456 * @param cls the io_handle corresponding to the Data Message
457 * @param socket the socket which was used
458 */
459static void
460write_data_finish_cb (void *cls,
461 struct GNUNET_STREAM_Socket *socket)
462{
463 struct GNUNET_STREAM_IOHandle *io_handle = cls;
464
465 io_handle->sent_packets++;
466}
467
468
469/**
470 * Writes data using the given socket. The amount of data written is limited by
471 * the receive_window_size
472 *
473 * @param socket the socket to use
474 */
475static void
476write_data (struct GNUNET_STREAM_Socket *socket)
477{
478 struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
479 unsigned int packet;
480 int ack_packet;
481
482 ack_packet = -1;
483 /* Find the last acknowledged packet */
484 for (packet=0; packet < 64; packet++)
485 {
486 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
487 packet))
488 {
489 ack_packet = packet;
490 }
491 }
492 /* Resend packets which weren't ack'ed */
493 for (packet=0; packet < ack_packet; packet++)
494 {
495 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
496 packet))
497 {
498 queue_message (socket,
499 &io_handle->messages[packet]->header,
500 NULL,
501 NULL);
502 }
503 }
504 packet = ack_packet + 1;
505 /* Now send new packets if there is enough buffer space */
506 while (io_handle->receive_window_available -=
507 io_handle->messages[packet]->header.header.size > 0)
508 {
509 queue_message (socket,
510 &io_handle->messages[packet]->header,
511 &write_data_finish_cb,
512 io_handle);
513 }
514}
515
516
422/** 517/**
423 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA 518 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
424 * 519 *
@@ -520,6 +615,7 @@ client_handle_hello_ack (void *cls,
520 { 615 {
521 case STATE_HELLO_WAIT: 616 case STATE_HELLO_WAIT:
522 socket->read_sequence_number = ntohl (ack_msg->sequence_number); 617 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
618 socket->receive_window_available = ntohl (ack_msg->receive_window_size);
523 /* Get the random sequence number */ 619 /* Get the random sequence number */
524 socket->write_sequence_number = 620 socket->write_sequence_number =
525 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); 621 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
@@ -530,6 +626,7 @@ client_handle_hello_ack (void *cls,
530 reply->header.header.type = 626 reply->header.header.type =
531 htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); 627 htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
532 reply->sequence_number = htonl (socket->write_sequence_number); 628 reply->sequence_number = htonl (socket->write_sequence_number);
629 reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
533 queue_message (socket, 630 queue_message (socket,
534 &reply->header, 631 &reply->header,
535 &set_state_established, 632 &set_state_established,
@@ -840,7 +937,9 @@ server_handle_hello_ack (void *cls,
840 GNUNET_assert (socket->tunnel == tunnel); 937 GNUNET_assert (socket->tunnel == tunnel);
841 if (STATE_HELLO_WAIT == socket->state) 938 if (STATE_HELLO_WAIT == socket->state)
842 { 939 {
843 socket->read_sequence_number = ntohs (ack_message->sequence_number); 940 socket->read_sequence_number = ntohl (ack_message->sequence_number);
941 socket->receive_window_available =
942 ntohl (ack_message->receive_window_size);
844 socket->state = STATE_ESTABLISHED; 943 socket->state = STATE_ESTABLISHED;
845 } 944 }
846 else 945 else
@@ -1039,11 +1138,10 @@ server_handle_close_ack (void *cls,
1039/** 1138/**
1040 * Message Handler for mesh 1139 * Message Handler for mesh
1041 * 1140 *
1042 * @param cls closure (set from GNUNET_MESH_connect) 1141 * @param socket the socket through which the ack was received
1043 * @param tunnel connection to the other end 1142 * @param tunnel connection to the other end
1044 * @param tunnel_ctx place to store local state associated with the tunnel
1045 * @param sender who sent the message 1143 * @param sender who sent the message
1046 * @param ack the actual message 1144 * @param ack the acknowledgment message
1047 * @param atsi performance data for the connection 1145 * @param atsi performance data for the connection
1048 * @return GNUNET_OK to keep the connection open, 1146 * @return GNUNET_OK to keep the connection open,
1049 * GNUNET_SYSERR to close it (signal serious error) 1147 * GNUNET_SYSERR to close it (signal serious error)
@@ -1055,6 +1153,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
1055 const struct GNUNET_STREAM_AckMessage *ack, 1153 const struct GNUNET_STREAM_AckMessage *ack,
1056 const struct GNUNET_ATS_Information*atsi) 1154 const struct GNUNET_ATS_Information*atsi)
1057{ 1155{
1156 switch (socket->state)
1157 {
1158 case (STATE_ESTABLISHED):
1159 if (NULL == socket->write_handle)
1160 {
1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162 "Received DATA ACK when write_handle is NULL\n");
1163 return GNUNET_OK;
1164 }
1165
1166 socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap);
1167 socket->write_handle->receive_window_available =
1168 ntohl (ack->receive_window_remaining);
1169 write_data (socket);
1170 break;
1171 default:
1172 break;
1173 }
1058 return GNUNET_OK; 1174 return GNUNET_OK;
1059} 1175}
1060 1176
@@ -1502,7 +1618,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1502 unsigned int packet; 1618 unsigned int packet;
1503 struct GNUNET_STREAM_IOHandle *io_handle; 1619 struct GNUNET_STREAM_IOHandle *io_handle;
1504 struct GNUNET_STREAM_DataMessage *data_msg; 1620 struct GNUNET_STREAM_DataMessage *data_msg;
1505 size_t max_payload_size;
1506 size_t packet_size; 1621 size_t packet_size;
1507 const void *sweep; 1622 const void *sweep;
1508 1623
@@ -1517,8 +1632,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1517 return NULL; 1632 return NULL;
1518 } 1633 }
1519 1634
1520 max_payload_size =
1521 MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
1522 num_needed_packets = ceil (size / max_payload_size); 1635 num_needed_packets = ceil (size / max_payload_size);
1523 if (64 < num_needed_packets) 1636 if (64 < num_needed_packets)
1524 { 1637 {
@@ -1528,8 +1641,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1528 } 1641 }
1529 1642
1530 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); 1643 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
1644 io_handle->receive_window_available = socket->receive_window_available;
1531 sweep = data; 1645 sweep = data;
1532 /* Divide the given area into packets for sending */ 1646 /* Divide the given buffer into packets for sending */
1533 for (packet=0; packet < num_needed_packets; packet++) 1647 for (packet=0; packet < num_needed_packets; packet++)
1534 { 1648 {
1535 if ((packet + 1) * max_payload_size < size) 1649 if ((packet + 1) * max_payload_size < size)
@@ -1545,6 +1659,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1545 io_handle->messages[packet]->header.header.size = htons (packet_size); 1659 io_handle->messages[packet]->header.header.size = htons (packet_size);
1546 io_handle->messages[packet]->header.header.type = 1660 io_handle->messages[packet]->header.header.type =
1547 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); 1661 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
1662 io_handle->messages[packet]->sequence_number =
1663 htons (socket->write_sequence_number++);
1548 data_msg = io_handle->messages[packet]; 1664 data_msg = io_handle->messages[packet];
1549 memcpy (&data_msg[1], 1665 memcpy (&data_msg[1],
1550 sweep, 1666 sweep,
@@ -1552,5 +1668,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
1552 sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); 1668 sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
1553 } 1669 }
1554 1670
1671 write_data (socket);
1672
1555 return io_handle; 1673 return io_handle;
1556} 1674}