aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api_new.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-06-04 13:00:16 +0000
committerChristian Grothoff <christian@grothoff.org>2011-06-04 13:00:16 +0000
commitde4de0f76f88209a6cd2d78f512c793e099552f5 (patch)
tree7fbb939fc9aeb8a61e0fcff67707b2b9c055a35f /src/transport/transport_api_new.c
parentabda91687b433defaad5425c484b142416172e2a (diff)
downloadgnunet-de4de0f76f88209a6cd2d78f512c793e099552f5.tar.gz
gnunet-de4de0f76f88209a6cd2d78f512c793e099552f5.zip
fixes
Diffstat (limited to 'src/transport/transport_api_new.c')
-rw-r--r--src/transport/transport_api_new.c107
1 files changed, 73 insertions, 34 deletions
diff --git a/src/transport/transport_api_new.c b/src/transport/transport_api_new.c
index 02f3fc421..0080937ca 100644
--- a/src/transport/transport_api_new.c
+++ b/src/transport/transport_api_new.c
@@ -24,8 +24,6 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * 25 *
26 * TODO: 26 * TODO:
27 * - support 'try connect' in transport service
28 * - add timeout (see FIXME)
29 * - adjust testcases to use new 'try connect' style (should be easy, breaks API compatibility!) 27 * - adjust testcases to use new 'try connect' style (should be easy, breaks API compatibility!)
30 * - adjust core service to use new 'try connect' style (should be MUCH nicer there as well!) 28 * - adjust core service to use new 'try connect' style (should be MUCH nicer there as well!)
31 * - test test test 29 * - test test test
@@ -69,7 +67,7 @@ struct GNUNET_TRANSPORT_TransmitHandle
69 /** 67 /**
70 * Neighbour for this handle, NULL for control messages. 68 * Neighbour for this handle, NULL for control messages.
71 */ 69 */
72 struct NeighbourList *neighbour; 70 struct Neighbour *neighbour;
73 71
74 /** 72 /**
75 * Function to call when notify_size bytes are available 73 * Function to call when notify_size bytes are available
@@ -88,6 +86,12 @@ struct GNUNET_TRANSPORT_TransmitHandle
88 struct GNUNET_TIME_Absolute timeout; 86 struct GNUNET_TIME_Absolute timeout;
89 87
90 /** 88 /**
89 * Task to trigger request timeout if the request is stalled due to
90 * congestion.
91 */
92 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
93
94 /**
91 * How many bytes is our notify callback waiting for? 95 * How many bytes is our notify callback waiting for?
92 */ 96 */
93 size_t notify_size; 97 size_t notify_size;
@@ -399,7 +403,7 @@ demultiplexer (void *cls,
399 const struct SendOkMessage *okm; 403 const struct SendOkMessage *okm;
400 struct HelloWaitList *hwl; 404 struct HelloWaitList *hwl;
401 struct HelloWaitList *next_hwl; 405 struct HelloWaitList *next_hwl;
402 struct NeighbourList *n; 406 struct Neighbour *n;
403 struct GNUNET_PeerIdentity me; 407 struct GNUNET_PeerIdentity me;
404 uint16_t size; 408 uint16_t size;
405 uint32_t ats_count; 409 uint32_t ats_count;
@@ -525,6 +529,9 @@ demultiplexer (void *cls,
525 if ( (n->th != NULL) && 529 if ( (n->th != NULL) &&
526 (n->hn == NULL) ) 530 (n->hn == NULL) )
527 { 531 {
532 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->th->timeout_task);
533 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
534 n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
528 /* we've been waiting for this (congestion, not quota, 535 /* we've been waiting for this (congestion, not quota,
529 caused delayed transmission) */ 536 caused delayed transmission) */
530 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, 537 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
@@ -581,6 +588,29 @@ demultiplexer (void *cls,
581 588
582 589
583/** 590/**
591 * A transmission request could not be satisfied because of
592 * network congestion. Notify the initiator and clean up.
593 *
594 * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle'
595 * @param tc scheduler context
596 */
597static void
598timeout_request_due_to_congestion (void *cls,
599 const struct GNUNET_SCHEDULER_TaskContext *tc)
600{
601 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
602 struct Neighbour *n = th->neighbour;
603
604 n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
605 GNUNET_assert (th == n->th);
606 GNUNET_assert (NULL == n->hn);
607 n->th = NULL;
608 th->notify (th->notify_cls, 0, NULL);
609 GNUNET_free (th);
610}
611
612
613/**
584 * Transmit message(s) to service. 614 * Transmit message(s) to service.
585 * 615 *
586 * @param cls handle to transport 616 * @param cls handle to transport
@@ -592,11 +622,13 @@ static size_t
592transport_notify_ready (void *cls, size_t size, void *buf) 622transport_notify_ready (void *cls, size_t size, void *buf)
593{ 623{
594 struct GNUNET_TRANSPORT_Handle *h = cls; 624 struct GNUNET_TRANSPORT_Handle *h = cls;
595 size_t ssize;
596 struct GNUNET_TRANSPORT_TransmitHandle *th; 625 struct GNUNET_TRANSPORT_TransmitHandle *th;
597 struct Neighbour *n; 626 struct Neighbour *n;
598 char *cbuf; 627 char *cbuf;
628 struct OutboundMessage obm;
599 size_t ret; 629 size_t ret;
630 size_t nret;
631 size_t mret;
600 632
601 GNUNET_assert (NULL != h->client); 633 GNUNET_assert (NULL != h->client);
602 h->cth = NULL; 634 h->cth = NULL;
@@ -637,8 +669,10 @@ transport_notify_ready (void *cls, size_t size, void *buf)
637 /* peer not ready, wait for notification! */ 669 /* peer not ready, wait for notification! */
638 GNUNET_CONTAINER_heap_remove_node (n->hn); 670 GNUNET_CONTAINER_heap_remove_node (n->hn);
639 n->hn = NULL; 671 n->hn = NULL;
640 /* FIXME: hitting transport-level congestion, add 672 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->th->timeout_task);
641 a timeout task for 'th' in this case! */ 673 n->th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (n->th->timeout),
674 &timeout_request_due_to_congestion,
675 n->th);
642 continue; 676 continue;
643 } 677 }
644 th = n->th; 678 th = n->th;
@@ -848,7 +882,6 @@ reconnect (void *cls,
848 const struct GNUNET_SCHEDULER_TaskContext *tc) 882 const struct GNUNET_SCHEDULER_TaskContext *tc)
849{ 883{
850 struct GNUNET_TRANSPORT_Handle *h = cls; 884 struct GNUNET_TRANSPORT_Handle *h = cls;
851 struct ControlMessage *pos;
852 885
853 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 886 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
854 if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 887 if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
@@ -882,33 +915,35 @@ reconnect (void *cls,
882static void 915static void
883disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) 916disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
884{ 917{
918 struct GNUNET_TRANSPORT_TransmitHandle *th;
919
885 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 920 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
886 /* Forget about all neighbours that we used to be connected to */ 921 /* Forget about all neighbours that we used to be connected to */
887 GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 922 GNUNET_CONTAINER_multihashmap_iterate(h->neighbours,
888 &neighbour_delete, 923 &neighbour_delete,
889 NULL); 924 NULL);
890 if (NULL != handle->cth) 925 if (NULL != h->cth)
891 { 926 {
892 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); 927 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
893 handle->cth = NULL; 928 h->cth = NULL;
894 } 929 }
895 if (NULL != handle->client) 930 if (NULL != h->client)
896 { 931 {
897 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); 932 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
898 handle->client = NULL; 933 h->client = NULL;
899 } 934 }
900 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) 935 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
901 { 936 {
902 GNUNET_SCHEDULER_cancel (h->quota_task); 937 GNUNET_SCHEDULER_cancel (h->quota_task);
903 h->quota_task = GNUNET_SCHEDULER_NO_TASK; 938 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
904 } 939 }
905 while ( (NULL != (th = handle->control_head))) 940 while ( (NULL != (th = h->control_head)))
906 { 941 {
907 GNUNET_CONTAINER_DLL_remove (handle->control_head, 942 GNUNET_CONTAINER_DLL_remove (h->control_head,
908 handle->control_tail, 943 h->control_tail,
909 cm); 944 th);
910 cm->notify (cm->notify_cls, 0, NULL); 945 th->notify (th->notify_cls, 0, NULL);
911 GNUNET_free (cm); 946 GNUNET_free (th);
912 } 947 }
913#if DEBUG_TRANSPORT 948#if DEBUG_TRANSPORT
914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1037,7 +1072,7 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
1037 1072
1038 1073
1039/** 1074/**
1040 * Send TRY_CONNECT message to the service. 1075 * Send REQUEST_CONNECT message to the service.
1041 * 1076 *
1042 * @param cls the 'struct GNUNET_PeerIdentity' 1077 * @param cls the 'struct GNUNET_PeerIdentity'
1043 * @param size number of bytes available in buf 1078 * @param size number of bytes available in buf
@@ -1048,7 +1083,7 @@ static size_t
1048send_try_connect (void *cls, size_t size, void *buf) 1083send_try_connect (void *cls, size_t size, void *buf)
1049{ 1084{
1050 struct GNUNET_PeerIdentity *pid = cls; 1085 struct GNUNET_PeerIdentity *pid = cls;
1051 struct TryConnectMessage msg; 1086 struct TransportRequestConnectMessage msg;
1052 1087
1053 if (buf == NULL) 1088 if (buf == NULL)
1054 { 1089 {
@@ -1058,17 +1093,17 @@ send_try_connect (void *cls, size_t size, void *buf)
1058#if DEBUG_TRANSPORT 1093#if DEBUG_TRANSPORT
1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060 "Transmitting `%s' request with respect to `%4s'.\n", 1095 "Transmitting `%s' request with respect to `%4s'.\n",
1061 "TRY_CONNECT", 1096 "REQUEST_CONNECT",
1062 GNUNET_i2s (&sqc->target)); 1097 GNUNET_i2s (&sqc->target));
1063#endif 1098#endif
1064 GNUNET_assert (size >= sizeof (struct TryConnectMessage)); 1099 GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1065 msg.header.size = htons (sizeof (struct TryConnectMessage)); 1100 msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
1066 msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); 1101 msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
1067 msg.reserved = htonl (0); 1102 msg.reserved = htonl (0);
1068 msg.peer = *pid; 1103 msg.peer = *pid;
1069 memcpy (buf, &msg, sizeof (msg)); 1104 memcpy (buf, &msg, sizeof (msg));
1070 GNUNET_free (pid); 1105 GNUNET_free (pid);
1071 return sizeof (struct TryConnectMessage); 1106 return sizeof (struct TransportRequestConnectMessage);
1072} 1107}
1073 1108
1074 1109
@@ -1088,7 +1123,7 @@ GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
1088 pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); 1123 pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
1089 *pid = *target; 1124 *pid = *target;
1090 schedule_control_transmit (handle, 1125 schedule_control_transmit (handle,
1091 sizeof (struct TryConnectMessage), 1126 sizeof (struct TransportRequestConnectMessage),
1092 &send_try_connect, pid); 1127 &send_try_connect, pid);
1093} 1128}
1094 1129
@@ -1272,7 +1307,7 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1272 ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1307 ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1273 ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE); 1308 ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE);
1274 ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1309 ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1275 ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); 1310 ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
1276 return ret; 1311 return ret;
1277} 1312}
1278 1313
@@ -1285,8 +1320,6 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1285void 1320void
1286GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) 1321GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1287{ 1322{
1288 struct GNUNET_TRANSPORT_TransmitHandle *cm;
1289
1290#if DEBUG_TRANSPORT 1323#if DEBUG_TRANSPORT
1291 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292 "Transport disconnect called!\n"); 1325 "Transport disconnect called!\n");
@@ -1370,13 +1403,13 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1370 th->priority = priority; 1403 th->priority = priority;
1371 n->th = th; 1404 n->th = th;
1372 /* calculate when our transmission should be ready */ 1405 /* calculate when our transmission should be ready */
1373 delay = GNUNET_BANDWIDTH_tracker_get_delay (n->out_tracker, size); 1406 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
1374 if (delay.rel_value > timeout.rel_value) 1407 if (delay.rel_value > timeout.rel_value)
1375 delay.rel_value = 0; /* notify immediately (with failure) */ 1408 delay.rel_value = 0; /* notify immediately (with failure) */
1376 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, 1409 n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1377 n, 1410 n,
1378 delay.rel_value); 1411 delay.rel_value);
1379 schedule_transmission (h); 1412 schedule_transmission (handle);
1380 return th; 1413 return th;
1381} 1414}
1382 1415
@@ -1401,6 +1434,12 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitH
1401 GNUNET_CONTAINER_heap_remove_node (n->hn); 1434 GNUNET_CONTAINER_heap_remove_node (n->hn);
1402 n->hn = NULL; 1435 n->hn = NULL;
1403 } 1436 }
1437 else
1438 {
1439 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != th->timeout_task);
1440 GNUNET_SCHEDULER_cancel (th->timeout_task);
1441 th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1442 }
1404 GNUNET_free (th); 1443 GNUNET_free (th);
1405} 1444}
1406 1445