aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-02-16 16:26:48 +0000
committerChristian Grothoff <christian@grothoff.org>2010-02-16 16:26:48 +0000
commit67342da57d1baeea6a486b9bf9feac670d53c8d2 (patch)
tree3206ba2310922e4e9ff6e51affbad3ae99bc5fb9
parent17c88acd603c4f8ee1805c0db851cc9ce112f75f (diff)
downloadgnunet-67342da57d1baeea6a486b9bf9feac670d53c8d2.tar.gz
gnunet-67342da57d1baeea6a486b9bf9feac670d53c8d2.zip
making things nice by breaking tons
-rw-r--r--src/core/gnunet-service-core.c2
-rw-r--r--src/include/gnunet_protocols.h24
-rw-r--r--src/include/gnunet_transport_service.h4
-rw-r--r--src/transport/gnunet-service-transport.c244
-rw-r--r--src/transport/test_transport_api.c64
-rw-r--r--src/transport/test_transport_api_tcp_peer2.conf2
-rw-r--r--src/transport/transport.h42
-rw-r--r--src/transport/transport_api.c1725
8 files changed, 867 insertions, 1240 deletions
diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c
index 47f6b41c2..08c1c5455 100644
--- a/src/core/gnunet-service-core.c
+++ b/src/core/gnunet-service-core.c
@@ -1096,7 +1096,7 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
1096 n); 1096 n);
1097 if (n->th == NULL) 1097 if (n->th == NULL)
1098 { 1098 {
1099 /* message request too large (oops) */ 1099 /* message request too large or duplicate request */
1100 GNUNET_break (0); 1100 GNUNET_break (0);
1101 /* discard encrypted message */ 1101 /* discard encrypted message */
1102 GNUNET_assert (NULL != (m = n->encrypted_head)); 1102 GNUNET_assert (NULL != (m = n->encrypted_head));
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 249ba3de0..e43143168 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -164,48 +164,36 @@ extern "C"
164#define GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA 26 164#define GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA 26
165 165
166/** 166/**
167 * Message telling transport to try to connect to the
168 * given peer.
169 */
170#define GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT 27
171
172/**
173 * Response to another peer confirming that communication was
174 * established.
175 */
176#define GNUNET_MESSAGE_TYPE_TRANSPORT_ACK 28
177
178/**
179 * Request to look addresses of peers in server. 167 * Request to look addresses of peers in server.
180 */ 168 */
181#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 29 169#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 27
182 170
183/** 171/**
184 * Response to the address lookup request. 172 * Response to the address lookup request.
185 */ 173 */
186#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 30 174#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 28
187 175
188/** 176/**
189 * Change in blacklisting status of a peer. 177 * Change in blacklisting status of a peer.
190 */ 178 */
191#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 31 179#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 29
192 180
193/** 181/**
194 * Request to transport to notify us about any blacklisting status 182 * Request to transport to notify us about any blacklisting status
195 * changes on this connection (and to immediately send all 183 * changes on this connection (and to immediately send all
196 * active blacklist entries). 184 * active blacklist entries).
197 */ 185 */
198#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 32 186#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 30
199 187
200/** 188/**
201 * Transport PING message 189 * Transport PING message
202 */ 190 */
203#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 33 191#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 32
204 192
205/** 193/**
206 * Transport PONG message 194 * Transport PONG message
207 */ 195 */
208#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 34 196#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 33
209 197
210/** 198/**
211 * Request addition of a HELLO 199 * Request addition of a HELLO
diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h
index 650dd2e68..7a3ccbf7b 100644
--- a/src/include/gnunet_transport_service.h
+++ b/src/include/gnunet_transport_service.h
@@ -22,10 +22,6 @@
22 * @file include/gnunet_transport_service.h 22 * @file include/gnunet_transport_service.h
23 * @brief low-level P2P IO 23 * @brief low-level P2P IO
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - define API for blacklisting, un-blacklisting and notifications
28 * about blacklisted peers
29 */ 25 */
30 26
31#ifndef GNUNET_TRANSPORT_SERVICE_H 27#ifndef GNUNET_TRANSPORT_SERVICE_H
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index e2616f078..854eef773 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -197,6 +197,12 @@ struct ForeignAddressList
197 */ 197 */
198 unsigned int connect_attempts; 198 unsigned int connect_attempts;
199 199
200 /**
201 * DV distance to this peer (1 if no DV is used).
202 * FIXME: need to set this from transport plugins!
203 */
204 uint32_t distance;
205
200}; 206};
201 207
202 208
@@ -446,6 +452,11 @@ struct NeighborList
446 struct GNUNET_TIME_Relative latency; 452 struct GNUNET_TIME_Relative latency;
447 453
448 /** 454 /**
455 * DV distance to this peer (1 if no DV is used).
456 */
457 uint32_t distance;
458
459 /**
449 * How many bytes have we received since the "last_quota_update" 460 * How many bytes have we received since the "last_quota_update"
450 * timestamp? 461 * timestamp?
451 */ 462 */
@@ -465,9 +476,8 @@ struct NeighborList
465 unsigned int quota_violation_count; 476 unsigned int quota_violation_count;
466 477
467 /** 478 /**
468 * Have we seen an ACK from this neighbor in the past? 479 * Have we seen an PONG from this neighbor in the past (and
469 * (used to make up a fake ACK for clients connecting after 480 * not had a disconnect since)?
470 * the neighbor connected to us).
471 */ 481 */
472 int received_pong; 482 int received_pong;
473 483
@@ -771,7 +781,6 @@ static struct CheckHelloValidatedContext *chvc_tail;
771static struct GNUNET_CONTAINER_MultiHashMap *validation_map; 781static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
772 782
773 783
774
775/** 784/**
776 * The peer specified by the given neighbor has timed-out or a plugin 785 * The peer specified by the given neighbor has timed-out or a plugin
777 * has disconnected. We may either need to do nothing (other plugins 786 * has disconnected. We may either need to do nothing (other plugins
@@ -1000,6 +1009,30 @@ transmit_to_client (struct TransportClient *client,
1000 1009
1001 1010
1002/** 1011/**
1012 * Transmit a 'SEND_OK' notification to the given client for the
1013 * given neighbor.
1014 *
1015 * @param client who to notify
1016 * @param n neighbor to notify about
1017 * @param result status code for the transmission request
1018 */
1019static void
1020transmit_send_ok (struct TransportClient *client,
1021 struct NeighborList *n,
1022 int result)
1023{
1024 struct SendOkMessage send_ok_msg;
1025
1026 send_ok_msg.header.size = htons (sizeof (send_ok_msg));
1027 send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1028 send_ok_msg.success = htonl (result);
1029 send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
1030 send_ok_msg.peer = n->id;
1031 transmit_to_client (client, &send_ok_msg.header, GNUNET_NO);
1032}
1033
1034
1035/**
1003 * Function called by the GNUNET_TRANSPORT_TransmitFunction 1036 * Function called by the GNUNET_TRANSPORT_TransmitFunction
1004 * upon "completion" of a send request. This tells the API 1037 * upon "completion" of a send request. This tells the API
1005 * that it is now legal to send another message to the given 1038 * that it is now legal to send another message to the given
@@ -1019,60 +1052,32 @@ transmit_send_continuation (void *cls,
1019 int result) 1052 int result)
1020{ 1053{
1021 struct MessageQueue *mq = cls; 1054 struct MessageQueue *mq = cls;
1022 /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
1023 struct SendOkMessage send_ok_msg;
1024 struct NeighborList *n; 1055 struct NeighborList *n;
1025 1056
1026 GNUNET_assert (mq != NULL);
1027 n = find_neighbor(&mq->neighbor_id); 1057 n = find_neighbor(&mq->neighbor_id);
1028 if (n == NULL) /* Neighbor must have been removed asynchronously! */ 1058 GNUNET_assert (n != NULL);
1029 return; 1059 if (mq->specific_address != NULL)
1030
1031 /* Otherwise, let's make sure we've got the right peer */
1032 GNUNET_assert (0 ==
1033 memcmp (&n->id, target,
1034 sizeof (struct GNUNET_PeerIdentity)));
1035
1036 if (result == GNUNET_OK)
1037 { 1060 {
1038 if (mq->specific_address != NULL) 1061 if (result == GNUNET_OK)
1039 { 1062 {
1040 mq->specific_address->timeout = 1063 mq->specific_address->timeout =
1041 GNUNET_TIME_relative_to_absolute 1064 GNUNET_TIME_relative_to_absolute
1042 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 1065 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1043 mq->specific_address->connected = GNUNET_YES; 1066 mq->specific_address->connected = GNUNET_YES;
1044 } 1067 }
1045 } 1068 else
1046 else 1069 {
1047 { 1070 mq->specific_address->connected = GNUNET_NO;
1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1071 }
1049 "Transmission to peer `%s' failed, marking connection as down.\n", 1072 if (! mq->internal_msg)
1050 GNUNET_i2s (target)); 1073 mq->specific_address->in_transmit = GNUNET_NO;
1051 if (mq->specific_address != NULL)
1052 mq->specific_address->connected = GNUNET_NO;
1053 } 1074 }
1054 if ( (! mq->internal_msg) &&
1055 (mq->specific_address != NULL) )
1056 mq->specific_address->in_transmit = GNUNET_NO;
1057
1058 if (mq->client != NULL) 1075 if (mq->client != NULL)
1059 { 1076 transmit_send_ok (mq->client, n, result);
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061 "Notifying client %p about transmission to peer `%4s'.\n",
1062 mq->client, GNUNET_i2s (target));
1063 send_ok_msg.header.size = htons (sizeof (send_ok_msg));
1064 send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1065 send_ok_msg.success = htonl (result);
1066 send_ok_msg.peer = n->id;
1067 transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
1068 }
1069 GNUNET_free (mq); 1077 GNUNET_free (mq);
1070 /* one plugin just became ready again, try transmitting 1078 try_transmission_to_peer (n);
1071 another message (if available) */ 1079 if (result != GNUNET_OK)
1072 if (result == GNUNET_OK) 1080 disconnect_neighbor (n, GNUNET_YES);
1073 try_transmission_to_peer (n);
1074 else
1075 disconnect_neighbor (n, GNUNET_YES);
1076} 1081}
1077 1082
1078 1083
@@ -1159,8 +1164,12 @@ try_transmission_to_peer (struct NeighborList *neighbor)
1159 min_latency = GNUNET_TIME_UNIT_FOREVER_REL; 1164 min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
1160 rl = NULL; 1165 rl = NULL;
1161 mq = neighbor->messages_head; 1166 mq = neighbor->messages_head;
1167 /* FIXME: support bi-directional use of TCP */
1162 if (mq->specific_address == NULL) 1168 if (mq->specific_address == NULL)
1163 mq->specific_address = find_ready_address(neighbor); 1169 mq->specific_address = find_ready_address(neighbor);
1170 GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
1171 neighbor->messages_tail,
1172 mq);
1164 if (mq->specific_address == NULL) 1173 if (mq->specific_address == NULL)
1165 { 1174 {
1166#if DEBUG_TRANSPORT 1175#if DEBUG_TRANSPORT
@@ -1169,14 +1178,14 @@ try_transmission_to_peer (struct NeighborList *neighbor)
1169 mq->message_buf_size, 1178 mq->message_buf_size,
1170 GNUNET_i2s (&mq->neighbor_id)); 1179 GNUNET_i2s (&mq->neighbor_id));
1171#endif 1180#endif
1181 if (mq->client != NULL)
1182 transmit_send_ok (mq->client, neighbor, GNUNET_NO);
1183 GNUNET_free (mq);
1172 return; /* nobody ready */ 1184 return; /* nobody ready */
1173 } 1185 }
1174 if (mq->specific_address->connected == GNUNET_NO) 1186 if (mq->specific_address->connected == GNUNET_NO)
1175 mq->specific_address->connect_attempts++; 1187 mq->specific_address->connect_attempts++;
1176 rl = mq->specific_address->ready_list; 1188 rl = mq->specific_address->ready_list;
1177 GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
1178 neighbor->messages_tail,
1179 mq);
1180 mq->plugin = rl->plugin; 1189 mq->plugin = rl->plugin;
1181 if (!mq->internal_msg) 1190 if (!mq->internal_msg)
1182 mq->specific_address->in_transmit = GNUNET_YES; 1191 mq->specific_address->in_transmit = GNUNET_YES;
@@ -1251,7 +1260,6 @@ transmit_to_peer (struct TransportClient *client,
1251 memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity)); 1260 memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
1252 mq->internal_msg = is_internal; 1261 mq->internal_msg = is_internal;
1253 mq->priority = priority; 1262 mq->priority = priority;
1254
1255 if (is_internal) 1263 if (is_internal)
1256 GNUNET_CONTAINER_DLL_insert (neighbor->messages_head, 1264 GNUNET_CONTAINER_DLL_insert (neighbor->messages_head,
1257 neighbor->messages_tail, 1265 neighbor->messages_tail,
@@ -1495,14 +1503,20 @@ plugin_env_notify_address (void *cls,
1495 */ 1503 */
1496static void 1504static void
1497notify_clients_connect (const struct GNUNET_PeerIdentity *peer, 1505notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1498 struct GNUNET_TIME_Relative latency) 1506 struct GNUNET_TIME_Relative latency,
1507 uint32_t distance)
1499{ 1508{
1500 struct ConnectInfoMessage cim; 1509 struct ConnectInfoMessage cim;
1501 struct TransportClient *cpos; 1510 struct TransportClient *cpos;
1502 1511
1512#if DEBUG_TRANSPORT
1513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1514 "Notifying clients about connection from `%s'\n",
1515 GNUNET_i2s (peer));
1516#endif
1503 cim.header.size = htons (sizeof (struct ConnectInfoMessage)); 1517 cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1504 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 1518 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1505 cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); 1519 cim.distance = htonl (distance);
1506 cim.latency = GNUNET_TIME_relative_hton (latency); 1520 cim.latency = GNUNET_TIME_relative_hton (latency);
1507 memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); 1521 memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1508 cpos = clients; 1522 cpos = clients;
@@ -1523,6 +1537,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1523 struct DisconnectInfoMessage dim; 1537 struct DisconnectInfoMessage dim;
1524 struct TransportClient *cpos; 1538 struct TransportClient *cpos;
1525 1539
1540#if DEBUG_TRANSPORT
1541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542 "Notifying clients about lost connection to `%s'\n",
1543 GNUNET_i2s (peer));
1544#endif
1526 dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); 1545 dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1527 dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); 1546 dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1528 dim.reserved = htonl (0); 1547 dim.reserved = htonl (0);
@@ -1612,6 +1631,7 @@ add_peer_address(struct NeighborList *neighbor,
1612 ret->expires = GNUNET_TIME_relative_to_absolute 1631 ret->expires = GNUNET_TIME_relative_to_absolute
1613 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 1632 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1614 ret->latency = GNUNET_TIME_relative_get_forever(); 1633 ret->latency = GNUNET_TIME_relative_get_forever();
1634 ret->distance = -1;
1615 ret->timeout = GNUNET_TIME_relative_to_absolute 1635 ret->timeout = GNUNET_TIME_relative_to_absolute
1616 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 1636 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1617 ret->ready_list = head; 1637 ret->ready_list = head;
@@ -1731,6 +1751,12 @@ check_pending_validation (void *cls,
1731 n->latency = fal->latency; 1751 n->latency = fal->latency;
1732 else 1752 else
1733 n->latency.value = (fal->latency.value + n->latency.value) / 2; 1753 n->latency.value = (fal->latency.value + n->latency.value) / 2;
1754 n->distance = fal->distance;
1755 if (GNUNET_NO == n->received_pong)
1756 {
1757 notify_clients_connect (&target, n->latency, n->distance);
1758 n->received_pong = GNUNET_YES;
1759 }
1734 } 1760 }
1735 1761
1736 /* clean up validation entry */ 1762 /* clean up validation entry */
@@ -1795,6 +1821,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
1795#endif 1821#endif
1796 return; 1822 return;
1797 } 1823 }
1824
1798#if 0 1825#if 0
1799 /* FIXME: add given address to potential pool of our addresses 1826 /* FIXME: add given address to potential pool of our addresses
1800 (for voting) */ 1827 (for voting) */
@@ -1809,7 +1836,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
1809 1836
1810static void 1837static void
1811neighbor_timeout_task (void *cls, 1838neighbor_timeout_task (void *cls,
1812 const struct GNUNET_SCHEDULER_TaskContext *tc) 1839 const struct GNUNET_SCHEDULER_TaskContext *tc)
1813{ 1840{
1814 struct NeighborList *n = cls; 1841 struct NeighborList *n = cls;
1815 1842
@@ -1861,13 +1888,13 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
1861 tp = tp->next; 1888 tp = tp->next;
1862 } 1889 }
1863 n->latency = GNUNET_TIME_UNIT_FOREVER_REL; 1890 n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
1891 n->distance = -1;
1864 n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, 1892 n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
1865 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 1893 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1866 &neighbor_timeout_task, n); 1894 &neighbor_timeout_task, n);
1867 transmit_to_peer (NULL, NULL, 0, 1895 transmit_to_peer (NULL, NULL, 0,
1868 (const char *) our_hello, GNUNET_HELLO_size(our_hello), 1896 (const char *) our_hello, GNUNET_HELLO_size(our_hello),
1869 GNUNET_NO, n); 1897 GNUNET_NO, n);
1870 notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
1871 return n; 1898 return n;
1872} 1899}
1873 1900
@@ -2266,28 +2293,15 @@ process_hello (struct TransportPlugin *plugin,
2266 * disconnect? 2293 * disconnect?
2267 */ 2294 */
2268static void 2295static void
2269disconnect_neighbor (struct NeighborList *current_handle, int check) 2296disconnect_neighbor (struct NeighborList *n, int check)
2270{ 2297{
2271 struct ReadyList *rpos; 2298 struct ReadyList *rpos;
2272 struct NeighborList *npos; 2299 struct NeighborList *npos;
2273 struct NeighborList *nprev; 2300 struct NeighborList *nprev;
2274 struct NeighborList *n;
2275 struct MessageQueue *mq; 2301 struct MessageQueue *mq;
2276 struct ForeignAddressList *peer_addresses; 2302 struct ForeignAddressList *peer_addresses;
2277 struct ForeignAddressList *peer_pos; 2303 struct ForeignAddressList *peer_pos;
2278 2304
2279 if (neighbors == NULL)
2280 return; /* We don't have any neighbors, so client has an already removed handle! */
2281
2282 npos = neighbors;
2283 while ((npos != NULL) && (current_handle != npos))
2284 npos = npos->next;
2285
2286 if (npos == NULL)
2287 return; /* Couldn't find neighbor in existing list, must have been already removed! */
2288 else
2289 n = npos;
2290
2291 if (GNUNET_YES == check) 2305 if (GNUNET_YES == check)
2292 { 2306 {
2293 rpos = n->plugins; 2307 rpos = n->plugins;
@@ -2303,10 +2317,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
2303 rpos = rpos->next; 2317 rpos = rpos->next;
2304 } 2318 }
2305 } 2319 }
2306
2307#if DEBUG_TRANSPORT 2320#if DEBUG_TRANSPORT
2308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 2321 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2309 "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id)); 2322 "Disconnecting from `%4s'\n",
2323 GNUNET_i2s (&n->id));
2310#endif 2324#endif
2311 /* remove n from neighbors list */ 2325 /* remove n from neighbors list */
2312 nprev = NULL; 2326 nprev = NULL;
@@ -2323,7 +2337,8 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
2323 nprev->next = n->next; 2337 nprev->next = n->next;
2324 2338
2325 /* notify all clients about disconnect */ 2339 /* notify all clients about disconnect */
2326 notify_clients_disconnect (&n->id); 2340 if (GNUNET_YES == n->received_pong)
2341 notify_clients_disconnect (&n->id);
2327 2342
2328 /* clean up all plugins, cancel connections and pending transmissions */ 2343 /* clean up all plugins, cancel connections and pending transmissions */
2329 while (NULL != (rpos = n->plugins)) 2344 while (NULL != (rpos = n->plugins))
@@ -2352,7 +2367,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
2352 GNUNET_free (mq); 2367 GNUNET_free (mq);
2353 } 2368 }
2354 if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) 2369 if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
2355 GNUNET_SCHEDULER_cancel (sched, n->timeout_task); 2370 {
2371 GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2372 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2373 }
2356 /* finally, free n itself */ 2374 /* finally, free n itself */
2357 GNUNET_free (n); 2375 GNUNET_free (n);
2358} 2376}
@@ -2504,9 +2522,10 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
2504 peer_address = add_peer_address(n, 2522 peer_address = add_peer_address(n,
2505 plugin->short_name, 2523 plugin->short_name,
2506 sender_address, 2524 sender_address,
2507 sender_address_len); 2525 sender_address_len);
2508 if (peer_address != NULL) 2526 if (peer_address != NULL)
2509 { 2527 {
2528 peer_address->distance = distance;
2510 if (peer_address->connected == GNUNET_NO) 2529 if (peer_address->connected == GNUNET_NO)
2511 { 2530 {
2512 peer_address->connected = GNUNET_YES; 2531 peer_address->connected = GNUNET_YES;
@@ -2520,10 +2539,12 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
2520 /* update traffic received amount ... */ 2539 /* update traffic received amount ... */
2521 msize = ntohs (message->size); 2540 msize = ntohs (message->size);
2522 n->last_received += msize; 2541 n->last_received += msize;
2523 GNUNET_SCHEDULER_cancel (sched, n->timeout_task); 2542 n->distance = distance;
2524 n->peer_timeout = 2543 n->peer_timeout =
2525 GNUNET_TIME_relative_to_absolute 2544 GNUNET_TIME_relative_to_absolute
2526 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 2545 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2546 GNUNET_SCHEDULER_cancel (sched,
2547 n->timeout_task);
2527 n->timeout_task = 2548 n->timeout_task =
2528 GNUNET_SCHEDULER_add_delayed (sched, 2549 GNUNET_SCHEDULER_add_delayed (sched,
2529 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2550 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -2551,6 +2572,11 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
2551 handle_pong(plugin, message, peer, sender_address, sender_address_len); 2572 handle_pong(plugin, message, peer, sender_address, sender_address_len);
2552 break; 2573 break;
2553 default: 2574 default:
2575 if (! n->received_pong)
2576 {
2577 GNUNET_break_op (0);
2578 break;
2579 }
2554#if DEBUG_TRANSPORT 2580#if DEBUG_TRANSPORT
2555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2556 "Received message of type %u from `%4s', sending to all clients.\n", 2582 "Received message of type %u from `%4s', sending to all clients.\n",
@@ -2591,8 +2617,6 @@ handle_start (void *cls,
2591 struct TransportClient *c; 2617 struct TransportClient *c;
2592 struct ConnectInfoMessage cim; 2618 struct ConnectInfoMessage cim;
2593 struct NeighborList *n; 2619 struct NeighborList *n;
2594 struct InboundMessage *im;
2595 struct GNUNET_MessageHeader *ack;
2596 2620
2597#if DEBUG_TRANSPORT 2621#if DEBUG_TRANSPORT
2598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2626,34 +2650,18 @@ handle_start (void *cls,
2626 /* tell new client about all existing connections */ 2650 /* tell new client about all existing connections */
2627 cim.header.size = htons (sizeof (struct ConnectInfoMessage)); 2651 cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2628 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 2652 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2629 cim.quota_out = 2653 n = neighbors;
2630 htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); 2654 while (n != NULL)
2631 /* FIXME: this ACK stuff is not nice! */ 2655 {
2632 im = GNUNET_malloc (sizeof (struct InboundMessage) + 2656 if (GNUNET_YES == n->received_pong)
2633 sizeof (struct GNUNET_MessageHeader)); 2657 {
2634 im->header.size = htons (sizeof (struct InboundMessage) + 2658 cim.id = n->id;
2635 sizeof (struct GNUNET_MessageHeader)); 2659 cim.latency = GNUNET_TIME_relative_hton (n->latency);
2636 im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); 2660 cim.distance = htonl (n->distance);
2637 im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); 2661 transmit_to_client (c, &cim.header, GNUNET_NO);
2638 ack = (struct GNUNET_MessageHeader *) &im[1];
2639 ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2640 ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2641 for (n = neighbors; n != NULL; n = n->next)
2642 {
2643 cim.id = n->id;
2644 cim.latency = GNUNET_TIME_relative_hton (n->latency);
2645 transmit_to_client (c, &cim.header, GNUNET_NO);
2646 if (n->received_pong)
2647 {
2648 im->peer = n->id;
2649 transmit_to_client (c, &im->header, GNUNET_NO);
2650 } 2662 }
2663 n = n->next;
2651 } 2664 }
2652 GNUNET_free (im);
2653 }
2654 else
2655 {
2656 fprintf(stderr, "Our hello is NULL!\n");
2657 } 2665 }
2658 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2666 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2659} 2667}
@@ -2725,7 +2733,7 @@ handle_send (void *cls,
2725 } 2733 }
2726 n = find_neighbor (&obm->peer); 2734 n = find_neighbor (&obm->peer);
2727 if (n == NULL) 2735 if (n == NULL)
2728 n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */ 2736 n = setup_new_neighbor (&obm->peer);
2729 tc = clients; 2737 tc = clients;
2730 while ((tc != NULL) && (tc->client != client)) 2738 while ((tc != NULL) && (tc->client != client))
2731 tc = tc->next; 2739 tc = tc->next;
@@ -2789,33 +2797,6 @@ handle_set_quota (void *cls,
2789} 2797}
2790 2798
2791 2799
2792/**
2793 * Handle TRY_CONNECT-message.
2794 *
2795 * @param cls closure (always NULL)
2796 * @param client identification of the client
2797 * @param message the actual message
2798 */
2799static void
2800handle_try_connect (void *cls,
2801 struct GNUNET_SERVER_Client *client,
2802 const struct GNUNET_MessageHeader *message)
2803{
2804 const struct TryConnectMessage *tcm;
2805 struct NeighborList *neighbor;
2806 tcm = (const struct TryConnectMessage *) message;
2807#if DEBUG_TRANSPORT
2808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2809 "Received `%s' request from client %p asking to connect to `%4s'\n",
2810 "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
2811#endif
2812 neighbor = find_neighbor(&tcm->peer);
2813 if (neighbor == NULL)
2814 setup_new_neighbor (&tcm->peer);
2815 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2816}
2817
2818
2819static void 2800static void
2820transmit_address_to_client (void *cls, const char *address) 2801transmit_address_to_client (void *cls, const char *address)
2821{ 2802{
@@ -2909,9 +2890,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
2909 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, 2890 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2910 {&handle_set_quota, NULL, 2891 {&handle_set_quota, NULL,
2911 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, 2892 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2912 {&handle_try_connect, NULL,
2913 GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2914 sizeof (struct TryConnectMessage)},
2915 {&handle_address_lookup, NULL, 2893 {&handle_address_lookup, NULL,
2916 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, 2894 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
2917 0}, 2895 0},
diff --git a/src/transport/test_transport_api.c b/src/transport/test_transport_api.c
index b7e34f810..a43f971a0 100644
--- a/src/transport/test_transport_api.c
+++ b/src/transport/test_transport_api.c
@@ -81,7 +81,7 @@ static void
81end () 81end ()
82{ 82{
83 /* do work here */ 83 /* do work here */
84 GNUNET_assert (ok == 8); 84 GNUNET_assert (ok == 6);
85 GNUNET_SCHEDULER_cancel (sched, die_task); 85 GNUNET_SCHEDULER_cancel (sched, die_task);
86 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from transports!\n"); 86 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from transports!\n");
87 GNUNET_TRANSPORT_disconnect (p1.th); 87 GNUNET_TRANSPORT_disconnect (p1.th);
@@ -132,7 +132,7 @@ notify_receive (void *cls,
132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from peer (%p)!\n", 132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from peer (%p)!\n",
133 ntohs(message->type), cls); 133 ntohs(message->type), cls);
134 134
135 GNUNET_assert (ok == 7); 135 GNUNET_assert (ok == 5);
136 OKPP; 136 OKPP;
137 137
138 GNUNET_assert (MTYPE == ntohs (message->type)); 138 GNUNET_assert (MTYPE == ntohs (message->type));
@@ -144,23 +144,51 @@ notify_receive (void *cls,
144} 144}
145 145
146 146
147static size_t
148notify_ready (void *cls, size_t size, void *buf)
149{
150 struct GNUNET_MessageHeader *hdr;
151
152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
153 "Transmitting message to peer (%p) - %u!\n", cls, size);
154 GNUNET_assert (size >= 256);
155 GNUNET_assert (ok == 4);
156 OKPP;
157 if (buf != NULL)
158 {
159 hdr = buf;
160 hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
161 hdr->type = htons (MTYPE);
162 }
163
164 return sizeof (struct GNUNET_MessageHeader);
165}
166
167
147static void 168static void
148notify_connect (void *cls, 169notify_connect (void *cls,
149 const struct GNUNET_PeerIdentity *peer, 170 const struct GNUNET_PeerIdentity *peer,
150 struct GNUNET_TIME_Relative latency, 171 struct GNUNET_TIME_Relative latency,
151 uint32_t distance) 172 uint32_t distance)
152{ 173{
174 if (cls == &p1)
175 {
176 GNUNET_TRANSPORT_notify_transmit_ready (p1.th,
177 &p2.id,
178 256, 0, TIMEOUT, &notify_ready,
179 &p1);
180 }
153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
154 "Peer `%4s' connected to us (%p)!\n", GNUNET_i2s (peer), cls); 182 "Peer `%4s' connected to us (%p)!\n", GNUNET_i2s (peer), cls);
155 GNUNET_assert ((ok >= 1) && (ok <= 6));
156 OKPP;
157} 183}
158 184
159 185
160static void 186static void
161notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) 187notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
162{ 188{
163 ok--; 189 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
190 "Peer `%4s' disconnected (%p)!\n",
191 GNUNET_i2s (peer), cls);
164} 192}
165 193
166 194
@@ -186,27 +214,6 @@ setup_peer (struct PeerContext *p, const char *cfgname)
186} 214}
187 215
188 216
189static size_t
190notify_ready (void *cls, size_t size, void *buf)
191{
192 struct GNUNET_MessageHeader *hdr;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
195 "Transmitting message to peer (%p) - %u!\n", cls, size);
196 GNUNET_assert (size >= 256);
197 GNUNET_assert ((ok >= 5) && (ok <= 6));
198 OKPP;
199 if (buf != NULL)
200 {
201 hdr = buf;
202 hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
203 hdr->type = htons (MTYPE);
204 }
205
206 return sizeof (struct GNUNET_MessageHeader);
207}
208
209
210static void 217static void
211exchange_hello_last (void *cls, 218exchange_hello_last (void *cls,
212 const struct GNUNET_MessageHeader *message) 219 const struct GNUNET_MessageHeader *message)
@@ -231,11 +238,6 @@ exchange_hello_last (void *cls,
231 /* both HELLOs exchanged, get ready to test transmission! */ 238 /* both HELLOs exchanged, get ready to test transmission! */
232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
233 "Finished exchanging HELLOs, now waiting for transmission!\n"); 240 "Finished exchanging HELLOs, now waiting for transmission!\n");
234
235 GNUNET_TRANSPORT_notify_transmit_ready (p1.th,
236 &p2.id,
237 256, 0, TIMEOUT, &notify_ready,
238 &p1);
239} 241}
240 242
241static void 243static void
diff --git a/src/transport/test_transport_api_tcp_peer2.conf b/src/transport/test_transport_api_tcp_peer2.conf
index 7f7e78574..870a4809e 100644
--- a/src/transport/test_transport_api_tcp_peer2.conf
+++ b/src/transport/test_transport_api_tcp_peer2.conf
@@ -49,7 +49,7 @@ MINIMUM-FRIENDS = 0
49 49
50[transport] 50[transport]
51PLUGINS = tcp 51PLUGINS = tcp
52DEBUG = NO 52#DEBUG = YES
53PREFIX = 53PREFIX =
54ALLOW_SHUTDOWN = YES 54ALLOW_SHUTDOWN = YES
55ACCEPT_FROM6 = ::1; 55ACCEPT_FROM6 = ::1;
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 5e92bfb1e..a336cd9b0 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -30,7 +30,7 @@
30#include "gnunet_time_lib.h" 30#include "gnunet_time_lib.h"
31#include "gnunet_transport_service.h" 31#include "gnunet_transport_service.h"
32 32
33#define DEBUG_TRANSPORT GNUNET_NO 33#define DEBUG_TRANSPORT GNUNET_YES
34 34
35/** 35/**
36 * For how long do we allow unused bandwidth 36 * For how long do we allow unused bandwidth
@@ -57,10 +57,9 @@ struct ConnectInfoMessage
57 struct GNUNET_MessageHeader header; 57 struct GNUNET_MessageHeader header;
58 58
59 /** 59 /**
60 * Current quota for outbound traffic in bytes/ms. 60 * Transport distance metric (i.e. hops for DV)
61 * (should be equal to system default)
62 */ 61 */
63 uint32_t quota_out GNUNET_PACKED; 62 uint32_t distance;
64 63
65 /** 64 /**
66 * Latency estimate. 65 * Latency estimate.
@@ -72,11 +71,6 @@ struct ConnectInfoMessage
72 */ 71 */
73 struct GNUNET_PeerIdentity id; 72 struct GNUNET_PeerIdentity id;
74 73
75 /**
76 * Transport distance metric (i.e. hops for DV)
77 */
78 uint32_t distance;
79
80}; 74};
81 75
82 76
@@ -133,31 +127,6 @@ struct QuotaSetMessage
133 127
134 128
135/** 129/**
136 * Message used to ask the transport service to connect
137 * to a particular peer.
138 */
139struct TryConnectMessage
140{
141
142 /**
143 * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT.
144 */
145 struct GNUNET_MessageHeader header;
146
147 /**
148 * Always zero.
149 */
150 uint32_t reserved GNUNET_PACKED;
151
152 /**
153 * About which peer are we talking here?
154 */
155 struct GNUNET_PeerIdentity peer;
156
157};
158
159
160/**
161 * Message used to notify the transport API about a message 130 * Message used to notify the transport API about a message
162 * received from the network. The actual message follows. 131 * received from the network. The actual message follows.
163 */ 132 */
@@ -213,6 +182,11 @@ struct SendOkMessage
213 uint32_t success GNUNET_PACKED; 182 uint32_t success GNUNET_PACKED;
214 183
215 /** 184 /**
185 * Latency estimate.
186 */
187 struct GNUNET_TIME_RelativeNBO latency;
188
189 /**
216 * Which peer can send more now? 190 * Which peer can send more now?
217 */ 191 */
218 struct GNUNET_PeerIdentity peer; 192 struct GNUNET_PeerIdentity peer;
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index ae07421d8..84de9cebe 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -25,6 +25,8 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_client_lib.h" 27#include "gnunet_client_lib.h"
28#include "gnunet_constants.h"
29#include "gnunet_container_lib.h"
28#include "gnunet_arm_service.h" 30#include "gnunet_arm_service.h"
29#include "gnunet_hello_lib.h" 31#include "gnunet_hello_lib.h"
30#include "gnunet_protocols.h" 32#include "gnunet_protocols.h"
@@ -57,167 +59,221 @@
57 */ 59 */
58#define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 60#define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
59 61
62
60/** 63/**
61 * Entry in linked list of all of our current neighbours. 64 * What stage are we in for transmission processing?
62 */ 65 */
63struct NeighbourList 66enum TransmitStage
67 {
68 /**
69 * No active message.
70 */
71 TS_NEW = 0,
72
73 /**
74 * Message in local queue, not given to service.
75 */
76 TS_QUEUED = 1,
77
78 /**
79 * Message given to service, not confirmed (no SEND_OK).
80 */
81 TS_TRANSMITTED = 2,
82
83 /**
84 * One message was given to service and before it was confirmed,
85 * another one was already queued (waiting for SEND_OK to pass on
86 * to service).
87 */
88 TS_TRANSMITTED_QUEUED = 3
89 };
90
91
92/**
93 * Handle for a transmission-ready request.
94 */
95struct GNUNET_TRANSPORT_TransmitHandle
64{ 96{
65 97
66 /** 98 /**
67 * This is a linked list. 99 * Neighbour for this handle, NULL for control-traffic.
68 */ 100 */
69 struct NeighbourList *next; 101 struct NeighbourList *neighbour;
70 102
71 /** 103 /**
72 * Active transmit handle, can be NULL. Used to move 104 * Function to call when notify_size bytes are available
73 * from ready to wait list on disconnect and to block 105 * for transmission.
74 * two transmissions to the same peer from being scheduled
75 * at the same time.
76 */ 106 */
77 struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle; 107 GNUNET_CONNECTION_TransmitReadyNotify notify;
78 108
79 /** 109 /**
80 * Identity of this neighbour. 110 * Closure for notify.
81 */ 111 */
82 struct GNUNET_PeerIdentity id; 112 void *notify_cls;
83 113
84 /** 114 /**
85 * At what time did we reset last_sent last? 115 * transmit_ready task Id. The task is used to introduce the
116 * artificial delay that may be required to maintain the bandwidth
117 * limits. Later, this will be the ID of the "transmit_timeout"
118 * task which is used to signal a timeout if the transmission could
119 * not be done in a timely fashion.
86 */ 120 */
87 struct GNUNET_TIME_Absolute last_quota_update; 121 GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
88 122
89 /** 123 /**
90 * How many bytes have we sent since the "last_quota_update" 124 * Timeout for this request.
91 * timestamp?
92 */ 125 */
93 uint64_t last_sent; 126 struct GNUNET_TIME_Absolute timeout;
94 127
95 /** 128 /**
96 * Quota for outbound traffic to the neighbour in bytes/ms. 129 * How many bytes is our notify callback waiting for?
97 */ 130 */
98 uint32_t quota_out; 131 size_t notify_size;
99 132
100 /** 133 /**
101 * Set to GNUNET_YES if we are currently allowed to 134 * How important is this message?
102 * transmit a message to the transport service for this
103 * peer, GNUNET_NO otherwise.
104 */ 135 */
105 int transmit_ok; 136 unsigned int priority;
106 137
107#if ACK
108 /**
109 * Set to GNUNET_YES if we have received an ACK for the
110 * given peer. Peers that receive our HELLO always respond
111 * with an ACK to let us know that we are successfully
112 * communicating. Note that a PING can not be used for this
113 * since PINGs are only send if a HELLO address requires
114 * confirmation (and also, PINGs are not passed to the
115 * transport API itself).
116 */
117 int received_ack;
118#endif
119}; 138};
120 139
121 140
122/** 141/**
123 * Linked list of requests from clients for our HELLO 142 * Handle for a control message queue entry.
124 * that were deferred.
125 */ 143 */
126struct HelloWaitList 144struct ControlMessage
127{ 145{
128 146
129 /** 147 /**
130 * This is a linked list. 148 * This is a doubly-linked list.
131 */ 149 */
132 struct HelloWaitList *next; 150 struct ControlMessage *next;
133 151
134 /** 152 /**
135 * Reference back to our transport handle. 153 * This is a doubly-linked list.
136 */ 154 */
137 struct GNUNET_TRANSPORT_Handle *handle; 155 struct ControlMessage *prev;
138 156
139 /** 157 /**
140 * Callback to call once we got our HELLO. 158 * Overall transport handle.
141 */ 159 */
142 GNUNET_TRANSPORT_HelloUpdateCallback rec; 160 struct GNUNET_TRANSPORT_Handle *h;
143 161
144 /** 162 /**
145 * Closure for rec. 163 * Function to call when notify_size bytes are available
164 * for transmission.
146 */ 165 */
147 void *rec_cls; 166 GNUNET_CONNECTION_TransmitReadyNotify notify;
167
168 /**
169 * Closure for notify.
170 */
171 void *notify_cls;
172
173 /**
174 * transmit_ready task Id. The task is used to introduce the
175 * artificial delay that may be required to maintain the bandwidth
176 * limits. Later, this will be the ID of the "transmit_timeout"
177 * task which is used to signal a timeout if the transmission could
178 * not be done in a timely fashion.
179 */
180 GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
181
182 /**
183 * How many bytes is our notify callback waiting for?
184 */
185 size_t notify_size;
148 186
149}; 187};
150 188
151 189
152/** 190/**
153 * Opaque handle for a transmission-ready request. 191 * Entry in linked list of all of our current neighbours.
154 */ 192 */
155struct GNUNET_TRANSPORT_TransmitHandle 193struct NeighbourList
156{ 194{
157 195
158 /** 196 /**
159 * We keep the transmit handles that are waiting for 197 * This is a linked list.
160 * a transport-level connection in a doubly linked list.
161 */ 198 */
162 struct GNUNET_TRANSPORT_TransmitHandle *next; 199 struct NeighbourList *next;
163 200
164 /** 201 /**
165 * We keep the transmit handles that are waiting for 202 * Overall transport handle.
166 * a transport-level connection in a doubly linked list.
167 */ 203 */
168 struct GNUNET_TRANSPORT_TransmitHandle *prev; 204 struct GNUNET_TRANSPORT_Handle *h;
169 205
170 /** 206 /**
171 * Handle of the main transport data structure. 207 * Active transmit handle; available if 'transmit_forbidden'
208 * is GNUNET_NO.
172 */ 209 */
173 struct GNUNET_TRANSPORT_Handle *handle; 210 struct GNUNET_TRANSPORT_TransmitHandle transmit_handle;
174 211
175 /** 212 /**
176 * Neighbour for this handle, can be NULL if the service 213 * Identity of this neighbour.
177 * is not yet connected to the target.
178 */ 214 */
179 struct NeighbourList *neighbour; 215 struct GNUNET_PeerIdentity id;
180 216
181 /** 217 /**
182 * Which peer is this transmission going to be for? All 218 * At what time did we reset last_sent last?
183 * zeros if it is control-traffic to the service.
184 */ 219 */
185 struct GNUNET_PeerIdentity target; 220 struct GNUNET_TIME_Absolute last_quota_update;
186 221
187 /** 222 /**
188 * Function to call when notify_size bytes are available 223 * How many bytes have we sent since the "last_quota_update"
189 * for transmission. 224 * timestamp?
190 */ 225 */
191 GNUNET_CONNECTION_TransmitReadyNotify notify; 226 uint64_t last_sent;
192 227
193 /** 228 /**
194 * Closure for notify. 229 * Quota for outbound traffic to the neighbour in bytes/ms.
195 */ 230 */
196 void *notify_cls; 231 uint32_t quota_out;
197 232
198 /** 233 /**
199 * transmit_ready task Id. The task is used to introduce the 234 * Set to GNUNET_NO if we are currently allowed to accept a
200 * artificial delay that may be required to maintain the bandwidth 235 * message to the transport service for this peer, GNUNET_YES
201 * limits. Later, this will be the ID of the "transmit_timeout" 236 * if we have one and are waiting for transmission, GNUNET_SYSERR
202 * task which is used to signal a timeout if the transmission could 237 * if we are waiting for confirmation AND have already accepted
203 * not be done in a timely fashion. 238 * yet another message.
204 */ 239 */
205 GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; 240 enum TransmitStage transmit_stage;
206 241
207 /** 242 /**
208 * Timeout for this request. 243 * Have we received a notification that this peer is connected
244 * to us right now?
209 */ 245 */
210 struct GNUNET_TIME_Absolute timeout; 246 int is_connected;
247
248};
249
250
251/**
252 * Linked list of requests from clients for our HELLO that were
253 * deferred.
254 */
255struct HelloWaitList
256{
211 257
212 /** 258 /**
213 * How many bytes is our notify callback waiting for? 259 * This is a linked list.
214 */ 260 */
215 size_t notify_size; 261 struct HelloWaitList *next;
216 262
217 /** 263 /**
218 * How important is this message? 264 * Reference back to our transport handle.
219 */ 265 */
220 unsigned int priority; 266 struct GNUNET_TRANSPORT_Handle *handle;
267
268 /**
269 * Callback to call once we got our HELLO.
270 */
271 GNUNET_TRANSPORT_HelloUpdateCallback rec;
272
273 /**
274 * Closure for rec.
275 */
276 void *rec_cls;
221 277
222}; 278};
223 279
@@ -250,6 +306,16 @@ struct GNUNET_TRANSPORT_Handle
250 GNUNET_TRANSPORT_NotifyDisconnect nd_cb; 306 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
251 307
252 /** 308 /**
309 * Head of DLL of control messages.
310 */
311 struct ControlMessage *control_head;
312
313 /**
314 * Tail of DLL of control messages.
315 */
316 struct ControlMessage *control_tail;
317
318 /**
253 * The current HELLO message for this peer. Updated 319 * The current HELLO message for this peer. Updated
254 * whenever transports change their addresses. 320 * whenever transports change their addresses.
255 */ 321 */
@@ -266,26 +332,6 @@ struct GNUNET_TRANSPORT_Handle
266 struct GNUNET_CLIENT_TransmitHandle *network_handle; 332 struct GNUNET_CLIENT_TransmitHandle *network_handle;
267 333
268 /** 334 /**
269 * Linked list of transmit handles that are waiting for the
270 * transport to connect to the respective peer. When we
271 * receive notification that the transport connected to a
272 * peer, we go over this list and check if someone has already
273 * requested a transmission to the new peer; if so, we trigger
274 * the next step.
275 */
276 struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
277
278 /**
279 * Linked list of transmit handles that are waiting for the
280 * transport to be ready for transmission to the respective
281 * peer. When we
282 * receive notification that the transport disconnected from
283 * a peer, we go over this list and move the entry back to
284 * the connect_wait list.
285 */
286 struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
287
288 /**
289 * Linked list of pending requests for our HELLO. 335 * Linked list of pending requests for our HELLO.
290 */ 336 */
291 struct HelloWaitList *hwl_head; 337 struct HelloWaitList *hwl_head;
@@ -306,27 +352,34 @@ struct GNUNET_TRANSPORT_Handle
306 struct NeighbourList *neighbours; 352 struct NeighbourList *neighbours;
307 353
308 /** 354 /**
309 * ID of the task trying to reconnect to the 355 * ID of the task trying to reconnect to the service.
310 * service.
311 */ 356 */
312 GNUNET_SCHEDULER_TaskIdentifier reconnect_task; 357 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
313 358
314 /** 359 /**
315 * Delay until we try to reconnect. 360 * ID of the task trying to trigger transmission for a peer
361 * while maintaining bandwidth quotas.
316 */ 362 */
317 struct GNUNET_TIME_Relative reconnect_delay; 363 GNUNET_SCHEDULER_TaskIdentifier quota_task;
318 364
319 /** 365 /**
320 * Do we currently have a transmission pending? 366 * Delay until we try to reconnect.
321 * (schedule transmission was called but has not
322 * yet succeeded)?
323 */ 367 */
324 int transmission_scheduled; 368 struct GNUNET_TIME_Relative reconnect_delay;
369
325}; 370};
326 371
327 372
373// FIXME: replace with hash map!
374/**
375 * Get the neighbour list entry for the given peer
376 *
377 * @param h our context
378 * @param peer peer to look up
379 * @return NULL if no such peer entry exists
380 */
328static struct NeighbourList * 381static struct NeighbourList *
329find_neighbour (struct GNUNET_TRANSPORT_Handle *h, 382neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
330 const struct GNUNET_PeerIdentity *peer) 383 const struct GNUNET_PeerIdentity *peer)
331{ 384{
332 struct NeighbourList *pos; 385 struct NeighbourList *pos;
@@ -340,51 +393,186 @@ find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
340 393
341 394
342/** 395/**
343 * Schedule the task to send one message from the 396 * Schedule the task to send one message, either from the control
344 * connect_ready list to the service. 397 * list or the peer message queues to the service.
345 */ 398 */
346static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h); 399static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
347 400
348 401
349/** 402/**
350 * Transmit message to client... 403 * Function called by the scheduler when the timeout for bandwidth
404 * availablility for the target neighbour is reached.
405 *
406 * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
407 * @param tc scheduler context
408 */
409static void
410quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
411{
412 struct GNUNET_TRANSPORT_Handle *h = cls;
413
414 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
415 schedule_transmission (h);
416}
417
418
419/**
420 * Update the quota values for the given neighbour now.
421 *
422 * @param n neighbour to update
423 */
424static void
425update_quota (struct NeighbourList *n)
426{
427 struct GNUNET_TIME_Relative delta;
428 uint64_t allowed;
429 uint64_t remaining;
430
431 delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
432 allowed = delta.value * n->quota_out;
433 if (n->last_sent < allowed)
434 {
435 remaining = allowed - n->last_sent;
436 if (n->quota_out > 0)
437 remaining /= n->quota_out;
438 else
439 remaining = 0;
440 if (remaining > MAX_BANDWIDTH_CARRY)
441 remaining = MAX_BANDWIDTH_CARRY;
442 n->last_sent = 0;
443 n->last_quota_update = GNUNET_TIME_absolute_get ();
444 n->last_quota_update.value -= remaining;
445 }
446 else
447 {
448 n->last_sent -= allowed;
449 n->last_quota_update = GNUNET_TIME_absolute_get ();
450 }
451}
452
453
454/**
455 * Figure out which transmission to a peer can be done right now.
456 * If none can, schedule a task to call 'schedule_transmission'
457 * whenever a peer transmission can be done in the future and
458 * return NULL. Otherwise return the next transmission to be
459 * performed.
460 *
461 * @param h handle to transport
462 * @return NULL to wait longer before doing any peer transmissions
463 */
464static struct GNUNET_TRANSPORT_TransmitHandle *
465schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h)
466{
467 struct GNUNET_TRANSPORT_TransmitHandle *ret;
468 struct GNUNET_TRANSPORT_TransmitHandle *th;
469 struct NeighbourList *n;
470 struct NeighbourList *next;
471 struct GNUNET_TIME_Relative retry_time;
472 struct GNUNET_TIME_Relative duration;
473 uint64_t available;
474
475 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
476 {
477 GNUNET_SCHEDULER_cancel (h->sched,
478 h->quota_task);
479 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
480 }
481 retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
482 ret = NULL;
483 next = h->neighbours;
484 while (NULL != (n = next))
485 {
486 next = n->next;
487 if (n->transmit_stage != TS_QUEUED)
488 continue; /* not eligible */
489 th = &n->transmit_handle;
490 /* check outgoing quota */
491 duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
492 if (duration.value > MIN_QUOTA_REFRESH_TIME)
493 {
494 update_quota (n);
495 duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
496 }
497 available = duration.value * n->quota_out;
498 if (available < n->last_sent + th->notify_size)
499 {
500 /* calculate how much bandwidth we'd still need to
501 accumulate and based on that how long we'll have
502 to wait... */
503 available = n->last_sent + th->notify_size - available;
504 duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
505 available / n->quota_out);
506 if (th->timeout.value <
507 GNUNET_TIME_relative_to_absolute (duration).value)
508 {
509 /* signal timeout! */
510#if DEBUG_TRANSPORT
511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512 "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n",
513 duration.value, GNUNET_i2s (&n->id));
514#endif
515 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
516 {
517 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
518 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
519 }
520 n->transmit_stage = TS_NEW;
521 if (NULL != th->notify)
522 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
523 continue;
524 }
525#if DEBUG_TRANSPORT
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
527 "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
528 GNUNET_i2s (&n->id), duration.value);
529#endif
530 retry_time = GNUNET_TIME_relative_min (retry_time,
531 duration);
532 continue;
533 }
534#if DEBUG_TRANSPORT
535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536 "Bandwidth available for transmission to `%4s'\n",
537 GNUNET_i2s (&n->id));
538#endif
539 if ( (ret == NULL) ||
540 (ret->priority < th->priority) )
541 ret = th;
542 }
543 if (ret == NULL)
544 h->quota_task = GNUNET_SCHEDULER_add_delayed (h->sched,
545 retry_time,
546 &quota_transmit_ready,
547 h);
548 return ret;
549}
550
551
552/**
553 * Transmit message(s) to service.
554 *
555 * @param cls handle to transport
556 * @param size number of bytes available in buf
557 * @param buf where to copy the message
558 * @return number of bytes copied to buf
351 */ 559 */
352static size_t 560static size_t
353transport_notify_ready (void *cls, size_t size, void *buf) 561transport_notify_ready (void *cls, size_t size, void *buf)
354{ 562{
355 struct GNUNET_TRANSPORT_Handle *h = cls; 563 struct GNUNET_TRANSPORT_Handle *h = cls;
564 struct ControlMessage *cm;
356 struct GNUNET_TRANSPORT_TransmitHandle *th; 565 struct GNUNET_TRANSPORT_TransmitHandle *th;
357 struct NeighbourList *n; 566 struct NeighbourList *n;
567 struct OutboundMessage obm;
358 size_t ret; 568 size_t ret;
569 size_t mret;
359 char *cbuf; 570 char *cbuf;
360 571
361 h->network_handle = NULL; 572 h->network_handle = NULL;
362 h->transmission_scheduled = GNUNET_NO;
363 if (buf == NULL) 573 if (buf == NULL)
364 { 574 {
365#if DEBUG_TRANSPORT 575 schedule_transmission (h);
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "Could not transmit to transport service, cancelling pending requests\n");
368#endif
369 th = h->connect_ready_head;
370 if (th->next != NULL)
371 th->next->prev = NULL;
372 h->connect_ready_head = th->next;
373 if (NULL != (n = th->neighbour))
374 {
375 GNUNET_assert (n->transmit_handle == th);
376 n->transmit_handle = NULL;
377 }
378 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
379 {
380 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
381 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
382 }
383 if (NULL != th->notify)
384 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
385 GNUNET_free (th);
386 if (h->connect_ready_head != NULL)
387 schedule_transmission (h); /* FIXME: is this ok? */
388 return 0; 576 return 0;
389 } 577 }
390#if DEBUG_TRANSPORT 578#if DEBUG_TRANSPORT
@@ -393,35 +581,64 @@ transport_notify_ready (void *cls, size_t size, void *buf)
393#endif 581#endif
394 cbuf = buf; 582 cbuf = buf;
395 ret = 0; 583 ret = 0;
396 h->network_handle = NULL; 584 while ( (NULL != (cm = h->control_head)) &&
397 h->transmission_scheduled = GNUNET_NO; 585 (cm->notify_size <= size) )
398 while ((h->connect_ready_head != NULL) && 586 {
399 (h->connect_ready_head->notify_size <= size)) 587 if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
588 {
589 GNUNET_SCHEDULER_cancel (h->sched, cm->notify_delay_task);
590 cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
591 }
592 GNUNET_CONTAINER_DLL_remove (h->control_head,
593 h->control_tail,
594 cm);
595 ret += cm->notify (cm->notify_cls, size, &cbuf[ret]);
596 GNUNET_free (cm);
597 size -= ret;
598 }
599 while ( (NULL != (th = schedule_peer_transmission (h))) &&
600 (th->notify_size <= size) )
400 { 601 {
401 th = h->connect_ready_head;
402 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) 602 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
403 { 603 {
404 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); 604 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
405 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; 605 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
406 } 606 }
407 GNUNET_assert (th->notify_size <= size); 607 n = th->neighbour;
408 if (th->next != NULL) 608 switch (n->transmit_stage)
409 th->next->prev = NULL; 609 {
410 h->connect_ready_head = th->next; 610 case TS_NEW:
411 if (NULL != (n = th->neighbour)) 611 GNUNET_break (0);
412 { 612 break;
413 GNUNET_assert (n->transmit_handle == th); 613 case TS_QUEUED:
414 n->transmit_handle = NULL; 614 n->transmit_stage = TS_TRANSMITTED;
415 } 615 break;
416 if (NULL != th->notify) 616 case TS_TRANSMITTED:
417 ret += th->notify (th->notify_cls, size, &cbuf[ret]); 617 GNUNET_break (0);
418 GNUNET_free (th); 618 break;
419 if (n != NULL) 619 case TS_TRANSMITTED_QUEUED:
420 n->last_sent += ret; 620 GNUNET_break (0);
421 size -= ret; 621 break;
622 default:
623 GNUNET_break (0);
624 }
625 GNUNET_assert (size >= sizeof (struct OutboundMessage));
626 mret = th->notify (th->notify_cls,
627 size - sizeof (struct OutboundMessage),
628 &cbuf[ret + sizeof (struct OutboundMessage)]);
629 GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
630 if (mret != 0)
631 {
632 obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
633 obm.header.size = htons (mret + sizeof (struct OutboundMessage));
634 obm.priority = htonl (th->priority);
635 obm.peer = n->id;
636 memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
637 ret += (mret + sizeof (struct OutboundMessage));
638 size -= (mret + sizeof (struct OutboundMessage));
639 }
422 } 640 }
423 if (h->connect_ready_head != NULL) 641 schedule_transmission (h);
424 schedule_transmission (h);
425#if DEBUG_TRANSPORT 642#if DEBUG_TRANSPORT
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "Transmitting %u bytes to transport service\n", ret); 644 "Transmitting %u bytes to transport service\n", ret);
@@ -431,155 +648,73 @@ transport_notify_ready (void *cls, size_t size, void *buf)
431 648
432 649
433/** 650/**
434 * Schedule the task to send one message from the 651 * Schedule the task to send one message, either from the control
435 * connect_ready list to the service. 652 * list or the peer message queues to the service.
436 */ 653 */
437static void 654static void
438schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) 655schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
439{ 656{
657 size_t size;
658 struct GNUNET_TIME_Relative timeout;
440 struct GNUNET_TRANSPORT_TransmitHandle *th; 659 struct GNUNET_TRANSPORT_TransmitHandle *th;
441 660
442 GNUNET_assert (NULL == h->network_handle); 661 if (NULL != h->network_handle)
662 return;
443 if (h->client == NULL) 663 if (h->client == NULL)
444 { 664 {
445 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 665 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
446 "Could not yet schedule transmission: we are not yet connected to the transport service!\n"); 666 "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
447 return; /* not yet connected */ 667 return; /* not yet connected */
448 } 668 }
449 th = h->connect_ready_head; 669 if (NULL != h->control_head)
450 if (th == NULL)
451 return; /* no request pending */
452 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
453 { 670 {
454 /* remove existing time out task, will be integrated 671 size = h->control_head->notify_size;
455 with transmit_ready notification! */ 672 timeout = GNUNET_TIME_UNIT_FOREVER_REL;
456 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
457 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
458 }
459 h->transmission_scheduled = GNUNET_YES;
460 h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
461 th->notify_size,
462 GNUNET_TIME_absolute_get_remaining
463 (th->timeout),
464 GNUNET_NO,
465 &transport_notify_ready,
466 h);
467 GNUNET_assert (NULL != h->network_handle);
468}
469
470
471/**
472 * Insert the given transmit handle in the given sorted
473 * doubly linked list based on timeout.
474 *
475 * @param head pointer to the head of the linked list
476 * @param th element to insert into the list
477 */
478static void
479insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
480 struct GNUNET_TRANSPORT_TransmitHandle *th)
481{
482 struct GNUNET_TRANSPORT_TransmitHandle *pos;
483 struct GNUNET_TRANSPORT_TransmitHandle *prev;
484
485 pos = *head;
486 prev = NULL;
487 while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
488 {
489 prev = pos;
490 pos = pos->next;
491 }
492 if (prev == NULL)
493 {
494 th->next = *head;
495 if (th->next != NULL)
496 th->next->prev = th;
497 *head = th;
498 }
499 else
500 {
501 th->next = pos;
502 th->prev = prev;
503 prev->next = th;
504 if (pos != NULL)
505 pos->prev = th;
506 }
507}
508
509
510/**
511 * Cancel a pending notify delay task (if pending) and also remove the
512 * given transmit handle from whatever list is on.
513 *
514 * @param th handle for the transmission request to manipulate
515 */
516static void
517remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
518{
519 struct GNUNET_TRANSPORT_Handle *h;
520
521 h = th->handle;
522 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
523 {
524 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
525 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
526 }
527 if (th->prev == NULL)
528 {
529 if (th == h->connect_wait_head)
530 h->connect_wait_head = th->next;
531 else
532 h->connect_ready_head = th->next;
533 } 673 }
534 else 674 else
535 { 675 {
536 th->prev->next = th->next; 676 th = schedule_peer_transmission (h);
677 if (th == NULL)
678 {
679 /* no transmission ready right now */
680 return;
681 }
682 size = th->notify_size;
683 timeout = GNUNET_TIME_absolute_get_remaining (th->timeout);
537 } 684 }
538 if (th->next != NULL) 685 h->network_handle =
539 th->next->prev = th->prev; 686 GNUNET_CLIENT_notify_transmit_ready (h->client,
687 size,
688 timeout,
689 GNUNET_NO,
690 &transport_notify_ready,
691 h);
692 GNUNET_assert (NULL != h->network_handle);
540} 693}
541 694
542 695
543/** 696/**
544 * Schedule a request to connect to the given
545 * neighbour (and if successful, add the specified
546 * handle to the wait list).
547 *
548 * @param th handle for a request to transmit once we
549 * have connected
550 */
551static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th);
552
553
554/**
555 * Called when our transmit request timed out before any transport 697 * Called when our transmit request timed out before any transport
556 * reported success connecting to the desired peer or before the 698 * reported success connecting to the desired peer or before the
557 * transport was ready to receive. Signal error and free 699 * transport was ready to receive. Signal error and free
558 * TransmitHandle. 700 * TransmitHandle.
559 */ 701 */
560static void 702static void
561peer_transmit_timeout (void *cls, 703control_transmit_timeout (void *cls,
562 const struct GNUNET_SCHEDULER_TaskContext *tc) 704 const struct GNUNET_SCHEDULER_TaskContext *tc)
563{ 705{
564 struct GNUNET_TRANSPORT_TransmitHandle *th = cls; 706 struct ControlMessage *th = cls;
565 707
566 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; 708 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
567 if (th->neighbour != NULL)
568 th->neighbour->transmit_handle = NULL;
569#if DEBUG_TRANSPORT
570 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
571 "Request for transmission to peer `%s' timed out.\n",
572 GNUNET_i2s (&th->target));
573#endif
574 remove_from_any_list (th);
575 if (NULL != th->notify) 709 if (NULL != th->notify)
576 th->notify (th->notify_cls, 0, NULL); 710 th->notify (th->notify_cls, 0, NULL);
711 GNUNET_CONTAINER_DLL_remove (th->h->control_head,
712 th->h->control_tail,
713 th);
577 GNUNET_free (th); 714 GNUNET_free (th);
578} 715}
579 716
580 717
581
582
583/** 718/**
584 * Queue control request for transmission to the transport 719 * Queue control request for transmission to the transport
585 * service. 720 * service.
@@ -600,68 +735,31 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
600 GNUNET_CONNECTION_TransmitReadyNotify notify, 735 GNUNET_CONNECTION_TransmitReadyNotify notify,
601 void *notify_cls) 736 void *notify_cls)
602{ 737{
603 struct GNUNET_TRANSPORT_TransmitHandle *th; 738 struct ControlMessage *th;
604 739
605#if DEBUG_TRANSPORT 740#if DEBUG_TRANSPORT
606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 741 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
607 "Control transmit of %u bytes within %llums requested\n", 742 "Control transmit of %u bytes within %llums requested\n",
608 size, (unsigned long long) timeout.value); 743 size, (unsigned long long) timeout.value);
609#endif 744#endif
610 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); 745 th = GNUNET_malloc (sizeof (struct ControlMessage));
611 th->handle = h; 746 th->h = h;
612 th->notify = notify; 747 th->notify = notify;
613 th->notify_cls = notify_cls; 748 th->notify_cls = notify_cls;
614 th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
615 th->notify_size = size; 749 th->notify_size = size;
616 th->notify_delay_task 750 th->notify_delay_task
617 = GNUNET_SCHEDULER_add_delayed (h->sched, 751 = GNUNET_SCHEDULER_add_delayed (h->sched,
618 timeout, &peer_transmit_timeout, th); 752 timeout, &control_transmit_timeout, th);
619 if (at_head) 753 if (at_head)
620 { 754 GNUNET_CONTAINER_DLL_insert (h->control_head,
621 th->next = h->connect_ready_head; 755 h->control_tail,
622 h->connect_ready_head = th; 756 th);
623 if (th->next != NULL)
624 th->next->prev = th;
625 }
626 else 757 else
627 { 758 GNUNET_CONTAINER_DLL_insert_after (h->control_head,
628 insert_transmit_handle (&h->connect_ready_head, th); 759 h->control_tail,
629 } 760 h->control_tail,
630 if (GNUNET_NO == h->transmission_scheduled) 761 th);
631 schedule_transmission (h); 762 schedule_transmission (h);
632}
633
634
635/**
636 * Update the quota values for the given neighbour now.
637 */
638static void
639update_quota (struct NeighbourList *n)
640{
641 struct GNUNET_TIME_Relative delta;
642 uint64_t allowed;
643 uint64_t remaining;
644
645 delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
646 allowed = delta.value * n->quota_out;
647 if (n->last_sent < allowed)
648 {
649 remaining = allowed - n->last_sent;
650 if (n->quota_out > 0)
651 remaining /= n->quota_out;
652 else
653 remaining = 0;
654 if (remaining > MAX_BANDWIDTH_CARRY)
655 remaining = MAX_BANDWIDTH_CARRY;
656 n->last_sent = 0;
657 n->last_quota_update = GNUNET_TIME_absolute_get ();
658 n->last_quota_update.value -= remaining;
659 }
660 else
661 {
662 n->last_sent -= allowed;
663 n->last_quota_update = GNUNET_TIME_absolute_get ();
664 }
665} 763}
666 764
667 765
@@ -681,6 +779,14 @@ struct SetQuotaContext
681}; 779};
682 780
683 781
782/**
783 * Send SET_QUOTA message to the service.
784 *
785 * @param cls the 'struct SetQuotaContext'
786 * @param size number of bytes available in buf
787 * @param buf where to copy the message
788 * @return number of bytes copied to buf
789 */
684static size_t 790static size_t
685send_set_quota (void *cls, size_t size, void *buf) 791send_set_quota (void *cls, size_t size, void *buf)
686{ 792{
@@ -699,7 +805,8 @@ send_set_quota (void *cls, size_t size, void *buf)
699#if DEBUG_TRANSPORT 805#if DEBUG_TRANSPORT
700 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701 "Transmitting `%s' request with respect to `%4s'.\n", 807 "Transmitting `%s' request with respect to `%4s'.\n",
702 "SET_QUOTA", GNUNET_i2s (&sqc->target)); 808 "SET_QUOTA",
809 GNUNET_i2s (&sqc->target));
703#endif 810#endif
704 GNUNET_assert (size >= sizeof (struct QuotaSetMessage)); 811 GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
705 msg = buf; 812 msg = buf;
@@ -742,7 +849,7 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
742 struct NeighbourList *n; 849 struct NeighbourList *n;
743 struct SetQuotaContext *sqc; 850 struct SetQuotaContext *sqc;
744 851
745 n = find_neighbour (handle, target); 852 n = neighbour_find (handle, target);
746 if (n != NULL) 853 if (n != NULL)
747 { 854 {
748 update_quota (n); 855 update_quota (n);
@@ -830,6 +937,14 @@ GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
830} 937}
831 938
832 939
940/**
941 * Send HELLO message to the service.
942 *
943 * @param cls the HELLO message to send
944 * @param size number of bytes available in buf
945 * @param buf where to copy the message
946 * @return number of bytes copied to buf
947 */
833static size_t 948static size_t
834send_hello (void *cls, size_t size, void *buf) 949send_hello (void *cls, size_t size, void *buf)
835{ 950{
@@ -873,14 +988,6 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
873 struct GNUNET_MessageHeader *hc; 988 struct GNUNET_MessageHeader *hc;
874 uint16_t size; 989 uint16_t size;
875 990
876 if (handle->client == NULL)
877 {
878#if DEBUG_TRANSPORT
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Not connected to transport service, dropping offered HELLO\n");
881#endif
882 return;
883 }
884 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); 991 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
885 size = ntohs (hello->size); 992 size = ntohs (hello->size);
886 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); 993 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
@@ -893,11 +1000,13 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
893 1000
894 1001
895/** 1002/**
896 * Function we use for handling incoming messages. 1003 * Transmit START message to service.
1004 *
1005 * @param cls unused
1006 * @param size number of bytes available in buf
1007 * @param buf where to copy the message
1008 * @return number of bytes copied to buf
897 */ 1009 */
898static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
899
900
901static size_t 1010static size_t
902send_start (void *cls, size_t size, void *buf) 1011send_start (void *cls, size_t size, void *buf)
903{ 1012{
@@ -905,9 +1014,10 @@ send_start (void *cls, size_t size, void *buf)
905 1014
906 if (buf == NULL) 1015 if (buf == NULL)
907 { 1016 {
1017 /* Can only be shutdown, just give up */
908#if DEBUG_TRANSPORT 1018#if DEBUG_TRANSPORT
909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910 "Timeout while trying to transmit `%s' request.\n", 1020 "Shutdown while trying to transmit `%s' request.\n",
911 "START"); 1021 "START");
912#endif 1022#endif
913 return 0; 1023 return 0;
@@ -924,186 +1034,101 @@ send_start (void *cls, size_t size, void *buf)
924 1034
925 1035
926/** 1036/**
927 * We're ready to transmit the request that the transport service 1037 * Free neighbour.
928 * should connect to a new peer. In addition to sending the
929 * request, schedule the next phase for the transmission processing
930 * that caused the connect request in the first place.
931 */
932static size_t
933request_connect (void *cls, size_t size, void *buf)
934{
935 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
936 struct TryConnectMessage *tcm;
937 struct GNUNET_TRANSPORT_Handle *h;
938
939 GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
940 h = th->handle;
941
942 if (buf == NULL)
943 {
944#if DEBUG_TRANSPORT
945 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
946 "Failed to transmit `%s' request for `%4s' to service.\n",
947 "TRY_CONNECT", GNUNET_i2s (&th->target));
948#endif
949 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
950 {
951 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
952 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
953 }
954 if (NULL != th->notify)
955 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
956 GNUNET_free (th);
957 return 0;
958 }
959#if DEBUG_TRANSPORT
960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961 "Transmitting `%s' message for `%4s' (need connection in %llu ms).\n",
962 "TRY_CONNECT", GNUNET_i2s (&th->target),
963 GNUNET_TIME_absolute_get_remaining (th->timeout).value);
964#endif
965 GNUNET_assert (size >= sizeof (struct TryConnectMessage));
966 tcm = buf;
967 tcm->header.size = htons (sizeof (struct TryConnectMessage));
968 tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
969 tcm->reserved = htonl (0);
970 memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
971 th->notify_delay_task
972 = GNUNET_SCHEDULER_add_delayed (h->sched,
973 GNUNET_TIME_absolute_get_remaining
974 (th->timeout),
975 &peer_transmit_timeout, th);
976 insert_transmit_handle (&h->connect_wait_head, th);
977 return sizeof (struct TryConnectMessage);
978}
979
980
981/**
982 * Schedule a request to connect to the given
983 * neighbour (and if successful, add the specified
984 * handle to the wait list).
985 *
986 * @param th handle for a request to transmit once we
987 * have connected
988 */
989static void
990try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
991{
992 GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
993 schedule_control_transmit (th->handle,
994 sizeof (struct TryConnectMessage),
995 GNUNET_NO,
996 GNUNET_TIME_absolute_get_remaining (th->timeout),
997 &request_connect, th);
998}
999
1000
1001/**
1002 * Task for delayed attempts to reconnect to a peer.
1003 *
1004 * @param cls must be a transmit handle that determines the peer
1005 * to which we will try to connect
1006 * @param tc scheduler information about why we were triggered (not used)
1007 */
1008static void
1009try_connect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1010{
1011 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1012
1013 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1014 try_connect (th);
1015}
1016
1017
1018/**
1019 * Remove neighbour from our list. Will automatically
1020 * trigger a re-connect attempt if we have messages pending
1021 * for this peer.
1022 * 1038 *
1023 * @param h our state 1039 * @param h our state
1024 * @param peer the peer to remove 1040 * @param n the entry to free
1025 */ 1041 */
1026static void 1042static void
1027remove_neighbour (struct GNUNET_TRANSPORT_Handle *h, 1043neighbour_free (struct NeighbourList *n)
1028 const struct GNUNET_PeerIdentity *peer)
1029{ 1044{
1045 struct GNUNET_TRANSPORT_Handle *h;
1030 struct NeighbourList *prev; 1046 struct NeighbourList *prev;
1031 struct NeighbourList *pos; 1047 struct NeighbourList *pos;
1032 struct GNUNET_TRANSPORT_TransmitHandle *th;
1033 1048
1049 h = n->h;
1034#if DEBUG_TRANSPORT 1050#if DEBUG_TRANSPORT
1035 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036 "Removing neighbour `%s' from list of connected peers.\n", 1052 "Removing neighbour `%s' from list of connected peers.\n",
1037 GNUNET_i2s (peer)); 1053 GNUNET_i2s (&n->id));
1038#endif 1054#endif
1055 GNUNET_break (n->is_connected == GNUNET_NO);
1056 GNUNET_break (n->transmit_stage == TS_NEW);
1057
1039 prev = NULL; 1058 prev = NULL;
1040 pos = h->neighbours; 1059 pos = h->neighbours;
1041 while ((pos != NULL) && 1060 while (pos != n)
1042 (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
1043 { 1061 {
1044 prev = pos; 1062 prev = pos;
1045 pos = pos->next; 1063 pos = pos->next;
1046 } 1064 }
1047 if (pos == NULL)
1048 {
1049 GNUNET_break (0);
1050 return;
1051 }
1052 if (prev == NULL) 1065 if (prev == NULL)
1053 h->neighbours = pos->next; 1066 h->neighbours = n->next;
1054 else 1067 else
1055 prev->next = pos->next; 1068 prev->next = n->next;
1056 if (NULL != (th = pos->transmit_handle)) 1069 GNUNET_free (n);
1057 { 1070}
1058 pos->transmit_handle = NULL; 1071
1059 th->neighbour = NULL; 1072
1060 remove_from_any_list (th); 1073/**
1061 if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <= 1074 * Mark neighbour as disconnected.
1062 CONNECT_RETRY_TIMEOUT.value) 1075 *
1063 { 1076 * @param n the entry to mark as disconnected
1064 /* signal error */ 1077 */
1065 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1078static void
1066 _ 1079neighbour_disconnect (struct NeighbourList *n)
1067 ("Connection with `%4s' failed and timeout was in the past, giving up on message delivery.\n"), 1080{
1068 GNUNET_i2s (peer)); 1081 struct GNUNET_TRANSPORT_Handle *h = n->h;
1069 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); 1082#if DEBUG_TRANSPORT
1070 peer_transmit_timeout (th, NULL); 1083 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071 } 1084 "Removing neighbour `%s' from list of connected peers.\n",
1072 else 1085 GNUNET_i2s (&n->id));
1073 { 1086#endif
1074 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1087 GNUNET_break (n->is_connected == GNUNET_YES);
1075 _ 1088 n->is_connected = GNUNET_NO;
1076 ("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"),
1077 GNUNET_i2s (peer),
1078 GNUNET_TIME_absolute_get_remaining (th->timeout).value);
1079 /* try again in a bit */
1080 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
1081 th->notify_delay_task
1082 = GNUNET_SCHEDULER_add_delayed (h->sched,
1083 CONNECT_RETRY_TIMEOUT,
1084 &try_connect_task, th);
1085 }
1086 }
1087 if (h->nc_cb != NULL) 1089 if (h->nc_cb != NULL)
1088 h->nd_cb (h->cls, peer); 1090 h->nd_cb (h->cls, &n->id);
1089 GNUNET_free (pos); 1091 if (n->transmit_stage == TS_NEW)
1092 neighbour_free (n);
1090} 1093}
1091 1094
1092 1095
1093/** 1096/**
1097 * Function we use for handling incoming messages.
1098 *
1099 * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1100 * @param msg message received, NULL on timeout or fatal error
1101 */
1102static void demultiplexer (void *cls,
1103 const struct GNUNET_MessageHeader *msg);
1104
1105
1106/**
1094 * Try again to connect to transport service. 1107 * Try again to connect to transport service.
1108 *
1109 * @param cls the handle to the transport service
1110 * @param tc scheduler context
1095 */ 1111 */
1096static void 1112static void
1097reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1113reconnect (void *cls,
1114 const struct GNUNET_SCHEDULER_TaskContext *tc)
1098{ 1115{
1099 struct GNUNET_TRANSPORT_Handle *h = cls; 1116 struct GNUNET_TRANSPORT_Handle *h = cls;
1100 struct GNUNET_TRANSPORT_TransmitHandle *pos; 1117 struct ControlMessage *pos;
1101 struct NeighbourList *n; 1118 struct NeighbourList *n;
1102 1119
1103 /* Forget about all neighbours that we used to be connected 1120 if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1104 to */ 1121 {
1105 while (NULL != (n = h->neighbours)) 1122 /* shutdown, just give up */
1106 remove_neighbour (h, &n->id); 1123 return;
1124 }
1125 /* Forget about all neighbours that we used to be connected to */
1126 n = h->neighbours;
1127 while (NULL != n)
1128 {
1129 neighbour_disconnect (n);
1130 n = n->next;
1131 }
1107#if DEBUG_TRANSPORT 1132#if DEBUG_TRANSPORT
1108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); 1133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
1109#endif 1134#endif
@@ -1111,20 +1136,16 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1111 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 1136 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1112 h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg); 1137 h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
1113 GNUNET_assert (h->client != NULL); 1138 GNUNET_assert (h->client != NULL);
1114 /* make sure we don't send "START" twice, 1139 /* make sure we don't send "START" twice, remove existing entry from
1115 remove existing entry from queue (if present) */ 1140 queue (if present) */
1116 pos = h->connect_ready_head; 1141 pos = h->control_head;
1117 while (pos != NULL) 1142 while (pos != NULL)
1118 { 1143 {
1119 if (pos->notify == &send_start) 1144 if (pos->notify == &send_start)
1120 { 1145 {
1121 if (pos->prev == NULL) 1146 GNUNET_CONTAINER_DLL_remove (h->control_head,
1122 h->connect_ready_head = pos->next; 1147 h->control_tail,
1123 else 1148 pos);
1124 pos->prev->next = pos->next;
1125 if (pos->next != NULL)
1126 pos->next->prev = pos->prev;
1127 GNUNET_assert (pos->neighbour == NULL);
1128 if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task) 1149 if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task)
1129 { 1150 {
1130 GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); 1151 GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
@@ -1147,6 +1168,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1147/** 1168/**
1148 * Function that will schedule the job that will try 1169 * Function that will schedule the job that will try
1149 * to connect us again to the client. 1170 * to connect us again to the client.
1171 *
1172 * @param h transport service to reconnect
1150 */ 1173 */
1151static void 1174static void
1152schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) 1175schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
@@ -1161,215 +1184,47 @@ schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1161 h->reconnect_task 1184 h->reconnect_task
1162 = GNUNET_SCHEDULER_add_delayed (h->sched, 1185 = GNUNET_SCHEDULER_add_delayed (h->sched,
1163 h->reconnect_delay, &reconnect, h); 1186 h->reconnect_delay, &reconnect, h);
1164 h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS; 1187 if (h->reconnect_delay.value == 0)
1165}
1166
1167
1168/**
1169 * We are connected to the respective peer, check the
1170 * bandwidth limits and schedule the transmission.
1171 */
1172static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
1173
1174
1175/**
1176 * Function called by the scheduler when the timeout
1177 * for bandwidth availablility for the target
1178 * neighbour is reached.
1179 */
1180static void
1181transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1182{
1183 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1184
1185 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1186 schedule_request (th);
1187}
1188
1189
1190/**
1191 * Remove the given transmit handle from the wait list. Does NOT free
1192 * it.
1193 */
1194static void
1195remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1196{
1197 if (th->prev == NULL)
1198 th->handle->connect_wait_head = th->next;
1199 else
1200 th->prev->next = th->next;
1201 if (th->next != NULL)
1202 th->next->prev = th->prev;
1203}
1204
1205
1206/**
1207 * We are connected to the respective peer, check the
1208 * bandwidth limits and schedule the transmission.
1209 */
1210static void
1211schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
1212{
1213 struct GNUNET_TRANSPORT_Handle *h;
1214 struct GNUNET_TIME_Relative duration;
1215 struct NeighbourList *n;
1216 uint64_t available;
1217
1218 h = th->handle;
1219 n = th->neighbour;
1220 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1221 { 1188 {
1222 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); 1189 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1223 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1224 } 1190 }
1225 /* check outgoing quota */ 1191 else
1226 duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1227 if (duration.value > MIN_QUOTA_REFRESH_TIME)
1228 { 1192 {
1229 update_quota (n); 1193 h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
1230 duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); 1194 h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
1231 } 1195 h->reconnect_delay);
1232 available = duration.value * n->quota_out;
1233 if (available < n->last_sent + th->notify_size)
1234 {
1235 /* calculate how much bandwidth we'd still need to
1236 accumulate and based on that how long we'll have
1237 to wait... */
1238 available = n->last_sent + th->notify_size - available;
1239 duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1240 available / n->quota_out);
1241 if (th->timeout.value <
1242 GNUNET_TIME_relative_to_absolute (duration).value)
1243 {
1244 /* signal timeout! */
1245#if DEBUG_TRANSPORT
1246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247 "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n",
1248 duration.value, GNUNET_i2s (&th->target));
1249#endif
1250 remove_from_wait_list (th);
1251 if (NULL != th->notify)
1252 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
1253 GNUNET_free (th);
1254 return;
1255 }
1256#if DEBUG_TRANSPORT
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258 "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
1259 GNUNET_i2s (&th->target), duration.value);
1260#endif
1261 th->notify_delay_task
1262 = GNUNET_SCHEDULER_add_delayed (h->sched,
1263 duration, &transmit_ready, th);
1264 return;
1265 } 1196 }
1266#if DEBUG_TRANSPORT
1267 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268 "Bandwidth available for transmission to `%4s'\n",
1269 GNUNET_i2s (&n->id));
1270#endif
1271 if (GNUNET_NO == n->transmit_ok)
1272 {
1273 /* we may be ready, but transport service is not;
1274 wait for SendOkMessage or timeout */
1275#if DEBUG_TRANSPORT
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277 "Need to wait for transport service `%s' message\n",
1278 "SEND_OK");
1279#endif
1280 th->notify_delay_task
1281 = GNUNET_SCHEDULER_add_delayed (h->sched,
1282 GNUNET_TIME_absolute_get_remaining
1283 (th->timeout), &peer_transmit_timeout,
1284 th);
1285 return;
1286 }
1287 n->transmit_ok = GNUNET_NO;
1288 remove_from_wait_list (th);
1289#if DEBUG_TRANSPORT
1290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291 "Moving message for `%4s' to ready list\n",
1292 GNUNET_i2s (&n->id));
1293#endif
1294 insert_transmit_handle (&h->connect_ready_head, th);
1295 if (GNUNET_NO == h->transmission_scheduled)
1296 schedule_transmission (h);
1297} 1197}
1298 1198
1299 1199
1300/** 1200/**
1301 * Add neighbour to our list 1201 * Add neighbour to our list
1302 */ 1202 */
1303static void 1203static struct NeighbourList *
1304add_neighbour (struct GNUNET_TRANSPORT_Handle *h, 1204neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
1305 uint32_t quota_out,
1306 struct GNUNET_TIME_Relative latency,
1307 uint16_t distance,
1308 const struct GNUNET_PeerIdentity *pid) 1205 const struct GNUNET_PeerIdentity *pid)
1309{ 1206{
1310 struct NeighbourList *n; 1207 struct NeighbourList *n;
1311 struct GNUNET_TRANSPORT_TransmitHandle *prev;
1312 struct GNUNET_TRANSPORT_TransmitHandle *pos;
1313 struct GNUNET_TRANSPORT_TransmitHandle *next;
1314 1208
1315 /* check for duplicates */ 1209 /* check for duplicates */
1316 if (NULL != find_neighbour (h, pid)) 1210 if (NULL != (n = neighbour_find (h, pid)))
1317 { 1211 {
1318 GNUNET_break (0); 1212 GNUNET_break (0);
1319 return; 1213 return n;
1320 } 1214 }
1321#if DEBUG_TRANSPORT 1215#if DEBUG_TRANSPORT
1322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323 "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid)); 1217 "Creating entry for neighbour `%4s'.\n",
1218 GNUNET_i2s (pid));
1324#endif 1219#endif
1325 n = GNUNET_malloc (sizeof (struct NeighbourList)); 1220 n = GNUNET_malloc (sizeof (struct NeighbourList));
1326 n->id = *pid; 1221 n->id = *pid;
1327 n->last_quota_update = GNUNET_TIME_absolute_get (); 1222 n->last_quota_update = GNUNET_TIME_absolute_get ();
1328 n->quota_out = quota_out;
1329 n->next = h->neighbours; 1223 n->next = h->neighbours;
1330 n->transmit_ok = GNUNET_YES; 1224 n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1331 h->neighbours = n; 1225 n->h = h;
1332 if (h->nc_cb != NULL) 1226 h->neighbours = n;
1333 h->nc_cb (h->cls, &n->id, latency, distance); 1227 return n;
1334 prev = NULL;
1335 pos = h->connect_wait_head;
1336 while (pos != NULL)
1337 {
1338 next = pos->next;
1339 if (0 == memcmp (pid,
1340 &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1341 {
1342 pos->neighbour = n;
1343 GNUNET_assert (NULL == n->transmit_handle);
1344 n->transmit_handle = pos;
1345 if (prev == NULL)
1346 h->connect_wait_head = next;
1347 else
1348 prev->next = next;
1349#if ACK
1350 if (GNUNET_YES == n->received_ack)
1351 {
1352#endif
1353#if DEBUG_TRANSPORT
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355 "Found pending request for `%4s' will trigger it now.\n",
1356 GNUNET_i2s (&pos->target));
1357#endif
1358 if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1359 {
1360 GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1361 pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1362 }
1363 schedule_request (pos);
1364#if ACK
1365 }
1366#endif
1367
1368 break;
1369 }
1370 prev = pos;
1371 pos = next;
1372 }
1373} 1228}
1374 1229
1375 1230
@@ -1377,7 +1232,6 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1377 * Connect to the transport service. Note that the connection may 1232 * Connect to the transport service. Note that the connection may
1378 * complete (or fail) asynchronously. 1233 * complete (or fail) asynchronously.
1379 * 1234 *
1380
1381 * @param sched scheduler to use 1235 * @param sched scheduler to use
1382 * @param cfg configuration to use 1236 * @param cfg configuration to use
1383 * @param cls closure for the callbacks 1237 * @param cls closure for the callbacks
@@ -1423,44 +1277,29 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1423#if DEBUG_TRANSPORT 1277#if DEBUG_TRANSPORT
1424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); 1278 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1425#endif 1279#endif
1426 while (NULL != (th = handle->connect_ready_head))
1427 {
1428 handle->connect_ready_head = th->next;
1429 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1430 {
1431 GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1432 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1433 }
1434 if (NULL != th->notify)
1435 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
1436 GNUNET_free (th);
1437 }
1438 while (NULL != (th = handle->connect_wait_head))
1439 {
1440 handle->connect_wait_head = th->next;
1441 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1442 {
1443 GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1444 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1445 }
1446 if (NULL != th->notify)
1447 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
1448 GNUNET_free (th);
1449 }
1450 while (NULL != (n = handle->neighbours)) 1280 while (NULL != (n = handle->neighbours))
1451 { 1281 {
1452 handle->neighbours = n->next; 1282 handle->neighbours = n->next;
1453 if (NULL != (th = n->transmit_handle)) 1283 switch (n->transmit_stage)
1454 { 1284 {
1455 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) 1285 case TS_NEW:
1456 { 1286 case TS_TRANSMITTED:
1457 GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); 1287 /* nothing to do */
1458 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; 1288 break;
1459 } 1289 case TS_QUEUED:
1460 if (NULL != th->notify) 1290 case TS_TRANSMITTED_QUEUED:
1461 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); 1291 th = &n->transmit_handle;
1462 GNUNET_free (th); 1292 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1463 } 1293 {
1294 GNUNET_SCHEDULER_cancel (handle->sched,
1295 th->notify_delay_task);
1296 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1297 }
1298 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
1299 break;
1300 default:
1301 GNUNET_break (0);
1302 }
1464 GNUNET_free (n); 1303 GNUNET_free (n);
1465 } 1304 }
1466 while (NULL != (hwl = handle->hwl_head)) 1305 while (NULL != (hwl = handle->hwl_head))
@@ -1479,6 +1318,11 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1479 GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task); 1318 GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1480 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 1319 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1481 } 1320 }
1321 if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
1322 {
1323 GNUNET_SCHEDULER_cancel (handle->sched, handle->quota_task);
1324 handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
1325 }
1482 GNUNET_free_non_null (handle->my_hello); 1326 GNUNET_free_non_null (handle->my_hello);
1483 handle->my_hello = NULL; 1327 handle->my_hello = NULL;
1484 GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport", 1328 GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport",
@@ -1502,10 +1346,9 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1502 1346
1503 1347
1504/** 1348/**
1505 * Type of a function to call when we receive a message 1349 * Function we use for handling incoming messages.
1506 * from the service.
1507 * 1350 *
1508 * @param cls closure 1351 * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1509 * @param msg message received, NULL on timeout or fatal error 1352 * @param msg message received, NULL on timeout or fatal error
1510 */ 1353 */
1511static void 1354static void
@@ -1521,59 +1364,29 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1521 struct HelloWaitList *next_hwl; 1364 struct HelloWaitList *next_hwl;
1522 struct NeighbourList *n; 1365 struct NeighbourList *n;
1523 struct GNUNET_PeerIdentity me; 1366 struct GNUNET_PeerIdentity me;
1524 struct GNUNET_TRANSPORT_TransmitHandle *th;
1525
1526 struct GNUNET_TRANSPORT_TransmitHandle *prev;
1527 struct GNUNET_TRANSPORT_TransmitHandle *pos;
1528 struct GNUNET_TRANSPORT_TransmitHandle *next;
1529 uint16_t size; 1367 uint16_t size;
1530 1368
1531 if ((msg == NULL) || (h->client == NULL)) 1369 if (h->client == NULL)
1370 {
1371 /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1372 finish clean up work! */
1373 GNUNET_free (h);
1374 return;
1375 }
1376 if (msg == NULL)
1532 { 1377 {
1533 if (h->client != NULL)
1534 {
1535#if DEBUG_TRANSPORT 1378#if DEBUG_TRANSPORT
1536 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1379 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1537 "Error receiving from transport service, disconnecting temporarily.\n"); 1380 "Error receiving from transport service, disconnecting temporarily.\n");
1538#endif 1381#endif
1539 if (h->network_handle != NULL) 1382 if (h->network_handle != NULL)
1540 { 1383 {
1541 GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); 1384 GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
1542 h->network_handle = NULL; 1385 h->network_handle = NULL;
1543 h->transmission_scheduled = GNUNET_NO; 1386 }
1544 th = h->connect_ready_head; 1387 GNUNET_CLIENT_disconnect (h->client);
1545 /* add timeout again, we canceled the transmit_ready task! */ 1388 h->client = NULL;
1546 1389 schedule_reconnect (h);
1547 /*GNUNET_assert (th->notify_delay_task ==
1548 GNUNET_SCHEDULER_NO_TASK);*/
1549
1550 /* START - somehow we are getting here when th->notify_delay_task is already
1551 * set. Not sure why, so just checking and canceling instead of asserting and
1552 * dying. Probably not a *fix*. */
1553 if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1554 {
1555 GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1556 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1557 }
1558 /* END */
1559 GNUNET_assert (th->notify_delay_task ==
1560 GNUNET_SCHEDULER_NO_TASK);
1561 th->notify_delay_task =
1562 GNUNET_SCHEDULER_add_delayed (h->sched,
1563 GNUNET_TIME_absolute_get_remaining
1564 (th->timeout),
1565 &peer_transmit_timeout, th);
1566 }
1567 GNUNET_CLIENT_disconnect (h->client);
1568 h->client = NULL;
1569 schedule_reconnect (h);
1570 }
1571 else
1572 {
1573 /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1574 finish clean up work! */
1575 GNUNET_free (h);
1576 }
1577 return; 1390 return;
1578 } 1391 }
1579 GNUNET_CLIENT_receive (h->client, 1392 GNUNET_CLIENT_receive (h->client,
@@ -1624,81 +1437,16 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1624 "Receiving `%s' message for `%4s'.\n", 1437 "Receiving `%s' message for `%4s'.\n",
1625 "CONNECT", GNUNET_i2s (&cim->id)); 1438 "CONNECT", GNUNET_i2s (&cim->id));
1626#endif 1439#endif
1627 if (NULL == (n = find_neighbour(h, &cim->id))) 1440 n = neighbour_find (h, &cim->id);
1628 { 1441 if (n == NULL)
1629#if DEBUG_TRANSPORT 1442 n = neighbour_add (h,
1630 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1443 &cim->id);
1631 "Don't know neighbor, adding!\n"); 1444 GNUNET_break (n->is_connected == GNUNET_NO);
1632#endif 1445 n->is_connected = GNUNET_YES;
1633 add_neighbour (h, 1446 if (h->nc_cb != NULL)
1634 ntohl (cim->quota_out), 1447 h->nc_cb (h->cls, &n->id,
1635 GNUNET_TIME_relative_ntoh (cim->latency), ntohs(cim->distance), &cim->id); 1448 GNUNET_TIME_relative_ntoh (cim->latency),
1636 } 1449 ntohs (cim->distance));
1637 else
1638 {
1639#if DEBUG_TRANSPORT
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641 "Do know neighbor, scheduling transmission!\n");
1642#endif
1643#if ACK
1644 n->received_ack = GNUNET_YES;
1645#endif
1646 if (NULL != n->transmit_handle)
1647 {
1648#if DEBUG_TRANSPORT
1649 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1650 "Peer connected, scheduling delayed message for delivery now.\n");
1651#endif
1652 schedule_request (n->transmit_handle);
1653 }
1654 else
1655 {
1656#if DEBUG_TRANSPORT
1657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658 "Transmit handle is null... Checking for pending stuff(?)\n");
1659#endif
1660 prev = NULL;
1661 pos = h->connect_wait_head;
1662 while (pos != NULL)
1663 {
1664 next = pos->next;
1665 if (0 == memcmp (&cim->id,
1666 &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1667 {
1668 pos->neighbour = n;
1669 GNUNET_assert (NULL == n->transmit_handle);
1670 n->transmit_handle = pos;
1671 if (prev == NULL)
1672 h->connect_wait_head = next;
1673 else
1674 prev->next = next;
1675#if ACK
1676 if (GNUNET_YES == n->received_ack)
1677 {
1678#endif
1679 #if DEBUG_TRANSPORT
1680 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681 "Found pending request for `%4s' will trigger it now.\n",
1682 GNUNET_i2s (&pos->target));
1683 #endif
1684 if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1685 {
1686 GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1687 pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1688 }
1689 schedule_request (pos);
1690#if ACK
1691 }
1692#endif
1693
1694 break;
1695 }
1696 prev = pos;
1697 pos = next;
1698 }
1699 }
1700 }
1701
1702 break; 1450 break;
1703 case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: 1451 case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1704 if (size != sizeof (struct DisconnectInfoMessage)) 1452 if (size != sizeof (struct DisconnectInfoMessage))
@@ -1710,9 +1458,13 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1710#if DEBUG_TRANSPORT 1458#if DEBUG_TRANSPORT
1711 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1459 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1712 "Receiving `%s' message for `%4s'.\n", 1460 "Receiving `%s' message for `%4s'.\n",
1713 "DISCONNECT", GNUNET_i2s (&dim->peer)); 1461 "DISCONNECT",
1462 GNUNET_i2s (&dim->peer));
1714#endif 1463#endif
1715 remove_neighbour (h, &dim->peer); 1464 n = neighbour_find (h, &cim->id);
1465 GNUNET_break (n != NULL);
1466 if (n != NULL)
1467 neighbour_disconnect (n);
1716 break; 1468 break;
1717 case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: 1469 case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1718 if (size != sizeof (struct SendOkMessage)) 1470 if (size != sizeof (struct SendOkMessage))
@@ -1726,21 +1478,26 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1726 "Receiving `%s' message, transmission %s.\n", "SEND_OK", 1478 "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1727 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); 1479 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
1728#endif 1480#endif
1729 n = find_neighbour (h, &okm->peer); 1481 n = neighbour_find (h, &okm->peer);
1730 GNUNET_assert (n != NULL); 1482 GNUNET_assert (n != NULL);
1731 n->transmit_ok = GNUNET_YES; 1483 switch (n->transmit_stage)
1732 if (n->transmit_handle != NULL) 1484 {
1733 { 1485 case TS_NEW:
1734#if DEBUG_TRANSPORT 1486 GNUNET_break (0);
1735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1487 break;
1736 "Processing pending message for `%4s'\n", 1488 case TS_QUEUED:
1737 GNUNET_i2s (&n->id)); 1489 GNUNET_break (0);
1738#endif 1490 break;
1739 GNUNET_SCHEDULER_cancel (h->sched, 1491 case TS_TRANSMITTED:
1740 n->transmit_handle->notify_delay_task); 1492 n->transmit_stage = TS_NEW;
1741 n->transmit_handle->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; 1493 break;
1742 schedule_request (n->transmit_handle); 1494 case TS_TRANSMITTED_QUEUED:
1743 } 1495 n->transmit_stage = TS_QUEUED;
1496 schedule_transmission (h);
1497 break;
1498 default:
1499 GNUNET_break (0);
1500 }
1744 break; 1501 break;
1745 case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: 1502 case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1746#if DEBUG_TRANSPORT 1503#if DEBUG_TRANSPORT
@@ -1761,42 +1518,20 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1761 GNUNET_break (0); 1518 GNUNET_break (0);
1762 break; 1519 break;
1763 } 1520 }
1764 switch (ntohs (imm->type))
1765 {
1766 case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1767#if DEBUG_TRANSPORT
1768 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1769 "Receiving `%s' message from `%4s'.\n",
1770 "ACK", GNUNET_i2s (&im->peer));
1771#endif
1772 break;
1773 default:
1774#if DEBUG_TRANSPORT 1521#if DEBUG_TRANSPORT
1775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1522 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1776 "Received message of type %u from `%4s'.\n", 1523 "Received message of type %u from `%4s'.\n",
1777 ntohs (imm->type), GNUNET_i2s (&im->peer)); 1524 ntohs (imm->type), GNUNET_i2s (&im->peer));
1778#endif 1525#endif
1779 1526 n = neighbour_find (h, &im->peer);
1780 n = find_neighbour (h, &im->peer); 1527 if (n == NULL)
1781 if (n == NULL) 1528 {
1782 { 1529 GNUNET_break (0);
1783 GNUNET_break (0); 1530 break;
1784 break; 1531 }
1785 } 1532 if (h->rec != NULL)
1786 1533 h->rec (h->cls, &im->peer, imm,
1787 if (NULL != n->transmit_handle) 1534 GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
1788 {
1789#if DEBUG_TRANSPORT
1790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1791 "Peer connected, scheduling delayed message for delivery now.\n");
1792#endif
1793 schedule_request (n->transmit_handle);
1794 }
1795 if (h->rec != NULL)
1796 h->rec (h->cls, &im->peer, imm,
1797 GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
1798 break;
1799 }
1800 break; 1535 break;
1801 default: 1536 default:
1802 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1537 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1809,73 +1544,48 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1809} 1544}
1810 1545
1811 1546
1812struct ClientTransmitWrapper
1813{
1814 GNUNET_CONNECTION_TransmitReadyNotify notify;
1815 void *notify_cls;
1816 struct GNUNET_TRANSPORT_TransmitHandle *th;
1817};
1818
1819
1820/** 1547/**
1821 * Transmit message of a client destined for another 1548 * Called when our transmit request timed out before any transport
1822 * peer to the service. 1549 * reported success connecting to the desired peer or before the
1550 * transport was ready to receive. Signal error and free
1551 * TransmitHandle.
1552 *
1553 * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out
1554 * @param tc scheduler context
1823 */ 1555 */
1824static size_t 1556static void
1825client_notify_wrapper (void *cls, size_t size, void *buf) 1557peer_transmit_timeout (void *cls,
1558 const struct GNUNET_SCHEDULER_TaskContext *tc)
1826{ 1559{
1827 struct ClientTransmitWrapper *ctw = cls; 1560 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1828 struct OutboundMessage *obm; 1561 struct NeighbourList *n;
1829 struct GNUNET_MessageHeader *hdr;
1830 size_t ret;
1831 1562
1832 if (size == 0) 1563 th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1833 { 1564 n = th->neighbour;
1834#if DEBUG_TRANSPORT 1565 switch (n->transmit_stage)
1835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1836 "Transmission request could not be satisfied.\n");
1837#endif
1838 if (NULL != ctw->notify)
1839 GNUNET_assert (0 == ctw->notify (ctw->notify_cls, 0, NULL));
1840 GNUNET_free (ctw);
1841 return 0;
1842 }
1843 GNUNET_assert (size >= sizeof (struct OutboundMessage));
1844 obm = buf;
1845 if (ctw->notify != NULL)
1846 ret = ctw->notify (ctw->notify_cls,
1847 size - sizeof (struct OutboundMessage),
1848 (void *) &obm[1]);
1849 else
1850 ret = 0;
1851 if (ret == 0)
1852 { 1566 {
1853 /* Need to reset flag, no SEND means no SEND_OK! */ 1567 case TS_NEW:
1854 ctw->th->neighbour->transmit_ok = GNUNET_YES; 1568 GNUNET_break (0);
1855 GNUNET_free (ctw); 1569 break;
1856 return 0; 1570 case TS_QUEUED:
1571 n->transmit_stage = TS_NEW;
1572 if (n->is_connected == GNUNET_NO)
1573 neighbour_free (n);
1574 break;
1575 case TS_TRANSMITTED:
1576 GNUNET_break (0);
1577 break;
1578 case TS_TRANSMITTED_QUEUED:
1579 n->transmit_stage = TS_TRANSMITTED;
1580 break;
1581 default:
1582 GNUNET_break (0);
1857 } 1583 }
1858 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); 1584 if (NULL != th->notify)
1859 hdr = (struct GNUNET_MessageHeader *) &obm[1]; 1585 th->notify (th->notify_cls, 0, NULL);
1860 GNUNET_assert (ntohs (hdr->size) == ret);
1861 GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1862 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1863#if DEBUG_TRANSPORT
1864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1865 "Transmitting `%s' message with data for `%4s'\n",
1866 "SEND", GNUNET_i2s (&ctw->th->target));
1867#endif
1868 ret += sizeof (struct OutboundMessage);
1869 obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1870 obm->header.size = htons (ret);
1871 obm->priority = htonl (ctw->th->priority);
1872 obm->peer = ctw->th->target;
1873 GNUNET_free (ctw);
1874 return ret;
1875} 1586}
1876 1587
1877 1588
1878
1879/** 1589/**
1880 * Check if we could queue a message of the given size for 1590 * Check if we could queue a message of the given size for
1881 * transmission. The transport service will take both its 1591 * transmission. The transport service will take both its
@@ -1905,10 +1615,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1905 GNUNET_CONNECTION_TransmitReadyNotify 1615 GNUNET_CONNECTION_TransmitReadyNotify
1906 notify, void *notify_cls) 1616 notify, void *notify_cls)
1907{ 1617{
1908 struct GNUNET_TRANSPORT_TransmitHandle *pos;
1909 struct GNUNET_TRANSPORT_TransmitHandle *th; 1618 struct GNUNET_TRANSPORT_TransmitHandle *th;
1910 struct NeighbourList *n; 1619 struct NeighbourList *n;
1911 struct ClientTransmitWrapper *ctw;
1912 1620
1913 if (size + sizeof (struct OutboundMessage) >= 1621 if (size + sizeof (struct OutboundMessage) >=
1914 GNUNET_SERVER_MAX_MESSAGE_SIZE) 1622 GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -1926,66 +1634,39 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1926 "Asking transport service for transmission of %u bytes to peer `%4s'.\n", 1634 "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1927 size, GNUNET_i2s (target)); 1635 size, GNUNET_i2s (target));
1928#endif 1636#endif
1929 n = find_neighbour (handle, target); 1637 n = neighbour_find (handle, target);
1930 if ((n != NULL) && (n->transmit_handle != NULL)) 1638 if (n == NULL)
1931 return NULL; /* already have a request pending for this peer! */ 1639 n = neighbour_add (handle, target);
1932 ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper)); 1640 if (n == NULL)
1933 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); 1641 return NULL;
1934 ctw->notify = notify; 1642 switch (n->transmit_stage)
1935 ctw->notify_cls = notify_cls; 1643 {
1936 ctw->th = th; 1644 case TS_NEW:
1937 th->handle = handle; 1645 n->transmit_stage = TS_QUEUED;
1646 break;
1647 case TS_QUEUED:
1648 break;
1649 case TS_TRANSMITTED:
1650 n->transmit_stage = TS_TRANSMITTED_QUEUED;
1651 break;
1652 case TS_TRANSMITTED_QUEUED:
1653 return NULL;
1654 break;
1655 default:
1656 GNUNET_break (0);
1657 return NULL;
1658 }
1659 th = &n->transmit_handle;
1938 th->neighbour = n; 1660 th->neighbour = n;
1939 th->target = *target; 1661 th->notify = notify;
1940 th->notify = &client_notify_wrapper; 1662 th->notify_cls = notify_cls;
1941 th->notify_cls = ctw;
1942 th->timeout = GNUNET_TIME_relative_to_absolute (timeout); 1663 th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1943 th->notify_size = size + sizeof (struct OutboundMessage); 1664 th->notify_size = size + sizeof (struct OutboundMessage);
1944 th->priority = priority; 1665 th->priority = priority;
1945 if (NULL == n)
1946 {
1947 pos = handle->connect_wait_head;
1948 while (pos != NULL)
1949 {
1950 GNUNET_assert (0 != memcmp (target,
1951 &pos->target,
1952 sizeof (struct GNUNET_PeerIdentity)));
1953 pos = pos->next;
1954 }
1955#if DEBUG_TRANSPORT
1956 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1957 "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1958#endif
1959 try_connect (th);
1960 return th;
1961 }
1962
1963#if DEBUG_TRANSPORT
1964 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1965 "Transmission request queued for transmission to transport service.\n");
1966#endif
1967 GNUNET_assert (NULL == n->transmit_handle);
1968 n->transmit_handle = th;
1969 if (GNUNET_YES != n->transmit_ok)
1970 {
1971#if DEBUG_TRANSPORT
1972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973 "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
1974 GNUNET_i2s (target), timeout.value);
1975#endif
1976 th->notify_delay_task
1977 = GNUNET_SCHEDULER_add_delayed (handle->sched,
1978 timeout, &peer_transmit_timeout, th);
1979 return th;
1980 }
1981
1982#if DEBUG_TRANSPORT
1983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984 "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
1985 GNUNET_i2s (target));
1986#endif
1987 th->notify_delay_task 1666 th->notify_delay_task
1988 = GNUNET_SCHEDULER_add_now (handle->sched, &transmit_ready, th); 1667 = GNUNET_SCHEDULER_add_delayed (handle->sched, timeout,
1668 &peer_transmit_timeout, th);
1669 schedule_transmission (handle);
1989 return th; 1670 return th;
1990} 1671}
1991 1672
@@ -1998,26 +1679,34 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1998 GNUNET_TRANSPORT_TransmitHandle 1679 GNUNET_TRANSPORT_TransmitHandle
1999 *th) 1680 *th)
2000{ 1681{
2001 struct GNUNET_TRANSPORT_Handle *h; 1682 struct NeighbourList *n;
2002 1683
1684 n = th->neighbour;
2003#if DEBUG_TRANSPORT 1685#if DEBUG_TRANSPORT
2004 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2005 "Transmission request of %u bytes to `%4s' was cancelled.\n", 1687 "Transmission request of %u bytes to `%4s' was cancelled.\n",
2006 th->notify_size - sizeof (struct OutboundMessage), 1688 th->notify_size - sizeof (struct OutboundMessage),
2007 GNUNET_i2s (&th->target)); 1689 GNUNET_i2s (&n->id));
2008#endif 1690#endif
2009 GNUNET_assert (th->notify == &client_notify_wrapper); 1691 switch (n->transmit_stage)
2010 remove_from_any_list (th);
2011 h = th->handle;
2012 if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
2013 { 1692 {
2014 GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); 1693 case TS_NEW:
2015 h->network_handle = NULL; 1694 GNUNET_break (0);
2016 h->transmission_scheduled = GNUNET_NO; 1695 break;
1696 case TS_QUEUED:
1697 n->transmit_stage = TS_NEW;
1698 if (n->is_connected == GNUNET_NO)
1699 neighbour_free (n);
1700 break;
1701 case TS_TRANSMITTED:
1702 GNUNET_break (0);
1703 break;
1704 case TS_TRANSMITTED_QUEUED:
1705 n->transmit_stage = TS_TRANSMITTED;
1706 break;
1707 default:
1708 GNUNET_break (0);
2017 } 1709 }
2018 GNUNET_free (th->notify_cls);
2019 GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
2020 GNUNET_free (th);
2021} 1710}
2022 1711
2023 1712