diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-07-29 20:40:55 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-07-29 20:40:55 +0000 |
commit | 12f69de19d27c2f962e4c0fe8480591e0e0ac6cf (patch) | |
tree | c68e543bbf6e053490c1b7e0ee1caba31fad1973 /src/stream | |
parent | af106ded69593d7f4676f32da6e0058cf1577ce2 (diff) | |
download | gnunet-12f69de19d27c2f962e4c0fe8480591e0e0ac6cf.tar.gz gnunet-12f69de19d27c2f962e4c0fe8480591e0e0ac6cf.zip |
fixes
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/stream_api.c | 147 |
1 files changed, 41 insertions, 106 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 49227ccec..4e5401c56 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -226,16 +226,6 @@ struct GNUNET_STREAM_Socket | |||
226 | struct GNUNET_MESH_TransmitHandle *transmit_handle; | 226 | struct GNUNET_MESH_TransmitHandle *transmit_handle; |
227 | 227 | ||
228 | /** | 228 | /** |
229 | * The current act transmit handle (if a pending ack transmit request exists) | ||
230 | */ | ||
231 | struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; | ||
232 | |||
233 | /** | ||
234 | * Pointer to the current ack message using in ack_task | ||
235 | */ | ||
236 | struct GNUNET_STREAM_AckMessage *ack_msg; | ||
237 | |||
238 | /** | ||
239 | * The current message associated with the transmit handle | 229 | * The current message associated with the transmit handle |
240 | */ | 230 | */ |
241 | struct MessageQueue *queue_head; | 231 | struct MessageQueue *queue_head; |
@@ -629,19 +619,21 @@ send_message_notify (void *cls, size_t size, void *buf) | |||
629 | * @param message the message to be sent | 619 | * @param message the message to be sent |
630 | * @param finish_cb the callback to be called when the message is sent | 620 | * @param finish_cb the callback to be called when the message is sent |
631 | * @param finish_cb_cls the closure for the callback | 621 | * @param finish_cb_cls the closure for the callback |
622 | * @param urgent set to GNUNET_YES to add the message to the beginning of the | ||
623 | * queue; GNUNET_NO to add at the tail | ||
632 | */ | 624 | */ |
633 | static void | 625 | static void |
634 | queue_message (struct GNUNET_STREAM_Socket *socket, | 626 | queue_message (struct GNUNET_STREAM_Socket *socket, |
635 | struct GNUNET_STREAM_MessageHeader *message, | 627 | struct GNUNET_STREAM_MessageHeader *message, |
636 | SendFinishCallback finish_cb, | 628 | SendFinishCallback finish_cb, |
637 | void *finish_cb_cls) | 629 | void *finish_cb_cls, |
630 | int urgent) | ||
638 | { | 631 | { |
639 | struct MessageQueue *queue_entity; | 632 | struct MessageQueue *queue_entity; |
640 | 633 | ||
641 | GNUNET_assert | 634 | GNUNET_assert |
642 | ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) | 635 | ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) |
643 | && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); | 636 | && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); |
644 | |||
645 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 637 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
646 | "%s: Queueing message of type %d and size %d\n", | 638 | "%s: Queueing message of type %d and size %d\n", |
647 | GNUNET_i2s (&socket->other_peer), | 639 | GNUNET_i2s (&socket->other_peer), |
@@ -652,9 +644,20 @@ queue_message (struct GNUNET_STREAM_Socket *socket, | |||
652 | queue_entity->message = message; | 644 | queue_entity->message = message; |
653 | queue_entity->finish_cb = finish_cb; | 645 | queue_entity->finish_cb = finish_cb; |
654 | queue_entity->finish_cb_cls = finish_cb_cls; | 646 | queue_entity->finish_cb_cls = finish_cb_cls; |
655 | GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head, | 647 | if (GNUNET_YES == urgent) |
656 | socket->queue_tail, | 648 | { |
657 | queue_entity); | 649 | GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail, |
650 | queue_entity); | ||
651 | if (NULL != socket->transmit_handle) | ||
652 | { | ||
653 | GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); | ||
654 | socket->transmit_handle = NULL; | ||
655 | } | ||
656 | } | ||
657 | else | ||
658 | GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head, | ||
659 | socket->queue_tail, | ||
660 | queue_entity); | ||
658 | if (NULL == socket->transmit_handle) | 661 | if (NULL == socket->transmit_handle) |
659 | { | 662 | { |
660 | socket->retries = 0; | 663 | socket->retries = 0; |
@@ -691,38 +694,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, | |||
691 | size = ntohs (message->header.size); | 694 | size = ntohs (message->header.size); |
692 | msg_copy = GNUNET_malloc (size); | 695 | msg_copy = GNUNET_malloc (size); |
693 | memcpy (msg_copy, message, size); | 696 | memcpy (msg_copy, message, size); |
694 | queue_message (socket, msg_copy, finish_cb, finish_cb_cls); | 697 | queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO); |
695 | } | ||
696 | |||
697 | |||
698 | /** | ||
699 | * Callback function for sending ack message | ||
700 | * | ||
701 | * @param cls closure the ACK message created in ack_task | ||
702 | * @param size number of bytes available in buffer | ||
703 | * @param buf where the callee should write the message | ||
704 | * @return number of bytes written to buf | ||
705 | */ | ||
706 | static size_t | ||
707 | send_ack_notify (void *cls, size_t size, void *buf) | ||
708 | { | ||
709 | struct GNUNET_STREAM_Socket *socket = cls; | ||
710 | |||
711 | if (0 == size) | ||
712 | { | ||
713 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
714 | "%s called with size 0\n", __func__); | ||
715 | return 0; | ||
716 | } | ||
717 | GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); | ||
718 | |||
719 | size = ntohs (socket->ack_msg->header.header.size); | ||
720 | memcpy (buf, socket->ack_msg, size); | ||
721 | |||
722 | GNUNET_free (socket->ack_msg); | ||
723 | socket->ack_msg = NULL; | ||
724 | socket->ack_transmit_handle = NULL; | ||
725 | return size; | ||
726 | } | 698 | } |
727 | 699 | ||
728 | 700 | ||
@@ -785,16 +757,8 @@ ack_task (void *cls, | |||
785 | ack_msg->base_sequence_number = htonl (socket->read_sequence_number); | 757 | ack_msg->base_sequence_number = htonl (socket->read_sequence_number); |
786 | ack_msg->receive_window_remaining = | 758 | ack_msg->receive_window_remaining = |
787 | htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); | 759 | htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); |
788 | socket->ack_msg = ack_msg; | 760 | /* Queue up ACK for immediate sending */ |
789 | /* Request MESH for sending ACK */ | 761 | queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES); |
790 | socket->ack_transmit_handle = | ||
791 | GNUNET_MESH_notify_transmit_ready (socket->tunnel, | ||
792 | GNUNET_NO, /* Corking */ | ||
793 | socket->retransmit_timeout, | ||
794 | &socket->other_peer, | ||
795 | ntohs (ack_msg->header.header.size), | ||
796 | &send_ack_notify, | ||
797 | socket); | ||
798 | } | 762 | } |
799 | 763 | ||
800 | 764 | ||
@@ -834,7 +798,7 @@ close_msg_retransmission_task (void *cls, | |||
834 | GNUNET_SCHEDULER_NO_TASK; | 798 | GNUNET_SCHEDULER_NO_TASK; |
835 | return; | 799 | return; |
836 | } | 800 | } |
837 | queue_message (socket, msg, NULL, NULL); | 801 | queue_message (socket, msg, NULL, NULL, GNUNET_NO); |
838 | shutdown_handle->close_msg_retransmission_task_id = | 802 | shutdown_handle->close_msg_retransmission_task_id = |
839 | GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, | 803 | GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, |
840 | &close_msg_retransmission_task, | 804 | &close_msg_retransmission_task, |
@@ -1512,11 +1476,12 @@ control_retransmission_task (void *cls, | |||
1512 | break; | 1476 | break; |
1513 | case STATE_HELLO_WAIT: | 1477 | case STATE_HELLO_WAIT: |
1514 | if (NULL == socket->lsocket) /* We are client */ | 1478 | if (NULL == socket->lsocket) /* We are client */ |
1515 | queue_message (socket, generate_hello (), NULL, NULL); | 1479 | queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO); |
1516 | else | 1480 | else |
1517 | queue_message (socket, | 1481 | queue_message (socket, |
1518 | (struct GNUNET_STREAM_MessageHeader *) | 1482 | (struct GNUNET_STREAM_MessageHeader *) |
1519 | generate_hello_ack (socket, GNUNET_NO), NULL, NULL); | 1483 | generate_hello_ack (socket, GNUNET_NO), NULL, NULL, |
1484 | GNUNET_NO); | ||
1520 | socket->control_retransmission_task_id = | 1485 | socket->control_retransmission_task_id = |
1521 | GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, | 1486 | GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, |
1522 | &control_retransmission_task, socket); | 1487 | &control_retransmission_task, socket); |
@@ -1525,7 +1490,8 @@ control_retransmission_task (void *cls, | |||
1525 | if (NULL == socket->lsocket) | 1490 | if (NULL == socket->lsocket) |
1526 | queue_message (socket, | 1491 | queue_message (socket, |
1527 | (struct GNUNET_STREAM_MessageHeader *) | 1492 | (struct GNUNET_STREAM_MessageHeader *) |
1528 | generate_hello_ack (socket, GNUNET_NO), NULL, NULL); | 1493 | generate_hello_ack (socket, GNUNET_NO), NULL, NULL, |
1494 | GNUNET_NO); | ||
1529 | else | 1495 | else |
1530 | GNUNET_break (0); | 1496 | GNUNET_break (0); |
1531 | default: | 1497 | default: |
@@ -1584,10 +1550,8 @@ client_handle_hello_ack (void *cls, | |||
1584 | (unsigned int) socket->read_sequence_number); | 1550 | (unsigned int) socket->read_sequence_number); |
1585 | socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); | 1551 | socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); |
1586 | reply = generate_hello_ack (socket, GNUNET_YES); | 1552 | reply = generate_hello_ack (socket, GNUNET_YES); |
1587 | queue_message (socket, | 1553 | queue_message (socket, &reply->header, &set_state_established, |
1588 | &reply->header, | 1554 | NULL, GNUNET_NO); |
1589 | &set_state_established, | ||
1590 | NULL); | ||
1591 | return GNUNET_OK; | 1555 | return GNUNET_OK; |
1592 | case STATE_ESTABLISHED: | 1556 | case STATE_ESTABLISHED: |
1593 | // call statistics (# ACKs ignored++) | 1557 | // call statistics (# ACKs ignored++) |
@@ -1663,7 +1627,7 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, | |||
1663 | reply->header.type = | 1627 | reply->header.type = |
1664 | htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); | 1628 | htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); |
1665 | reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); | 1629 | reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); |
1666 | queue_message (socket, reply, NULL, NULL); | 1630 | queue_message (socket, reply, NULL, NULL, GNUNET_NO); |
1667 | break; | 1631 | break; |
1668 | 1632 | ||
1669 | default: | 1633 | default: |
@@ -1914,11 +1878,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, | |||
1914 | htons (sizeof (struct GNUNET_STREAM_MessageHeader)); | 1878 | htons (sizeof (struct GNUNET_STREAM_MessageHeader)); |
1915 | receive_close_ack->header.type = | 1879 | receive_close_ack->header.type = |
1916 | htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); | 1880 | htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); |
1917 | queue_message (socket, | 1881 | queue_message (socket, receive_close_ack, &set_state_closed, |
1918 | receive_close_ack, | 1882 | NULL, GNUNET_NO); |
1919 | &set_state_closed, | ||
1920 | NULL); | ||
1921 | |||
1922 | /* FIXME: Handle the case where write handle is present; the write operation | 1883 | /* FIXME: Handle the case where write handle is present; the write operation |
1923 | should be deemed as finised and the write continuation callback | 1884 | should be deemed as finised and the write continuation callback |
1924 | has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ | 1885 | has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ |
@@ -2029,10 +1990,7 @@ handle_close (struct GNUNET_STREAM_Socket *socket, | |||
2029 | close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); | 1990 | close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); |
2030 | close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); | 1991 | close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); |
2031 | close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); | 1992 | close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); |
2032 | queue_message (socket, | 1993 | queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO); |
2033 | close_ack, | ||
2034 | &set_state_closed, | ||
2035 | NULL); | ||
2036 | if (socket->state == STATE_CLOSED) | 1994 | if (socket->state == STATE_CLOSED) |
2037 | return GNUNET_OK; | 1995 | return GNUNET_OK; |
2038 | 1996 | ||
@@ -2177,7 +2135,8 @@ server_handle_hello (void *cls, | |||
2177 | { | 2135 | { |
2178 | case STATE_INIT: | 2136 | case STATE_INIT: |
2179 | reply = generate_hello_ack (socket, GNUNET_YES); | 2137 | reply = generate_hello_ack (socket, GNUNET_YES); |
2180 | queue_message (socket, &reply->header, &set_state_hello_wait, NULL); | 2138 | queue_message (socket, &reply->header, &set_state_hello_wait, NULL, |
2139 | GNUNET_NO); | ||
2181 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == | 2140 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == |
2182 | socket->control_retransmission_task_id); | 2141 | socket->control_retransmission_task_id); |
2183 | socket->control_retransmission_task_id = | 2142 | socket->control_retransmission_task_id = |
@@ -2753,10 +2712,7 @@ mesh_peer_connect_callback (void *cls, | |||
2753 | socket->state = STATE_INIT; | 2712 | socket->state = STATE_INIT; |
2754 | /* Send HELLO message */ | 2713 | /* Send HELLO message */ |
2755 | message = generate_hello (); | 2714 | message = generate_hello (); |
2756 | queue_message (socket, | 2715 | queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO); |
2757 | message, | ||
2758 | &set_state_hello_wait, | ||
2759 | NULL); | ||
2760 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == | 2716 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == |
2761 | socket->control_retransmission_task_id); | 2717 | socket->control_retransmission_task_id); |
2762 | socket->control_retransmission_task_id = | 2718 | socket->control_retransmission_task_id = |
@@ -2873,13 +2829,6 @@ tunnel_cleaner (void *cls, | |||
2873 | GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); | 2829 | GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); |
2874 | socket->transmit_handle = NULL; | 2830 | socket->transmit_handle = NULL; |
2875 | } | 2831 | } |
2876 | if (NULL != socket->ack_transmit_handle) | ||
2877 | { | ||
2878 | GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); | ||
2879 | GNUNET_free (socket->ack_msg); | ||
2880 | socket->ack_msg = NULL; | ||
2881 | socket->ack_transmit_handle = NULL; | ||
2882 | } | ||
2883 | /* Stop Tasks using socket->tunnel */ | 2832 | /* Stop Tasks using socket->tunnel */ |
2884 | if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) | 2833 | if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) |
2885 | { | 2834 | { |
@@ -3096,10 +3045,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, | |||
3096 | "Existing read handle should be cancelled before shutting" | 3045 | "Existing read handle should be cancelled before shutting" |
3097 | " down reading\n"); | 3046 | " down reading\n"); |
3098 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); | 3047 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); |
3099 | queue_message (socket, | 3048 | queue_message (socket, msg, &set_state_receive_close_wait, NULL, |
3100 | msg, | 3049 | GNUNET_NO); |
3101 | &set_state_receive_close_wait, | ||
3102 | NULL); | ||
3103 | break; | 3050 | break; |
3104 | case SHUT_WR: | 3051 | case SHUT_WR: |
3105 | handle->operation = SHUT_WR; | 3052 | handle->operation = SHUT_WR; |
@@ -3108,10 +3055,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, | |||
3108 | "Existing write handle should be cancelled before shutting" | 3055 | "Existing write handle should be cancelled before shutting" |
3109 | " down writing\n"); | 3056 | " down writing\n"); |
3110 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); | 3057 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); |
3111 | queue_message (socket, | 3058 | queue_message (socket, msg, &set_state_transmit_close_wait, NULL, |
3112 | msg, | 3059 | GNUNET_NO); |
3113 | &set_state_transmit_close_wait, | ||
3114 | NULL); | ||
3115 | break; | 3060 | break; |
3116 | case SHUT_RDWR: | 3061 | case SHUT_RDWR: |
3117 | handle->operation = SHUT_RDWR; | 3062 | handle->operation = SHUT_RDWR; |
@@ -3124,10 +3069,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, | |||
3124 | "Existing read handle should be cancelled before shutting" | 3069 | "Existing read handle should be cancelled before shutting" |
3125 | " down reading\n"); | 3070 | " down reading\n"); |
3126 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); | 3071 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); |
3127 | queue_message (socket, | 3072 | queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO); |
3128 | msg, | ||
3129 | &set_state_close_wait, | ||
3130 | NULL); | ||
3131 | break; | 3073 | break; |
3132 | default: | 3074 | default: |
3133 | LOG (GNUNET_ERROR_TYPE_WARNING, | 3075 | LOG (GNUNET_ERROR_TYPE_WARNING, |
@@ -3206,13 +3148,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) | |||
3206 | GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); | 3148 | GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); |
3207 | socket->transmit_handle = NULL; | 3149 | socket->transmit_handle = NULL; |
3208 | } | 3150 | } |
3209 | if (NULL != socket->ack_transmit_handle) | ||
3210 | { | ||
3211 | GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); | ||
3212 | GNUNET_free (socket->ack_msg); | ||
3213 | socket->ack_msg = NULL; | ||
3214 | socket->ack_transmit_handle = NULL; | ||
3215 | } | ||
3216 | /* Clear existing message queue */ | 3151 | /* Clear existing message queue */ |
3217 | while (NULL != (head = socket->queue_head)) { | 3152 | while (NULL != (head = socket->queue_head)) { |
3218 | GNUNET_CONTAINER_DLL_remove (socket->queue_head, | 3153 | GNUNET_CONTAINER_DLL_remove (socket->queue_head, |