diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-02-12 12:28:31 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-02-12 12:28:31 +0000 |
commit | 954517d8f65fa3a228ee66ecf6f7fefbf1967d81 (patch) | |
tree | 5e47de0f53811b38b149195ccaf22aadedddfbdb /src/stream/stream_api.c | |
parent | cd896065dc6b0b61913a5af9054865f040afc885 (diff) | |
download | gnunet-954517d8f65fa3a228ee66ecf6f7fefbf1967d81.tar.gz gnunet-954517d8f65fa3a228ee66ecf6f7fefbf1967d81.zip |
-added write operation
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r-- | src/stream/stream_api.c | 140 |
1 files changed, 129 insertions, 11 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index faceac297..c25cb6c9b 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -30,8 +30,21 @@ | |||
30 | #include "stream_protocol.h" | 30 | #include "stream_protocol.h" |
31 | 31 | ||
32 | 32 | ||
33 | /** | ||
34 | * The maximum packet size of a stream packet | ||
35 | */ | ||
33 | #define MAX_PACKET_SIZE 64000 | 36 | #define MAX_PACKET_SIZE 64000 |
34 | 37 | ||
38 | /** | ||
39 | * The maximum payload a data message packet can carry | ||
40 | */ | ||
41 | static size_t max_payload_size = | ||
42 | MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); | ||
43 | |||
44 | /** | ||
45 | * Receive buffer | ||
46 | */ | ||
47 | #define RECEIVE_BUFFER_SIZE 4096000 | ||
35 | 48 | ||
36 | /** | 49 | /** |
37 | * states in the Protocol | 50 | * states in the Protocol |
@@ -221,6 +234,11 @@ struct GNUNET_STREAM_Socket | |||
221 | * Read sequence number. This number's value is determined during handshake | 234 | * Read sequence number. This number's value is determined during handshake |
222 | */ | 235 | */ |
223 | uint32_t read_sequence_number; | 236 | uint32_t read_sequence_number; |
237 | |||
238 | /** | ||
239 | * receiver's available buffer | ||
240 | */ | ||
241 | uint32_t receive_window_available; | ||
224 | }; | 242 | }; |
225 | 243 | ||
226 | 244 | ||
@@ -266,7 +284,19 @@ struct GNUNET_STREAM_IOHandle | |||
266 | * The bitmap of this IOHandle; Corresponding bit for a message is set when | 284 | * The bitmap of this IOHandle; Corresponding bit for a message is set when |
267 | * it has been acknowledged by the receiver | 285 | * it has been acknowledged by the receiver |
268 | */ | 286 | */ |
269 | GNUNET_STREAM_AckBitmap bitmap; | 287 | GNUNET_STREAM_AckBitmap ack_bitmap; |
288 | |||
289 | /** | ||
290 | * receiver's available buffer | ||
291 | */ | ||
292 | uint32_t receive_window_available; | ||
293 | |||
294 | /** | ||
295 | * Number of packets sent before waiting for an ack | ||
296 | * | ||
297 | * FIXME: Do we need this? | ||
298 | */ | ||
299 | unsigned int sent_packets; | ||
270 | }; | 300 | }; |
271 | 301 | ||
272 | 302 | ||
@@ -392,7 +422,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket, | |||
392 | * @param value GNUNET_YES to on, GNUNET_NO to off | 422 | * @param value GNUNET_YES to on, GNUNET_NO to off |
393 | */ | 423 | */ |
394 | static void | 424 | static void |
395 | AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, | 425 | ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, |
396 | unsigned int bit, | 426 | unsigned int bit, |
397 | int value) | 427 | int value) |
398 | { | 428 | { |
@@ -411,7 +441,7 @@ AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, | |||
411 | * @return GNUNET_YES if the bit is set; GNUNET_NO if not | 441 | * @return GNUNET_YES if the bit is set; GNUNET_NO if not |
412 | */ | 442 | */ |
413 | static uint8_t | 443 | static uint8_t |
414 | AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, | 444 | ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, |
415 | unsigned int bit) | 445 | unsigned int bit) |
416 | { | 446 | { |
417 | GNUNET_assert (bit < 64); | 447 | GNUNET_assert (bit < 64); |
@@ -419,6 +449,71 @@ AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, | |||
419 | } | 449 | } |
420 | 450 | ||
421 | 451 | ||
452 | |||
453 | /** | ||
454 | * Function called when Data Message is sent | ||
455 | * | ||
456 | * @param cls the io_handle corresponding to the Data Message | ||
457 | * @param socket the socket which was used | ||
458 | */ | ||
459 | static void | ||
460 | write_data_finish_cb (void *cls, | ||
461 | struct GNUNET_STREAM_Socket *socket) | ||
462 | { | ||
463 | struct GNUNET_STREAM_IOHandle *io_handle = cls; | ||
464 | |||
465 | io_handle->sent_packets++; | ||
466 | } | ||
467 | |||
468 | |||
469 | /** | ||
470 | * Writes data using the given socket. The amount of data written is limited by | ||
471 | * the receive_window_size | ||
472 | * | ||
473 | * @param socket the socket to use | ||
474 | */ | ||
475 | static void | ||
476 | write_data (struct GNUNET_STREAM_Socket *socket) | ||
477 | { | ||
478 | struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle; | ||
479 | unsigned int packet; | ||
480 | int ack_packet; | ||
481 | |||
482 | ack_packet = -1; | ||
483 | /* Find the last acknowledged packet */ | ||
484 | for (packet=0; packet < 64; packet++) | ||
485 | { | ||
486 | if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, | ||
487 | packet)) | ||
488 | { | ||
489 | ack_packet = packet; | ||
490 | } | ||
491 | } | ||
492 | /* Resend packets which weren't ack'ed */ | ||
493 | for (packet=0; packet < ack_packet; packet++) | ||
494 | { | ||
495 | if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, | ||
496 | packet)) | ||
497 | { | ||
498 | queue_message (socket, | ||
499 | &io_handle->messages[packet]->header, | ||
500 | NULL, | ||
501 | NULL); | ||
502 | } | ||
503 | } | ||
504 | packet = ack_packet + 1; | ||
505 | /* Now send new packets if there is enough buffer space */ | ||
506 | while (io_handle->receive_window_available -= | ||
507 | io_handle->messages[packet]->header.header.size > 0) | ||
508 | { | ||
509 | queue_message (socket, | ||
510 | &io_handle->messages[packet]->header, | ||
511 | &write_data_finish_cb, | ||
512 | io_handle); | ||
513 | } | ||
514 | } | ||
515 | |||
516 | |||
422 | /** | 517 | /** |
423 | * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA | 518 | * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA |
424 | * | 519 | * |
@@ -520,6 +615,7 @@ client_handle_hello_ack (void *cls, | |||
520 | { | 615 | { |
521 | case STATE_HELLO_WAIT: | 616 | case STATE_HELLO_WAIT: |
522 | socket->read_sequence_number = ntohl (ack_msg->sequence_number); | 617 | socket->read_sequence_number = ntohl (ack_msg->sequence_number); |
618 | socket->receive_window_available = ntohl (ack_msg->receive_window_size); | ||
523 | /* Get the random sequence number */ | 619 | /* Get the random sequence number */ |
524 | socket->write_sequence_number = | 620 | socket->write_sequence_number = |
525 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); | 621 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); |
@@ -530,6 +626,7 @@ client_handle_hello_ack (void *cls, | |||
530 | reply->header.header.type = | 626 | reply->header.header.type = |
531 | htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); | 627 | htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); |
532 | reply->sequence_number = htonl (socket->write_sequence_number); | 628 | reply->sequence_number = htonl (socket->write_sequence_number); |
629 | reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); | ||
533 | queue_message (socket, | 630 | queue_message (socket, |
534 | &reply->header, | 631 | &reply->header, |
535 | &set_state_established, | 632 | &set_state_established, |
@@ -840,7 +937,9 @@ server_handle_hello_ack (void *cls, | |||
840 | GNUNET_assert (socket->tunnel == tunnel); | 937 | GNUNET_assert (socket->tunnel == tunnel); |
841 | if (STATE_HELLO_WAIT == socket->state) | 938 | if (STATE_HELLO_WAIT == socket->state) |
842 | { | 939 | { |
843 | socket->read_sequence_number = ntohs (ack_message->sequence_number); | 940 | socket->read_sequence_number = ntohl (ack_message->sequence_number); |
941 | socket->receive_window_available = | ||
942 | ntohl (ack_message->receive_window_size); | ||
844 | socket->state = STATE_ESTABLISHED; | 943 | socket->state = STATE_ESTABLISHED; |
845 | } | 944 | } |
846 | else | 945 | else |
@@ -1039,11 +1138,10 @@ server_handle_close_ack (void *cls, | |||
1039 | /** | 1138 | /** |
1040 | * Message Handler for mesh | 1139 | * Message Handler for mesh |
1041 | * | 1140 | * |
1042 | * @param cls closure (set from GNUNET_MESH_connect) | 1141 | * @param socket the socket through which the ack was received |
1043 | * @param tunnel connection to the other end | 1142 | * @param tunnel connection to the other end |
1044 | * @param tunnel_ctx place to store local state associated with the tunnel | ||
1045 | * @param sender who sent the message | 1143 | * @param sender who sent the message |
1046 | * @param ack the actual message | 1144 | * @param ack the acknowledgment message |
1047 | * @param atsi performance data for the connection | 1145 | * @param atsi performance data for the connection |
1048 | * @return GNUNET_OK to keep the connection open, | 1146 | * @return GNUNET_OK to keep the connection open, |
1049 | * GNUNET_SYSERR to close it (signal serious error) | 1147 | * GNUNET_SYSERR to close it (signal serious error) |
@@ -1055,6 +1153,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, | |||
1055 | const struct GNUNET_STREAM_AckMessage *ack, | 1153 | const struct GNUNET_STREAM_AckMessage *ack, |
1056 | const struct GNUNET_ATS_Information*atsi) | 1154 | const struct GNUNET_ATS_Information*atsi) |
1057 | { | 1155 | { |
1156 | switch (socket->state) | ||
1157 | { | ||
1158 | case (STATE_ESTABLISHED): | ||
1159 | if (NULL == socket->write_handle) | ||
1160 | { | ||
1161 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1162 | "Received DATA ACK when write_handle is NULL\n"); | ||
1163 | return GNUNET_OK; | ||
1164 | } | ||
1165 | |||
1166 | socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap); | ||
1167 | socket->write_handle->receive_window_available = | ||
1168 | ntohl (ack->receive_window_remaining); | ||
1169 | write_data (socket); | ||
1170 | break; | ||
1171 | default: | ||
1172 | break; | ||
1173 | } | ||
1058 | return GNUNET_OK; | 1174 | return GNUNET_OK; |
1059 | } | 1175 | } |
1060 | 1176 | ||
@@ -1502,7 +1618,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1502 | unsigned int packet; | 1618 | unsigned int packet; |
1503 | struct GNUNET_STREAM_IOHandle *io_handle; | 1619 | struct GNUNET_STREAM_IOHandle *io_handle; |
1504 | struct GNUNET_STREAM_DataMessage *data_msg; | 1620 | struct GNUNET_STREAM_DataMessage *data_msg; |
1505 | size_t max_payload_size; | ||
1506 | size_t packet_size; | 1621 | size_t packet_size; |
1507 | const void *sweep; | 1622 | const void *sweep; |
1508 | 1623 | ||
@@ -1517,8 +1632,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1517 | return NULL; | 1632 | return NULL; |
1518 | } | 1633 | } |
1519 | 1634 | ||
1520 | max_payload_size = | ||
1521 | MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); | ||
1522 | num_needed_packets = ceil (size / max_payload_size); | 1635 | num_needed_packets = ceil (size / max_payload_size); |
1523 | if (64 < num_needed_packets) | 1636 | if (64 < num_needed_packets) |
1524 | { | 1637 | { |
@@ -1528,8 +1641,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1528 | } | 1641 | } |
1529 | 1642 | ||
1530 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); | 1643 | io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle)); |
1644 | io_handle->receive_window_available = socket->receive_window_available; | ||
1531 | sweep = data; | 1645 | sweep = data; |
1532 | /* Divide the given area into packets for sending */ | 1646 | /* Divide the given buffer into packets for sending */ |
1533 | for (packet=0; packet < num_needed_packets; packet++) | 1647 | for (packet=0; packet < num_needed_packets; packet++) |
1534 | { | 1648 | { |
1535 | if ((packet + 1) * max_payload_size < size) | 1649 | if ((packet + 1) * max_payload_size < size) |
@@ -1545,6 +1659,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1545 | io_handle->messages[packet]->header.header.size = htons (packet_size); | 1659 | io_handle->messages[packet]->header.header.size = htons (packet_size); |
1546 | io_handle->messages[packet]->header.header.type = | 1660 | io_handle->messages[packet]->header.header.type = |
1547 | htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); | 1661 | htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); |
1662 | io_handle->messages[packet]->sequence_number = | ||
1663 | htons (socket->write_sequence_number++); | ||
1548 | data_msg = io_handle->messages[packet]; | 1664 | data_msg = io_handle->messages[packet]; |
1549 | memcpy (&data_msg[1], | 1665 | memcpy (&data_msg[1], |
1550 | sweep, | 1666 | sweep, |
@@ -1552,5 +1668,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
1552 | sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); | 1668 | sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage); |
1553 | } | 1669 | } |
1554 | 1670 | ||
1671 | write_data (socket); | ||
1672 | |||
1555 | return io_handle; | 1673 | return io_handle; |
1556 | } | 1674 | } |