aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-05-11 08:18:06 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-05-11 08:18:06 +0000
commitb1bf8f35ca652ab6371760d7e38d62a1e1bdee9b (patch)
tree885c4c19539afd575626df77f43e6365c42bcbc6
parent2c12d7cc5ffcd5fe9a4fb651e75fe38cd10d7121 (diff)
downloadgnunet-b1bf8f35ca652ab6371760d7e38d62a1e1bdee9b.tar.gz
gnunet-b1bf8f35ca652ab6371760d7e38d62a1e1bdee9b.zip
logging and indentation
-rw-r--r--src/stream/stream_api.c1544
1 files changed, 773 insertions, 771 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 3bf3c7863..e8ba24966 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -45,6 +45,8 @@
45#include "gnunet_testing_lib.h" 45#include "gnunet_testing_lib.h"
46#include "stream_protocol.h" 46#include "stream_protocol.h"
47 47
48#define LOG(kind,...) \
49 GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
48 50
49/** 51/**
50 * The maximum packet size of a stream packet 52 * The maximum packet size of a stream packet
@@ -494,31 +496,31 @@ send_message_notify (void *cls, size_t size, void *buf)
494 return 0; /* just to be safe */ 496 return 0; /* just to be safe */
495 GNUNET_PEER_resolve (socket->other_peer, &target); 497 GNUNET_PEER_resolve (socket->other_peer, &target);
496 if (0 == size) /* request timed out */ 498 if (0 == size) /* request timed out */
497 { 499 {
498 socket->retries++; 500 socket->retries++;
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 501 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "Message sending timed out. Retry %d \n", 502 "Message sending timed out. Retry %d \n",
501 socket->retries); 503 socket->retries);
502 socket->transmit_handle = 504 socket->transmit_handle =
503 GNUNET_MESH_notify_transmit_ready (socket->tunnel, 505 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
504 0, /* Corking */ 506 0, /* Corking */
505 1, /* Priority */ 507 1, /* Priority */
506 /* FIXME: exponential backoff */ 508 /* FIXME: exponential backoff */
507 socket->retransmit_timeout, 509 socket->retransmit_timeout,
508 &target, 510 &target,
509 ntohs (head->message->header.size), 511 ntohs (head->message->header.size),
510 &send_message_notify, 512 &send_message_notify,
511 socket); 513 socket);
512 return 0; 514 return 0;
513 } 515 }
514 516
515 ret = ntohs (head->message->header.size); 517 ret = ntohs (head->message->header.size);
516 GNUNET_assert (size >= ret); 518 GNUNET_assert (size >= ret);
517 memcpy (buf, head->message, ret); 519 memcpy (buf, head->message, ret);
518 if (NULL != head->finish_cb) 520 if (NULL != head->finish_cb)
519 { 521 {
520 head->finish_cb (head->finish_cb_cls, socket); 522 head->finish_cb (head->finish_cb_cls, socket);
521 } 523 }
522 GNUNET_CONTAINER_DLL_remove (socket->queue_head, 524 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
523 socket->queue_tail, 525 socket->queue_tail,
524 head); 526 head);
@@ -526,19 +528,19 @@ send_message_notify (void *cls, size_t size, void *buf)
526 GNUNET_free (head); 528 GNUNET_free (head);
527 head = socket->queue_head; 529 head = socket->queue_head;
528 if (NULL != head) /* more pending messages to send */ 530 if (NULL != head) /* more pending messages to send */
529 { 531 {
530 socket->retries = 0; 532 socket->retries = 0;
531 socket->transmit_handle = 533 socket->transmit_handle =
532 GNUNET_MESH_notify_transmit_ready (socket->tunnel, 534 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
533 0, /* Corking */ 535 0, /* Corking */
534 1, /* Priority */ 536 1, /* Priority */
535 /* FIXME: exponential backoff */ 537 /* FIXME: exponential backoff */
536 socket->retransmit_timeout, 538 socket->retransmit_timeout,
537 &target, 539 &target,
538 ntohs (head->message->header.size), 540 ntohs (head->message->header.size),
539 &send_message_notify, 541 &send_message_notify,
540 socket); 542 socket);
541 } 543 }
542 return ret; 544 return ret;
543} 545}
544 546
@@ -564,10 +566,10 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
564 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) 566 ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
565 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); 567 && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
566 568
567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 569 LOG (GNUNET_ERROR_TYPE_DEBUG,
568 "Queueing message of type %d and size %d\n", 570 "Queueing message of type %d and size %d\n",
569 ntohs (message->header.type), 571 ntohs (message->header.type),
570 ntohs (message->header.size)); 572 ntohs (message->header.size));
571 GNUNET_assert (NULL != message); 573 GNUNET_assert (NULL != message);
572 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); 574 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
573 queue_entity->message = message; 575 queue_entity->message = message;
@@ -632,11 +634,11 @@ send_ack_notify (void *cls, size_t size, void *buf)
632 struct GNUNET_STREAM_Socket *socket = cls; 634 struct GNUNET_STREAM_Socket *socket = cls;
633 635
634 if (0 == size) 636 if (0 == size)
635 { 637 {
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 638 LOG (GNUNET_ERROR_TYPE_DEBUG,
637 "%s called with size 0\n", __func__); 639 "%s called with size 0\n", __func__);
638 return 0; 640 return 0;
639 } 641 }
640 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); 642 GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
641 643
642 size = ntohs (socket->ack_msg->header.header.size); 644 size = ntohs (socket->ack_msg->header.header.size);
@@ -673,8 +675,8 @@ retransmission_timeout_task (void *cls,
673 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) 675 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
674 return; 676 return;
675 677
676 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 678 LOG (GNUNET_ERROR_TYPE_DEBUG,
677 "Retransmitting DATA...\n"); 679 "Retransmitting DATA...\n");
678 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 680 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
679 write_data (socket); 681 write_data (socket);
680} 682}
@@ -695,9 +697,9 @@ ack_task (void *cls,
695 struct GNUNET_PeerIdentity target; 697 struct GNUNET_PeerIdentity target;
696 698
697 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) 699 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
698 { 700 {
699 return; 701 return;
700 } 702 }
701 703
702 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; 704 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
703 705
@@ -746,22 +748,22 @@ close_msg_retransmission_task (void *cls,
746 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 748 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
747 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 749 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
748 switch (shutdown_handle->operation) 750 switch (shutdown_handle->operation)
749 { 751 {
750 case SHUT_RDWR: 752 case SHUT_RDWR:
751 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); 753 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
752 break; 754 break;
753 case SHUT_RD: 755 case SHUT_RD:
754 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); 756 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
755 break; 757 break;
756 case SHUT_WR: 758 case SHUT_WR:
757 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); 759 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
758 break; 760 break;
759 default: 761 default:
760 GNUNET_free (msg); 762 GNUNET_free (msg);
761 shutdown_handle->close_msg_retransmission_task_id = 763 shutdown_handle->close_msg_retransmission_task_id =
762 GNUNET_SCHEDULER_NO_TASK; 764 GNUNET_SCHEDULER_NO_TASK;
763 return; 765 return;
764 } 766 }
765 queue_message (socket, msg, NULL, NULL); 767 queue_message (socket, msg, NULL, NULL);
766 shutdown_handle->close_msg_retransmission_task_id = 768 shutdown_handle->close_msg_retransmission_task_id =
767 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, 769 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
@@ -822,46 +824,46 @@ write_data (struct GNUNET_STREAM_Socket *socket)
822 ack_packet = -1; 824 ack_packet = -1;
823 /* Find the last acknowledged packet */ 825 /* Find the last acknowledged packet */
824 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 826 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
825 { 827 {
826 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, 828 if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
827 packet)) 829 packet))
828 ack_packet = packet; 830 ack_packet = packet;
829 else if (NULL == io_handle->messages[packet]) 831 else if (NULL == io_handle->messages[packet])
830 break; 832 break;
831 } 833 }
832 /* Resend packets which weren't ack'ed */ 834 /* Resend packets which weren't ack'ed */
833 for (packet=0; packet < ack_packet; packet++) 835 for (packet=0; packet < ack_packet; packet++)
836 {
837 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
838 packet))
834 { 839 {
835 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, 840 LOG (GNUNET_ERROR_TYPE_DEBUG,
836 packet)) 841 "Placing DATA message with sequence %u in send queue\n",
837 { 842 ntohl (io_handle->messages[packet]->sequence_number));
838 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 843
839 "Placing DATA message with sequence %u in send queue\n", 844 copy_and_queue_message (socket,
840 ntohl (io_handle->messages[packet]->sequence_number)); 845 &io_handle->messages[packet]->header,
841 846 NULL,
842 copy_and_queue_message (socket, 847 NULL);
843 &io_handle->messages[packet]->header,
844 NULL,
845 NULL);
846 }
847 } 848 }
849 }
848 packet = ack_packet + 1; 850 packet = ack_packet + 1;
849 /* Now send new packets if there is enough buffer space */ 851 /* Now send new packets if there is enough buffer space */
850 while ( (NULL != io_handle->messages[packet]) && 852 while ( (NULL != io_handle->messages[packet]) &&
851 (socket->receiver_window_available 853 (socket->receiver_window_available
852 >= ntohs (io_handle->messages[packet]->header.header.size)) ) 854 >= ntohs (io_handle->messages[packet]->header.header.size)) )
853 { 855 {
854 socket->receiver_window_available -= 856 socket->receiver_window_available -=
855 ntohs (io_handle->messages[packet]->header.header.size); 857 ntohs (io_handle->messages[packet]->header.header.size);
856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 858 LOG (GNUNET_ERROR_TYPE_DEBUG,
857 "Placing DATA message with sequence %u in send queue\n", 859 "Placing DATA message with sequence %u in send queue\n",
858 ntohl (io_handle->messages[packet]->sequence_number)); 860 ntohl (io_handle->messages[packet]->sequence_number));
859 copy_and_queue_message (socket, 861 copy_and_queue_message (socket,
860 &io_handle->messages[packet]->header, 862 &io_handle->messages[packet]->header,
861 NULL, 863 NULL,
862 NULL); 864 NULL);
863 packet++; 865 packet++;
864 } 866 }
865 867
866 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) 868 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
867 socket->retransmission_timeout_task_id = 869 socket->retransmission_timeout_task_id =
@@ -901,11 +903,11 @@ call_read_processor (void *cls,
901 903
902 /* Check the bitmap for any holes */ 904 /* Check the bitmap for any holes */
903 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 905 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
904 { 906 {
905 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, 907 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
906 packet)) 908 packet))
907 break; 909 break;
908 } 910 }
909 /* We only call read processor if we have the first packet */ 911 /* We only call read processor if we have the first packet */
910 GNUNET_assert (0 < packet); 912 GNUNET_assert (0 < packet);
911 913
@@ -919,18 +921,18 @@ call_read_processor (void *cls,
919 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 921 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
920 922
921 /* Call the data processor */ 923 /* Call the data processor */
922 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 924 LOG (GNUNET_ERROR_TYPE_DEBUG,
923 "Calling read processor\n"); 925 "Calling read processor\n");
924 read_size = 926 read_size =
925 socket->read_handle->proc (socket->read_handle->proc_cls, 927 socket->read_handle->proc (socket->read_handle->proc_cls,
926 socket->status, 928 socket->status,
927 socket->receive_buffer + socket->copy_offset, 929 socket->receive_buffer + socket->copy_offset,
928 valid_read_size); 930 valid_read_size);
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 931 LOG (GNUNET_ERROR_TYPE_DEBUG,
930 "Read processor read %d bytes\n", 932 "Read processor read %d bytes\n",
931 read_size); 933 read_size);
932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 934 LOG (GNUNET_ERROR_TYPE_DEBUG,
933 "Read processor completed successfully\n"); 935 "Read processor completed successfully\n");
934 936
935 /* Free the read handle */ 937 /* Free the read handle */
936 GNUNET_free (socket->read_handle); 938 GNUNET_free (socket->read_handle);
@@ -941,20 +943,20 @@ call_read_processor (void *cls,
941 943
942 /* Determine upto which packet we can remove from the buffer */ 944 /* Determine upto which packet we can remove from the buffer */
943 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 945 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
944 { 946 {
945 if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) 947 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
946 { packet++; break; } 948 { packet++; break; }
947 if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) 949 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
948 break; 950 break;
949 } 951 }
950 952
951 /* If no packets can be removed we can't move the buffer */ 953 /* If no packets can be removed we can't move the buffer */
952 if (0 == packet) return; 954 if (0 == packet) return;
953 955
954 sequence_increase = packet; 956 sequence_increase = packet;
955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 957 LOG (GNUNET_ERROR_TYPE_DEBUG,
956 "Sequence increase after read processor completion: %u\n", 958 "Sequence increase after read processor completion: %u\n",
957 sequence_increase); 959 sequence_increase);
958 960
959 /* Shift the data in the receive buffer */ 961 /* Shift the data in the receive buffer */
960 memmove (socket->receive_buffer, 962 memmove (socket->receive_buffer,
@@ -979,16 +981,16 @@ call_read_processor (void *cls,
979 981
980 /* Fix relative boundaries */ 982 /* Fix relative boundaries */
981 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 983 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
984 {
985 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
982 { 986 {
983 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) 987 socket->receive_buffer_boundaries[packet] =
984 { 988 socket->receive_buffer_boundaries[packet + sequence_increase]
985 socket->receive_buffer_boundaries[packet] = 989 - offset_increase;
986 socket->receive_buffer_boundaries[packet + sequence_increase]
987 - offset_increase;
988 }
989 else
990 socket->receive_buffer_boundaries[packet] = 0;
991 } 990 }
991 else
992 socket->receive_buffer_boundaries[packet] = 0;
993 }
992} 994}
993 995
994 996
@@ -1000,7 +1002,7 @@ call_read_processor (void *cls,
1000 */ 1002 */
1001static void 1003static void
1002read_io_timeout (void *cls, 1004read_io_timeout (void *cls,
1003 const struct GNUNET_SCHEDULER_TaskContext *tc) 1005 const struct GNUNET_SCHEDULER_TaskContext *tc)
1004{ 1006{
1005 struct GNUNET_STREAM_Socket *socket = cls; 1007 struct GNUNET_STREAM_Socket *socket = cls;
1006 GNUNET_STREAM_DataProcessor proc; 1008 GNUNET_STREAM_DataProcessor proc;
@@ -1009,8 +1011,8 @@ read_io_timeout (void *cls,
1009 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 1011 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1010 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 1012 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1011 { 1013 {
1012 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1014 LOG (GNUNET_ERROR_TYPE_DEBUG,
1013 "Read task timedout - Cancelling it\n"); 1015 "Read task timedout - Cancelling it\n");
1014 GNUNET_SCHEDULER_cancel (socket->read_task_id); 1016 GNUNET_SCHEDULER_cancel (socket->read_task_id);
1015 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 1017 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1016 } 1018 }
@@ -1054,138 +1056,138 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1054 1056
1055 size = htons (msg->header.header.size); 1057 size = htons (msg->header.header.size);
1056 if (size < sizeof (struct GNUNET_STREAM_DataMessage)) 1058 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1057 { 1059 {
1058 GNUNET_break_op (0); 1060 GNUNET_break_op (0);
1059 return GNUNET_SYSERR; 1061 return GNUNET_SYSERR;
1060 } 1062 }
1061 1063
1062 if (GNUNET_PEER_search (sender) != socket->other_peer) 1064 if (GNUNET_PEER_search (sender) != socket->other_peer)
1065 {
1066 LOG (GNUNET_ERROR_TYPE_DEBUG,
1067 "Received DATA from non-confirming peer\n");
1068 return GNUNET_YES;
1069 }
1070
1071 switch (socket->state)
1072 {
1073 case STATE_ESTABLISHED:
1074 case STATE_TRANSMIT_CLOSED:
1075 case STATE_TRANSMIT_CLOSE_WAIT:
1076
1077 /* check if the message's sequence number is in the range we are
1078 expecting */
1079 relative_sequence_number =
1080 ntohl (msg->sequence_number) - socket->read_sequence_number;
1081 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1082 {
1083 LOG (GNUNET_ERROR_TYPE_DEBUG,
1084 "Ignoring received message with sequence number %u\n",
1085 ntohl (msg->sequence_number));
1086 /* Start ACK sending task if one is not already present */
1087 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1088 {
1089 socket->ack_task_id =
1090 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1091 (msg->ack_deadline),
1092 &ack_task,
1093 socket);
1094 }
1095 return GNUNET_YES;
1096 }
1097
1098 /* Check if we have already seen this message */
1099 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1100 relative_sequence_number))
1063 { 1101 {
1064 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1065 "Received DATA from non-confirming peer\n"); 1103 "Ignoring already received message with sequence "
1104 "number %u\n",
1105 ntohl (msg->sequence_number));
1106 /* Start ACK sending task if one is not already present */
1107 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1108 {
1109 socket->ack_task_id =
1110 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1111 (msg->ack_deadline),
1112 &ack_task,
1113 socket);
1114 }
1066 return GNUNET_YES; 1115 return GNUNET_YES;
1067 } 1116 }
1068 1117
1069 switch (socket->state) 1118 LOG (GNUNET_ERROR_TYPE_DEBUG,
1119 "Receiving DATA with sequence number: %u and size: %d from %x\n",
1120 ntohl (msg->sequence_number),
1121 ntohs (msg->header.header.size),
1122 socket->other_peer);
1123
1124 /* Check if we have to allocate the buffer */
1125 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1126 relative_offset = ntohl (msg->offset) - socket->read_offset;
1127 bytes_needed = relative_offset + size;
1128 if (bytes_needed > socket->receive_buffer_size)
1070 { 1129 {
1071 case STATE_ESTABLISHED: 1130 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1072 case STATE_TRANSMIT_CLOSED: 1131 {
1073 case STATE_TRANSMIT_CLOSE_WAIT: 1132 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1074 1133 bytes_needed);
1075 /* check if the message's sequence number is in the range we are 1134 socket->receive_buffer_size = bytes_needed;
1076 expecting */ 1135 }
1077 relative_sequence_number = 1136 else
1078 ntohl (msg->sequence_number) - socket->read_sequence_number; 1137 {
1079 if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) 1138 LOG (GNUNET_ERROR_TYPE_DEBUG,
1080 { 1139 "Cannot accommodate packet %d as buffer is full\n",
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1140 ntohl (msg->sequence_number));
1082 "Ignoring received message with sequence number %u\n", 1141 return GNUNET_YES;
1083 ntohl (msg->sequence_number)); 1142 }
1084 /* Start ACK sending task if one is not already present */ 1143 }
1085 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1086 {
1087 socket->ack_task_id =
1088 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1089 (msg->ack_deadline),
1090 &ack_task,
1091 socket);
1092 }
1093 return GNUNET_YES;
1094 }
1095
1096 /* Check if we have already seen this message */
1097 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1098 relative_sequence_number))
1099 {
1100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101 "Ignoring already received message with sequence "
1102 "number %u\n",
1103 ntohl (msg->sequence_number));
1104 /* Start ACK sending task if one is not already present */
1105 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1106 {
1107 socket->ack_task_id =
1108 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1109 (msg->ack_deadline),
1110 &ack_task,
1111 socket);
1112 }
1113 return GNUNET_YES;
1114 }
1115
1116 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1117 "Receiving DATA with sequence number: %u and size: %d from %x\n",
1118 ntohl (msg->sequence_number),
1119 ntohs (msg->header.header.size),
1120 socket->other_peer);
1121
1122 /* Check if we have to allocate the buffer */
1123 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1124 relative_offset = ntohl (msg->offset) - socket->read_offset;
1125 bytes_needed = relative_offset + size;
1126 if (bytes_needed > socket->receive_buffer_size)
1127 {
1128 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1129 {
1130 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1131 bytes_needed);
1132 socket->receive_buffer_size = bytes_needed;
1133 }
1134 else
1135 {
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "Cannot accommodate packet %d as buffer is full\n",
1138 ntohl (msg->sequence_number));
1139 return GNUNET_YES;
1140 }
1141 }
1142 1144
1143 /* Copy Data to buffer */ 1145 /* Copy Data to buffer */
1144 payload = &msg[1]; 1146 payload = &msg[1];
1145 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); 1147 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1146 memcpy (socket->receive_buffer + relative_offset, 1148 memcpy (socket->receive_buffer + relative_offset,
1147 payload, 1149 payload,
1148 size); 1150 size);
1149 socket->receive_buffer_boundaries[relative_sequence_number] = 1151 socket->receive_buffer_boundaries[relative_sequence_number] =
1150 relative_offset + size; 1152 relative_offset + size;
1151 1153
1152 /* Modify the ACK bitmap */ 1154 /* Modify the ACK bitmap */
1153 ackbitmap_modify_bit (&socket->ack_bitmap, 1155 ackbitmap_modify_bit (&socket->ack_bitmap,
1154 relative_sequence_number, 1156 relative_sequence_number,
1155 GNUNET_YES); 1157 GNUNET_YES);
1156 1158
1157 /* Start ACK sending task if one is not already present */ 1159 /* Start ACK sending task if one is not already present */
1158 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) 1160 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1159 { 1161 {
1160 socket->ack_task_id = 1162 socket->ack_task_id =
1161 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 1163 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1162 (msg->ack_deadline), 1164 (msg->ack_deadline),
1163 &ack_task, 1165 &ack_task,
1164 socket);
1165 }
1166
1167 if ((NULL != socket->read_handle) /* A read handle is waiting */
1168 /* There is no current read task */
1169 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1170 /* We have the first packet */
1171 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1172 0)))
1173 {
1174 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175 "Scheduling read processor\n");
1176
1177 socket->read_task_id =
1178 GNUNET_SCHEDULER_add_now (&call_read_processor,
1179 socket); 1166 socket);
1180 } 1167 }
1181
1182 break;
1183 1168
1184 default: 1169 if ((NULL != socket->read_handle) /* A read handle is waiting */
1185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1170 /* There is no current read task */
1186 "Received data message when it cannot be handled\n"); 1171 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1187 break; 1172 /* We have the first packet */
1173 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
1174 0)))
1175 {
1176 LOG (GNUNET_ERROR_TYPE_DEBUG,
1177 "Scheduling read processor\n");
1178
1179 socket->read_task_id =
1180 GNUNET_SCHEDULER_add_now (&call_read_processor,
1181 socket);
1188 } 1182 }
1183
1184 break;
1185
1186 default:
1187 LOG (GNUNET_ERROR_TYPE_DEBUG,
1188 "Received data message when it cannot be handled\n");
1189 break;
1190 }
1189 return GNUNET_YES; 1191 return GNUNET_YES;
1190} 1192}
1191 1193
@@ -1204,11 +1206,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1204 */ 1206 */
1205static int 1207static int
1206client_handle_data (void *cls, 1208client_handle_data (void *cls,
1207 struct GNUNET_MESH_Tunnel *tunnel, 1209 struct GNUNET_MESH_Tunnel *tunnel,
1208 void **tunnel_ctx, 1210 void **tunnel_ctx,
1209 const struct GNUNET_PeerIdentity *sender, 1211 const struct GNUNET_PeerIdentity *sender,
1210 const struct GNUNET_MessageHeader *message, 1212 const struct GNUNET_MessageHeader *message,
1211 const struct GNUNET_ATS_Information*atsi) 1213 const struct GNUNET_ATS_Information*atsi)
1212{ 1214{
1213 struct GNUNET_STREAM_Socket *socket = cls; 1215 struct GNUNET_STREAM_Socket *socket = cls;
1214 1216
@@ -1232,28 +1234,28 @@ set_state_established (void *cls,
1232{ 1234{
1233 struct GNUNET_PeerIdentity initiator_pid; 1235 struct GNUNET_PeerIdentity initiator_pid;
1234 1236
1235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1237 LOG (GNUNET_ERROR_TYPE_DEBUG,
1236 "Attaining ESTABLISHED state\n"); 1238 "Attaining ESTABLISHED state\n");
1237 socket->write_offset = 0; 1239 socket->write_offset = 0;
1238 socket->read_offset = 0; 1240 socket->read_offset = 0;
1239 socket->state = STATE_ESTABLISHED; 1241 socket->state = STATE_ESTABLISHED;
1240 /* FIXME: What if listen_cb is NULL */ 1242 /* FIXME: What if listen_cb is NULL */
1241 if (NULL != socket->lsocket) 1243 if (NULL != socket->lsocket)
1244 {
1245 GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
1246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1247 "Calling listen callback\n");
1248 if (GNUNET_SYSERR ==
1249 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1250 socket,
1251 &initiator_pid))
1242 { 1252 {
1243 GNUNET_PEER_resolve (socket->other_peer, &initiator_pid); 1253 socket->state = STATE_CLOSED;
1244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1254 /* FIXME: We should close in a decent way */
1245 "Calling listen callback\n"); 1255 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1246 if (GNUNET_SYSERR == 1256 GNUNET_free (socket);
1247 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1248 socket,
1249 &initiator_pid))
1250 {
1251 socket->state = STATE_CLOSED;
1252 /* FIXME: We should close in a decent way */
1253 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1254 GNUNET_free (socket);
1255 }
1256 } 1257 }
1258 }
1257 else if (socket->open_cb) 1259 else if (socket->open_cb)
1258 socket->open_cb (socket->open_cls, socket); 1260 socket->open_cb (socket->open_cls, socket);
1259} 1261}
@@ -1270,8 +1272,8 @@ set_state_hello_wait (void *cls,
1270 struct GNUNET_STREAM_Socket *socket) 1272 struct GNUNET_STREAM_Socket *socket)
1271{ 1273{
1272 GNUNET_assert (STATE_INIT == socket->state); 1274 GNUNET_assert (STATE_INIT == socket->state);
1273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1275 LOG (GNUNET_ERROR_TYPE_DEBUG,
1274 "Attaining HELLO_WAIT state\n"); 1276 "Attaining HELLO_WAIT state\n");
1275 socket->state = STATE_HELLO_WAIT; 1277 socket->state = STATE_HELLO_WAIT;
1276} 1278}
1277 1279
@@ -1286,8 +1288,8 @@ static void
1286set_state_close_wait (void *cls, 1288set_state_close_wait (void *cls,
1287 struct GNUNET_STREAM_Socket *socket) 1289 struct GNUNET_STREAM_Socket *socket)
1288{ 1290{
1289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1291 LOG (GNUNET_ERROR_TYPE_DEBUG,
1290 "Attaing CLOSE_WAIT state\n"); 1292 "Attaing CLOSE_WAIT state\n");
1291 socket->state = STATE_CLOSE_WAIT; 1293 socket->state = STATE_CLOSE_WAIT;
1292 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ 1294 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1293 socket->receive_buffer = NULL; 1295 socket->receive_buffer = NULL;
@@ -1305,8 +1307,8 @@ static void
1305set_state_receive_close_wait (void *cls, 1307set_state_receive_close_wait (void *cls,
1306 struct GNUNET_STREAM_Socket *socket) 1308 struct GNUNET_STREAM_Socket *socket)
1307{ 1309{
1308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1310 LOG (GNUNET_ERROR_TYPE_DEBUG,
1309 "Attaing RECEIVE_CLOSE_WAIT state\n"); 1311 "Attaing RECEIVE_CLOSE_WAIT state\n");
1310 socket->state = STATE_RECEIVE_CLOSE_WAIT; 1312 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1311 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ 1313 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1312 socket->receive_buffer = NULL; 1314 socket->receive_buffer = NULL;
@@ -1324,8 +1326,8 @@ static void
1324set_state_transmit_close_wait (void *cls, 1326set_state_transmit_close_wait (void *cls,
1325 struct GNUNET_STREAM_Socket *socket) 1327 struct GNUNET_STREAM_Socket *socket)
1326{ 1328{
1327 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1329 LOG (GNUNET_ERROR_TYPE_DEBUG,
1328 "Attaining TRANSMIT_CLOSE_WAIT state\n"); 1330 "Attaining TRANSMIT_CLOSE_WAIT state\n");
1329 socket->state = STATE_TRANSMIT_CLOSE_WAIT; 1331 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1330} 1332}
1331 1333
@@ -1358,9 +1360,9 @@ generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
1358 /* Get the random sequence number */ 1360 /* Get the random sequence number */
1359 socket->write_sequence_number = 1361 socket->write_sequence_number =
1360 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); 1362 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1361 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1363 LOG (GNUNET_ERROR_TYPE_DEBUG,
1362 "Generated write sequence number %u\n", 1364 "Generated write sequence number %u\n",
1363 (unsigned int) socket->write_sequence_number); 1365 (unsigned int) socket->write_sequence_number);
1364 1366
1365 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); 1367 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1366 msg->header.header.size = 1368 msg->header.header.size =
@@ -1398,24 +1400,24 @@ client_handle_hello_ack (void *cls,
1398 struct GNUNET_STREAM_HelloAckMessage *reply; 1400 struct GNUNET_STREAM_HelloAckMessage *reply;
1399 1401
1400 if (GNUNET_PEER_search (sender) != socket->other_peer) 1402 if (GNUNET_PEER_search (sender) != socket->other_peer)
1401 { 1403 {
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1404 LOG (GNUNET_ERROR_TYPE_DEBUG,
1403 "Received HELLO_ACK from non-confirming peer\n"); 1405 "Received HELLO_ACK from non-confirming peer\n");
1404 return GNUNET_YES; 1406 return GNUNET_YES;
1405 } 1407 }
1406 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; 1408 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1409 LOG (GNUNET_ERROR_TYPE_DEBUG,
1408 "Received HELLO_ACK from %x\n", 1410 "Received HELLO_ACK from %x\n",
1409 socket->other_peer); 1411 socket->other_peer);
1410 1412
1411 GNUNET_assert (socket->tunnel == tunnel); 1413 GNUNET_assert (socket->tunnel == tunnel);
1412 switch (socket->state) 1414 switch (socket->state)
1413 { 1415 {
1414 case STATE_HELLO_WAIT: 1416 case STATE_HELLO_WAIT:
1415 socket->read_sequence_number = ntohl (ack_msg->sequence_number); 1417 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1416 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1418 LOG (GNUNET_ERROR_TYPE_DEBUG,
1417 "Read sequence number %u\n", 1419 "Read sequence number %u\n",
1418 (unsigned int) socket->read_sequence_number); 1420 (unsigned int) socket->read_sequence_number);
1419 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); 1421 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1420 reply = generate_hello_ack_msg (socket); 1422 reply = generate_hello_ack_msg (socket);
1421 queue_message (socket, 1423 queue_message (socket,
@@ -1429,10 +1431,10 @@ client_handle_hello_ack (void *cls,
1429 return GNUNET_OK; 1431 return GNUNET_OK;
1430 case STATE_INIT: 1432 case STATE_INIT:
1431 default: 1433 default:
1432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1434 LOG (GNUNET_ERROR_TYPE_DEBUG,
1433 "Server %x sent HELLO_ACK when in state %d\n", 1435 "Server %x sent HELLO_ACK when in state %d\n",
1434 socket->other_peer, 1436 socket->other_peer,
1435 socket->state); 1437 socket->state);
1436 socket->state = STATE_CLOSED; // introduce STATE_ERROR? 1438 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1437 return GNUNET_SYSERR; 1439 return GNUNET_SYSERR;
1438 } 1440 }
@@ -1487,22 +1489,22 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1487 struct GNUNET_STREAM_MessageHeader *reply; 1489 struct GNUNET_STREAM_MessageHeader *reply;
1488 1490
1489 switch (socket->state) 1491 switch (socket->state)
1490 { 1492 {
1491 case STATE_ESTABLISHED: 1493 case STATE_ESTABLISHED:
1492 socket->state = STATE_RECEIVE_CLOSED; 1494 socket->state = STATE_RECEIVE_CLOSED;
1493 1495
1494 /* Send TRANSMIT_CLOSE_ACK */ 1496 /* Send TRANSMIT_CLOSE_ACK */
1495 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1497 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1496 reply->header.type = 1498 reply->header.type =
1497 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); 1499 htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1498 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1500 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1499 queue_message (socket, reply, NULL, NULL); 1501 queue_message (socket, reply, NULL, NULL);
1500 break; 1502 break;
1501 1503
1502 default: 1504 default:
1503 /* FIXME: Call statistics? */ 1505 /* FIXME: Call statistics? */
1504 break; 1506 break;
1505 } 1507 }
1506 return GNUNET_YES; 1508 return GNUNET_YES;
1507} 1509}
1508 1510
@@ -1561,86 +1563,86 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1561 1563
1562 shutdown_handle = socket->shutdown_handle; 1564 shutdown_handle = socket->shutdown_handle;
1563 if (NULL == shutdown_handle) 1565 if (NULL == shutdown_handle)
1566 {
1567 LOG (GNUNET_ERROR_TYPE_DEBUG,
1568 "Received *CLOSE_ACK when shutdown handle is NULL\n");
1569 return GNUNET_OK;
1570 }
1571
1572 switch (operation)
1573 {
1574 case SHUT_RDWR:
1575 switch (socket->state)
1564 { 1576 {
1565 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1577 case STATE_CLOSE_WAIT:
1566 "Received *CLOSE_ACK when shutdown handle is NULL\n"); 1578 if (SHUT_RDWR != shutdown_handle->operation)
1579 {
1580 LOG (GNUNET_ERROR_TYPE_DEBUG,
1581 "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
1582 return GNUNET_OK;
1583 }
1584
1585 LOG (GNUNET_ERROR_TYPE_DEBUG,
1586 "Received CLOSE_ACK from %x\n",
1587 socket->other_peer);
1588 socket->state = STATE_CLOSED;
1589 break;
1590 default:
1591 LOG (GNUNET_ERROR_TYPE_DEBUG,
1592 "Received CLOSE_ACK when in it not expected\n");
1567 return GNUNET_OK; 1593 return GNUNET_OK;
1568 } 1594 }
1595 break;
1569 1596
1570 switch (operation) 1597 case SHUT_RD:
1598 switch (socket->state)
1571 { 1599 {
1572 case SHUT_RDWR: 1600 case STATE_RECEIVE_CLOSE_WAIT:
1573 switch (socket->state) 1601 if (SHUT_RD != shutdown_handle->operation)
1574 { 1602 {
1575 case STATE_CLOSE_WAIT: 1603 LOG (GNUNET_ERROR_TYPE_DEBUG,
1576 if (SHUT_RDWR != shutdown_handle->operation) 1604 "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
1577 { 1605 return GNUNET_OK;
1578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1606 }
1579 "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n"); 1607
1580 return GNUNET_OK; 1608 LOG (GNUNET_ERROR_TYPE_DEBUG,
1581 } 1609 "Received RECEIVE_CLOSE_ACK from %x\n",
1582 1610 socket->other_peer);
1583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1611 socket->state = STATE_RECEIVE_CLOSED;
1584 "Received CLOSE_ACK from %x\n",
1585 socket->other_peer);
1586 socket->state = STATE_CLOSED;
1587 break;
1588 default:
1589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590 "Received CLOSE_ACK when in it not expected\n");
1591 return GNUNET_OK;
1592 }
1593 break; 1612 break;
1613 default:
1614 LOG (GNUNET_ERROR_TYPE_DEBUG,
1615 "Received RECEIVE_CLOSE_ACK when in it not expected\n");
1616 return GNUNET_OK;
1617 }
1594 1618
1595 case SHUT_RD: 1619 break;
1596 switch (socket->state) 1620 case SHUT_WR:
1597 { 1621 switch (socket->state)
1598 case STATE_RECEIVE_CLOSE_WAIT: 1622 {
1599 if (SHUT_RD != shutdown_handle->operation) 1623 case STATE_TRANSMIT_CLOSE_WAIT:
1600 { 1624 if (SHUT_WR != shutdown_handle->operation)
1601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1625 {
1602 "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n"); 1626 LOG (GNUNET_ERROR_TYPE_DEBUG,
1603 return GNUNET_OK; 1627 "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
1604 } 1628 return GNUNET_OK;
1605 1629 }
1606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1607 "Received RECEIVE_CLOSE_ACK from %x\n",
1608 socket->other_peer);
1609 socket->state = STATE_RECEIVE_CLOSED;
1610 break;
1611 default:
1612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613 "Received RECEIVE_CLOSE_ACK when in it not expected\n");
1614 return GNUNET_OK;
1615 }
1616 1630
1617 break; 1631 LOG (GNUNET_ERROR_TYPE_DEBUG,
1618 case SHUT_WR: 1632 "Received TRANSMIT_CLOSE_ACK from %x\n",
1619 switch (socket->state) 1633 socket->other_peer);
1620 { 1634 socket->state = STATE_TRANSMIT_CLOSED;
1621 case STATE_TRANSMIT_CLOSE_WAIT:
1622 if (SHUT_WR != shutdown_handle->operation)
1623 {
1624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625 "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
1626 return GNUNET_OK;
1627 }
1628
1629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1630 "Received TRANSMIT_CLOSE_ACK from %x\n",
1631 socket->other_peer);
1632 socket->state = STATE_TRANSMIT_CLOSED;
1633 break;
1634 default:
1635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636 "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
1637
1638 return GNUNET_OK;
1639 }
1640 break; 1635 break;
1641 default: 1636 default:
1642 GNUNET_assert (0); 1637 LOG (GNUNET_ERROR_TYPE_DEBUG,
1638 "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
1639
1640 return GNUNET_OK;
1643 } 1641 }
1642 break;
1643 default:
1644 GNUNET_assert (0);
1645 }
1644 1646
1645 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ 1647 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1646 shutdown_handle->completion_cb(shutdown_handle->completion_cls, 1648 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
@@ -1649,12 +1651,12 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1649 socket->shutdown_handle = NULL; 1651 socket->shutdown_handle = NULL;
1650 if (GNUNET_SCHEDULER_NO_TASK 1652 if (GNUNET_SCHEDULER_NO_TASK
1651 != shutdown_handle->close_msg_retransmission_task_id) 1653 != shutdown_handle->close_msg_retransmission_task_id)
1652 { 1654 {
1653 GNUNET_SCHEDULER_cancel 1655 GNUNET_SCHEDULER_cancel
1654 (shutdown_handle->close_msg_retransmission_task_id); 1656 (shutdown_handle->close_msg_retransmission_task_id);
1655 shutdown_handle->close_msg_retransmission_task_id = 1657 shutdown_handle->close_msg_retransmission_task_id =
1656 GNUNET_SCHEDULER_NO_TASK; 1658 GNUNET_SCHEDULER_NO_TASK;
1657 } 1659 }
1658 return GNUNET_OK; 1660 return GNUNET_OK;
1659} 1661}
1660 1662
@@ -1712,20 +1714,20 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1712 struct GNUNET_STREAM_MessageHeader *receive_close_ack; 1714 struct GNUNET_STREAM_MessageHeader *receive_close_ack;
1713 1715
1714 switch (socket->state) 1716 switch (socket->state)
1715 { 1717 {
1716 case STATE_INIT: 1718 case STATE_INIT:
1717 case STATE_LISTEN: 1719 case STATE_LISTEN:
1718 case STATE_HELLO_WAIT: 1720 case STATE_HELLO_WAIT:
1719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1721 LOG (GNUNET_ERROR_TYPE_DEBUG,
1720 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); 1722 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1721 return GNUNET_OK; 1723 return GNUNET_OK;
1722 default: 1724 default:
1723 break; 1725 break;
1724 } 1726 }
1725 1727
1726 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1728 LOG (GNUNET_ERROR_TYPE_DEBUG,
1727 "Received RECEIVE_CLOSE from %x\n", 1729 "Received RECEIVE_CLOSE from %x\n",
1728 socket->other_peer); 1730 socket->other_peer);
1729 receive_close_ack = 1731 receive_close_ack =
1730 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1732 GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1731 receive_close_ack->header.size = 1733 receive_close_ack->header.size =
@@ -1738,8 +1740,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1738 NULL); 1740 NULL);
1739 1741
1740 /* FIXME: Handle the case where write handle is present; the write operation 1742 /* FIXME: Handle the case where write handle is present; the write operation
1741 should be deemed as finised and the write continuation callback 1743 should be deemed as finised and the write continuation callback
1742 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ 1744 has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
1743 return GNUNET_OK; 1745 return GNUNET_OK;
1744} 1746}
1745 1747
@@ -1828,20 +1830,20 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
1828 struct GNUNET_STREAM_MessageHeader *close_ack; 1830 struct GNUNET_STREAM_MessageHeader *close_ack;
1829 1831
1830 switch (socket->state) 1832 switch (socket->state)
1831 { 1833 {
1832 case STATE_INIT: 1834 case STATE_INIT:
1833 case STATE_LISTEN: 1835 case STATE_LISTEN:
1834 case STATE_HELLO_WAIT: 1836 case STATE_HELLO_WAIT:
1835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1837 LOG (GNUNET_ERROR_TYPE_DEBUG,
1836 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); 1838 "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
1837 return GNUNET_OK; 1839 return GNUNET_OK;
1838 default: 1840 default:
1839 break; 1841 break;
1840 } 1842 }
1841 1843
1842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1844 LOG (GNUNET_ERROR_TYPE_DEBUG,
1843 "Received CLOSE from %x\n", 1845 "Received CLOSE from %x\n",
1844 socket->other_peer); 1846 socket->other_peer);
1845 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1847 close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1846 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1848 close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1847 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); 1849 close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
@@ -1978,34 +1980,34 @@ server_handle_hello (void *cls,
1978 struct GNUNET_STREAM_HelloAckMessage *reply; 1980 struct GNUNET_STREAM_HelloAckMessage *reply;
1979 1981
1980 if (GNUNET_PEER_search (sender) != socket->other_peer) 1982 if (GNUNET_PEER_search (sender) != socket->other_peer)
1981 { 1983 {
1982 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1984 LOG (GNUNET_ERROR_TYPE_DEBUG,
1983 "Received HELLO from non-confirming peer\n"); 1985 "Received HELLO from non-confirming peer\n");
1984 return GNUNET_YES; 1986 return GNUNET_YES;
1985 } 1987 }
1986 1988
1987 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 1989 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
1988 ntohs (message->type)); 1990 ntohs (message->type));
1989 GNUNET_assert (socket->tunnel == tunnel); 1991 GNUNET_assert (socket->tunnel == tunnel);
1990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1992 LOG (GNUNET_ERROR_TYPE_DEBUG,
1991 "Received HELLO from %x\n", 1993 "Received HELLO from %x\n",
1992 socket->other_peer); 1994 socket->other_peer);
1993 1995
1994 if (STATE_INIT == socket->state) 1996 if (STATE_INIT == socket->state)
1995 { 1997 {
1996 reply = generate_hello_ack_msg (socket); 1998 reply = generate_hello_ack_msg (socket);
1997 queue_message (socket, 1999 queue_message (socket,
1998 &reply->header, 2000 &reply->header,
1999 &set_state_hello_wait, 2001 &set_state_hello_wait,
2000 NULL); 2002 NULL);
2001 } 2003 }
2002 else 2004 else
2003 { 2005 {
2004 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2006 LOG (GNUNET_ERROR_TYPE_DEBUG,
2005 "Client sent HELLO when in state %d\n", socket->state); 2007 "Client sent HELLO when in state %d\n", socket->state);
2006 /* FIXME: Send RESET? */ 2008 /* FIXME: Send RESET? */
2007 2009
2008 } 2010 }
2009 return GNUNET_OK; 2011 return GNUNET_OK;
2010} 2012}
2011 2013
@@ -2038,26 +2040,26 @@ server_handle_hello_ack (void *cls,
2038 GNUNET_assert (socket->tunnel == tunnel); 2040 GNUNET_assert (socket->tunnel == tunnel);
2039 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; 2041 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2040 if (STATE_HELLO_WAIT == socket->state) 2042 if (STATE_HELLO_WAIT == socket->state)
2041 { 2043 {
2042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2044 LOG (GNUNET_ERROR_TYPE_DEBUG,
2043 "Received HELLO_ACK from %x\n", 2045 "Received HELLO_ACK from %x\n",
2044 socket->other_peer); 2046 socket->other_peer);
2045 socket->read_sequence_number = ntohl (ack_message->sequence_number); 2047 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2048 LOG (GNUNET_ERROR_TYPE_DEBUG,
2047 "Read sequence number %u\n", 2049 "Read sequence number %u\n",
2048 (unsigned int) socket->read_sequence_number); 2050 (unsigned int) socket->read_sequence_number);
2049 socket->receiver_window_available = 2051 socket->receiver_window_available =
2050 ntohl (ack_message->receiver_window_size); 2052 ntohl (ack_message->receiver_window_size);
2051 /* Attain ESTABLISHED state */ 2053 /* Attain ESTABLISHED state */
2052 set_state_established (NULL, socket); 2054 set_state_established (NULL, socket);
2053 } 2055 }
2054 else 2056 else
2055 { 2057 {
2056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2058 LOG (GNUNET_ERROR_TYPE_DEBUG,
2057 "Client sent HELLO_ACK when in state %d\n", socket->state); 2059 "Client sent HELLO_ACK when in state %d\n", socket->state);
2058 /* FIXME: Send RESET? */ 2060 /* FIXME: Send RESET? */
2059 2061
2060 } 2062 }
2061 return GNUNET_OK; 2063 return GNUNET_OK;
2062} 2064}
2063 2065
@@ -2299,113 +2301,113 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2299 2301
2300 2302
2301 if (GNUNET_PEER_search (sender) != socket->other_peer) 2303 if (GNUNET_PEER_search (sender) != socket->other_peer)
2302 { 2304 {
2303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2305 LOG (GNUNET_ERROR_TYPE_DEBUG,
2304 "Received ACK from non-confirming peer\n"); 2306 "Received ACK from non-confirming peer\n");
2305 return GNUNET_YES; 2307 return GNUNET_YES;
2306 } 2308 }
2307 2309
2308 switch (socket->state) 2310 switch (socket->state)
2311 {
2312 case (STATE_ESTABLISHED):
2313 case (STATE_RECEIVE_CLOSED):
2314 case (STATE_RECEIVE_CLOSE_WAIT):
2315 if (NULL == socket->write_handle)
2316 {
2317 LOG (GNUNET_ERROR_TYPE_DEBUG,
2318 "Received DATA_ACK when write_handle is NULL\n");
2319 return GNUNET_OK;
2320 }
2321 /* FIXME: increment in the base sequence number is breaking current flow
2322 */
2323 if (!((socket->write_sequence_number
2324 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2309 { 2325 {
2310 case (STATE_ESTABLISHED): 2326 LOG (GNUNET_ERROR_TYPE_DEBUG,
2311 case (STATE_RECEIVE_CLOSED): 2327 "Received DATA_ACK with unexpected base sequence number\n");
2312 case (STATE_RECEIVE_CLOSE_WAIT): 2328 LOG (GNUNET_ERROR_TYPE_DEBUG,
2313 if (NULL == socket->write_handle) 2329 "Current write sequence: %u; Ack's base sequence: %u\n",
2314 { 2330 socket->write_sequence_number,
2315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2331 ntohl (ack->base_sequence_number));
2316 "Received DATA_ACK when write_handle is NULL\n"); 2332 return GNUNET_OK;
2317 return GNUNET_OK; 2333 }
2318 } 2334 /* FIXME: include the case when write_handle is cancelled - ignore the
2319 /* FIXME: increment in the base sequence number is breaking current flow 2335 acks */
2320 */ 2336
2321 if (!((socket->write_sequence_number 2337 LOG (GNUNET_ERROR_TYPE_DEBUG,
2322 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) 2338 "Received DATA_ACK from %x\n",
2323 { 2339 socket->other_peer);
2324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325 "Received DATA_ACK with unexpected base sequence number\n");
2326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327 "Current write sequence: %u; Ack's base sequence: %u\n",
2328 socket->write_sequence_number,
2329 ntohl (ack->base_sequence_number));
2330 return GNUNET_OK;
2331 }
2332 /* FIXME: include the case when write_handle is cancelled - ignore the
2333 acks */
2334
2335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2336 "Received DATA_ACK from %x\n",
2337 socket->other_peer);
2338 2340
2339 /* Cancel the retransmission task */ 2341 /* Cancel the retransmission task */
2340 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 2342 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2341 { 2343 {
2342 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 2344 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2343 socket->retransmission_timeout_task_id = 2345 socket->retransmission_timeout_task_id =
2344 GNUNET_SCHEDULER_NO_TASK; 2346 GNUNET_SCHEDULER_NO_TASK;
2345 } 2347 }
2346 2348
2347 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 2349 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2348 { 2350 {
2349 if (NULL == socket->write_handle->messages[packet]) break; 2351 if (NULL == socket->write_handle->messages[packet]) break;
2350 if (ntohl (ack->base_sequence_number) 2352 if (ntohl (ack->base_sequence_number)
2351 >= ntohl (socket->write_handle->messages[packet]->sequence_number)) 2353 >= ntohl (socket->write_handle->messages[packet]->sequence_number))
2352 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, 2354 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2353 packet, 2355 packet,
2354 GNUNET_YES); 2356 GNUNET_YES);
2355 else 2357 else
2356 if (GNUNET_YES == 2358 if (GNUNET_YES ==
2357 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap, 2359 ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
2358 ntohl (socket->write_handle->messages[packet]->sequence_number) 2360 ntohl (socket->write_handle->messages[packet]->sequence_number)
2359 - ntohl (ack->base_sequence_number))) 2361 - ntohl (ack->base_sequence_number)))
2360 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, 2362 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2361 packet, 2363 packet,
2362 GNUNET_YES); 2364 GNUNET_YES);
2363 } 2365 }
2364 2366
2365 /* Update the receive window remaining 2367 /* Update the receive window remaining
2366 FIXME : Should update with the value from a data ack with greater 2368 FIXME : Should update with the value from a data ack with greater
2367 sequence number */ 2369 sequence number */
2368 socket->receiver_window_available = 2370 socket->receiver_window_available =
2369 ntohl (ack->receive_window_remaining); 2371 ntohl (ack->receive_window_remaining);
2370 2372
2371 /* Check if we have received all acknowledgements */ 2373 /* Check if we have received all acknowledgements */
2372 need_retransmission = GNUNET_NO; 2374 need_retransmission = GNUNET_NO;
2375 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2376 {
2377 if (NULL == socket->write_handle->messages[packet]) break;
2378 if (GNUNET_YES != ackbitmap_is_bit_set
2379 (&socket->write_handle->ack_bitmap,packet))
2380 {
2381 need_retransmission = GNUNET_YES;
2382 break;
2383 }
2384 }
2385 if (GNUNET_YES == need_retransmission)
2386 {
2387 write_data (socket);
2388 }
2389 else /* We have to call the write continuation callback now */
2390 {
2391 /* Free the packets */
2373 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 2392 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2374 { 2393 {
2375 if (NULL == socket->write_handle->messages[packet]) break; 2394 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2376 if (GNUNET_YES != ackbitmap_is_bit_set 2395 }
2377 (&socket->write_handle->ack_bitmap,packet)) 2396 if (NULL != socket->write_handle->write_cont)
2378 { 2397 socket->write_handle->write_cont
2379 need_retransmission = GNUNET_YES; 2398 (socket->write_handle->write_cont_cls,
2380 break; 2399 socket->status,
2381 } 2400 socket->write_handle->size);
2382 } 2401 LOG (GNUNET_ERROR_TYPE_DEBUG,
2383 if (GNUNET_YES == need_retransmission) 2402 "Write completion callback completed\n");
2384 { 2403 /* We are done with the write handle - Freeing it */
2385 write_data (socket); 2404 GNUNET_free (socket->write_handle);
2386 } 2405 socket->write_handle = NULL;
2387 else /* We have to call the write continuation callback now */
2388 {
2389 /* Free the packets */
2390 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2391 {
2392 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2393 }
2394 if (NULL != socket->write_handle->write_cont)
2395 socket->write_handle->write_cont
2396 (socket->write_handle->write_cont_cls,
2397 socket->status,
2398 socket->write_handle->size);
2399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400 "Write completion callback completed\n");
2401 /* We are done with the write handle - Freeing it */
2402 GNUNET_free (socket->write_handle);
2403 socket->write_handle = NULL;
2404 }
2405 break;
2406 default:
2407 break;
2408 } 2406 }
2407 break;
2408 default:
2409 break;
2410 }
2409 return GNUNET_OK; 2411 return GNUNET_OK;
2410} 2412}
2411 2413
@@ -2541,15 +2543,15 @@ mesh_peer_connect_callback (void *cls,
2541 connected_peer = GNUNET_PEER_search (peer); 2543 connected_peer = GNUNET_PEER_search (peer);
2542 2544
2543 if (connected_peer != socket->other_peer) 2545 if (connected_peer != socket->other_peer)
2544 { 2546 {
2545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2547 LOG (GNUNET_ERROR_TYPE_DEBUG,
2546 "A peer which is not our target has connected to our tunnel\n"); 2548 "A peer which is not our target has connected to our tunnel\n");
2547 return; 2549 return;
2548 } 2550 }
2549 2551
2550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2552 LOG (GNUNET_ERROR_TYPE_DEBUG,
2551 "Target peer %x connected\n", 2553 "Target peer %x connected\n",
2552 connected_peer); 2554 connected_peer);
2553 2555
2554 /* Set state to INIT */ 2556 /* Set state to INIT */
2555 socket->state = STATE_INIT; 2557 socket->state = STATE_INIT;
@@ -2565,10 +2567,10 @@ mesh_peer_connect_callback (void *cls,
2565 2567
2566 /* Call open callback */ 2568 /* Call open callback */
2567 if (NULL == socket->open_cb) 2569 if (NULL == socket->open_cb)
2568 { 2570 {
2569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2571 LOG (GNUNET_ERROR_TYPE_DEBUG,
2570 "STREAM_open callback is NULL\n"); 2572 "STREAM_open callback is NULL\n");
2571 } 2573 }
2572} 2574}
2573 2575
2574 2576
@@ -2585,9 +2587,9 @@ mesh_peer_disconnect_callback (void *cls,
2585 struct GNUNET_STREAM_Socket *socket=cls; 2587 struct GNUNET_STREAM_Socket *socket=cls;
2586 2588
2587 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ 2589 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2590 LOG (GNUNET_ERROR_TYPE_DEBUG,
2589 "Other peer %x disconnected\n", 2591 "Other peer %x disconnected\n",
2590 socket->other_peer); 2592 socket->other_peer);
2591} 2593}
2592 2594
2593 2595
@@ -2619,9 +2621,9 @@ new_tunnel_notify (void *cls,
2619 socket->session_id = 0; /* FIXME */ 2621 socket->session_id = 0; /* FIXME */
2620 socket->state = STATE_INIT; 2622 socket->state = STATE_INIT;
2621 socket->lsocket = lsocket; 2623 socket->lsocket = lsocket;
2622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2624 LOG (GNUNET_ERROR_TYPE_DEBUG,
2623 "Peer %x initiated tunnel to us\n", 2625 "Peer %x initiated tunnel to us\n",
2624 socket->other_peer); 2626 socket->other_peer);
2625 2627
2626 /* FIXME: Copy MESH handle from lsocket to socket */ 2628 /* FIXME: Copy MESH handle from lsocket to socket */
2627 2629
@@ -2652,36 +2654,36 @@ tunnel_cleaner (void *cls,
2652 return; 2654 return;
2653 2655
2654 GNUNET_break_op(0); 2656 GNUNET_break_op(0);
2655 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2657 LOG (GNUNET_ERROR_TYPE_DEBUG,
2656 "Peer %x has terminated connection abruptly\n", 2658 "Peer %x has terminated connection abruptly\n",
2657 socket->other_peer); 2659 socket->other_peer);
2658 2660
2659 socket->status = GNUNET_STREAM_SHUTDOWN; 2661 socket->status = GNUNET_STREAM_SHUTDOWN;
2660 2662
2661 /* Clear Transmit handles */ 2663 /* Clear Transmit handles */
2662 if (NULL != socket->transmit_handle) 2664 if (NULL != socket->transmit_handle)
2663 { 2665 {
2664 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); 2666 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2665 socket->transmit_handle = NULL; 2667 socket->transmit_handle = NULL;
2666 } 2668 }
2667 if (NULL != socket->ack_transmit_handle) 2669 if (NULL != socket->ack_transmit_handle)
2668 { 2670 {
2669 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); 2671 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2670 GNUNET_free (socket->ack_msg); 2672 GNUNET_free (socket->ack_msg);
2671 socket->ack_msg = NULL; 2673 socket->ack_msg = NULL;
2672 socket->ack_transmit_handle = NULL; 2674 socket->ack_transmit_handle = NULL;
2673 } 2675 }
2674 /* Stop Tasks using socket->tunnel */ 2676 /* Stop Tasks using socket->tunnel */
2675 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) 2677 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2676 { 2678 {
2677 GNUNET_SCHEDULER_cancel (socket->ack_task_id); 2679 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2678 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; 2680 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2679 } 2681 }
2680 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 2682 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
2681 { 2683 {
2682 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 2684 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
2683 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 2685 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
2684 } 2686 }
2685 /* FIXME: Cancel all other tasks using socket->tunnel */ 2687 /* FIXME: Cancel all other tasks using socket->tunnel */
2686 socket->tunnel = NULL; 2688 socket->tunnel = NULL;
2687} 2689}
@@ -2718,8 +2720,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2718 GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; 2720 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2719 va_list vargs; /* Variable arguments */ 2721 va_list vargs; /* Variable arguments */
2720 2722
2721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2723 LOG (GNUNET_ERROR_TYPE_DEBUG,
2722 "%s\n", __func__); 2724 "%s\n", __func__);
2723 2725
2724 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); 2726 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2725 socket->other_peer = GNUNET_PEER_intern (target); 2727 socket->other_peer = GNUNET_PEER_intern (target);
@@ -2733,15 +2735,15 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2733 do { 2735 do {
2734 option = va_arg (vargs, enum GNUNET_STREAM_Option); 2736 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2735 switch (option) 2737 switch (option)
2736 { 2738 {
2737 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: 2739 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2738 /* Expect struct GNUNET_TIME_Relative */ 2740 /* Expect struct GNUNET_TIME_Relative */
2739 socket->retransmit_timeout = va_arg (vargs, 2741 socket->retransmit_timeout = va_arg (vargs,
2740 struct GNUNET_TIME_Relative); 2742 struct GNUNET_TIME_Relative);
2741 break; 2743 break;
2742 case GNUNET_STREAM_OPTION_END: 2744 case GNUNET_STREAM_OPTION_END:
2743 break; 2745 break;
2744 } 2746 }
2745 } while (GNUNET_STREAM_OPTION_END != option); 2747 } while (GNUNET_STREAM_OPTION_END != option);
2746 va_end (vargs); /* End of variable args parsing */ 2748 va_end (vargs); /* End of variable args parsing */
2747 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ 2749 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
@@ -2752,14 +2754,14 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2752 client_message_handlers, 2754 client_message_handlers,
2753 ports); /* We don't get inbound tunnels */ 2755 ports); /* We don't get inbound tunnels */
2754 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ 2756 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
2755 { 2757 {
2756 GNUNET_free (socket); 2758 GNUNET_free (socket);
2757 return NULL; 2759 return NULL;
2758 } 2760 }
2759 2761
2760 /* Now create the mesh tunnel to target */ 2762 /* Now create the mesh tunnel to target */
2761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2763 LOG (GNUNET_ERROR_TYPE_DEBUG,
2762 "Creating MESH Tunnel\n"); 2764 "Creating MESH Tunnel\n");
2763 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, 2765 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2764 NULL, /* Tunnel context */ 2766 NULL, /* Tunnel context */
2765 &mesh_peer_connect_callback, 2767 &mesh_peer_connect_callback,
@@ -2769,8 +2771,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2769 GNUNET_MESH_peer_request_connect_add (socket->tunnel, 2771 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2770 target); 2772 target);
2771 2773
2772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2774 LOG (GNUNET_ERROR_TYPE_DEBUG,
2773 "%s() END\n", __func__); 2775 "%s() END\n", __func__);
2774 return socket; 2776 return socket;
2775} 2777}
2776 2778
@@ -2805,55 +2807,55 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
2805 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 2807 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
2806 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 2808 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2807 switch (operation) 2809 switch (operation)
2808 { 2810 {
2809 case SHUT_RD: 2811 case SHUT_RD:
2810 handle->operation = SHUT_RD; 2812 handle->operation = SHUT_RD;
2811 if (NULL != socket->read_handle) 2813 if (NULL != socket->read_handle)
2812 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2814 LOG (GNUNET_ERROR_TYPE_WARNING,
2813 "Existing read handle should be cancelled before shutting" 2815 "Existing read handle should be cancelled before shutting"
2814 " down reading\n"); 2816 " down reading\n");
2815 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); 2817 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
2816 queue_message (socket, 2818 queue_message (socket,
2817 msg, 2819 msg,
2818 &set_state_receive_close_wait, 2820 &set_state_receive_close_wait,
2819 NULL); 2821 NULL);
2820 break; 2822 break;
2821 case SHUT_WR: 2823 case SHUT_WR:
2822 handle->operation = SHUT_WR; 2824 handle->operation = SHUT_WR;
2823 if (NULL != socket->write_handle) 2825 if (NULL != socket->write_handle)
2824 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2826 LOG (GNUNET_ERROR_TYPE_WARNING,
2825 "Existing write handle should be cancelled before shutting" 2827 "Existing write handle should be cancelled before shutting"
2826 " down writing\n"); 2828 " down writing\n");
2827 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); 2829 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
2828 queue_message (socket, 2830 queue_message (socket,
2829 msg, 2831 msg,
2830 &set_state_transmit_close_wait, 2832 &set_state_transmit_close_wait,
2831 NULL); 2833 NULL);
2832 break; 2834 break;
2833 case SHUT_RDWR: 2835 case SHUT_RDWR:
2834 handle->operation = SHUT_RDWR; 2836 handle->operation = SHUT_RDWR;
2835 if (NULL != socket->write_handle) 2837 if (NULL != socket->write_handle)
2836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2838 LOG (GNUNET_ERROR_TYPE_WARNING,
2837 "Existing write handle should be cancelled before shutting" 2839 "Existing write handle should be cancelled before shutting"
2838 " down writing\n"); 2840 " down writing\n");
2839 if (NULL != socket->read_handle) 2841 if (NULL != socket->read_handle)
2840 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2842 LOG (GNUNET_ERROR_TYPE_WARNING,
2841 "Existing read handle should be cancelled before shutting" 2843 "Existing read handle should be cancelled before shutting"
2842 " down reading\n"); 2844 " down reading\n");
2843 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); 2845 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
2844 queue_message (socket, 2846 queue_message (socket,
2845 msg, 2847 msg,
2846 &set_state_close_wait, 2848 &set_state_close_wait,
2847 NULL); 2849 NULL);
2848 break; 2850 break;
2849 default: 2851 default:
2850 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2852 LOG (GNUNET_ERROR_TYPE_WARNING,
2851 "GNUNET_STREAM_shutdown called with invalid value for " 2853 "GNUNET_STREAM_shutdown called with invalid value for "
2852 "parameter operation -- Ignoring\n"); 2854 "parameter operation -- Ignoring\n");
2853 GNUNET_free (msg); 2855 GNUNET_free (msg);
2854 GNUNET_free (handle); 2856 GNUNET_free (handle);
2855 return NULL; 2857 return NULL;
2856 } 2858 }
2857 handle->close_msg_retransmission_task_id = 2859 handle->close_msg_retransmission_task_id =
2858 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, 2860 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2859 &close_msg_retransmission_task, 2861 &close_msg_retransmission_task,
@@ -2891,33 +2893,33 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2891 GNUNET_break (NULL == socket->write_handle); 2893 GNUNET_break (NULL == socket->write_handle);
2892 2894
2893 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 2895 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
2894 { 2896 {
2895 /* socket closed with read task pending!? */ 2897 /* socket closed with read task pending!? */
2896 GNUNET_break (0); 2898 GNUNET_break (0);
2897 GNUNET_SCHEDULER_cancel (socket->read_task_id); 2899 GNUNET_SCHEDULER_cancel (socket->read_task_id);
2898 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 2900 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
2899 } 2901 }
2900 2902
2901 /* Terminate the ack'ing tasks if they are still present */ 2903 /* Terminate the ack'ing tasks if they are still present */
2902 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) 2904 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
2903 { 2905 {
2904 GNUNET_SCHEDULER_cancel (socket->ack_task_id); 2906 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2905 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; 2907 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2906 } 2908 }
2907 2909
2908 /* Clear Transmit handles */ 2910 /* Clear Transmit handles */
2909 if (NULL != socket->transmit_handle) 2911 if (NULL != socket->transmit_handle)
2910 { 2912 {
2911 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); 2913 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2912 socket->transmit_handle = NULL; 2914 socket->transmit_handle = NULL;
2913 } 2915 }
2914 if (NULL != socket->ack_transmit_handle) 2916 if (NULL != socket->ack_transmit_handle)
2915 { 2917 {
2916 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); 2918 GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
2917 GNUNET_free (socket->ack_msg); 2919 GNUNET_free (socket->ack_msg);
2918 socket->ack_msg = NULL; 2920 socket->ack_msg = NULL;
2919 socket->ack_transmit_handle = NULL; 2921 socket->ack_transmit_handle = NULL;
2920 } 2922 }
2921 2923
2922 /* Clear existing message queue */ 2924 /* Clear existing message queue */
2923 while (NULL != (head = socket->queue_head)) { 2925 while (NULL != (head = socket->queue_head)) {
@@ -2930,23 +2932,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
2930 2932
2931 /* Close associated tunnel */ 2933 /* Close associated tunnel */
2932 if (NULL != socket->tunnel) 2934 if (NULL != socket->tunnel)
2933 { 2935 {
2934 GNUNET_MESH_tunnel_destroy (socket->tunnel); 2936 GNUNET_MESH_tunnel_destroy (socket->tunnel);
2935 socket->tunnel = NULL; 2937 socket->tunnel = NULL;
2936 } 2938 }
2937 2939
2938 /* Close mesh connection */ 2940 /* Close mesh connection */
2939 if (NULL != socket->mesh && NULL == socket->lsocket) 2941 if (NULL != socket->mesh && NULL == socket->lsocket)
2940 { 2942 {
2941 GNUNET_MESH_disconnect (socket->mesh); 2943 GNUNET_MESH_disconnect (socket->mesh);
2942 socket->mesh = NULL; 2944 socket->mesh = NULL;
2943 } 2945 }
2944 2946
2945 /* Release receive buffer */ 2947 /* Release receive buffer */
2946 if (NULL != socket->receive_buffer) 2948 if (NULL != socket->receive_buffer)
2947 { 2949 {
2948 GNUNET_free (socket->receive_buffer); 2950 GNUNET_free (socket->receive_buffer);
2949 } 2951 }
2950 2952
2951 GNUNET_free (socket); 2953 GNUNET_free (socket);
2952} 2954}
@@ -3039,8 +3041,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3039 const void *sweep; 3041 const void *sweep;
3040 struct GNUNET_TIME_Relative ack_deadline; 3042 struct GNUNET_TIME_Relative ack_deadline;
3041 3043
3042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3044 LOG (GNUNET_ERROR_TYPE_DEBUG,
3043 "%s\n", __func__); 3045 "%s\n", __func__);
3044 3046
3045 /* Return NULL if there is already a write request pending */ 3047 /* Return NULL if there is already a write request pending */
3046 if (NULL != socket->write_handle) 3048 if (NULL != socket->write_handle)
@@ -3050,30 +3052,30 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3050 } 3052 }
3051 3053
3052 switch (socket->state) 3054 switch (socket->state)
3053 { 3055 {
3054 case STATE_TRANSMIT_CLOSED: 3056 case STATE_TRANSMIT_CLOSED:
3055 case STATE_TRANSMIT_CLOSE_WAIT: 3057 case STATE_TRANSMIT_CLOSE_WAIT:
3056 case STATE_CLOSED: 3058 case STATE_CLOSED:
3057 case STATE_CLOSE_WAIT: 3059 case STATE_CLOSE_WAIT:
3058 if (NULL != write_cont) 3060 if (NULL != write_cont)
3059 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); 3061 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3062 LOG (GNUNET_ERROR_TYPE_DEBUG,
3061 "%s() END\n", __func__); 3063 "%s() END\n", __func__);
3062 return NULL; 3064 return NULL;
3063 case STATE_INIT: 3065 case STATE_INIT:
3064 case STATE_LISTEN: 3066 case STATE_LISTEN:
3065 case STATE_HELLO_WAIT: 3067 case STATE_HELLO_WAIT:
3066 if (NULL != write_cont) 3068 if (NULL != write_cont)
3067 /* FIXME: GNUNET_STREAM_SYSERR?? */ 3069 /* FIXME: GNUNET_STREAM_SYSERR?? */
3068 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); 3070 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3071 LOG (GNUNET_ERROR_TYPE_DEBUG,
3070 "%s() END\n", __func__); 3072 "%s() END\n", __func__);
3071 return NULL; 3073 return NULL;
3072 case STATE_ESTABLISHED: 3074 case STATE_ESTABLISHED:
3073 case STATE_RECEIVE_CLOSED: 3075 case STATE_RECEIVE_CLOSED:
3074 case STATE_RECEIVE_CLOSE_WAIT: 3076 case STATE_RECEIVE_CLOSE_WAIT:
3075 break; 3077 break;
3076 } 3078 }
3077 3079
3078 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) 3080 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
3079 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; 3081 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
@@ -3089,43 +3091,43 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3089 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); 3091 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3090 /* Divide the given buffer into packets for sending */ 3092 /* Divide the given buffer into packets for sending */
3091 for (packet=0; packet < num_needed_packets; packet++) 3093 for (packet=0; packet < num_needed_packets; packet++)
3094 {
3095 if ((packet + 1) * max_payload_size < size)
3092 { 3096 {
3093 if ((packet + 1) * max_payload_size < size) 3097 payload_size = max_payload_size;
3094 { 3098 packet_size = MAX_PACKET_SIZE;
3095 payload_size = max_payload_size;
3096 packet_size = MAX_PACKET_SIZE;
3097 }
3098 else
3099 {
3100 payload_size = size - packet * max_payload_size;
3101 packet_size = payload_size + sizeof (struct
3102 GNUNET_STREAM_DataMessage);
3103 }
3104 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3105 io_handle->messages[packet]->header.header.size = htons (packet_size);
3106 io_handle->messages[packet]->header.header.type =
3107 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3108 io_handle->messages[packet]->sequence_number =
3109 htonl (socket->write_sequence_number++);
3110 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3111
3112 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3113 determined from RTT */
3114 io_handle->messages[packet]->ack_deadline =
3115 GNUNET_TIME_relative_hton (ack_deadline);
3116 data_msg = io_handle->messages[packet];
3117 /* Copy data from given buffer to the packet */
3118 memcpy (&data_msg[1],
3119 sweep,
3120 payload_size);
3121 sweep += payload_size;
3122 socket->write_offset += payload_size;
3123 } 3099 }
3100 else
3101 {
3102 payload_size = size - packet * max_payload_size;
3103 packet_size = payload_size + sizeof (struct
3104 GNUNET_STREAM_DataMessage);
3105 }
3106 io_handle->messages[packet] = GNUNET_malloc (packet_size);
3107 io_handle->messages[packet]->header.header.size = htons (packet_size);
3108 io_handle->messages[packet]->header.header.type =
3109 htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3110 io_handle->messages[packet]->sequence_number =
3111 htonl (socket->write_sequence_number++);
3112 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3113
3114 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3115 determined from RTT */
3116 io_handle->messages[packet]->ack_deadline =
3117 GNUNET_TIME_relative_hton (ack_deadline);
3118 data_msg = io_handle->messages[packet];
3119 /* Copy data from given buffer to the packet */
3120 memcpy (&data_msg[1],
3121 sweep,
3122 payload_size);
3123 sweep += payload_size;
3124 socket->write_offset += payload_size;
3125 }
3124 socket->write_handle = io_handle; 3126 socket->write_handle = io_handle;
3125 write_data (socket); 3127 write_data (socket);
3126 3128
3127 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3129 LOG (GNUNET_ERROR_TYPE_DEBUG,
3128 "%s() END\n", __func__); 3130 "%s() END\n", __func__);
3129 3131
3130 return io_handle; 3132 return io_handle;
3131} 3133}
@@ -3152,30 +3154,30 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3152{ 3154{
3153 struct GNUNET_STREAM_IOReadHandle *read_handle; 3155 struct GNUNET_STREAM_IOReadHandle *read_handle;
3154 3156
3155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3157 LOG (GNUNET_ERROR_TYPE_DEBUG,
3156 "%s()\n", 3158 "%s()\n",
3157 __func__); 3159 __func__);
3158 3160
3159 /* Return NULL if there is already a read handle; the user has to cancel that 3161 /* Return NULL if there is already a read handle; the user has to cancel that
3160 first before continuing or has to wait until it is completed */ 3162 first before continuing or has to wait until it is completed */
3161 if (NULL != socket->read_handle) return NULL; 3163 if (NULL != socket->read_handle) return NULL;
3162 3164
3163 GNUNET_assert (NULL != proc); 3165 GNUNET_assert (NULL != proc);
3164 3166
3165 switch (socket->state) 3167 switch (socket->state)
3166 { 3168 {
3167 case STATE_RECEIVE_CLOSED: 3169 case STATE_RECEIVE_CLOSED:
3168 case STATE_RECEIVE_CLOSE_WAIT: 3170 case STATE_RECEIVE_CLOSE_WAIT:
3169 case STATE_CLOSED: 3171 case STATE_CLOSED:
3170 case STATE_CLOSE_WAIT: 3172 case STATE_CLOSE_WAIT:
3171 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); 3173 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3174 LOG (GNUNET_ERROR_TYPE_DEBUG,
3173 "%s() END\n", 3175 "%s() END\n",
3174 __func__); 3176 __func__);
3175 return NULL; 3177 return NULL;
3176 default: 3178 default:
3177 break; 3179 break;
3178 } 3180 }
3179 3181
3180 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); 3182 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
3181 read_handle->proc = proc; 3183 read_handle->proc = proc;
@@ -3185,20 +3187,20 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3185 /* Check if we have a packet at bitmap 0 */ 3187 /* Check if we have a packet at bitmap 0 */
3186 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 3188 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3187 0)) 3189 0))
3188 { 3190 {
3189 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, 3191 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3190 socket); 3192 socket);
3191 3193
3192 } 3194 }
3193 3195
3194 /* Setup the read timeout task */ 3196 /* Setup the read timeout task */
3195 socket->read_io_timeout_task_id = 3197 socket->read_io_timeout_task_id =
3196 GNUNET_SCHEDULER_add_delayed (timeout, 3198 GNUNET_SCHEDULER_add_delayed (timeout,
3197 &read_io_timeout, 3199 &read_io_timeout,
3198 socket); 3200 socket);
3199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3201 LOG (GNUNET_ERROR_TYPE_DEBUG,
3200 "%s() END\n", 3202 "%s() END\n",
3201 __func__); 3203 __func__);
3202 return read_handle; 3204 return read_handle;
3203} 3205}
3204 3206
@@ -3218,16 +3220,16 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3218 GNUNET_assert (socket->write_handle == ioh); 3220 GNUNET_assert (socket->write_handle == ioh);
3219 3221
3220 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 3222 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
3221 { 3223 {
3222 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 3224 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
3223 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 3225 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3224 } 3226 }
3225 3227
3226 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 3228 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3227 { 3229 {
3228 if (NULL == ioh->messages[packet]) break; 3230 if (NULL == ioh->messages[packet]) break;
3229 GNUNET_free (ioh->messages[packet]); 3231 GNUNET_free (ioh->messages[packet]);
3230 } 3232 }
3231 3233
3232 GNUNET_free (socket->write_handle); 3234 GNUNET_free (socket->write_handle);
3233 socket->write_handle = NULL; 3235 socket->write_handle = NULL;