aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-07-29 20:40:55 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-07-29 20:40:55 +0000
commit12f69de19d27c2f962e4c0fe8480591e0e0ac6cf (patch)
treec68e543bbf6e053490c1b7e0ee1caba31fad1973 /src/stream
parentaf106ded69593d7f4676f32da6e0058cf1577ce2 (diff)
downloadgnunet-12f69de19d27c2f962e4c0fe8480591e0e0ac6cf.tar.gz
gnunet-12f69de19d27c2f962e4c0fe8480591e0e0ac6cf.zip
fixes
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c147
1 files changed, 41 insertions, 106 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 49227ccec..4e5401c56 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -226,16 +226,6 @@ struct GNUNET_STREAM_Socket
226 struct GNUNET_MESH_TransmitHandle *transmit_handle; 226 struct GNUNET_MESH_TransmitHandle *transmit_handle;
227 227
228 /** 228 /**
229 * The current act transmit handle (if a pending ack transmit request exists)
230 */
231 struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
232
233 /**
234 * Pointer to the current ack message using in ack_task
235 */
236 struct GNUNET_STREAM_AckMessage *ack_msg;
237
238 /**
239 * The current message associated with the transmit handle 229 * The current message associated with the transmit handle
240 */ 230 */
241 struct MessageQueue *queue_head; 231 struct MessageQueue *queue_head;
@@ -629,19 +619,21 @@ send_message_notify (void *cls, size_t size, void *buf)
629 * @param message the message to be sent 619 * @param message the message to be sent
630 * @param finish_cb the callback to be called when the message is sent 620 * @param finish_cb the callback to be called when the message is sent
631 * @param finish_cb_cls the closure for the callback 621 * @param finish_cb_cls the closure for the callback
622 * @param urgent set to GNUNET_YES to add the message to the beginning of the
623 * queue; GNUNET_NO to add at the tail
632 */ 624 */
633static void 625static void
634queue_message (struct GNUNET_STREAM_Socket *socket, 626queue_message (struct GNUNET_STREAM_Socket *socket,
635 struct GNUNET_STREAM_MessageHeader *message, 627 struct GNUNET_STREAM_MessageHeader *message,
636 SendFinishCallback finish_cb, 628 SendFinishCallback finish_cb,
637 void *finish_cb_cls) 629 void *finish_cb_cls,
630 int urgent)
638{ 631{
639 struct MessageQueue *queue_entity; 632 struct MessageQueue *queue_entity;
640 633
641 GNUNET_assert 634 GNUNET_assert
642 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) 635 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
643 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); 636 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
644
645 LOG (GNUNET_ERROR_TYPE_DEBUG, 637 LOG (GNUNET_ERROR_TYPE_DEBUG,
646 "%s: Queueing message of type %d and size %d\n", 638 "%s: Queueing message of type %d and size %d\n",
647 GNUNET_i2s (&socket->other_peer), 639 GNUNET_i2s (&socket->other_peer),
@@ -652,9 +644,20 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
652 queue_entity->message = message; 644 queue_entity->message = message;
653 queue_entity->finish_cb = finish_cb; 645 queue_entity->finish_cb = finish_cb;
654 queue_entity->finish_cb_cls = finish_cb_cls; 646 queue_entity->finish_cb_cls = finish_cb_cls;
655 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head, 647 if (GNUNET_YES == urgent)
656 socket->queue_tail, 648 {
657 queue_entity); 649 GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
650 queue_entity);
651 if (NULL != socket->transmit_handle)
652 {
653 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
654 socket->transmit_handle = NULL;
655 }
656 }
657 else
658 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
659 socket->queue_tail,
660 queue_entity);
658 if (NULL == socket->transmit_handle) 661 if (NULL == socket->transmit_handle)
659 { 662 {
660 socket->retries = 0; 663 socket->retries = 0;
@@ -691,38 +694,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
691 size = ntohs (message->header.size); 694 size = ntohs (message->header.size);
692 msg_copy = GNUNET_malloc (size); 695 msg_copy = GNUNET_malloc (size);
693 memcpy (msg_copy, message, size); 696 memcpy (msg_copy, message, size);
694 queue_message (socket, msg_copy, finish_cb, finish_cb_cls); 697 queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
695}
696
697
698/**
699 * Callback function for sending ack message
700 *
701 * @param cls closure the ACK message created in ack_task
702 * @param size number of bytes available in buffer
703 * @param buf where the callee should write the message
704 * @return number of bytes written to buf
705 */
706static size_t
707send_ack_notify (void *cls, size_t size, void *buf)
708{
709 struct GNUNET_STREAM_Socket *socket = cls;
710
711 if (0 == size)
712 {
713 LOG (GNUNET_ERROR_TYPE_DEBUG,
714 "%s called with size 0\n", __func__);
715 return 0;
716 }
717 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
718
719 size = ntohs (socket->ack_msg->header.header.size);
720 memcpy (buf, socket->ack_msg, size);
721
722 GNUNET_free (socket->ack_msg);
723 socket->ack_msg = NULL;
724 socket->ack_transmit_handle = NULL;
725 return size;
726} 698}
727 699
728 700
@@ -785,16 +757,8 @@ ack_task (void *cls,
785 ack_msg->base_sequence_number = htonl (socket->read_sequence_number); 757 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
786 ack_msg->receive_window_remaining = 758 ack_msg->receive_window_remaining =
787 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); 759 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
788 socket->ack_msg = ack_msg; 760 /* Queue up ACK for immediate sending */
789 /* Request MESH for sending ACK */ 761 queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
790 socket->ack_transmit_handle =
791 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
792 GNUNET_NO, /* Corking */
793 socket->retransmit_timeout,
794 &socket->other_peer,
795 ntohs (ack_msg->header.header.size),
796 &send_ack_notify,
797 socket);
798} 762}
799 763
800 764
@@ -834,7 +798,7 @@ close_msg_retransmission_task (void *cls,
834 GNUNET_SCHEDULER_NO_TASK; 798 GNUNET_SCHEDULER_NO_TASK;
835 return; 799 return;
836 } 800 }
837 queue_message (socket, msg, NULL, NULL); 801 queue_message (socket, msg, NULL, NULL, GNUNET_NO);
838 shutdown_handle->close_msg_retransmission_task_id = 802 shutdown_handle->close_msg_retransmission_task_id =
839 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, 803 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
840 &close_msg_retransmission_task, 804 &close_msg_retransmission_task,
@@ -1512,11 +1476,12 @@ control_retransmission_task (void *cls,
1512 break; 1476 break;
1513 case STATE_HELLO_WAIT: 1477 case STATE_HELLO_WAIT:
1514 if (NULL == socket->lsocket) /* We are client */ 1478 if (NULL == socket->lsocket) /* We are client */
1515 queue_message (socket, generate_hello (), NULL, NULL); 1479 queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
1516 else 1480 else
1517 queue_message (socket, 1481 queue_message (socket,
1518 (struct GNUNET_STREAM_MessageHeader *) 1482 (struct GNUNET_STREAM_MessageHeader *)
1519 generate_hello_ack (socket, GNUNET_NO), NULL, NULL); 1483 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1484 GNUNET_NO);
1520 socket->control_retransmission_task_id = 1485 socket->control_retransmission_task_id =
1521 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, 1486 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1522 &control_retransmission_task, socket); 1487 &control_retransmission_task, socket);
@@ -1525,7 +1490,8 @@ control_retransmission_task (void *cls,
1525 if (NULL == socket->lsocket) 1490 if (NULL == socket->lsocket)
1526 queue_message (socket, 1491 queue_message (socket,
1527 (struct GNUNET_STREAM_MessageHeader *) 1492 (struct GNUNET_STREAM_MessageHeader *)
1528 generate_hello_ack (socket, GNUNET_NO), NULL, NULL); 1493 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1494 GNUNET_NO);
1529 else 1495 else
1530 GNUNET_break (0); 1496 GNUNET_break (0);
1531 default: 1497 default:
@@ -1584,10 +1550,8 @@ client_handle_hello_ack (void *cls,
1584 (unsigned int) socket->read_sequence_number); 1550 (unsigned int) socket->read_sequence_number);
1585 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); 1551 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1586 reply = generate_hello_ack (socket, GNUNET_YES); 1552 reply = generate_hello_ack (socket, GNUNET_YES);
1587 queue_message (socket, 1553 queue_message (socket, &reply->header, &set_state_established,
1588 &reply->header, 1554 NULL, GNUNET_NO);
1589 &set_state_established,
1590 NULL);
1591 return GNUNET_OK; 1555 return GNUNET_OK;
1592 case STATE_ESTABLISHED: 1556 case STATE_ESTABLISHED:
1593 // call statistics (# ACKs ignored++) 1557 // call statistics (# ACKs ignored++)
@@ -1663,7 +1627,7 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1663 reply->header.type = 1627 reply->header.type =
1664 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); 1628 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1665 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1629 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1666 queue_message (socket, reply, NULL, NULL); 1630 queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1667 break; 1631 break;
1668 1632
1669 default: 1633 default:
@@ -1914,11 +1878,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1914 htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1878 htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1915 receive_close_ack->header.type = 1879 receive_close_ack->header.type =
1916 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); 1880 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1917 queue_message (socket, 1881 queue_message (socket, receive_close_ack, &set_state_closed,
1918 receive_close_ack, 1882 NULL, GNUNET_NO);
1919 &set_state_closed,
1920 NULL);
1921
1922 /* FIXME: Handle the case where write handle is present; the write operation 1883 /* FIXME: Handle the case where write handle is present; the write operation
1923 should be deemed as finised and the write continuation callback 1884 should be deemed as finised and the write continuation callback
1924 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ 1885 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
@@ -2029,10 +1990,7 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
2029 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1990 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2030 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1991 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2031 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); 1992 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2032 queue_message (socket, 1993 queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2033 close_ack,
2034 &set_state_closed,
2035 NULL);
2036 if (socket->state == STATE_CLOSED) 1994 if (socket->state == STATE_CLOSED)
2037 return GNUNET_OK; 1995 return GNUNET_OK;
2038 1996
@@ -2177,7 +2135,8 @@ server_handle_hello (void *cls,
2177 { 2135 {
2178 case STATE_INIT: 2136 case STATE_INIT:
2179 reply = generate_hello_ack (socket, GNUNET_YES); 2137 reply = generate_hello_ack (socket, GNUNET_YES);
2180 queue_message (socket, &reply->header, &set_state_hello_wait, NULL); 2138 queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2139 GNUNET_NO);
2181 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == 2140 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2182 socket->control_retransmission_task_id); 2141 socket->control_retransmission_task_id);
2183 socket->control_retransmission_task_id = 2142 socket->control_retransmission_task_id =
@@ -2753,10 +2712,7 @@ mesh_peer_connect_callback (void *cls,
2753 socket->state = STATE_INIT; 2712 socket->state = STATE_INIT;
2754 /* Send HELLO message */ 2713 /* Send HELLO message */
2755 message = generate_hello (); 2714 message = generate_hello ();
2756 queue_message (socket, 2715 queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2757 message,
2758 &set_state_hello_wait,
2759 NULL);
2760 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == 2716 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2761 socket->control_retransmission_task_id); 2717 socket->control_retransmission_task_id);
2762 socket->control_retransmission_task_id = 2718 socket->control_retransmission_task_id =
@@ -2873,13 +2829,6 @@ tunnel_cleaner (void *cls,
2873 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); 2829 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2874 socket->transmit_handle = NULL; 2830 socket->transmit_handle = NULL;
2875 } 2831 }
2876 if (NULL != socket->ack_transmit_handle)
2877 {
2878 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2879 GNUNET_free (socket->ack_msg);
2880 socket->ack_msg = NULL;
2881 socket->ack_transmit_handle = NULL;
2882 }
2883 /* Stop Tasks using socket->tunnel */ 2832 /* Stop Tasks using socket->tunnel */
2884 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) 2833 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2885 { 2834 {
@@ -3096,10 +3045,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3096 "Existing read handle should be cancelled before shutting" 3045 "Existing read handle should be cancelled before shutting"
3097 " down reading\n"); 3046 " down reading\n");
3098 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); 3047 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3099 queue_message (socket, 3048 queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3100 msg, 3049 GNUNET_NO);
3101 &set_state_receive_close_wait,
3102 NULL);
3103 break; 3050 break;
3104 case SHUT_WR: 3051 case SHUT_WR:
3105 handle->operation = SHUT_WR; 3052 handle->operation = SHUT_WR;
@@ -3108,10 +3055,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3108 "Existing write handle should be cancelled before shutting" 3055 "Existing write handle should be cancelled before shutting"
3109 " down writing\n"); 3056 " down writing\n");
3110 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); 3057 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3111 queue_message (socket, 3058 queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3112 msg, 3059 GNUNET_NO);
3113 &set_state_transmit_close_wait,
3114 NULL);
3115 break; 3060 break;
3116 case SHUT_RDWR: 3061 case SHUT_RDWR:
3117 handle->operation = SHUT_RDWR; 3062 handle->operation = SHUT_RDWR;
@@ -3124,10 +3069,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3124 "Existing read handle should be cancelled before shutting" 3069 "Existing read handle should be cancelled before shutting"
3125 " down reading\n"); 3070 " down reading\n");
3126 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); 3071 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3127 queue_message (socket, 3072 queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3128 msg,
3129 &set_state_close_wait,
3130 NULL);
3131 break; 3073 break;
3132 default: 3074 default:
3133 LOG (GNUNET_ERROR_TYPE_WARNING, 3075 LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -3206,13 +3148,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3206 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); 3148 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3207 socket->transmit_handle = NULL; 3149 socket->transmit_handle = NULL;
3208 } 3150 }
3209 if (NULL != socket->ack_transmit_handle)
3210 {
3211 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
3212 GNUNET_free (socket->ack_msg);
3213 socket->ack_msg = NULL;
3214 socket->ack_transmit_handle = NULL;
3215 }
3216 /* Clear existing message queue */ 3151 /* Clear existing message queue */
3217 while (NULL != (head = socket->queue_head)) { 3152 while (NULL != (head = socket->queue_head)) {
3218 GNUNET_CONTAINER_DLL_remove (socket->queue_head, 3153 GNUNET_CONTAINER_DLL_remove (socket->queue_head,