aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2013-07-31 08:07:58 +0000
committerBart Polot <bart@net.in.tum.de>2013-07-31 08:07:58 +0000
commit7bf56139d0cd7499ab4cd7c194769f6768ad37b4 (patch)
tree73f1e92a2720176af82b6c03c3781ab9fe42e256 /src
parentb87f43cb7ca0ceec933054e6b224b1b321eb7238 (diff)
downloadgnunet-7bf56139d0cd7499ab4cd7c194769f6768ad37b4.tar.gz
gnunet-7bf56139d0cd7499ab4cd7c194769f6768ad37b4.zip
- wip
Diffstat (limited to 'src')
-rw-r--r--src/mesh/Makefile.am3
-rw-r--r--src/mesh/gnunet-service-mesh-enc.c567
-rw-r--r--src/mesh/mesh_protocol_enc.h10
3 files changed, 319 insertions, 261 deletions
diff --git a/src/mesh/Makefile.am b/src/mesh/Makefile.am
index 6e8d95ec9..8667f5527 100644
--- a/src/mesh/Makefile.am
+++ b/src/mesh/Makefile.am
@@ -20,6 +20,9 @@ plugindir = $(libdir)/gnunet
20 20
21AM_CLFAGS = -g 21AM_CLFAGS = -g
22 22
23EXP_LIBEXEC = \
24 gnunet-service-mesh-enc
25
23libexec_PROGRAMS = \ 26libexec_PROGRAMS = \
24 gnunet-service-mesh $(EXP_LIBEXEC) 27 gnunet-service-mesh $(EXP_LIBEXEC)
25 28
diff --git a/src/mesh/gnunet-service-mesh-enc.c b/src/mesh/gnunet-service-mesh-enc.c
index 95151dfa3..17982133e 100644
--- a/src/mesh/gnunet-service-mesh-enc.c
+++ b/src/mesh/gnunet-service-mesh-enc.c
@@ -161,6 +161,7 @@ enum MeshConnectionState
161struct MeshClient; 161struct MeshClient;
162struct MeshPeer; 162struct MeshPeer;
163struct MeshTunnel2; 163struct MeshTunnel2;
164struct MeshConnection;
164struct MeshChannel; 165struct MeshChannel;
165struct MeshChannelReliability; 166struct MeshChannelReliability;
166 167
@@ -186,9 +187,14 @@ struct MeshPeerQueue
186 struct MeshPeer *peer; 187 struct MeshPeer *peer;
187 188
188 /** 189 /**
189 * Tunnel this message belongs to. 190 * Connection this message belongs to.
190 */ 191 */
191 struct MeshTunnel2 *tunnel; 192 struct MeshConnection *c;
193
194 /**
195 * Channel this message belongs to, if known.
196 */
197 struct MeshChannel *ch;
192 198
193 /** 199 /**
194 * Pointer to info stucture used as cls. 200 * Pointer to info stucture used as cls.
@@ -214,6 +220,11 @@ struct MeshPeerQueue
214struct MeshFlowControl 220struct MeshFlowControl
215{ 221{
216 /** 222 /**
223 * Peer
224 */
225 struct MeshPeer *peer;
226
227 /**
217 * Transmission queue to core DLL head 228 * Transmission queue to core DLL head
218 */ 229 */
219 struct MeshPeerQueue *queue_head; 230 struct MeshPeerQueue *queue_head;
@@ -983,26 +994,42 @@ static void
983connection_bck_keepalive (void *cls, 994connection_bck_keepalive (void *cls,
984 const struct GNUNET_SCHEDULER_TaskContext *tc); 995 const struct GNUNET_SCHEDULER_TaskContext *tc);
985 996
997
998/**
999 * Change the tunnel state.
1000 *
1001 * @param c Connection whose state to change.
1002 * @param state New state.
1003 */
1004static void
1005connection_change_state (struct MeshConnection* c,
1006 enum MeshConnectionState state);
1007
1008
1009
986/** 1010/**
987 * @brief Queue and pass message to core when possible. 1011 * @brief Queue and pass message to core when possible.
988 * 1012 *
989 * If type is payload (UNICAST, TO_ORIGIN) checks for queue status 1013 * If type is payload (UNICAST, TO_ORIGIN) checks for queue status and
990 * and accounts for it. In case the queue is full, the message is dropped and 1014 * accounts for it. In case the queue is full, the message is dropped and
991 * a break issued. 1015 * a break issued.
992 * 1016 *
993 * Otherwise, the message is treated as internal and allowed to go, 1017 * Otherwise, message is treated as internal and allowed to go regardless of
994 * regardless of queue status. 1018 * queue status.
995 * 1019 *
996 * @param cls Closure (@c type dependant). It will be used by queue_send to 1020 * @param cls Closure (@c type dependant). It will be used by queue_send to
997 * build the message to be sent if not already prebuilt. 1021 * build the message to be sent if not already prebuilt.
998 * @param type Type of the message, 0 for a raw message. 1022 * @param type Type of the message, 0 for a raw message.
999 * @param size Size of the message. 1023 * @param size Size of the message.
1000 * @param dst Neighbor to send message to. 1024 * @param dst Neighbor to send message to.
1001 * @param t Tunnel this message belongs to. 1025 * @param c Connection this message belongs to.
1026 * @param ch Channel this message belongs to, if applicable (otherwise NULL).
1002 */ 1027 */
1003static void 1028static void
1004queue_add (void *cls, uint16_t type, size_t size, 1029queue_add (void *cls, uint16_t type, size_t size,
1005 struct MeshPeer *dst, struct MeshTunnel2 *t); 1030 struct MeshPeer *dst,
1031 struct MeshConnection *c,
1032 struct MeshChannel *ch);
1006 1033
1007 1034
1008/** 1035/**
@@ -1157,6 +1184,38 @@ announce_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1157 1184
1158 1185
1159/** 1186/**
1187 * Get the previous hop in a connection
1188 *
1189 * @param c Connection.
1190 *
1191 * @return Short ID of the previous peer.
1192 */
1193GNUNET_PEER_Id
1194connection_get_prev_hop (struct MeshConnection *c)
1195{
1196 if (0 == c->own_pos || c->path->length < 2)
1197 return c->path->peers[0];
1198 return c->path->peers[c->own_pos - 1];
1199}
1200
1201
1202/**
1203 * Get the next hop in a connection
1204 *
1205 * @param c Connection.
1206 *
1207 * @return Short ID of the next peer.
1208 */
1209GNUNET_PEER_Id
1210connection_get_next_hop (struct MeshConnection *c)
1211{
1212 if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
1213 return c->path->peers[c->path->length - 1];
1214 return c->path->peers[c->own_pos + 1];
1215}
1216
1217
1218/**
1160 * Check if client has registered with the service and has not disconnected 1219 * Check if client has registered with the service and has not disconnected
1161 * 1220 *
1162 * @param client the client to check 1221 * @param client the client to check
@@ -1279,99 +1338,175 @@ send_local_ack (struct MeshChannel *ch,
1279} 1338}
1280 1339
1281 1340
1341/**
1342 * Pick a connection on which send the next data message.
1343 *
1344 * @param t Tunnel on which to send the message.
1345 * @param fwd Is this a fwd message?
1346 *
1347 * @return The connection on which to send the next message.
1348 */
1349static struct MeshConnection *
1350tunnel_get_connection (struct MeshTunnel2 *t, int fwd)
1351{
1352 struct MeshConnection *c;
1353 struct MeshConnection *best;
1354 struct MeshPeer *neighbor;
1355 GNUNET_PEER_Id id;
1356 unsigned int lowest_q;
1357
1358
1359 neighbor = NULL;
1360 best = NULL;
1361 lowest_q = UINT_MAX;
1362 for (c = t->connection_head; NULL != c; c = c->next)
1363 {
1364 if (MESH_CONNECTION_READY == c->state)
1365 {
1366 id = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
1367 neighbor = peer_get_short (id);
1368 if (NULL == neighbor->fc)
1369 {
1370 GNUNET_break (0);
1371 continue;
1372 }
1373 if (neighbor->fc->queue_n < lowest_q)
1374 {
1375 best = c;
1376 lowest_q = neighbor->fc->queue_n;
1377 }
1378 }
1379 }
1380 return best;
1381}
1382
1282 1383
1283/** 1384/**
1284 * Sends an already built message to a peer, properly registrating 1385 * Sends an already built message on a tunnel, properly registering
1285 * all used resources. 1386 * all used resources.
1286 * 1387 *
1287 * @param message Message to send. Function makes a copy of it. 1388 * @param message Message to send. Function makes a copy of it.
1288 * @param peer Short ID of the neighbor whom to send the message. 1389 * @param c Connection on which this message is transmitted.
1289 * @param t Tunnel on which this message is transmitted. 1390 * @param ch Channel on which this message is transmitted.
1391 * @param fwd Is this a fwd message?
1290 */ 1392 */
1291static void 1393static void
1292send_prebuilt_message (const struct GNUNET_MessageHeader *message, 1394send_prebuilt_message_connection (const struct GNUNET_MessageHeader *message,
1293 GNUNET_PEER_Id peer, 1395 struct MeshConnection *c,
1294 struct MeshTunnel2 *t) 1396 struct MeshChannel *ch,
1397 int fwd)
1295{ 1398{
1296 struct MeshPeer *neighbor; 1399 struct MeshPeer *neighbor;
1297 struct MeshPeerPath *p; 1400 GNUNET_PEER_Id id;
1298 void *data; 1401 void *data;
1299 size_t size; 1402 size_t size;
1300 uint16_t type; 1403 uint16_t type;
1301 1404
1302 if (0 == peer) 1405 id = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
1406 neighbor = peer_get_short (id);
1407 if (NULL == neighbor)
1408 {
1409 GNUNET_break (0);
1303 return; 1410 return;
1411 }
1304 1412
1305 size = ntohs (message->size); 1413 size = ntohs (message->size);
1306 data = GNUNET_malloc (size); 1414 data = GNUNET_malloc (size);
1307 memcpy (data, message, size); 1415 memcpy (data, message, size);
1308 type = ntohs(message->type); 1416 type = ntohs(message->type);
1309 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type || 1417 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
1310 GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type) 1418 GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
1311 { 1419 {
1312 struct GNUNET_MESH_Data *u; 1420 struct GNUNET_MESH_Data *u;
1313 1421
1314 u = (struct GNUNET_MESH_Data *) data; 1422 u = (struct GNUNET_MESH_Data *) data;
1315 u->ttl = htonl (ntohl (u->ttl) - 1); 1423 u->ttl = htonl (ntohl (u->ttl) - 1);
1316 } 1424 }
1317 neighbor = peer_get_short (peer); 1425
1318 for (p = neighbor->path_head; NULL != p; p = p->next)
1319 {
1320 if (2 >= p->length)
1321 {
1322 break;
1323 }
1324 }
1325 if (NULL == p)
1326 {
1327 GNUNET_break (0);
1328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1329 " no direct connection to %s\n",
1330 GNUNET_i2s (GNUNET_PEER_resolve2 (peer)));
1331 GNUNET_free (data);
1332 return;
1333 }
1334 if (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK == type) // FIXME
1335 type = 0;
1336 queue_add (data, 1426 queue_add (data,
1337 type, 1427 type,
1338 size, 1428 size,
1339 neighbor, 1429 neighbor,
1340 t); 1430 c,
1431 ch);
1341} 1432}
1342 1433
1343 1434
1344GNUNET_PEER_Id 1435/**
1345connection_get_prev_hop (struct MeshConnection *c) 1436 * Sends an already built message on a tunnel, properly registering
1437 * all used resources.
1438 *
1439 * @param message Message to send. Function makes a copy of it.
1440 * @param t Tunnel on which this message is transmitted.
1441 * @param ch Channel on which this message is transmitted.
1442 * @param fwd Is this a fwd message?
1443 */
1444static void
1445send_prebuilt_message_tunnel (const struct GNUNET_MessageHeader *message,
1446 struct MeshTunnel2 *t,
1447 struct MeshChannel *ch,
1448 int fwd)
1346{ 1449{
1347 if (0 == c->own_pos || c->path->length < 2) 1450 struct MeshConnection *c;
1348 return c->path->peers[0]; 1451
1349 return c->path->peers[c->own_pos - 1]; 1452 c = tunnel_get_connection (t, fwd);
1453 if (NULL == c)
1454 {
1455 GNUNET_break (0);
1456 return;
1457 }
1458
1459 send_prebuilt_message_connection (message, c, ch, fwd);
1350} 1460}
1351 1461
1352 1462
1353GNUNET_PEER_Id 1463/**
1354connection_get_next_hop (struct MeshConnection *c) 1464 * Sends an already built message directly to a peer.
1465 *
1466 * @param message Message to send. Function makes a copy of it.
1467 * @param peer Tunnel on which this message is transmitted.
1468 */
1469static void
1470send_prebuilt_message_peer (const struct GNUNET_MessageHeader *message,
1471 struct MeshPeer *peer)
1355{ 1472{
1356 if ((c->path->length - 1) == c->own_pos || c->path->length < 2) 1473 void *data;
1357 return c->path->peers[c->path->length - 1]; 1474 size_t size;
1358 return c->path->peers[c->own_pos + 1]; 1475 uint16_t type;
1476
1477 if (NULL == peer)
1478 {
1479 GNUNET_break (0);
1480 return;
1481 }
1482
1483 size = ntohs (message->size);
1484 data = GNUNET_malloc (size);
1485 memcpy (data, message, size);
1486 type = ntohs(message->type);
1487
1488 queue_add (data,
1489 type,
1490 size,
1491 peer,
1492 NULL,
1493 NULL);
1359} 1494}
1360 1495
1361 1496
1362/** 1497/**
1363 * Sends a CREATE CONNECTION message for a path to a peer, properly registrating 1498 * Sends a CREATE CONNECTION message for a path to a peer.
1364 * all used resources. 1499 * Changes the connection and tunnel states if necessary.
1365 * 1500 *
1366 * @param t Tunnel for which the connection is created.
1367 * @param connection Connection to create. 1501 * @param connection Connection to create.
1368 */ 1502 */
1369static void 1503static void
1370send_connection_create (struct MeshTunnel2 *t, 1504send_connection_create (struct MeshConnection *connection)
1371 struct MeshConnection *connection)
1372{ 1505{
1373 struct MeshPeer *neighbor; 1506 struct MeshPeer *neighbor;
1507 struct MeshTunnel2 *t;
1374 1508
1509 t = connection->t;
1375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n"); 1510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
1376 neighbor = peer_get_short (connection_get_next_hop (connection)); 1511 neighbor = peer_get_short (connection_get_next_hop (connection));
1377 queue_add (connection, 1512 queue_add (connection,
@@ -1380,9 +1515,12 @@ send_connection_create (struct MeshTunnel2 *t,
1380 (connection->path->length * 1515 (connection->path->length *
1381 sizeof (struct GNUNET_PeerIdentity)), 1516 sizeof (struct GNUNET_PeerIdentity)),
1382 neighbor, 1517 neighbor,
1383 t); 1518 connection,
1519 NULL);
1384 if (MESH_TUNNEL_SEARCHING == t->state) 1520 if (MESH_TUNNEL_SEARCHING == t->state)
1385 tunnel_change_state (t, MESH_TUNNEL_WAITING); 1521 tunnel_change_state (t, MESH_TUNNEL_WAITING);
1522 if (MESH_CONNECTION_NEW == connection->state)
1523 connection_change_state (connection, MESH_CONNECTION_SENT);
1386} 1524}
1387 1525
1388 1526
@@ -1390,21 +1528,23 @@ send_connection_create (struct MeshTunnel2 *t,
1390 * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE 1528 * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
1391 * directed to us. 1529 * directed to us.
1392 * 1530 *
1393 * @param t Tunnel which to confirm.
1394 * @param connection Connection to confirm. 1531 * @param connection Connection to confirm.
1395 */ 1532 */
1396static void 1533static void
1397send_connection_ack (struct MeshTunnel2 *t, struct MeshConnection *connection) 1534send_connection_ack (struct MeshConnection *connection)
1398{ 1535{
1399 struct MeshPeer *neighbor; 1536 struct MeshPeer *neighbor;
1537 struct MeshTunnel2 *t;
1400 1538
1539 t = connection->t;
1401 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n"); 1540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
1402 neighbor = peer_get_short (connection_get_prev_hop (connection)); 1541 neighbor = peer_get_short (connection_get_prev_hop (connection));
1403 queue_add (connection, 1542 queue_add (connection,
1404 GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK, 1543 GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
1405 sizeof (struct GNUNET_MESH_ConnectionACK), 1544 sizeof (struct GNUNET_MESH_ConnectionACK),
1406 neighbor, 1545 neighbor,
1407 t); 1546 connection,
1547 NULL);
1408 if (MESH_TUNNEL_NEW == t->state) 1548 if (MESH_TUNNEL_NEW == t->state)
1409 tunnel_change_state (t, MESH_TUNNEL_WAITING); 1549 tunnel_change_state (t, MESH_TUNNEL_WAITING);
1410} 1550}
@@ -1425,7 +1565,7 @@ send_ack (GNUNET_PEER_Id peer, uint32_t ack)
1425 msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); 1565 msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
1426 msg.pid = htonl (ack); 1566 msg.pid = htonl (ack);
1427 1567
1428 send_prebuilt_message (&msg.header, peer, NULL); 1568 send_prebuilt_message_peer (&msg.header, peer_get_short (peer));
1429} 1569}
1430 1570
1431 1571
@@ -1771,27 +1911,27 @@ peer_connect (struct MeshPeer *peer)
1771 if (NULL != p) 1911 if (NULL != p)
1772 { 1912 {
1773 c = tunnel_use_path (t, p); 1913 c = tunnel_use_path (t, p);
1774 send_connection_create (t, c); 1914 send_connection_create (c);
1775 connection_change_state (c, MESH_CONNECTION_SENT);
1776 } 1915 }
1777 } 1916 }
1778 else if (NULL == peer->dhtget) 1917 else if (NULL == peer->dhtget)
1779 { 1918 {
1780 struct GNUNET_PeerIdentity id; 1919 const struct GNUNET_PeerIdentity *id;
1781 1920
1782 GNUNET_PEER_resolve (peer->id, &id); 1921 id = GNUNET_PEER_resolve2 (peer->id);
1783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1922 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784 " Starting DHT GET for peer %s\n", GNUNET_i2s (&id)); 1923 " Starting DHT GET for peer %s\n", GNUNET_i2s (id));
1785 peer->dhtget = GNUNET_DHT_get_start (dht_handle, /* handle */ 1924 peer->dhtget = GNUNET_DHT_get_start (dht_handle, /* handle */
1786 GNUNET_BLOCK_TYPE_MESH_PEER, /* type */ 1925 GNUNET_BLOCK_TYPE_MESH_PEER, /* type */
1787 &id.hashPubKey, /* key to search */ 1926 &id->hashPubKey, /* key to search */
1788 dht_replication_level, /* replication level */ 1927 dht_replication_level, /* replication level */
1789 GNUNET_DHT_RO_RECORD_ROUTE | 1928 GNUNET_DHT_RO_RECORD_ROUTE |
1790 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1929 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1791 NULL, /* xquery */ 1930 NULL, /* xquery */
1792 0, /* xquery bits */ 1931 0, /* xquery bits */
1793 &dht_get_id_handler, peer); 1932 &dht_get_id_handler, peer);
1794 tunnel_change_state (t, MESH_TUNNEL_SEARCHING); 1933 if (MESH_TUNNEL_NEW == t->state)
1934 tunnel_change_state (t, MESH_TUNNEL_SEARCHING);
1795 } 1935 }
1796 else 1936 else
1797 { 1937 {
@@ -1864,7 +2004,7 @@ peer_cancel_queues (GNUNET_PEER_Id neighbor, struct MeshTunnel2 *t)
1864 for (q = fc->queue_head; NULL != q; q = next) 2004 for (q = fc->queue_head; NULL != q; q = next)
1865 { 2005 {
1866 next = q->next; 2006 next = q->next;
1867 if (q->tunnel == t) 2007 if (q->peer->tunnel == t)
1868 { 2008 {
1869 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == q->type || 2009 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == q->type ||
1870 GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == q->type) 2010 GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == q->type)
@@ -2099,55 +2239,6 @@ peer_add_path_to_origin (struct MeshPeer *peer_info,
2099} 2239}
2100 2240
2101 2241
2102/**
2103 * Add a tunnel to the list of tunnels a peer participates in.
2104 * Update the tunnel's destination.
2105 *
2106 * @param p Peer to add to.
2107 * @param t Tunnel to add.
2108 */
2109static void
2110peer_add_tunnel (struct MeshPeer *p, struct MeshTunnel *t)
2111{
2112 if (0 != t->dest)
2113 {
2114 GNUNET_break (t->dest == p->id);
2115 return;
2116 }
2117 t->dest = p->id;
2118 GNUNET_PEER_change_rc (t->dest, 1);
2119 GNUNET_array_append (p->tunnels, p->ntunnels, t);
2120}
2121
2122
2123/**
2124 * Remove a tunnel from the list of tunnels a peer participates in.
2125 * Free the tunnel's destination.
2126 *
2127 * @param p Peer to clean.
2128 * @param t Tunnel to remove.
2129 */
2130static void
2131peer_remove_tunnel (struct MeshPeer *p, struct MeshTunnel *t)
2132{
2133 unsigned int i;
2134
2135 if (t->dest == p->id)
2136 {
2137 GNUNET_PEER_change_rc (t->dest, -1);
2138 t->dest = 0;
2139 }
2140 for (i = 0; i < p->ntunnels; i++)
2141 {
2142 if (p->tunnels[i] == t)
2143 {
2144 p->tunnels[i] = p->tunnels[p->ntunnels - 1];
2145 GNUNET_array_grow (p->tunnels, p->ntunnels, p->ntunnels - 1);
2146 return;
2147 }
2148 }
2149}
2150
2151 2242
2152/** 2243/**
2153 * Function called if the connection to the peer has been stalled for a while, 2244 * Function called if the connection to the peer has been stalled for a while,
@@ -2157,11 +2248,10 @@ peer_remove_tunnel (struct MeshPeer *p, struct MeshTunnel *t)
2157 * @param tc TaskContext. 2248 * @param tc TaskContext.
2158 */ 2249 */
2159static void 2250static void
2160tunnel_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 2251peer_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2161{ 2252{
2162 struct MeshFlowControl *fc = cls; 2253 struct MeshFlowControl *fc = cls;
2163 struct GNUNET_MESH_Poll msg; 2254 struct GNUNET_MESH_Poll msg;
2164 struct MeshTunnel *t = fc->t;
2165 GNUNET_PEER_Id peer; 2255 GNUNET_PEER_Id peer;
2166 2256
2167 fc->poll_task = GNUNET_SCHEDULER_NO_TASK; 2257 fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
@@ -2171,44 +2261,19 @@ tunnel_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2171 } 2261 }
2172 2262
2173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n"); 2263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
2264 peer = fc->peer->id;
2174 2265
2175 GNUNET_PEER_resolve (t->id.oid, &msg.oid);
2176
2177 if (fc == &t->prev_fc)
2178 {
2179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** prev peer!\n");
2180 peer = t->prev_hop;
2181 }
2182 else if (fc == &t->next_fc)
2183 {
2184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** next peer!\n");
2185 peer = t->next_hop;
2186 }
2187 else
2188 {
2189 GNUNET_break (0);
2190 return;
2191 }
2192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** peer: %s!\n", 2266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** peer: %s!\n",
2193 GNUNET_i2s(GNUNET_PEER_resolve2 (peer))); 2267 GNUNET_i2s (GNUNET_PEER_resolve2 (peer)));
2194 if (0 == peer)
2195 {
2196 if (GNUNET_YES == t->destroy)
2197 tunnel_destroy (t);
2198 else
2199 GNUNET_break (0);
2200 2268
2201 return;
2202 }
2203 msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL); 2269 msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
2204 msg.header.size = htons (sizeof (msg)); 2270 msg.header.size = htons (sizeof (msg));
2205 msg.tid = htonl (t->id.tid); 2271 msg.pid = htonl (fc->last_pid_sent);
2206 msg.pid = htonl (peer_get_first_payload_pid (peer_get_short (peer), t)); 2272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
2207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", ntohl (msg.pid)); 2273 send_prebuilt_message_peer (&msg.header, peer_get_short (peer));
2208 send_prebuilt_message (&msg.header, peer, t);
2209 fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time); 2274 fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
2210 fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, 2275 fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
2211 &tunnel_poll, fc); 2276 &peer_poll, fc);
2212} 2277}
2213 2278
2214 2279
@@ -2361,14 +2426,10 @@ channel_get_by_local_id (struct MeshClient *c, MESH_ChannelNumber chid)
2361static struct MeshTunnel * 2426static struct MeshTunnel *
2362channel_get_by_pi (GNUNET_PEER_Id pi, MESH_ChannelNumber tid) 2427channel_get_by_pi (GNUNET_PEER_Id pi, MESH_ChannelNumber tid)
2363{ 2428{
2364 struct MESH_TunnelID id; 2429// struct GNUNET_HashCode hash;
2365 struct GNUNET_HashCode hash;
2366 2430
2367 id.oid = pi; 2431// return GNUNET_CONTAINER_multihashmap_get (tunnels, &hash); FIXME
2368 id.tid = tid; 2432 return NULL;
2369
2370 GNUNET_CRYPTO_hash (&id, sizeof (struct MESH_TunnelID), &hash);
2371 return GNUNET_CONTAINER_multihashmap_get (tunnels, &hash);
2372} 2433}
2373 2434
2374 2435
@@ -2408,14 +2469,9 @@ tunnel_change_state (MeshTunnel2* t, MeshTunnelState state)
2408} 2469}
2409 2470
2410 2471
2411/**
2412 * Change the tunnel state.
2413 *
2414 * @param c Connection whose state to change.
2415 * @param state New state.
2416 */
2417static void 2472static void
2418connection_change_state (MeshConnection* c, MeshConnectionState state) 2473connection_change_state (struct MeshConnection* c,
2474 enum MeshConnectionState state)
2419{ 2475{
2420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2476 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2421 "Connection %s[%X] state was %s\n", 2477 "Connection %s[%X] state was %s\n",
@@ -2542,26 +2598,24 @@ tunnel_notify_connection_broken (MeshTunnel2* t,
2542/** 2598/**
2543 * Send an end-to-end FWD ACK message for the most recent in-sequence payload. 2599 * Send an end-to-end FWD ACK message for the most recent in-sequence payload.
2544 * 2600 *
2545 * @param t Tunnel this is about. 2601 * @param ch Channel this is about.
2546 * @param fwd Is for FWD traffic? (ACK dest->owner) 2602 * @param fwd Is for FWD traffic? (ACK dest->owner)
2547 */ 2603 */
2548static void 2604static void
2549tunnel_send_data_ack (struct MeshTunnel *t, int fwd) 2605channel_send_data_ack (struct MeshChannel *ch, int fwd)
2550{ 2606{
2551 struct GNUNET_MESH_DataACK msg; 2607 struct GNUNET_MESH_DataACK msg;
2552 struct MeshChannelReliability *rel; 2608 struct MeshChannelReliability *rel;
2553 struct MeshReliableMessage *copy; 2609 struct MeshReliableMessage *copy;
2554 GNUNET_PEER_Id hop;
2555 uint64_t mask; 2610 uint64_t mask;
2556 unsigned int delta; 2611 unsigned int delta;
2557 2612
2558 rel = fwd ? t->bck_rel : t->fwd_rel; 2613 rel = fwd ? ch->bck_rel : ch->fwd_rel;
2559 hop = fwd ? t->prev_hop : t->next_hop;
2560 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2561 "send_data_ack for %u\n", 2615 "send_data_ack for %u\n",
2562 rel->mid_recv - 1); 2616 rel->mid_recv - 1);
2563 2617
2564 if (GNUNET_NO == t->reliable) 2618 if (GNUNET_NO == ch->reliable)
2565 { 2619 {
2566 GNUNET_break_op (0); 2620 GNUNET_break_op (0);
2567 return; 2621 return;
@@ -2586,7 +2640,7 @@ tunnel_send_data_ack (struct MeshTunnel *t, int fwd)
2586 } 2640 }
2587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " final futures %llX\n", msg.futures); 2641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " final futures %llX\n", msg.futures);
2588 2642
2589 send_prebuilt_message (&msg.header, hop, t); 2643 send_prebuilt_message_tunnel (&msg.header, t, ch, fwd);
2590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n"); 2644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
2591} 2645}
2592 2646
@@ -2594,17 +2648,18 @@ tunnel_send_data_ack (struct MeshTunnel *t, int fwd)
2594/** 2648/**
2595 * Send an ACK informing the predecessor about the available buffer space. 2649 * Send an ACK informing the predecessor about the available buffer space.
2596 * In case there is no predecessor, inform the owning client. 2650 * In case there is no predecessor, inform the owning client.
2597 * If buffering is off, send only on behalf of children or self if endpoint. 2651 *
2598 * If buffering is on, send when sent to children and buffer space is free.
2599 * Note that although the name is fwd_ack, the FWD mean forward *traffic*, 2652 * Note that although the name is fwd_ack, the FWD mean forward *traffic*,
2600 * the ACK itself goes "back" (towards root). 2653 * the ACK itself goes "back" (towards root).
2601 * 2654 *
2602 * @param t Tunnel on which to send the ACK. 2655 * @param ch Channel on which to send the ACK.
2656 * @param c Connection on which to send the ACK.
2603 * @param type Type of message that triggered the ACK transmission. 2657 * @param type Type of message that triggered the ACK transmission.
2604 * @param fwd Is this FWD ACK? (Going dest->owner) 2658 * @param fwd Is this FWD ACK? (Going dest->owner)
2605 */ 2659 */
2606static void 2660static void
2607tunnel_send_ack (struct MeshTunnel *t, uint16_t type, int fwd) 2661channel_send_ack (struct MeshChannel *ch, struct MeshConnection *c,
2662 uint16_t type, int fwd)
2608{ 2663{
2609 struct MeshChannelReliability *rel; 2664 struct MeshChannelReliability *rel;
2610 struct MeshFlowControl *next_fc; 2665 struct MeshFlowControl *next_fc;
@@ -2616,12 +2671,12 @@ tunnel_send_ack (struct MeshTunnel *t, uint16_t type, int fwd)
2616 uint32_t ack; 2671 uint32_t ack;
2617 int delta; 2672 int delta;
2618 2673
2619 rel = fwd ? t->fwd_rel : t->bck_rel; 2674 rel = fwd ? ch->fwd_rel : ch->bck_rel;
2620 c = fwd ? t->client : t->owner; 2675 c = fwd ? ch->client : ch->owner;
2621 o = fwd ? t->owner : t->client; 2676 o = fwd ? ch->owner : ch->client;
2677 hop = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
2622 next_fc = fwd ? &t->next_fc : &t->prev_fc; 2678 next_fc = fwd ? &t->next_fc : &t->prev_fc;
2623 prev_fc = fwd ? &t->prev_fc : &t->next_fc; 2679 prev_fc = fwd ? &t->prev_fc : &t->next_fc;
2624 hop = fwd ? t->prev_hop : t->next_hop;
2625 2680
2626 switch (type) 2681 switch (type)
2627 { 2682 {
@@ -3079,28 +3134,26 @@ connection_keepalive (struct MeshConnection *c, int fwd)
3079 msg = (struct GNUNET_MESH_TunnelKeepAlive *) cbuf; 3134 msg = (struct GNUNET_MESH_TunnelKeepAlive *) cbuf;
3080 msg->header.size = htons (size); 3135 msg->header.size = htons (size);
3081 msg->header.type = htons (type); 3136 msg->header.type = htons (type);
3082 msg->oid = *(GNUNET_PEER_resolve2 (t->id.oid)); 3137 msg->oid = *(GNUNET_PEER_resolve2 (c->t->id.oid));
3083 msg->tid = htonl (t->id.tid); 3138 msg->tid = htonl (c->t->id.tid);
3084 send_prebuilt_message (&msg->header, hop, t); 3139 send_prebuilt_message (&msg->header, hop, c->t);
3085} 3140}
3086 3141
3087 3142
3088/** 3143/**
3089 * Send create (PATH_CREATE/PATH_ACK) packets for a tunnel. 3144 * Send CONNECTION_{CREATE/ACK} packets for a connection.
3090 * 3145 *
3091 * @param t Tunnel for which to send the message. 3146 * @param c Connection for which to send the message.
3092 * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK. 3147 * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
3093 */ 3148 */
3094static void 3149static void
3095connection_recreate (struct MeshTunnel *t, int fwd) 3150connection_recreate (struct MeshConnection *c, int fwd)
3096{ 3151{
3097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
3098 "sending path recreate for tunnel %s[%X]\n",
3099 GNUNET_i2s (GNUNET_PEER_resolve2 (t->id.oid)), t->id.tid);
3100 if (fwd) 3153 if (fwd)
3101 send_path_create (t); 3154 send_connection_create (c);
3102 else 3155 else
3103 send_path_ack (t); 3156 send_connection_ack (c);
3104} 3157}
3105 3158
3106 3159
@@ -3125,10 +3178,10 @@ connection_maintain (struct MeshConnection *c, int fwd)
3125 case MESH_CONNECTION_NEW: 3178 case MESH_CONNECTION_NEW:
3126 GNUNET_break (0); 3179 GNUNET_break (0);
3127 case MESH_CONNECTION_SENT: 3180 case MESH_CONNECTION_SENT:
3128 connection_recreate (t, fwd); 3181 connection_recreate (c, fwd);
3129 break; 3182 break;
3130 case MESH_CONNECTION_READY: 3183 case MESH_CONNECTION_READY:
3131 connection_keepalive (t, fwd); 3184 connection_keepalive (c, fwd);
3132 break; 3185 break;
3133 default: 3186 default:
3134 break; 3187 break;
@@ -3695,12 +3748,12 @@ queue_get_next (const struct MeshPeer *peer)
3695 struct MeshPeerQueue *q; 3748 struct MeshPeerQueue *q;
3696 3749
3697 struct GNUNET_MESH_Data *dmsg; 3750 struct GNUNET_MESH_Data *dmsg;
3698 struct MeshTunnel* t; 3751 struct MeshTunnel2 *t;
3699 uint32_t pid; 3752 uint32_t pid;
3700 uint32_t ack; 3753 uint32_t ack;
3701 3754
3702 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* selecting message\n"); 3755 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* selecting message\n");
3703 for (q = peer->queue_head; NULL != q; q = q->next) 3756 for (q = peer->fc->queue_head; NULL != q; q = q->next)
3704 { 3757 {
3705 t = q->tunnel; 3758 t = q->tunnel;
3706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3748,14 +3801,20 @@ queue_send (void *cls, size_t size, void *buf)
3748 struct MeshPeer *peer = cls; 3801 struct MeshPeer *peer = cls;
3749 struct GNUNET_MessageHeader *msg; 3802 struct GNUNET_MessageHeader *msg;
3750 struct MeshPeerQueue *queue; 3803 struct MeshPeerQueue *queue;
3751 struct MeshTunnel *t; 3804 struct MeshTunnel2 *t;
3752 struct GNUNET_PeerIdentity dst_id; 3805 struct GNUNET_PeerIdentity *dst_id;
3753 struct MeshFlowControl *fc; 3806 struct MeshFlowControl *fc;
3754 size_t data_size; 3807 size_t data_size;
3755 uint32_t pid; 3808 uint32_t pid;
3756 uint16_t type; 3809 uint16_t type;
3757 3810
3758 peer->core_transmit = NULL; 3811 fc = peer->fc;
3812 if (NULL == fc)
3813 {
3814 GNUNET_break (0);
3815 return 0;
3816 }
3817 fc->core_transmit = NULL;
3759 3818
3760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n"); 3819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n");
3761 3820
@@ -3770,28 +3829,28 @@ queue_send (void *cls, size_t size, void *buf)
3770 if (NULL == queue) 3829 if (NULL == queue)
3771 { 3830 {
3772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not ready, return\n"); 3831 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not ready, return\n");
3773 if (NULL == peer->queue_head) 3832 if (NULL == fc->queue_head)
3774 GNUNET_break (0); /* Core tmt_rdy should've been canceled */ 3833 GNUNET_break (0); /* Core tmt_rdy should've been canceled */
3775 return 0; 3834 return 0;
3776 } 3835 }
3777 3836
3778 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not empty\n"); 3837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not empty\n");
3779 3838
3780 GNUNET_PEER_resolve (peer->id, &dst_id); 3839 dst_id = GNUNET_PEER_resolve (peer->id);
3781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3840 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3782 "* towards %s\n", 3841 "* towards %s\n",
3783 GNUNET_i2s (&dst_id)); 3842 GNUNET_i2s (dst_id));
3784 /* Check if buffer size is enough for the message */ 3843 /* Check if buffer size is enough for the message */
3785 if (queue->size > size) 3844 if (queue->size > size)
3786 { 3845 {
3787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3846 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3788 "* not enough room, reissue\n"); 3847 "* not enough room, reissue\n");
3789 peer->core_transmit = 3848 fc->core_transmit =
3790 GNUNET_CORE_notify_transmit_ready (core_handle, 3849 GNUNET_CORE_notify_transmit_ready (core_handle,
3791 GNUNET_NO, 3850 GNUNET_NO,
3792 0, 3851 0,
3793 GNUNET_TIME_UNIT_FOREVER_REL, 3852 GNUNET_TIME_UNIT_FOREVER_REL,
3794 &dst_id, 3853 dst_id,
3795 queue->size, 3854 queue->size,
3796 &queue_send, 3855 &queue_send,
3797 peer); 3856 peer);
@@ -3799,9 +3858,7 @@ queue_send (void *cls, size_t size, void *buf)
3799 } 3858 }
3800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* size ok\n"); 3859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* size ok\n");
3801 3860
3802 t = queue->tunnel; 3861 t = queue->peer->tunnel;
3803 GNUNET_assert (0 < t->pending_messages);
3804 t->pending_messages--;
3805 type = 0; 3862 type = 0;
3806 3863
3807 /* Fill buf */ 3864 /* Fill buf */
@@ -3829,11 +3886,14 @@ queue_send (void *cls, size_t size, void *buf)
3829 break; 3886 break;
3830 case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE: 3887 case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
3831 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path create\n"); 3888 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path create\n");
3832 data_size = send_core_path_create (queue->cls, size, buf); 3889 data_size = send_core_connection_create (queue->cls, size, buf);
3833 break; 3890 break;
3834 case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK: 3891 case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
3835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path ack\n"); 3892 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path ack\n");
3836 data_size = send_core_path_ack (queue->cls, size, buf); 3893 if (NULL != t->client)
3894 data_size = send_core_connection_ack (queue->cls, size, buf);
3895 else
3896 data_size = send_core_data_raw (queue->cls, size, buf);
3837 break; 3897 break;
3838 default: 3898 default:
3839 GNUNET_break (0); 3899 GNUNET_break (0);
@@ -3939,56 +3999,40 @@ queue_send (void *cls, size_t size, void *buf)
3939} 3999}
3940 4000
3941 4001
3942/**
3943 * @brief Queue and pass message to core when possible.
3944 *
3945 * If type is payload (UNICAST, TO_ORIGIN) checks for queue status and
3946 * accounts for it. In case the queue is full, the message is dropped and
3947 * a break issued.
3948 *
3949 * Otherwise, message is treated as internal and allowed to go regardless of
3950 * queue status.
3951 *
3952 * @param cls Closure (@c type dependant). It will be used by queue_send to
3953 * build the message to be sent if not already prebuilt.
3954 * @param type Type of the message, 0 for a raw message.
3955 * @param size Size of the message.
3956 * @param dst Neighbor to send message to.
3957 * @param t Tunnel this message belongs to.
3958 */
3959static void 4002static void
3960queue_add (void *cls, uint16_t type, size_t size, 4003queue_add (void *cls, uint16_t type, size_t size,
3961 struct MeshPeer *dst, struct MeshTunnel *t) 4004 struct MeshPeer *dst,
4005 struct MeshConnection *c,
4006 struct MeshChannel *ch)
3962{ 4007{
3963 struct MeshPeerQueue *queue; 4008 struct MeshPeerQueue *queue;
3964 struct MeshFlowControl *fc; 4009 struct MeshFlowControl *fc;
4010 struct MeshTunnel2 *t;
3965 int priority; 4011 int priority;
3966 4012
3967 fc = NULL; 4013 fc = NULL;
3968 priority = GNUNET_NO; 4014 priority = GNUNET_NO;
3969 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) 4015 if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
3970 { 4016 GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
3971 fc = &t->next_fc;
3972 }
3973 else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
3974 { 4017 {
3975 fc = &t->prev_fc; 4018 fc = dst->fc;
3976 } 4019 }
3977 if (NULL != fc) 4020 if (NULL != fc)
3978 { 4021 {
3979 if (fc->queue_n >= t->queue_max) 4022 if (fc->queue_n >= fc->queue_max)
3980 { 4023 {
3981 /* If this isn't a retransmission, drop the message */ 4024 /* If this isn't a retransmission, drop the message */
3982 if (GNUNET_NO == t->reliable || 4025 if (NULL != ch &&
3983 (NULL == t->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) || 4026 (GNUNET_NO == ch->reliable ||
3984 (NULL == t->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)) 4027 (NULL == ch->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) ||
4028 (NULL == ch->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)))
3985 { 4029 {
3986 GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)", 4030 GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3987 1, GNUNET_NO); 4031 1, GNUNET_NO);
3988 GNUNET_break (0); 4032 GNUNET_break (0);
3989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4033 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3990 "queue full: %u/%u\n", 4034 "queue full: %u/%u\n",
3991 fc->queue_n, t->queue_max); 4035 fc->queue_n, fc->queue_max);
3992 return; /* Drop this message */ 4036 return; /* Drop this message */
3993 } 4037 }
3994 priority = GNUNET_YES; 4038 priority = GNUNET_YES;
@@ -3997,25 +4041,28 @@ queue_add (void *cls, uint16_t type, size_t size,
3997 if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) && 4041 if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) &&
3998 GNUNET_SCHEDULER_NO_TASK == fc->poll_task) 4042 GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
3999 fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, 4043 fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
4000 &tunnel_poll, 4044 &peer_poll,
4001 fc); 4045 dst);
4002 } 4046 }
4003 queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); 4047 queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
4004 queue->cls = cls; 4048 queue->cls = cls;
4005 queue->type = type; 4049 queue->type = type;
4006 queue->size = size; 4050 queue->size = size;
4007 queue->peer = dst; 4051 queue->peer = dst;
4008 queue->tunnel = t; 4052 queue->c = c;
4053 queue->ch = ch;
4009 if (GNUNET_YES == priority) 4054 if (GNUNET_YES == priority)
4010 { 4055 {
4011 struct GNUNET_MESH_Data *d; 4056 struct GNUNET_MESH_Data *d;
4012 uint32_t prev; 4057 uint32_t prev;
4013 uint32_t next; 4058 uint32_t next;
4014 4059
4015 GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue); 4060 GNUNET_CONTAINER_DLL_insert (dst->fc->queue_head,
4061 dst->fc->queue_tail,
4062 queue);
4016 d = (struct GNUNET_MESH_Data *) queue->cls; 4063 d = (struct GNUNET_MESH_Data *) queue->cls;
4017 prev = d->pid; 4064 prev = d->pid;
4018 for (queue = dst->queue_tail; NULL != queue; queue = queue->prev) 4065 for (queue = dst->fc->queue_tail; NULL != queue; queue = queue->prev)
4019 { 4066 {
4020 if (queue->type != type) 4067 if (queue->type != type)
4021 continue; 4068 continue;
@@ -4026,11 +4073,13 @@ queue_add (void *cls, uint16_t type, size_t size,
4026 } 4073 }
4027 } 4074 }
4028 else 4075 else
4029 GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); 4076 GNUNET_CONTAINER_DLL_insert_tail (dst->fc->queue_head,
4077 dst->fc->queue_tail,
4078 queue);
4030 4079
4031 if (NULL == dst->core_transmit) 4080 if (NULL == dst->fc->core_transmit)
4032 { 4081 {
4033 dst->core_transmit = 4082 dst->fc->core_transmit =
4034 GNUNET_CORE_notify_transmit_ready (core_handle, 4083 GNUNET_CORE_notify_transmit_ready (core_handle,
4035 0, 4084 0,
4036 0, 4085 0,
@@ -4040,7 +4089,6 @@ queue_add (void *cls, uint16_t type, size_t size,
4040 &queue_send, 4089 &queue_send,
4041 dst); 4090 dst);
4042 } 4091 }
4043 t->pending_messages++;
4044} 4092}
4045 4093
4046 4094
@@ -4995,6 +5043,7 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp,
4995 /* Count connections */ 5043 /* Count connections */
4996 for (c = peer->tunnel->connection_head, i = 0; NULL != c; c = c->next, i++); 5044 for (c = peer->tunnel->connection_head, i = 0; NULL != c; c = c->next, i++);
4997 5045
5046 /* If we already have 3 (or more (?!)) connections, it's enough */
4998 if (3 <= i) 5047 if (3 <= i)
4999 return; 5048 return;
5000 5049
@@ -5699,6 +5748,12 @@ core_connect (void *cls, const struct GNUNET_PeerIdentity *peer)
5699 path->peers[0] = myid; 5748 path->peers[0] = myid;
5700 GNUNET_PEER_change_rc (myid, 1); 5749 GNUNET_PEER_change_rc (myid, 1);
5701 peer_add_path (peer_info, path, GNUNET_YES); 5750 peer_add_path (peer_info, path, GNUNET_YES);
5751 if (NULL == peer_info->fc)
5752 {
5753 peer_info->fc = GNUNET_new (struct MeshFlowControl);
5754 fc_init (peer_info->fc);
5755 peer_info->fc->peer = peer_info;
5756 }
5702 return; 5757 return;
5703} 5758}
5704 5759
@@ -5715,6 +5770,7 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
5715 struct MeshPeer *pi; 5770 struct MeshPeer *pi;
5716 struct MeshPeerQueue *q; 5771 struct MeshPeerQueue *q;
5717 struct MeshPeerQueue *n; 5772 struct MeshPeerQueue *n;
5773 struct MeshFlowControl *fc;
5718 5774
5719 DEBUG_CONN ("Peer disconnected\n"); 5775 DEBUG_CONN ("Peer disconnected\n");
5720 pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); 5776 pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey);
@@ -5723,25 +5779,34 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
5723 GNUNET_break (0); 5779 GNUNET_break (0);
5724 return; 5780 return;
5725 } 5781 }
5726 q = pi->queue_head; 5782 fc = pi->fc;
5783 if (NULL != fc)
5784 {
5785 GNUNET_break (0);
5786 return;
5787 }
5788 pi->fc = NULL;
5789
5790 q = fc->queue_head;
5727 while (NULL != q) 5791 while (NULL != q)
5728 { 5792 {
5729 n = q->next; 5793 n = q->next;
5730 /* TODO try to reroute this traffic instead */ 5794 queue_destroy (q, GNUNET_YES);
5731 queue_destroy(q, GNUNET_YES);
5732 q = n; 5795 q = n;
5733 } 5796 }
5734 if (NULL != pi->core_transmit) 5797 if (NULL != fc->core_transmit)
5735 { 5798 GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
5736 GNUNET_CORE_notify_transmit_ready_cancel(pi->core_transmit); 5799 if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
5737 pi->core_transmit = NULL; 5800 GNUNET_SCHEDULER_cancel (fc->poll_task);
5738 } 5801
5739 peer_remove_path (pi, pi->id, myid); 5802 peer_remove_path (pi, pi->id, myid);
5740 if (myid == pi->id) 5803 if (myid == pi->id)
5741 { 5804 {
5742 DEBUG_CONN (" (self)\n"); 5805 DEBUG_CONN (" (self)\n");
5743 } 5806 }
5744 GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO); 5807 GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
5808 GNUNET_free (fc);
5809
5745 return; 5810 return;
5746} 5811}
5747 5812
diff --git a/src/mesh/mesh_protocol_enc.h b/src/mesh/mesh_protocol_enc.h
index 907fd3ee7..6292bac40 100644
--- a/src/mesh/mesh_protocol_enc.h
+++ b/src/mesh/mesh_protocol_enc.h
@@ -201,16 +201,6 @@ struct GNUNET_MESH_Poll
201 struct GNUNET_MessageHeader header; 201 struct GNUNET_MessageHeader header;
202 202
203 /** 203 /**
204 * TID of the tunnel
205 */
206 uint32_t tid GNUNET_PACKED;
207
208 /**
209 * OID of the tunnel
210 */
211 struct GNUNET_PeerIdentity oid;
212
213 /**
214 * Last packet sent. 204 * Last packet sent.
215 */ 205 */
216 uint32_t pid GNUNET_PACKED; 206 uint32_t pid GNUNET_PACKED;