aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-09-11 12:55:53 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-09-11 12:55:53 +0000
commited318284b8b6b48753fa50f8f138c9d44ec4aacb (patch)
tree55442485483bca05b6f7d0844a367738110458fe /src/stream
parentb1e2549a18ae16e0e101899b7c4739c5b153d1f6 (diff)
downloadgnunet-ed318284b8b6b48753fa50f8f138c9d44ec4aacb.tar.gz
gnunet-ed318284b8b6b48753fa50f8f138c9d44ec4aacb.zip
stream speedup fixes
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c178
1 files changed, 75 insertions, 103 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index d09182d2b..156dcd68f 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -1088,6 +1088,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1088 const struct GNUNET_ATS_Information*atsi) 1088 const struct GNUNET_ATS_Information*atsi)
1089{ 1089{
1090 const void *payload; 1090 const void *payload;
1091 struct GNUNET_TIME_Relative ack_deadline_rel;
1091 uint32_t bytes_needed; 1092 uint32_t bytes_needed;
1092 uint32_t relative_offset; 1093 uint32_t relative_offset;
1093 uint32_t relative_sequence_number; 1094 uint32_t relative_sequence_number;
@@ -1099,23 +1100,19 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1099 GNUNET_break_op (0); 1100 GNUNET_break_op (0);
1100 return GNUNET_SYSERR; 1101 return GNUNET_SYSERR;
1101 } 1102 }
1102 1103 if (0 != memcmp (sender, &socket->other_peer,
1103 if (0 != memcmp (sender, 1104 sizeof (struct GNUNET_PeerIdentity)))
1104 &socket->other_peer,
1105 sizeof (struct GNUNET_PeerIdentity)))
1106 { 1105 {
1107 LOG (GNUNET_ERROR_TYPE_DEBUG, 1106 LOG (GNUNET_ERROR_TYPE_DEBUG,
1108 "%s: Received DATA from non-confirming peer\n", 1107 "%s: Received DATA from non-confirming peer\n",
1109 GNUNET_i2s (&socket->other_peer)); 1108 GNUNET_i2s (&socket->other_peer));
1110 return GNUNET_YES; 1109 return GNUNET_YES;
1111 } 1110 }
1112
1113 switch (socket->state) 1111 switch (socket->state)
1114 { 1112 {
1115 case STATE_ESTABLISHED: 1113 case STATE_ESTABLISHED:
1116 case STATE_TRANSMIT_CLOSED: 1114 case STATE_TRANSMIT_CLOSED:
1117 case STATE_TRANSMIT_CLOSE_WAIT: 1115 case STATE_TRANSMIT_CLOSE_WAIT:
1118
1119 /* check if the message's sequence number is in the range we are 1116 /* check if the message's sequence number is in the range we are
1120 expecting */ 1117 expecting */
1121 relative_sequence_number = 1118 relative_sequence_number =
@@ -1136,8 +1133,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1136 socket); 1133 socket);
1137 } 1134 }
1138 return GNUNET_YES; 1135 return GNUNET_YES;
1139 } 1136 }
1140
1141 /* Check if we have already seen this message */ 1137 /* Check if we have already seen this message */
1142 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 1138 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1143 relative_sequence_number)) 1139 relative_sequence_number))
@@ -1151,20 +1147,14 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1151 { 1147 {
1152 socket->ack_task_id = 1148 socket->ack_task_id =
1153 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 1149 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1154 (msg->ack_deadline), 1150 (msg->ack_deadline), &ack_task, socket);
1155 &ack_task,
1156 socket);
1157 } 1151 }
1158 return GNUNET_YES; 1152 return GNUNET_YES;
1159 } 1153 }
1160
1161 LOG (GNUNET_ERROR_TYPE_DEBUG, 1154 LOG (GNUNET_ERROR_TYPE_DEBUG,
1162 "%s: Receiving DATA with sequence number: %u and size: %d from %s\n", 1155 "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
1163 GNUNET_i2s (&socket->other_peer), 1156 GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
1164 ntohl (msg->sequence_number), 1157 ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
1165 ntohs (msg->header.header.size),
1166 GNUNET_i2s (&socket->other_peer));
1167
1168 /* Check if we have to allocate the buffer */ 1158 /* Check if we have to allocate the buffer */
1169 size -= sizeof (struct GNUNET_STREAM_DataMessage); 1159 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1170 relative_offset = ntohl (msg->offset) - socket->read_offset; 1160 relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1181,54 +1171,67 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1181 { 1171 {
1182 LOG (GNUNET_ERROR_TYPE_DEBUG, 1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "%s: Cannot accommodate packet %d as buffer is full\n", 1173 "%s: Cannot accommodate packet %d as buffer is full\n",
1184 GNUNET_i2s (&socket->other_peer), 1174 GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1185 ntohl (msg->sequence_number));
1186 return GNUNET_YES; 1175 return GNUNET_YES;
1187 } 1176 }
1188 } 1177 }
1189
1190 /* Copy Data to buffer */ 1178 /* Copy Data to buffer */
1191 payload = &msg[1]; 1179 payload = &msg[1];
1192 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); 1180 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1193 memcpy (socket->receive_buffer + relative_offset, 1181 memcpy (socket->receive_buffer + relative_offset, payload, size);
1194 payload,
1195 size);
1196 socket->receive_buffer_boundaries[relative_sequence_number] = 1182 socket->receive_buffer_boundaries[relative_sequence_number] =
1197 relative_offset + size; 1183 relative_offset + size;
1198
1199 /* Modify the ACK bitmap */ 1184 /* Modify the ACK bitmap */
1200 ackbitmap_modify_bit (&socket->ack_bitmap, 1185 ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1201 relative_sequence_number, 1186 GNUNET_YES);
1202 GNUNET_YES);
1203
1204 /* Start ACK sending task if one is not already present */ 1187 /* Start ACK sending task if one is not already present */
1188 ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1205 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) 1189 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1206 { 1190 {
1191 ack_deadline_rel =
1192 GNUNET_TIME_relative_min (ack_deadline_rel,
1193 GNUNET_TIME_relative_multiply
1194 (GNUNET_TIME_UNIT_SECONDS, 300));
1207 socket->ack_task_id = 1195 socket->ack_task_id =
1208 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 1196 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1209 (msg->ack_deadline), 1197 (msg->ack_deadline), &ack_task, socket);
1210 &ack_task, 1198 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1211 socket); 1199 socket->ack_time_deadline = ack_deadline_rel;
1200 }
1201 else
1202 {
1203 struct GNUNET_TIME_Relative ack_time_past;
1204 struct GNUNET_TIME_Relative ack_time_remaining;
1205 struct GNUNET_TIME_Relative ack_time_min;
1206 ack_time_past =
1207 GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1208 ack_time_remaining = GNUNET_TIME_relative_subtract
1209 (socket->ack_time_deadline, ack_time_past);
1210 ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1211 ack_deadline_rel);
1212 if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1213 sizeof (struct GNUNET_TIME_Relative)))
1214 {
1215 ack_deadline_rel = ack_time_min;
1216 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1217 socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1218 &ack_task, socket);
1219 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1220 socket->ack_time_deadline = ack_deadline_rel;
1221 }
1212 } 1222 }
1213
1214 if ((NULL != socket->read_handle) /* A read handle is waiting */ 1223 if ((NULL != socket->read_handle) /* A read handle is waiting */
1215 /* There is no current read task */ 1224 /* There is no current read task */
1216 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) 1225 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
1217 /* We have the first packet */ 1226 /* We have the first packet */
1218 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 1227 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1219 0)))
1220 { 1228 {
1221 LOG (GNUNET_ERROR_TYPE_DEBUG, 1229 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1222 "%s: Scheduling read processor\n", 1230 GNUNET_i2s (&socket->other_peer));
1223 GNUNET_i2s (&socket->other_peer)); 1231 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
1224 1232 socket);
1225 socket->read_task_id = 1233 }
1226 GNUNET_SCHEDULER_add_now (&call_read_processor,
1227 socket);
1228 }
1229
1230 break; 1234 break;
1231
1232 default: 1235 default:
1233 LOG (GNUNET_ERROR_TYPE_DEBUG, 1236 LOG (GNUNET_ERROR_TYPE_DEBUG,
1234 "%s: Received data message when it cannot be handled\n", 1237 "%s: Received data message when it cannot be handled\n",
@@ -1261,11 +1264,8 @@ client_handle_data (void *cls,
1261{ 1264{
1262 struct GNUNET_STREAM_Socket *socket = cls; 1265 struct GNUNET_STREAM_Socket *socket = cls;
1263 1266
1264 return handle_data (socket, 1267 return handle_data (socket, tunnel, sender,
1265 tunnel, 1268 (const struct GNUNET_STREAM_DataMessage *) message, atsi);
1266 sender,
1267 (const struct GNUNET_STREAM_DataMessage *) message,
1268 atsi);
1269} 1269}
1270 1270
1271 1271
@@ -1529,9 +1529,8 @@ client_handle_hello_ack (void *cls,
1529 const struct GNUNET_STREAM_HelloAckMessage *ack_msg; 1529 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1530 struct GNUNET_STREAM_HelloAckMessage *reply; 1530 struct GNUNET_STREAM_HelloAckMessage *reply;
1531 1531
1532 if (0 != memcmp (sender, 1532 if (0 != memcmp (sender, &socket->other_peer,
1533 &socket->other_peer, 1533 sizeof (struct GNUNET_PeerIdentity)))
1534 sizeof (struct GNUNET_PeerIdentity)))
1535 { 1534 {
1536 LOG (GNUNET_ERROR_TYPE_DEBUG, 1535 LOG (GNUNET_ERROR_TYPE_DEBUG,
1537 "%s: Received HELLO_ACK from non-confirming peer\n", 1536 "%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1539,11 +1538,8 @@ client_handle_hello_ack (void *cls,
1539 return GNUNET_YES; 1538 return GNUNET_YES;
1540 } 1539 }
1541 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; 1540 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1542 LOG (GNUNET_ERROR_TYPE_DEBUG, 1541 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1543 "%s: Received HELLO_ACK from %s\n", 1542 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1544 GNUNET_i2s (&socket->other_peer),
1545 GNUNET_i2s (&socket->other_peer));
1546
1547 GNUNET_assert (socket->tunnel == tunnel); 1543 GNUNET_assert (socket->tunnel == tunnel);
1548 switch (socket->state) 1544 switch (socket->state)
1549 { 1545 {
@@ -1555,7 +1551,7 @@ client_handle_hello_ack (void *cls,
1555 (unsigned int) socket->read_sequence_number); 1551 (unsigned int) socket->read_sequence_number);
1556 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); 1552 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1557 reply = generate_hello_ack (socket, GNUNET_YES); 1553 reply = generate_hello_ack (socket, GNUNET_YES);
1558 queue_message (socket, &reply->header, &set_state_established, 1554 queue_message (socket, &reply->header, &set_state_established,
1559 NULL, GNUNET_NO); 1555 NULL, GNUNET_NO);
1560 return GNUNET_OK; 1556 return GNUNET_OK;
1561 case STATE_ESTABLISHED: 1557 case STATE_ESTABLISHED:
@@ -1568,8 +1564,7 @@ client_handle_hello_ack (void *cls,
1568 default: 1564 default:
1569 LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 1565 LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
1570 GNUNET_i2s (&socket->other_peer), 1566 GNUNET_i2s (&socket->other_peer),
1571 GNUNET_i2s (&socket->other_peer), 1567 GNUNET_i2s (&socket->other_peer), socket->state);
1572 socket->state);
1573 socket->state = STATE_CLOSED; // introduce STATE_ERROR? 1568 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1574 return GNUNET_SYSERR; 1569 return GNUNET_SYSERR;
1575 } 1570 }
@@ -1626,7 +1621,6 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1626 { 1621 {
1627 case STATE_ESTABLISHED: 1622 case STATE_ESTABLISHED:
1628 socket->state = STATE_RECEIVE_CLOSED; 1623 socket->state = STATE_RECEIVE_CLOSED;
1629
1630 /* Send TRANSMIT_CLOSE_ACK */ 1624 /* Send TRANSMIT_CLOSE_ACK */
1631 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 1625 reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1632 reply->header.type = 1626 reply->header.type =
@@ -1634,7 +1628,6 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1634 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); 1628 reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1635 queue_message (socket, reply, NULL, NULL, GNUNET_NO); 1629 queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1636 break; 1630 break;
1637
1638 default: 1631 default:
1639 /* FIXME: Call statistics? */ 1632 /* FIXME: Call statistics? */
1640 break; 1633 break;
@@ -1703,7 +1696,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1703 GNUNET_i2s (&socket->other_peer)); 1696 GNUNET_i2s (&socket->other_peer));
1704 return GNUNET_OK; 1697 return GNUNET_OK;
1705 } 1698 }
1706
1707 switch (operation) 1699 switch (operation)
1708 { 1700 {
1709 case SHUT_RDWR: 1701 case SHUT_RDWR:
@@ -1714,15 +1706,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1714 { 1706 {
1715 LOG (GNUNET_ERROR_TYPE_DEBUG, 1707 LOG (GNUNET_ERROR_TYPE_DEBUG,
1716 "%s: Received CLOSE_ACK when shutdown handle is not for " 1708 "%s: Received CLOSE_ACK when shutdown handle is not for "
1717 "SHUT_RDWR\n", 1709 "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1718 GNUNET_i2s (&socket->other_peer));
1719 return GNUNET_OK; 1710 return GNUNET_OK;
1720 } 1711 }
1721 1712 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1722 LOG (GNUNET_ERROR_TYPE_DEBUG, 1713 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1723 "%s: Received CLOSE_ACK from %s\n",
1724 GNUNET_i2s (&socket->other_peer),
1725 GNUNET_i2s (&socket->other_peer));
1726 socket->state = STATE_CLOSED; 1714 socket->state = STATE_CLOSED;
1727 break; 1715 break;
1728 default: 1716 default:
@@ -1732,7 +1720,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1732 return GNUNET_OK; 1720 return GNUNET_OK;
1733 } 1721 }
1734 break; 1722 break;
1735
1736 case SHUT_RD: 1723 case SHUT_RD:
1737 switch (socket->state) 1724 switch (socket->state)
1738 { 1725 {
@@ -1741,15 +1728,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1741 { 1728 {
1742 LOG (GNUNET_ERROR_TYPE_DEBUG, 1729 LOG (GNUNET_ERROR_TYPE_DEBUG,
1743 "%s: Received RECEIVE_CLOSE_ACK when shutdown handle " 1730 "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1744 "is not for SHUT_RD\n", 1731 "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1745 GNUNET_i2s (&socket->other_peer));
1746 return GNUNET_OK; 1732 return GNUNET_OK;
1747 } 1733 }
1748 1734 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1749 LOG (GNUNET_ERROR_TYPE_DEBUG, 1735 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1750 "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1751 GNUNET_i2s (&socket->other_peer),
1752 GNUNET_i2s (&socket->other_peer));
1753 socket->state = STATE_RECEIVE_CLOSED; 1736 socket->state = STATE_RECEIVE_CLOSED;
1754 break; 1737 break;
1755 default: 1738 default:
@@ -1758,7 +1741,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1758 GNUNET_i2s (&socket->other_peer)); 1741 GNUNET_i2s (&socket->other_peer));
1759 return GNUNET_OK; 1742 return GNUNET_OK;
1760 } 1743 }
1761
1762 break; 1744 break;
1763 case SHUT_WR: 1745 case SHUT_WR:
1764 switch (socket->state) 1746 switch (socket->state)
@@ -1772,25 +1754,20 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1772 GNUNET_i2s (&socket->other_peer)); 1754 GNUNET_i2s (&socket->other_peer));
1773 return GNUNET_OK; 1755 return GNUNET_OK;
1774 } 1756 }
1775 1757 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1776 LOG (GNUNET_ERROR_TYPE_DEBUG, 1758 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1777 "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1778 GNUNET_i2s (&socket->other_peer),
1779 GNUNET_i2s (&socket->other_peer));
1780 socket->state = STATE_TRANSMIT_CLOSED; 1759 socket->state = STATE_TRANSMIT_CLOSED;
1781 break; 1760 break;
1782 default: 1761 default:
1783 LOG (GNUNET_ERROR_TYPE_DEBUG, 1762 LOG (GNUNET_ERROR_TYPE_DEBUG,
1784 "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n", 1763 "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
1785 GNUNET_i2s (&socket->other_peer)); 1764 GNUNET_i2s (&socket->other_peer));
1786
1787 return GNUNET_OK; 1765 return GNUNET_OK;
1788 } 1766 }
1789 break; 1767 break;
1790 default: 1768 default:
1791 GNUNET_assert (0); 1769 GNUNET_assert (0);
1792 } 1770 }
1793
1794 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ 1771 if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
1795 shutdown_handle->completion_cb(shutdown_handle->completion_cls, 1772 shutdown_handle->completion_cb(shutdown_handle->completion_cls,
1796 operation); 1773 operation);
@@ -1800,7 +1777,7 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1800 GNUNET_SCHEDULER_cancel 1777 GNUNET_SCHEDULER_cancel
1801 (shutdown_handle->close_msg_retransmission_task_id); 1778 (shutdown_handle->close_msg_retransmission_task_id);
1802 shutdown_handle->close_msg_retransmission_task_id = 1779 shutdown_handle->close_msg_retransmission_task_id =
1803 GNUNET_SCHEDULER_NO_TASK; 1780 GNUNET_SCHEDULER_NO_TASK;
1804 } 1781 }
1805 GNUNET_free (shutdown_handle); /* Free shutdown handle */ 1782 GNUNET_free (shutdown_handle); /* Free shutdown handle */
1806 socket->shutdown_handle = NULL; 1783 socket->shutdown_handle = NULL;
@@ -3338,14 +3315,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3338 3315
3339 LOG (GNUNET_ERROR_TYPE_DEBUG, 3316 LOG (GNUNET_ERROR_TYPE_DEBUG,
3340 "%s\n", __func__); 3317 "%s\n", __func__);
3341
3342 /* Return NULL if there is already a write request pending */
3343 if (NULL != socket->write_handle) 3318 if (NULL != socket->write_handle)
3344 { 3319 {
3345 GNUNET_break (0); 3320 GNUNET_break (0);
3346 return NULL; 3321 return NULL;
3347 } 3322 }
3348
3349 switch (socket->state) 3323 switch (socket->state)
3350 { 3324 {
3351 case STATE_TRANSMIT_CLOSED: 3325 case STATE_TRANSMIT_CLOSED:
@@ -3371,7 +3345,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3371 case STATE_RECEIVE_CLOSE_WAIT: 3345 case STATE_RECEIVE_CLOSE_WAIT:
3372 break; 3346 break;
3373 } 3347 }
3374
3375 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size) 3348 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3376 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size; 3349 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
3377 num_needed_packets = 3350 num_needed_packets =
@@ -3408,25 +3381,24 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3408 io_handle->messages[packet]->sequence_number = 3381 io_handle->messages[packet]->sequence_number =
3409 htonl (socket->write_sequence_number++); 3382 htonl (socket->write_sequence_number++);
3410 io_handle->messages[packet]->offset = htonl (socket->write_offset); 3383 io_handle->messages[packet]->offset = htonl (socket->write_offset);
3411
3412 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value 3384 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3413 determined from RTT */ 3385 determined from RTT */
3414 io_handle->messages[packet]->ack_deadline = 3386 io_handle->messages[packet]->ack_deadline =
3415 GNUNET_TIME_relative_hton (ack_deadline); 3387 GNUNET_TIME_relative_hton (ack_deadline);
3416 data_msg = io_handle->messages[packet]; 3388 data_msg = io_handle->messages[packet];
3417 /* Copy data from given buffer to the packet */ 3389 /* Copy data from given buffer to the packet */
3418 memcpy (&data_msg[1], 3390 memcpy (&data_msg[1], sweep, payload_size);
3419 sweep,
3420 payload_size);
3421 sweep += payload_size; 3391 sweep += payload_size;
3422 socket->write_offset += payload_size; 3392 socket->write_offset += payload_size;
3423 } 3393 }
3394 /* ack the last data message. FIXME: remove when we figure out how to do this
3395 using RTT */
3396 io_handle->messages[num_needed_packets - 1]->ack_deadline =
3397 GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3424 socket->write_handle = io_handle; 3398 socket->write_handle = io_handle;
3425 write_data (socket); 3399 write_data (socket);
3426
3427 LOG (GNUNET_ERROR_TYPE_DEBUG, 3400 LOG (GNUNET_ERROR_TYPE_DEBUG,
3428 "%s() END\n", __func__); 3401 "%s() END\n", __func__);
3429
3430 return io_handle; 3402 return io_handle;
3431} 3403}
3432 3404