aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-06-28 19:42:01 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-06-28 19:42:01 +0000
commit3d1275946f7264fbd3baecfecddb3fd2e3a4fe57 (patch)
tree943a5d7a483c1d93d9d6e3848bd8d1efc6019bd4 /src/stream
parentb4b5e74393c6d1621bd5b907e9a1fc5843ecb2f2 (diff)
downloadgnunet-3d1275946f7264fbd3baecfecddb3fd2e3a4fe57.tar.gz
gnunet-3d1275946f7264fbd3baecfecddb3fd2e3a4fe57.zip
-control retransmission for HELLO and HELLO_ACK
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c221
1 files changed, 139 insertions, 82 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index df0710e80..168929b01 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -272,7 +272,12 @@ struct GNUNET_STREAM_Socket
272 /** 272 /**
273 * Task identifier for retransmission task after timeout 273 * Task identifier for retransmission task after timeout
274 */ 274 */
275 GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; 275 GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
276
277 /**
278 * Task identifier for retransmission of control messages
279 */
280 GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
276 281
277 /** 282 /**
278 * The task for sending timely Acks 283 * The task for sending timely Acks
@@ -576,7 +581,6 @@ send_message_notify (void *cls, size_t size, void *buf)
576 socket); 581 socket);
577 return 0; 582 return 0;
578 } 583 }
579
580 ret = ntohs (head->message->header.size); 584 ret = ntohs (head->message->header.size);
581 GNUNET_assert (size >= ret); 585 GNUNET_assert (size >= ret);
582 memcpy (buf, head->message, ret); 586 memcpy (buf, head->message, ret);
@@ -731,17 +735,16 @@ write_data (struct GNUNET_STREAM_Socket *socket);
731 * @param tc the Task context 735 * @param tc the Task context
732 */ 736 */
733static void 737static void
734retransmission_timeout_task (void *cls, 738data_retransmission_task (void *cls,
735 const struct GNUNET_SCHEDULER_TaskContext *tc) 739 const struct GNUNET_SCHEDULER_TaskContext *tc)
736{ 740{
737 struct GNUNET_STREAM_Socket *socket = cls; 741 struct GNUNET_STREAM_Socket *socket = cls;
738 742
739 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) 743 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
740 return; 744 return;
741
742 LOG (GNUNET_ERROR_TYPE_DEBUG, 745 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer)); 746 "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
744 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 747 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
745 write_data (socket); 748 write_data (socket);
746} 749}
747 750
@@ -925,11 +928,11 @@ write_data (struct GNUNET_STREAM_Socket *socket)
925 NULL); 928 NULL);
926 packet++; 929 packet++;
927 } 930 }
928 if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) 931 if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
929 socket->retransmission_timeout_task_id = 932 socket->data_retransmission_task_id =
930 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 933 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
931 (GNUNET_TIME_UNIT_SECONDS, 8), 934 (GNUNET_TIME_UNIT_SECONDS, 8),
932 &retransmission_timeout_task, 935 &data_retransmission_task,
933 socket); 936 socket);
934} 937}
935 938
@@ -1292,7 +1295,7 @@ client_handle_data (void *cls,
1292/** 1295/**
1293 * Callback to set state to ESTABLISHED 1296 * Callback to set state to ESTABLISHED
1294 * 1297 *
1295 * @param cls the closure from queue_message FIXME: document 1298 * @param cls the closure NULL;
1296 * @param socket the socket to requiring state change 1299 * @param socket the socket to requiring state change
1297 */ 1300 */
1298static void 1301static void
@@ -1305,6 +1308,10 @@ set_state_established (void *cls,
1305 socket->write_offset = 0; 1308 socket->write_offset = 0;
1306 socket->read_offset = 0; 1309 socket->read_offset = 0;
1307 socket->state = STATE_ESTABLISHED; 1310 socket->state = STATE_ESTABLISHED;
1311 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1312 socket->control_retransmission_task_id);
1313 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1314 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1308 if (NULL != socket->lsocket) 1315 if (NULL != socket->lsocket)
1309 { 1316 {
1310 LOG (GNUNET_ERROR_TYPE_DEBUG, 1317 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1321,7 +1328,7 @@ set_state_established (void *cls,
1321 GNUNET_free (socket); 1328 GNUNET_free (socket);
1322 } 1329 }
1323 } 1330 }
1324 else if (NULL != socket->open_cb) 1331 else
1325 socket->open_cb (socket->open_cls, socket); 1332 socket->open_cb (socket->open_cls, socket);
1326} 1333}
1327 1334
@@ -1337,7 +1344,7 @@ set_state_hello_wait (void *cls,
1337 struct GNUNET_STREAM_Socket *socket) 1344 struct GNUNET_STREAM_Socket *socket)
1338{ 1345{
1339 GNUNET_assert (STATE_INIT == socket->state); 1346 GNUNET_assert (STATE_INIT == socket->state);
1340 LOG (GNUNET_ERROR_TYPE_DEBUG, 1347 LOG (GNUNET_ERROR_TYPE_DEBUG,
1341 "%s: Attaining HELLO_WAIT state\n", 1348 "%s: Attaining HELLO_WAIT state\n",
1342 GNUNET_i2s (&socket->other_peer)); 1349 GNUNET_i2s (&socket->other_peer));
1343 socket->state = STATE_HELLO_WAIT; 1350 socket->state = STATE_HELLO_WAIT;
@@ -1416,41 +1423,102 @@ set_state_closed (void *cls,
1416 1423
1417 1424
1418/** 1425/**
1426 * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1427 *
1428 * @return the generate hello message
1429 */
1430static struct GNUNET_STREAM_MessageHeader *
1431generate_hello (void)
1432{
1433 struct GNUNET_STREAM_MessageHeader *msg;
1434
1435 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
1436 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1437 msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
1438 return msg;
1439}
1440
1441
1442/**
1419 * Returns a new HelloAckMessage. Also sets the write sequence number for the 1443 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1420 * socket 1444 * socket
1421 * 1445 *
1422 * @param socket the socket for which this HelloAckMessage has to be generated 1446 * @param socket the socket for which this HelloAckMessage has to be generated
1447 * @param generate_seq GNUNET_YES to generate the write sequence number,
1448 * GNUNET_NO to use the existing sequence number
1423 * @return the HelloAckMessage 1449 * @return the HelloAckMessage
1424 */ 1450 */
1425static struct GNUNET_STREAM_HelloAckMessage * 1451static struct GNUNET_STREAM_HelloAckMessage *
1426generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) 1452generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1453 int generate_seq)
1427{ 1454{
1428 struct GNUNET_STREAM_HelloAckMessage *msg; 1455 struct GNUNET_STREAM_HelloAckMessage *msg;
1429 1456
1430 /* Get the random sequence number */ 1457 if (GNUNET_YES == generate_seq)
1431 if (GNUNET_YES == socket->testing_active) 1458 {
1432 socket->write_sequence_number = 1459 if (GNUNET_YES == socket->testing_active)
1433 socket->testing_set_write_sequence_number_value; 1460 socket->write_sequence_number =
1434 else 1461 socket->testing_set_write_sequence_number_value;
1435 socket->write_sequence_number = 1462 else
1436 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); 1463 socket->write_sequence_number =
1437 LOG (GNUNET_ERROR_TYPE_DEBUG, 1464 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1438 "%s: write sequence number %u\n", 1465 LOG (GNUNET_ERROR_TYPE_DEBUG,
1439 GNUNET_i2s (&socket->other_peer), 1466 "%s: write sequence number %u\n",
1440 (unsigned int) socket->write_sequence_number); 1467 GNUNET_i2s (&socket->other_peer),
1441 1468 (unsigned int) socket->write_sequence_number);
1469 }
1442 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); 1470 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1443 msg->header.header.size = 1471 msg->header.header.size =
1444 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage)); 1472 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1445 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); 1473 msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1446 msg->sequence_number = htonl (socket->write_sequence_number); 1474 msg->sequence_number = htonl (socket->write_sequence_number);
1447 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE); 1475 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1448
1449 return msg; 1476 return msg;
1450} 1477}
1451 1478
1452 1479
1453/** 1480/**
1481 * Task for retransmitting control messages if they aren't ACK'ed before a
1482 * deadline
1483 *
1484 * @param cls the socket
1485 * @param tc the Task context
1486 */
1487static void
1488control_retransmission_task (void *cls,
1489 const struct GNUNET_SCHEDULER_TaskContext *tc)
1490{
1491 struct GNUNET_STREAM_Socket *socket = cls;
1492
1493 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1494 return;
1495 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1496 switch (socket->status)
1497 {
1498 case STATE_INIT:
1499 GNUNET_break (0);
1500 break;
1501 case STATE_LISTEN:
1502 GNUNET_break (0);
1503 break;
1504 case STATE_HELLO_WAIT:
1505 if (NULL == socket->lsocket) /* We are client */
1506 queue_message (socket, generate_hello (), NULL, NULL);
1507 else
1508 queue_message (socket,
1509 (struct GNUNET_STREAM_MessageHeader *)
1510 generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
1511 break;
1512 default:
1513 GNUNET_break (0);
1514 }
1515 socket->control_retransmission_task_id =
1516 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1517 &control_retransmission_task, socket);
1518}
1519
1520
1521/**
1454 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK 1522 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1455 * 1523 *
1456 * @param cls the socket (set from GNUNET_MESH_connect) 1524 * @param cls the socket (set from GNUNET_MESH_connect)
@@ -1499,11 +1567,11 @@ client_handle_hello_ack (void *cls,
1499 GNUNET_i2s (&socket->other_peer), 1567 GNUNET_i2s (&socket->other_peer),
1500 (unsigned int) socket->read_sequence_number); 1568 (unsigned int) socket->read_sequence_number);
1501 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); 1569 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1502 reply = generate_hello_ack_msg (socket); 1570 reply = generate_hello_ack (socket, GNUNET_YES);
1503 queue_message (socket, 1571 queue_message (socket,
1504 &reply->header, 1572 &reply->header,
1505 &set_state_established, 1573 &set_state_established,
1506 NULL); 1574 NULL);
1507 return GNUNET_OK; 1575 return GNUNET_OK;
1508 case STATE_ESTABLISHED: 1576 case STATE_ESTABLISHED:
1509 case STATE_RECEIVE_CLOSE_WAIT: 1577 case STATE_RECEIVE_CLOSE_WAIT:
@@ -2087,31 +2155,34 @@ server_handle_hello (void *cls,
2087 return GNUNET_YES; 2155 return GNUNET_YES;
2088 } 2156 }
2089 2157
2090 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 2158 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2091 ntohs (message->type));
2092 GNUNET_assert (socket->tunnel == tunnel); 2159 GNUNET_assert (socket->tunnel == tunnel);
2093 LOG (GNUNET_ERROR_TYPE_DEBUG, 2160 LOG (GNUNET_ERROR_TYPE_DEBUG,
2094 "%s: Received HELLO from %s\n", 2161 "%s: Received HELLO from %s\n",
2095 GNUNET_i2s (&socket->other_peer), 2162 GNUNET_i2s (&socket->other_peer),
2096 GNUNET_i2s (&socket->other_peer)); 2163 GNUNET_i2s (&socket->other_peer));
2097 2164
2098 if (STATE_INIT == socket->state) 2165 switch (socket->status)
2099 { 2166 {
2100 reply = generate_hello_ack_msg (socket); 2167 case STATE_INIT:
2168 reply = generate_hello_ack (socket, GNUNET_YES);
2101 queue_message (socket, 2169 queue_message (socket,
2102 &reply->header, 2170 &reply->header,
2103 &set_state_hello_wait, 2171 &set_state_hello_wait,
2104 NULL); 2172 NULL);
2105 } 2173 break;
2106 else 2174 default:
2107 {
2108 LOG (GNUNET_ERROR_TYPE_DEBUG, 2175 LOG (GNUNET_ERROR_TYPE_DEBUG,
2109 "%s: Client sent HELLO when in state %d\n", 2176 "%s: Client sent HELLO when in state %d\n",
2110 GNUNET_i2s (&socket->other_peer), 2177 GNUNET_i2s (&socket->other_peer),
2111 socket->state); 2178 socket->state);
2112 /* FIXME: Send RESET? */ 2179 /* FIXME: Send RESET? */
2113
2114 } 2180 }
2181 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2182 socket->control_retransmission_task_id);
2183 socket->control_retransmission_task_id =
2184 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2185 &control_retransmission_task, socket);
2115 return GNUNET_OK; 2186 return GNUNET_OK;
2116} 2187}
2117 2188
@@ -2427,8 +2498,6 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2427 GNUNET_i2s (&socket->other_peer)); 2498 GNUNET_i2s (&socket->other_peer));
2428 return GNUNET_OK; 2499 return GNUNET_OK;
2429 } 2500 }
2430 /* FIXME: increment in the base sequence number is breaking current flow
2431 */
2432 if (!((socket->write_sequence_number 2501 if (!((socket->write_sequence_number
2433 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) 2502 - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2434 { 2503 {
@@ -2450,10 +2519,10 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2450 GNUNET_i2s (&socket->other_peer)); 2519 GNUNET_i2s (&socket->other_peer));
2451 2520
2452 /* Cancel the retransmission task */ 2521 /* Cancel the retransmission task */
2453 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 2522 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2454 { 2523 {
2455 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 2524 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2456 socket->retransmission_timeout_task_id = 2525 socket->data_retransmission_task_id =
2457 GNUNET_SCHEDULER_NO_TASK; 2526 GNUNET_SCHEDULER_NO_TASK;
2458 } 2527 }
2459 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 2528 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
@@ -2665,30 +2734,23 @@ mesh_peer_connect_callback (void *cls,
2665 GNUNET_i2s(peer)); 2734 GNUNET_i2s(peer));
2666 return; 2735 return;
2667 } 2736 }
2668
2669 LOG (GNUNET_ERROR_TYPE_DEBUG, 2737 LOG (GNUNET_ERROR_TYPE_DEBUG,
2670 "%s: Target peer %s connected\n", 2738 "%s: Target peer %s connected\n",
2671 GNUNET_i2s (&socket->other_peer), 2739 GNUNET_i2s (&socket->other_peer),
2672 GNUNET_i2s (&socket->other_peer)); 2740 GNUNET_i2s (&socket->other_peer));
2673
2674 /* Set state to INIT */ 2741 /* Set state to INIT */
2675 socket->state = STATE_INIT; 2742 socket->state = STATE_INIT;
2676
2677 /* Send HELLO message */ 2743 /* Send HELLO message */
2678 message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); 2744 message = generate_hello ();
2679 message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
2680 message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
2681 queue_message (socket, 2745 queue_message (socket,
2682 message, 2746 message,
2683 &set_state_hello_wait, 2747 &set_state_hello_wait,
2684 NULL); 2748 NULL);
2685 2749 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2686 /* Call open callback */ 2750 socket->control_retransmission_task_id);
2687 if (NULL == socket->open_cb) 2751 socket->control_retransmission_task_id =
2688 { 2752 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2689 LOG (GNUNET_ERROR_TYPE_DEBUG, 2753 &control_retransmission_task, socket);
2690 "STREAM_open callback is NULL\n");
2691 }
2692} 2754}
2693 2755
2694 2756
@@ -2752,15 +2814,12 @@ new_tunnel_notify (void *cls,
2752 socket->retransmit_timeout = lsocket->retransmit_timeout; 2814 socket->retransmit_timeout = lsocket->retransmit_timeout;
2753 socket->testing_active = lsocket->testing_active; 2815 socket->testing_active = lsocket->testing_active;
2754 socket->testing_set_write_sequence_number_value = 2816 socket->testing_set_write_sequence_number_value =
2755 lsocket->testing_set_write_sequence_number_value; 2817 lsocket->testing_set_write_sequence_number_value;
2756
2757 LOG (GNUNET_ERROR_TYPE_DEBUG, 2818 LOG (GNUNET_ERROR_TYPE_DEBUG,
2758 "%s: Peer %s initiated tunnel to us\n", 2819 "%s: Peer %s initiated tunnel to us\n",
2759 GNUNET_i2s (&socket->other_peer), 2820 GNUNET_i2s (&socket->other_peer),
2760 GNUNET_i2s (&socket->other_peer)); 2821 GNUNET_i2s (&socket->other_peer));
2761
2762 /* FIXME: Copy MESH handle from lsocket to socket */ 2822 /* FIXME: Copy MESH handle from lsocket to socket */
2763
2764 return socket; 2823 return socket;
2765} 2824}
2766 2825
@@ -2814,10 +2873,10 @@ tunnel_cleaner (void *cls,
2814 GNUNET_SCHEDULER_cancel (socket->ack_task_id); 2873 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2815 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; 2874 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2816 } 2875 }
2817 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 2876 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2818 { 2877 {
2819 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 2878 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2820 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 2879 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2821 } 2880 }
2822 /* FIXME: Cancel all other tasks using socket->tunnel */ 2881 /* FIXME: Cancel all other tasks using socket->tunnel */
2823 socket->tunnel = NULL; 2882 socket->tunnel = NULL;
@@ -2905,7 +2964,8 @@ lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2905 * @param target the target peer to which the stream has to be opened 2964 * @param target the target peer to which the stream has to be opened
2906 * @param app_port the application port number which uniquely identifies this 2965 * @param app_port the application port number which uniquely identifies this
2907 * stream 2966 * stream
2908 * @param open_cb this function will be called after stream has be established 2967 * @param open_cb this function will be called after stream has be established;
2968 * cannot be NULL
2909 * @param open_cb_cls the closure for open_cb 2969 * @param open_cb_cls the closure for open_cb
2910 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END 2970 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2911 * @return if successful it returns the stream socket; NULL if stream cannot be 2971 * @return if successful it returns the stream socket; NULL if stream cannot be
@@ -2922,17 +2982,17 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2922 struct GNUNET_STREAM_Socket *socket; 2982 struct GNUNET_STREAM_Socket *socket;
2923 enum GNUNET_STREAM_Option option; 2983 enum GNUNET_STREAM_Option option;
2924 GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; 2984 GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
2925 va_list vargs; /* Variable arguments */ 2985 va_list vargs;
2926 2986
2927 LOG (GNUNET_ERROR_TYPE_DEBUG, 2987 LOG (GNUNET_ERROR_TYPE_DEBUG,
2928 "%s\n", __func__); 2988 "%s\n", __func__);
2989 GNUNET_assert (NULL != open_cb);
2929 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); 2990 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2930 socket->other_peer = *target; 2991 socket->other_peer = *target;
2931 socket->open_cb = open_cb; 2992 socket->open_cb = open_cb;
2932 socket->open_cls = open_cb_cls; 2993 socket->open_cls = open_cb_cls;
2933 /* Set defaults */ 2994 /* Set defaults */
2934 socket->retransmit_timeout = 2995 socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2935 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
2936 socket->testing_active = GNUNET_NO; 2996 socket->testing_active = GNUNET_NO;
2937 va_start (vargs, open_cb_cls); /* Parse variable args */ 2997 va_start (vargs, open_cb_cls); /* Parse variable args */
2938 do { 2998 do {
@@ -2972,10 +3032,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2972 GNUNET_free (socket); 3032 GNUNET_free (socket);
2973 return NULL; 3033 return NULL;
2974 } 3034 }
2975
2976 /* Now create the mesh tunnel to target */ 3035 /* Now create the mesh tunnel to target */
2977 LOG (GNUNET_ERROR_TYPE_DEBUG, 3036 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
2978 "Creating MESH Tunnel\n");
2979 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, 3037 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
2980 NULL, /* Tunnel context */ 3038 NULL, /* Tunnel context */
2981 &mesh_peer_connect_callback, 3039 &mesh_peer_connect_callback,
@@ -2984,9 +3042,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2984 GNUNET_assert (NULL != socket->tunnel); 3042 GNUNET_assert (NULL != socket->tunnel);
2985 GNUNET_MESH_peer_request_connect_add (socket->tunnel, 3043 GNUNET_MESH_peer_request_connect_add (socket->tunnel,
2986 &socket->other_peer); 3044 &socket->other_peer);
2987 3045 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
2988 LOG (GNUNET_ERROR_TYPE_DEBUG,
2989 "%s() END\n", __func__);
2990 return socket; 3046 return socket;
2991} 3047}
2992 3048
@@ -3088,6 +3144,7 @@ GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3088{ 3144{
3089 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) 3145 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3090 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); 3146 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3147 handle->socket->shutdown_handle = NULL;
3091 GNUNET_free (handle); 3148 GNUNET_free (handle);
3092} 3149}
3093 3150
@@ -3114,22 +3171,24 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3114 GNUNET_STREAM_io_write_cancel (socket->write_handle); 3171 GNUNET_STREAM_io_write_cancel (socket->write_handle);
3115 //socket->write_handle = NULL; 3172 //socket->write_handle = NULL;
3116 } 3173 }
3117
3118 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 3174 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
3119 { 3175 {
3120 /* socket closed with read task pending!? */ 3176 /* socket closed with read task pending!? */
3121 GNUNET_break (0); 3177 GNUNET_break (0);
3122 GNUNET_SCHEDULER_cancel (socket->read_task_id); 3178 GNUNET_SCHEDULER_cancel (socket->read_task_id);
3123 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 3179 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3124 } 3180 }
3125
3126 /* Terminate the ack'ing tasks if they are still present */ 3181 /* Terminate the ack'ing tasks if they are still present */
3127 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) 3182 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3128 { 3183 {
3129 GNUNET_SCHEDULER_cancel (socket->ack_task_id); 3184 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3130 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; 3185 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3131 } 3186 }
3132 3187 /* Terminate the control retransmission tasks */
3188 if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3189 {
3190 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3191 }
3133 /* Clear Transmit handles */ 3192 /* Clear Transmit handles */
3134 if (NULL != socket->transmit_handle) 3193 if (NULL != socket->transmit_handle)
3135 { 3194 {
@@ -3143,7 +3202,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3143 socket->ack_msg = NULL; 3202 socket->ack_msg = NULL;
3144 socket->ack_transmit_handle = NULL; 3203 socket->ack_transmit_handle = NULL;
3145 } 3204 }
3146
3147 /* Clear existing message queue */ 3205 /* Clear existing message queue */
3148 while (NULL != (head = socket->queue_head)) { 3206 while (NULL != (head = socket->queue_head)) {
3149 GNUNET_CONTAINER_DLL_remove (socket->queue_head, 3207 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
@@ -3213,8 +3271,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3213 } 3271 }
3214 lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */ 3272 lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
3215 /* Set defaults */ 3273 /* Set defaults */
3216 lsocket->retransmit_timeout = 3274 lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3217 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
3218 lsocket->testing_active = GNUNET_NO; 3275 lsocket->testing_active = GNUNET_NO;
3219 lsocket->listen_ok_cb = NULL; 3276 lsocket->listen_ok_cb = NULL;
3220 listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */ 3277 listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
@@ -3491,10 +3548,10 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
3491 GNUNET_assert (NULL != socket->write_handle); 3548 GNUNET_assert (NULL != socket->write_handle);
3492 GNUNET_assert (socket->write_handle == ioh); 3549 GNUNET_assert (socket->write_handle == ioh);
3493 3550
3494 if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) 3551 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3495 { 3552 {
3496 GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); 3553 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3497 socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 3554 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3498 } 3555 }
3499 3556
3500 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 3557 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)