aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-05-13 00:48:24 +0000
committerChristian Grothoff <christian@grothoff.org>2012-05-13 00:48:24 +0000
commit761e32ca4ffae07d145081c8273eb68fb505194b (patch)
tree3c078fcf219fdb38aad459d328e6dd370c5ddb01 /src
parent1524fd5835311f123d95e672dcae459630797fe0 (diff)
downloadgnunet-761e32ca4ffae07d145081c8273eb68fb505194b.tar.gz
gnunet-761e32ca4ffae07d145081c8273eb68fb505194b.zip
-major rewrite of gnunet-service-transport_neighbours state machine
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-transport.c5
-rw-r--r--src/transport/gnunet-service-transport_neighbours.c4025
-rw-r--r--src/transport/gnunet-service-transport_neighbours.h30
3 files changed, 2156 insertions, 1904 deletions
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index 5148a1497..f029f0efe 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -265,8 +265,8 @@ plugin_env_receive_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
265 ats_count); 265 ats_count);
266 break; 266 break;
267 case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK: 267 case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK:
268 GST_neighbours_handle_ack (message, peer, &address, session, ats, 268 GST_neighbours_handle_session_ack (message, peer, &address, session, ats,
269 ats_count); 269 ats_count);
270 break; 270 break;
271 case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT: 271 case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
272 GST_neighbours_handle_disconnect_message (peer, message); 272 GST_neighbours_handle_disconnect_message (peer, message);
@@ -434,7 +434,6 @@ ats_request_address_change (void *cls,
434 GST_neighbours_force_disconnect (&address->peer); 434 GST_neighbours_force_disconnect (&address->peer);
435 return; 435 return;
436 } 436 }
437 /* will never return GNUNET_YES since connection is to be established */
438 GST_neighbours_switch_to_address (&address->peer, address, session, ats, 437 GST_neighbours_switch_to_address (&address->peer, address, session, ats,
439 ats_count, bandwidth_in, 438 ats_count, bandwidth_in,
440 bandwidth_out); 439 bandwidth_out);
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c
index 9ee0bcfdb..faccf3695 100644
--- a/src/transport/gnunet-service-transport_neighbours.c
+++ b/src/transport/gnunet-service-transport_neighbours.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2010,2011 Christian Grothoff (and other contributing authors) 3 (C) 2010,2011,2012 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -22,6 +22,10 @@
22 * @file transport/gnunet-service-transport_neighbours.c 22 * @file transport/gnunet-service-transport_neighbours.c
23 * @brief neighbour management 23 * @brief neighbour management
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - "address_change_cb" is NEVER invoked; when should we call this one exactly?
28 * - TEST, TEST, TEST...
25 */ 29 */
26#include "platform.h" 30#include "platform.h"
27#include "gnunet_ats_service.h" 31#include "gnunet_ats_service.h"
@@ -42,6 +46,12 @@
42#define NEIGHBOUR_TABLE_SIZE 256 46#define NEIGHBOUR_TABLE_SIZE 256
43 47
44/** 48/**
49 * Time we give plugin to transmit DISCONNECT message before the
50 * neighbour entry self-destructs.
51 */
52#define DISCONNECT_SENT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
53
54/**
45 * How often must a peer violate bandwidth quotas before we start 55 * How often must a peer violate bandwidth quotas before we start
46 * to simply drop its messages? 56 * to simply drop its messages?
47 */ 57 */
@@ -56,31 +66,47 @@
56 */ 66 */
57#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 67#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58 68
59#define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 69/**
70 * How long are we willing to wait for a response from ATS before timing out?
71 */
72#define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
60 73
61#define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100) 74/**
75 * How long are we willing to wait for an ACK from the other peer before
76 * giving up on our connect operation?
77 */
78#define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
62 79
80/**
81 * How long are we willing to wait for a successful reconnect if
82 * an existing connection went down? Much shorter than the
83 * usual SETUP_CONNECTION_TIMEOUT as we do not inform the
84 * higher layers about the disconnect during this period.
85 */
63#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) 86#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
64 87
65#define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
66
67/** 88/**
68 * Entry in neighbours. 89 * How long are we willing to wait for a response from the blacklist
90 * subsystem before timing out?
69 */ 91 */
70struct NeighbourMapEntry; 92#define BLACKLIST_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
93
71 94
72GNUNET_NETWORK_STRUCT_BEGIN 95GNUNET_NETWORK_STRUCT_BEGIN
73 96
74/** 97/**
75 * Message a peer sends to another to indicate its 98 * Message a peer sends to another to indicate that it intends to
76 * preference for communicating via a particular 99 * setup a connection/session for data exchange. A 'SESSION_CONNECT'
77 * session (and the desire to establish a real 100 * should be answered with a 'SESSION_CONNECT_ACK' with the same body
78 * connection). 101 * to confirm. A 'SESSION_CONNECT_ACK' should then be followed with
102 * a 'SESSION_ACK'. Once the 'SESSION_ACK' is received, both peers
103 * should be connected.
79 */ 104 */
80struct SessionConnectMessage 105struct SessionConnectMessage
81{ 106{
82 /** 107 /**
83 * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' 108 * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
109 * or 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK'
84 */ 110 */
85 struct GNUNET_MessageHeader header; 111 struct GNUNET_MessageHeader header;
86 112
@@ -98,6 +124,12 @@ struct SessionConnectMessage
98}; 124};
99 125
100 126
127/**
128 * Message we send to the other peer to notify him that we intentionally
129 * are disconnecting (to reduce timeouts). This is just a friendly
130 * notification, peers must not rely on always receiving disconnect
131 * messages.
132 */
101struct SessionDisconnectMessage 133struct SessionDisconnectMessage
102{ 134{
103 /** 135 /**
@@ -135,8 +167,10 @@ struct SessionDisconnectMessage
135 struct GNUNET_CRYPTO_RsaSignature signature; 167 struct GNUNET_CRYPTO_RsaSignature signature;
136 168
137}; 169};
170
138GNUNET_NETWORK_STRUCT_END 171GNUNET_NETWORK_STRUCT_END
139 172
173
140/** 174/**
141 * For each neighbour we keep a list of messages 175 * For each neighbour we keep a list of messages
142 * that we still want to transmit to the neighbour. 176 * that we still want to transmit to the neighbour.
@@ -155,12 +189,6 @@ struct MessageQueue
155 struct MessageQueue *prev; 189 struct MessageQueue *prev;
156 190
157 /** 191 /**
158 * Once this message is actively being transmitted, which
159 * neighbour is it associated with?
160 */
161 struct NeighbourMapEntry *n;
162
163 /**
164 * Function to call once we're done. 192 * Function to call once we're done.
165 */ 193 */
166 GST_NeighbourSendContinuation cont; 194 GST_NeighbourSendContinuation cont;
@@ -189,6 +217,67 @@ struct MessageQueue
189}; 217};
190 218
191 219
220/**
221 * Possible state of a neighbour. Initially, we are S_NOT_CONNECTED.
222 *
223 * Then, there are two main paths. If we receive a CONNECT message, we
224 * first run a check against the blacklist and ask ATS for a
225 * suggestion. (S_CONNECT_RECV_ATS). If the blacklist comes back
226 * positive, we give the address to ATS. If ATS makes a suggestion,
227 * we ALSO give that suggestion to the blacklist
228 * (S_CONNECT_RECV_BLACKLIST). Once the blacklist approves the
229 * address we got from ATS, we send our CONNECT_ACK and go to
230 * S_CONNECT_RECV_ACK. If we receive a SESSION_ACK, we go to
231 * S_CONNECTED (and notify everyone about the new connection). If the
232 * operation times out, we go to S_DISCONNECT.
233 *
234 * The other case is where we transmit a CONNECT message first. We
235 * start with S_INIT_ATS. If we get an address, we enter
236 * S_INIT_BLACKLIST and check the blacklist. If the blacklist is OK
237 * with the connection, we actually send the CONNECT message and go to
238 * state S_CONNECT_SENT. Once we receive a CONNECT_ACK, we go to
239 * S_CONNECTED (and notify everyone about the new connection and send
240 * back a SESSION_ACK). If the operation times out, we go to
241 * S_DISCONNECT.
242 *
243 * If the session is in trouble (i.e. transport-level disconnect or
244 * timeout), we go to S_RECONNECT_ATS where we ask ATS for a new
245 * address (we don't notify anyone about the disconnect yet). Once we
246 * have a new address, we go to S_RECONNECT_BLACKLIST to check the new
247 * address against the blacklist. If the blacklist approves, we enter
248 * S_RECONNECT_SENT and send a CONNECT message. If we receive a
249 * CONNECT_ACK, we go to S_CONNECTED and nobody noticed that we had
250 * trouble; we also send a SESSION_ACK at this time just in case. If
251 * the operation times out, we go to S_DISCONNECT (and notify everyone
252 * about the lost connection).
253 *
254 * If ATS decides to switch addresses while we have a normal
255 * connection, we go to S_CONNECTED_SWITCHING_BLACKLIST to check the
256 * new address against the blacklist. If the blacklist approves, we
257 * go to S_CONNECTED_SWITCHING_CONNECT_SENT and send a
258 * SESSION_CONNECT. If we get a SESSION_ACK back, we switch the
259 * primary connection to the suggested alternative from ATS, go back
260 * to S_CONNECTED and send a SESSION_ACK to the other peer just to be
261 * sure. If the operation times out (or the blacklist disapproves),
262 * we go to S_CONNECTED (and notify ATS that the given alternative
263 * address is "invalid").
264 *
265 * Once a session is in S_DISCONNECT, it is cleaned up and then goes
266 * to (S_DISCONNECT_FINISHED). If we receive an explicit disconnect
267 * request, we can go from any state to S_DISCONNECT, possibly after
268 * generating disconnect notifications.
269 *
270 * Note that it is quite possible that while we are in any of these
271 * states, we could receive a 'CONNECT' request from the other peer.
272 * We then enter a 'weird' state where we pursue our own primary state
273 * machine (as described above), but with the 'send_connect_ack' flag
274 * set to 1. If our state machine allows us to send a 'CONNECT_ACK'
275 * (because we have an acceptable address), we send the 'CONNECT_ACK'
276 * and set the 'send_connect_ack' to 2. If we then receive a
277 * 'SESSION_ACK', we go to 'S_CONNECTED' (and reset 'send_connect_ack'
278 * to 0).
279 *
280 */
192enum State 281enum State
193{ 282{
194 /** 283 /**
@@ -197,36 +286,127 @@ enum State
197 S_NOT_CONNECTED, 286 S_NOT_CONNECTED,
198 287
199 /** 288 /**
200 * sent CONNECT message to other peer, waiting for CONNECT_ACK 289 * Asked to initiate connection, trying to get address from ATS
290 */
291 S_INIT_ATS,
292
293 /**
294 * Asked to initiate connection, trying to get address approved
295 * by blacklist.
296 */
297 S_INIT_BLACKLIST,
298
299 /**
300 * Sent CONNECT message to other peer, waiting for CONNECT_ACK
201 */ 301 */
202 S_CONNECT_SENT, 302 S_CONNECT_SENT,
203 303
204 /** 304 /**
205 * received CONNECT message to other peer, sending CONNECT_ACK 305 * Received a CONNECT, asking ATS about address suggestions.
206 */ 306 */
207 S_CONNECT_RECV, 307 S_CONNECT_RECV_ATS,
208 308
209 /** 309 /**
210 * received ACK or payload 310 * Received CONNECT from other peer, got an address, checking with blacklist.
311 */
312 S_CONNECT_RECV_BLACKLIST,
313
314 /**
315 * CONNECT request from other peer was SESSION_ACK'ed, waiting for
316 * SESSION_ACK.
317 */
318 S_CONNECT_RECV_ACK,
319
320 /**
321 * Got our CONNECT_ACK/SESSION_ACK, connection is up.
211 */ 322 */
212 S_CONNECTED, 323 S_CONNECTED,
213 324
214 /** 325 /**
215 * connection ended, fast reconnect 326 * Connection got into trouble, rest of the system still believes
327 * it to be up, but we're getting a new address from ATS.
216 */ 328 */
217 S_FAST_RECONNECT, 329 S_RECONNECT_ATS,
218 330
219 /** 331 /**
220 * Disconnect in progress 332 * Connection got into trouble, rest of the system still believes
333 * it to be up; we are checking the new address against the blacklist.
221 */ 334 */
222 S_DISCONNECT 335 S_RECONNECT_BLACKLIST,
336
337 /**
338 * Sent CONNECT over new address (either by ATS telling us to switch
339 * addresses or from RECONNECT_ATS); if this fails, we need to tell
340 * the rest of the system about a disconnect.
341 */
342 S_RECONNECT_SENT,
343
344 /**
345 * We have some primary connection, but ATS suggested we switch
346 * to some alternative; we're now checking the alternative against
347 * the blacklist.
348 */
349 S_CONNECTED_SWITCHING_BLACKLIST,
350
351 /**
352 * We have some primary connection, but ATS suggested we switch
353 * to some alternative; we now sent a CONNECT message for the
354 * alternative session to the other peer and waiting for a
355 * CONNECT_ACK to make this our primary connection.
356 */
357 S_CONNECTED_SWITCHING_CONNECT_SENT,
358
359 /**
360 * Disconnect in progress (we're sending the DISCONNECT message to the
361 * other peer; after that is finished, the state will be cleaned up).
362 */
363 S_DISCONNECT,
364
365 /**
366 * We're finished with the disconnect; clean up state now!
367 */
368 S_DISCONNECT_FINISHED
223}; 369};
224 370
225enum Address_State 371
372/**
373 * A possible address we could use to communicate with a neighbour.
374 */
375struct NeighbourAddress
226{ 376{
227 USED, 377
228 UNUSED, 378 /**
229 FRESH, 379 * Active session for this address.
380 */
381 struct Session *session;
382
383 /**
384 * Network-level address information.
385 */
386 struct GNUNET_HELLO_Address *address;
387
388 /**
389 * Timestamp of the 'SESSION_CONNECT' message we sent to the other
390 * peer for this address. Use to check that the ACK is in response
391 * to our most recent 'CONNECT'.
392 */
393 struct GNUNET_TIME_Absolute connect_timestamp;
394
395 /**
396 * Inbound bandwidth from ATS for this address.
397 */
398 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
399
400 /**
401 * Outbound bandwidth from ATS for this address.
402 */
403 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
404
405 /**
406 * Did we tell ATS that this is our 'active' address?
407 */
408 int ats_active;
409
230}; 410};
231 411
232 412
@@ -254,20 +434,15 @@ struct NeighbourMapEntry
254 struct MessageQueue *is_active; 434 struct MessageQueue *is_active;
255 435
256 /** 436 /**
257 * Active session for communicating with the peer. 437 * Primary address we currently use to communicate with the neighbour.
258 */ 438 */
259 struct Session *session; 439 struct NeighbourAddress primary_address;
260 440
261 /** 441 /**
262 * Address we currently use. 442 * Alternative address currently under consideration for communicating
443 * with the neighbour.
263 */ 444 */
264 struct GNUNET_HELLO_Address *address; 445 struct NeighbourAddress alternative_address;
265
266 /**
267 * Address we currently use.
268 */
269 struct GNUNET_HELLO_Address *fast_reconnect_address;
270
271 446
272 /** 447 /**
273 * Identity of this neighbour. 448 * Identity of this neighbour.
@@ -275,107 +450,133 @@ struct NeighbourMapEntry
275 struct GNUNET_PeerIdentity id; 450 struct GNUNET_PeerIdentity id;
276 451
277 /** 452 /**
278 * ID of task scheduled to run when this peer is about to 453 * Main task that drives this peer (timeouts, keepalives, etc.).
279 * time out (will free resources associated with the peer). 454 * Always runs the 'master_task'.
280 */ 455 */
281 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 456 GNUNET_SCHEDULER_TaskIdentifier task;
282 457
283 /** 458 /**
284 * ID of task scheduled to send keepalives. 459 * At what time should we sent the next keep-alive message?
285 */ 460 */
286 GNUNET_SCHEDULER_TaskIdentifier keepalive_task; 461 struct GNUNET_TIME_Absolute keep_alive_time;
287 462
288 /** 463 /**
289 * ID of task scheduled to run when we should try transmitting 464 * At what time did we sent the last keep-alive message? Used
290 * the head of the message queue. 465 * to calculate round-trip time ("latency").
291 */ 466 */
292 GNUNET_SCHEDULER_TaskIdentifier transmission_task; 467 struct GNUNET_TIME_Absolute last_keep_alive_time;
293 468
294 /** 469 /**
295 * Tracker for inbound bandwidth. 470 * Timestamp we should include in our next CONNECT_ACK message.
471 * (only valid if 'send_connect_ack' is GNUNET_YES). Used to build
472 * our CONNECT_ACK message.
296 */ 473 */
297 struct GNUNET_BANDWIDTH_Tracker in_tracker; 474 struct GNUNET_TIME_Absolute connect_ack_timestamp;
298 475
299 /** 476 /**
300 * Inbound bandwidth from ATS, activated when connection is up 477 * Time where we should cut the connection (timeout) if we don't
478 * make progress in the state machine (or get a KEEPALIVE_RESPONSE
479 * if we are in S_CONNECTED).
301 */ 480 */
302 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; 481 struct GNUNET_TIME_Absolute timeout;
303 482
304 /** 483 /**
305 * Inbound bandwidth from ATS, activated when connection is up 484 * Latest calculated latency value
306 */ 485 */
307 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; 486 struct GNUNET_TIME_Relative latency;
308 487
309 /** 488 /**
310 * Timestamp of the 'SESSION_CONNECT' message we got from the other peer 489 * Tracker for inbound bandwidth.
311 */ 490 */
312 struct GNUNET_TIME_Absolute connect_ts; 491 struct GNUNET_BANDWIDTH_Tracker in_tracker;
313 492
314 /** 493 /**
315 * When did we sent the last keep-alive message? 494 * How often has the other peer (recently) violated the inbound
495 * traffic limit? Incremented by 10 per violation, decremented by 1
496 * per non-violation (for each time interval).
316 */ 497 */
317 struct GNUNET_TIME_Absolute keep_alive_sent; 498 unsigned int quota_violation_count;
318 499
319 /** 500 /**
320 * Latest calculated latency value 501 * The current state of the peer.
321 */ 502 */
322 struct GNUNET_TIME_Relative latency; 503 enum State state;
323 504
324 /** 505 /**
325 * Timeout for ATS 506 * Did we sent an KEEP_ALIVE message and are we expecting a response?
326 * We asked ATS for a new address for this peer
327 */ 507 */
328 GNUNET_SCHEDULER_TaskIdentifier ats_suggest; 508 int expect_latency_response;
329 509
330 /** 510 /**
331 * Delay for ATS request 511 * Flag to set if we still need to send a CONNECT_ACK message to the other peer
512 * (once we have an address to use and the peer has been allowed by our
513 * blacklist). Set to 1 if we need to send a CONNECT_ACK. Set to 2 if we
514 * did send a CONNECT_ACK and should go to 'S_CONNECTED' upon receiving
515 * a 'SESSION_ACK' (regardless of what our own state machine might say).
332 */ 516 */
333 GNUNET_SCHEDULER_TaskIdentifier ats_request; 517 int send_connect_ack;
334 518
335 /** 519};
336 * Task the resets the peer state after due to an pending
337 * unsuccessful connection setup
338 */
339 GNUNET_SCHEDULER_TaskIdentifier state_reset;
340 520
341 521
522/**
523 * Context for blacklist checks and the 'handle_test_blacklist_cont'
524 * function. Stores information about ongoing blacklist checks.
525 */
526struct BlackListCheckContext
527{
528
342 /** 529 /**
343 * How often has the other peer (recently) violated the inbound 530 * We keep blacklist checks in a DLL.
344 * traffic limit? Incremented by 10 per violation, decremented by 1
345 * per non-violation (for each time interval).
346 */ 531 */
347 unsigned int quota_violation_count; 532 struct BlackListCheckContext *next;
348
349 533
350 /** 534 /**
351 * The current state of the peer 535 * We keep blacklist checks in a DLL.
352 * Element of enum State
353 */ 536 */
354 int state; 537 struct BlackListCheckContext *prev;
355 538
356 /** 539 /**
357 * Did we sent an KEEP_ALIVE message and are we expecting a response? 540 * Address that is being checked.
358 */ 541 */
359 int expect_latency_response; 542 struct NeighbourAddress na;
543
544 /**
545 * ATS information about the address.
546 */
547 struct GNUNET_ATS_Information *ats;
360 548
361 /** 549 /**
362 * Was the address used successfully 550 * Handle to the ongoing blacklist check.
363 */ 551 */
364 int address_state; 552 struct GST_BlacklistCheck *bc;
365 553
366 /** 554 /**
367 * Fast reconnect attempts for identical address 555 * Size of the 'ats' array.
368 */ 556 */
369 unsigned int fast_reconnect_attempts; 557 uint32_t ats_count;
558
370}; 559};
371 560
372 561
373/** 562/**
374 * All known neighbours and their HELLOs. 563 * Hash map from peer identities to the respective 'struct NeighbourMapEntry'.
375 */ 564 */
376static struct GNUNET_CONTAINER_MultiHashMap *neighbours; 565static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
377 566
378/** 567/**
568 * We keep blacklist checks in a DLL so that we can find
569 * the 'sessions' in their 'struct NeighbourAddress' if
570 * a session goes down.
571 */
572static struct BlackListCheckContext *bc_head;
573
574/**
575 * We keep blacklist checks in a DLL.
576 */
577static struct BlackListCheckContext *bc_tail;
578
579/**
379 * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb 580 * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
380 */ 581 */
381static void *callback_cls; 582static void *callback_cls;
@@ -398,10 +599,13 @@ static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
398/** 599/**
399 * counter for connected neighbours 600 * counter for connected neighbours
400 */ 601 */
401static int neighbours_connected; 602static unsigned int neighbours_connected;
603
604/**
605 * Number of bytes we have currently queued for transmission.
606 */
607static unsigned long long bytes_in_send_queue;
402 608
403static unsigned int bytes_in_send_queue;
404static unsigned int bytes_received;
405 609
406/** 610/**
407 * Lookup a neighbour entry in the neighbours hash map. 611 * Lookup a neighbour entry in the neighbours hash map.
@@ -412,461 +616,311 @@ static unsigned int bytes_received;
412static struct NeighbourMapEntry * 616static struct NeighbourMapEntry *
413lookup_neighbour (const struct GNUNET_PeerIdentity *pid) 617lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
414{ 618{
619 if (NULL == neighbours)
620 return NULL;
415 return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); 621 return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
416} 622}
417 623
624
418/** 625/**
419 * Disconnect from the given neighbour, clean up the record. 626 * Test if we're connected to the given peer.
420 * 627 *
421 * @param n neighbour to disconnect from 628 * @param n neighbour entry of peer to test
629 * @return GNUNET_YES if we are connected, GNUNET_NO if not
422 */ 630 */
423static void
424disconnect_neighbour (struct NeighbourMapEntry *n);
425
426#define change_state(n, state, ...) change (n, state, __LINE__)
427
428static int
429is_connecting (struct NeighbourMapEntry *n)
430{
431 if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
432 return GNUNET_YES;
433 return GNUNET_NO;
434}
435
436static int
437is_connected (struct NeighbourMapEntry *n)
438{
439 if (n->state == S_CONNECTED)
440 return GNUNET_YES;
441 return GNUNET_NO;
442}
443
444static int 631static int
445is_disconnecting (struct NeighbourMapEntry *n) 632test_connected (struct NeighbourMapEntry *n)
446{ 633{
447 if (n->state == S_DISCONNECT) 634 if (NULL == n)
448 return GNUNET_YES; 635 return GNUNET_NO;
449 return GNUNET_NO; 636 switch (n->state)
450}
451
452static const char *
453print_state (int state)
454{
455 switch (state)
456 { 637 {
457 case S_CONNECTED: 638 case S_NOT_CONNECTED:
458 return "S_CONNECTED"; 639 case S_INIT_ATS:
459 break; 640 case S_INIT_BLACKLIST:
460 case S_CONNECT_RECV:
461 return "S_CONNECT_RECV";
462 break;
463 case S_CONNECT_SENT: 641 case S_CONNECT_SENT:
464 return "S_CONNECT_SENT"; 642 case S_CONNECT_RECV_ATS:
465 break; 643 case S_CONNECT_RECV_BLACKLIST:
644 case S_CONNECT_RECV_ACK:
645 return GNUNET_NO;
646 case S_CONNECTED:
647 case S_RECONNECT_ATS:
648 case S_RECONNECT_BLACKLIST:
649 case S_RECONNECT_SENT:
650 case S_CONNECTED_SWITCHING_BLACKLIST:
651 case S_CONNECTED_SWITCHING_CONNECT_SENT:
652 return GNUNET_YES;
466 case S_DISCONNECT: 653 case S_DISCONNECT:
467 return "S_DISCONNECT"; 654 case S_DISCONNECT_FINISHED:
468 break; 655 return GNUNET_NO;
469 case S_NOT_CONNECTED:
470 return "S_NOT_CONNECTED";
471 break;
472 case S_FAST_RECONNECT:
473 return "S_FAST_RECONNECT";
474 break;
475 default: 656 default:
476 GNUNET_break (0); 657 GNUNET_break (0);
477 break; 658 break;
478 } 659 }
479 return NULL; 660 return GNUNET_SYSERR;
480} 661}
481 662
482static int
483change (struct NeighbourMapEntry *n, int state, int line);
484
485static void
486ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
487
488 663
664/**
665 * Send information about a new outbound quota to our clients.
666 *
667 * @param target affected peer
668 * @param quota new quota
669 */
489static void 670static void
490reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 671send_outbound_quota (const struct GNUNET_PeerIdentity *target,
672 struct GNUNET_BANDWIDTH_Value32NBO quota)
491{ 673{
492 struct NeighbourMapEntry *n = cls; 674 struct QuotaSetMessage q_msg;
493
494 if (n == NULL)
495 return;
496
497 n->state_reset = GNUNET_SCHEDULER_NO_TASK;
498 if (n->state == S_CONNECTED)
499 return;
500 GNUNET_STATISTICS_update (GST_stats,
501 gettext_noop
502 ("# failed connection attempts due to timeout"), 1,
503 GNUNET_NO);
504 /* resetting state */
505
506 if (n->state == S_FAST_RECONNECT)
507 {
508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "Fast reconnect time out, disconnecting peer `%s'\n",
510 GNUNET_i2s (&n->id));
511 disconnect_neighbour(n);
512 return;
513 }
514 675
515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 676 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
516 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", 677 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
517 GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__); 678 ntohl (quota.value__), GNUNET_i2s (target));
518 679 q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
519 n->state = S_NOT_CONNECTED; 680 q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
520 681 q_msg.quota = quota;
521 /* destroying address */ 682 q_msg.peer = (*target);
522 if (n->address != NULL) 683 GST_clients_broadcast (&q_msg.header, GNUNET_NO);
523 {
524 GNUNET_assert (strlen (n->address->transport_name) > 0);
525 GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
526 }
527
528 /* request new address */
529 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
530 GNUNET_SCHEDULER_cancel (n->ats_suggest);
531 n->ats_suggest =
532 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
533 n);
534 GNUNET_ATS_suggest_address (GST_ats, &n->id);
535} 684}
536 685
537static int
538change (struct NeighbourMapEntry *n, int state, int line)
539{
540 int previous_state;
541 /* allowed transitions */
542 int allowed = GNUNET_NO;
543
544 previous_state = n->state;
545 686
546 switch (n->state) 687/**
547 { 688 * We don't need a given neighbour address any more.
548 case S_NOT_CONNECTED: 689 * Release its resources and give appropriate notifications
549 if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) || 690 * to ATS and other subsystems.
550 (state == S_DISCONNECT)) 691 *
551 allowed = GNUNET_YES; 692 * @param na address we are done with; 'na' itself must NOT be 'free'd, only the contents!
552 break; 693 */
553 case S_CONNECT_RECV: 694static void
554 allowed = GNUNET_YES; 695free_address (struct NeighbourAddress *na)
555 break; 696{
556 case S_CONNECT_SENT: 697 if (GNUNET_YES == na->ats_active)
557 allowed = GNUNET_YES;
558 break;
559 case S_CONNECTED:
560 if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
561 allowed = GNUNET_YES;
562 break;
563 case S_DISCONNECT:
564 break;
565 case S_FAST_RECONNECT:
566 if ((state == S_CONNECTED) || (state == S_DISCONNECT))
567 allowed = GNUNET_YES;
568 break;
569 default:
570 GNUNET_break (0);
571 break;
572 }
573 if (allowed == GNUNET_NO)
574 {
575 char *old = GNUNET_strdup (print_state (n->state));
576 char *new = GNUNET_strdup (print_state (state));
577
578 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
579 "Illegal state transition from `%s' to `%s' in line %u \n", old,
580 new, line);
581 GNUNET_break (0);
582 GNUNET_free (old);
583 GNUNET_free (new);
584 return GNUNET_SYSERR;
585 }
586 {
587 char *old = GNUNET_strdup (print_state (n->state));
588 char *new = GNUNET_strdup (print_state (state));
589
590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
592 GNUNET_i2s (&n->id), n, old, new, line);
593 GNUNET_free (old);
594 GNUNET_free (new);
595 }
596 n->state = state;
597
598 switch (n->state)
599 {
600 case S_FAST_RECONNECT:
601 case S_CONNECT_RECV:
602 case S_CONNECT_SENT:
603 if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
604 GNUNET_SCHEDULER_cancel (n->state_reset);
605 n->state_reset =
606 GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
607 break;
608 case S_CONNECTED:
609 case S_NOT_CONNECTED:
610 case S_DISCONNECT:
611 if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
612 {
613#if DEBUG_TRANSPORT
614 char *old = GNUNET_strdup (print_state (n->state));
615 char *new = GNUNET_strdup (print_state (state));
616
617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618 "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
619 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
620 GNUNET_free (old);
621 GNUNET_free (new);
622#endif
623 GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
624 GNUNET_SCHEDULER_cancel (n->state_reset);
625 n->state_reset = GNUNET_SCHEDULER_NO_TASK;
626 }
627 break;
628
629 default:
630 GNUNET_assert (0);
631 }
632
633 if (NULL != address_change_cb)
634 { 698 {
635 if (n->state == S_CONNECTED) 699 GST_validation_set_address_use (na->address, na->session, GNUNET_NO, __LINE__);
636 address_change_cb (callback_cls, &n->id, n->address); 700 GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_NO);
637 else if (previous_state == S_CONNECTED)
638 address_change_cb (callback_cls, &n->id, NULL);
639 } 701 }
640 702 na->ats_active = GNUNET_NO;
641 return GNUNET_OK; 703 if (NULL != na->address)
642}
643
644static ssize_t
645send_with_session (struct NeighbourMapEntry *n,
646 const char *msgbuf, size_t msgbuf_size,
647 uint32_t priority,
648 struct GNUNET_TIME_Relative timeout,
649 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
650{
651 struct GNUNET_TRANSPORT_PluginFunctions *papi;
652 size_t ret = GNUNET_SYSERR;
653
654 GNUNET_assert (n != NULL);
655 GNUNET_assert (n->session != NULL);
656
657 papi = GST_plugins_find (n->address->transport_name);
658 if (papi == NULL)
659 { 704 {
660 if (cont != NULL) 705 GNUNET_HELLO_address_free (na->address);
661 cont (cont_cls, &n->id, GNUNET_SYSERR); 706 na->address = NULL;
662 return GNUNET_SYSERR;
663 } 707 }
664 708 na->session = NULL;
665 ret = papi->send (papi->cls,
666 n->session,
667 msgbuf, msgbuf_size,
668 0,
669 timeout,
670 cont, cont_cls);
671
672 if ((ret == -1) && (cont != NULL))
673 cont (cont_cls, &n->id, GNUNET_SYSERR);
674 return ret;
675} 709}
676 710
677/**
678 * Task invoked to start a transmission to another peer.
679 *
680 * @param cls the 'struct NeighbourMapEntry'
681 * @param tc scheduler context
682 */
683static void
684transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
685
686 711
687/** 712/**
688 * We're done with our transmission attempt, continue processing. 713 * Initialize the 'struct NeighbourAddress'.
689 * 714 *
690 * @param cls the 'struct MessageQueue' of the message 715 * @param peer identity of the peer to switch the address for
691 * @param receiver intended receiver 716 * @param address address of the other peer, NULL if other peer
692 * @param success whether it worked or not 717 * connected to us
718 * @param session session to use (or NULL, in which case an
719 * address must be setup)
720 * @param bandwidth_in inbound quota to be used when connection is up
721 * @param bandwidth_out outbound quota to be used when connection is up
722 * @param is_active GNUNET_YES to mark this as the active address with ATS
693 */ 723 */
694static void 724static void
695transmit_send_continuation (void *cls, 725set_address (struct NeighbourAddress *na,
696 const struct GNUNET_PeerIdentity *receiver, 726 const struct GNUNET_HELLO_Address *address,
697 int success) 727 struct Session *session,
728 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
729 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
730 int is_active)
698{ 731{
699 struct MessageQueue *mq = cls; 732 struct GNUNET_TRANSPORT_PluginFunctions *papi;
700 struct NeighbourMapEntry *n;
701 struct NeighbourMapEntry *tmp;
702 733
703 tmp = lookup_neighbour (receiver); 734 if (NULL == (papi = GST_plugins_find (address->transport_name)))
704 n = mq->n;
705 if ((NULL != n) && (tmp != NULL) && (tmp == n))
706 { 735 {
707 GNUNET_assert (n->is_active == mq); 736 GNUNET_break (0);
708 n->is_active = NULL; 737 return;
709 if (success == GNUNET_YES) 738 }
739 if (session == na->session)
740 {
741 na->bandwidth_in = bandwidth_in;
742 na->bandwidth_out = bandwidth_out;
743 if (is_active != na->ats_active)
710 { 744 {
711 GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); 745 na->ats_active = is_active;
712 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); 746 GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, is_active);
747 GST_validation_set_address_use (na->address, na->session, is_active, __LINE__);
713 } 748 }
749 if (GNUNET_YES == is_active)
750 {
751 /* FIXME: is this the right place to set quotas? */
752 GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in);
753 send_outbound_quota (&address->peer, bandwidth_out);
754 }
755 return;
714 } 756 }
757 free_address (na);
758 if (NULL == session)
759 session = papi->get_session (papi->cls, address);
760 if (NULL == session)
761 {
762 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763 "Failed to obtain new session for peer `%s' and address '%s'\n",
764 GNUNET_i2s (&address->peer), GST_plugins_a2s (address));
765 GNUNET_ATS_address_destroyed (GST_ats, address, NULL);
766 return;
767 }
768 na->address = GNUNET_HELLO_address_copy (address);
769 na->bandwidth_in = bandwidth_in;
770 na->bandwidth_out = bandwidth_out;
771 na->session = session;
772 na->ats_active = is_active;
773 if (GNUNET_YES == is_active)
774 {
775 /* Telling ATS about new session */
776 GNUNET_ATS_address_update (GST_ats, na->address, na->session, NULL, 0);
777 GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_YES);
778 GST_validation_set_address_use (na->address, na->session, GNUNET_YES, __LINE__);
715 779
716 GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size); 780 /* FIXME: is this the right place to set quotas? */
717 bytes_in_send_queue -= mq->message_buf_size; 781 GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in);
718 GNUNET_STATISTICS_set (GST_stats, 782 send_outbound_quota (&address->peer, bandwidth_out);
719 gettext_noop 783 }
720 ("# bytes in message queue for other peers"),
721 bytes_in_send_queue, GNUNET_NO);
722
723 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
724 ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
725 (success == GNUNET_OK) ? "successful" : "FAILED");
726 if (NULL != mq->cont)
727 mq->cont (mq->cont_cls, success);
728 GNUNET_free (mq);
729} 784}
730 785
731 786
732/** 787/**
733 * Check the ready list for the given neighbour and if a plugin is 788 * Free a neighbour map entry.
734 * ready for transmission (and if we have a message), do so!
735 * 789 *
736 * @param n target peer for which to transmit 790 * @param n entry to free
737 */ 791 */
738static void 792static void
739try_transmission_to_peer (struct NeighbourMapEntry *n) 793free_neighbour (struct NeighbourMapEntry *n)
740{ 794{
741 struct MessageQueue *mq; 795 struct MessageQueue *mq;
742 struct GNUNET_TIME_Relative timeout; 796 struct GNUNET_TRANSPORT_PluginFunctions *papi;
743 ssize_t ret;
744 797
745 if (n->is_active != NULL) 798 GNUNET_assert (GNUNET_YES ==
746 { 799 GNUNET_CONTAINER_multihashmap_remove (neighbours,
747 GNUNET_break (0); 800 &n->id.hashPubKey, n));
748 return; /* transmission already pending */ 801 n->is_active = NULL; /* always free'd by its own continuation! */
749 }
750 if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
751 {
752 GNUNET_break (0);
753 return; /* currently waiting for bandwidth */
754 }
755 while (NULL != (mq = n->messages_head))
756 {
757 timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
758 if (timeout.rel_value > 0)
759 break;
760 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
761 n->is_active = mq;
762 mq->n = n;
763 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
764 }
765 if (NULL == mq)
766 return; /* no more messages */
767 802
768 if (n->address == NULL) 803 /* fail messages currently in the queue */
804 while (NULL != (mq = n->messages_head))
769 { 805 {
770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
771 GNUNET_i2s (&n->id));
772 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); 806 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
773 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); 807 if (NULL != mq->cont)
774 GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); 808 mq->cont (mq->cont_cls, GNUNET_SYSERR);
775 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); 809 GNUNET_free (mq);
776 return;
777 }
778
779 if (GST_plugins_find (n->address->transport_name) == NULL)
780 {
781 GNUNET_break (0);
782 return;
783 } 810 }
784 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); 811 /* It is too late to send other peer disconnect notifications, but at
785 n->is_active = mq; 812 least internally we need to get clean... */
786 mq->n = n; 813 if (GNUNET_YES == test_connected (n))
787
788 if ((n->address->address_length == 0) && (n->session == NULL))
789 { 814 {
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", 815 GNUNET_STATISTICS_set (GST_stats,
791 GNUNET_i2s (&n->id)); 816 gettext_noop ("# peers connected"),
792 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); 817 --neighbours_connected,
793 GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); 818 GNUNET_NO);
794 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); 819 disconnect_notify_cb (callback_cls, &n->id);
795 return;
796 } 820 }
821 n->state = S_DISCONNECT_FINISHED;
822
823 /* FIXME-PLUGIN-API: This does not seem to guarantee that all
824 transport sessions eventually get killed due to inactivity; they
825 MUST have their own timeout logic (but at least TCP doesn't have
826 one yet). Are we sure that EVERY 'session' of a plugin is
827 actually cleaned up this way!? Note that if we are switching
828 between two TCP sessions to the same peer, the existing plugin
829 API gives us not even the means to selectively kill only one of
830 them! Killing all sessions like this seems to be very, very
831 wrong. */
832 if ( (NULL != n->primary_address.address) &&
833 (NULL != (papi = GST_plugins_find (n->primary_address.address->transport_name))) )
834 papi->disconnect (papi->cls, &n->id);
835
836 /* cut transport-level connection */
837 free_address (&n->primary_address);
838 free_address (&n->alternative_address);
839
840 // FIXME-ATS-API: we might want to be more specific about
841 // which states we do this from in the future (ATS should
842 // have given us a 'suggest_address' handle, and if we have
843 // such a handle, we should cancel the operation here!
844 GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
797 845
798 ret = send_with_session(n, 846 if (GNUNET_SCHEDULER_NO_TASK != n->task)
799 mq->message_buf, mq->message_buf_size,
800 0, timeout,
801 &transmit_send_continuation, mq);
802
803 if (ret == -1)
804 { 847 {
805 /* failure, but 'send' would not call continuation in this case, 848 GNUNET_SCHEDULER_cancel (n->task);
806 * so we need to do it here! */ 849 n->task = GNUNET_SCHEDULER_NO_TASK;
807 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
808 } 850 }
809 851 /* free rest of memory */
852 GNUNET_free (n);
810} 853}
811 854
812 855
813/** 856/**
814 * Task invoked to start a transmission to another peer. 857 * Transmit a message using the current session of the given
858 * neighbour.
815 * 859 *
816 * @param cls the 'struct NeighbourMapEntry' 860 * @param n entry for the recipient
817 * @param tc scheduler context 861 * @param msgbuf buffer to transmit
862 * @param msgbuf_size number of bytes in buffer
863 * @param priority transmission priority
864 * @param timeout transmission timeout
865 * @param cont continuation to call when finished (can be NULL)
866 * @param cont_cls closure for cont
818 */ 867 */
819static void 868static void
820transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 869send_with_session (struct NeighbourMapEntry *n,
870 const char *msgbuf, size_t msgbuf_size,
871 uint32_t priority,
872 struct GNUNET_TIME_Relative timeout,
873 GNUNET_TRANSPORT_TransmitContinuation cont,
874 void *cont_cls)
821{ 875{
822 struct NeighbourMapEntry *n = cls; 876 struct GNUNET_TRANSPORT_PluginFunctions *papi;
823 877
824 GNUNET_assert (NULL != lookup_neighbour (&n->id)); 878 GNUNET_assert (n->primary_address.session != NULL);
825 n->transmission_task = GNUNET_SCHEDULER_NO_TASK; 879 if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name))) ||
826 try_transmission_to_peer (n); 880 (-1 == papi->send (papi->cls,
881 n->primary_address.session,
882 msgbuf, msgbuf_size,
883 priority,
884 timeout,
885 cont, cont_cls))) &&
886 (NULL != cont) )
887 cont (cont_cls, &n->id, GNUNET_SYSERR);
888 GNUNET_break (NULL != papi);
827} 889}
828 890
829 891
830/** 892/**
831 * Initialize the neighbours subsystem. 893 * Function called when the 'DISCONNECT' message has been sent by the
894 * plugin. Frees the neighbour --- if the entry still exists.
832 * 895 *
833 * @param cls closure for callbacks 896 * @param cls NULL
834 * @param connect_cb function to call if we connect to a peer 897 * @param target identity of the neighbour that was disconnected
835 * @param disconnect_cb function to call if we disconnect from a peer 898 * @param result GNUNET_OK if the disconnect got out successfully
836 * @param peer_address_cb function to call if we change an active address
837 * of a neighbour
838 */ 899 */
839void
840GST_neighbours_start (void *cls,
841 GNUNET_TRANSPORT_NotifyConnect connect_cb,
842 GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
843 GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
844{
845 callback_cls = cls;
846 connect_notify_cb = connect_cb;
847 disconnect_notify_cb = disconnect_cb;
848 address_change_cb = peer_address_cb;
849 neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
850 neighbours_connected = 0;
851 bytes_in_send_queue = 0;
852 bytes_received = 0;
853}
854
855
856static void 900static void
857send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target, 901send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
858 int result) 902 int result)
859{ 903{
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 904 struct NeighbourMapEntry *n;
861 "Sending DISCONNECT message to peer `%4s': %i\n", 905
862 GNUNET_i2s (target), result); 906 n = lookup_neighbour (target);
907 if (NULL == n)
908 return; /* already gone */
909 if (S_DISCONNECT != n->state)
910 return; /* have created a fresh entry since */
911 n->state = S_DISCONNECT_FINISHED;
912 free_neighbour (n);
863} 913}
864 914
865 915
866static int 916/**
867send_disconnect (struct NeighbourMapEntry * n) 917 * Transmit a DISCONNECT message to the other peer.
918 *
919 * @param n neighbour to send DISCONNECT message.
920 */
921static void
922send_disconnect (struct NeighbourMapEntry *n)
868{ 923{
869 size_t ret;
870 struct SessionDisconnectMessage disconnect_msg; 924 struct SessionDisconnectMessage disconnect_msg;
871 925
872 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 926 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -890,23 +944,31 @@ send_disconnect (struct NeighbourMapEntry * n)
890 &disconnect_msg.purpose, 944 &disconnect_msg.purpose,
891 &disconnect_msg.signature)); 945 &disconnect_msg.signature));
892 946
893 ret = send_with_session (n, 947 send_with_session (n,
894 (const char *) &disconnect_msg, sizeof (disconnect_msg), 948 (const char *) &disconnect_msg, sizeof (disconnect_msg),
895 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, 949 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
896 &send_disconnect_cont, NULL); 950 &send_disconnect_cont, NULL);
897
898 if (ret == GNUNET_SYSERR)
899 return GNUNET_SYSERR;
900
901 GNUNET_STATISTICS_update (GST_stats, 951 GNUNET_STATISTICS_update (GST_stats,
902 gettext_noop 952 gettext_noop
903 ("# peers disconnected due to external request"), 1, 953 ("# DISCONNECT messages sent"), 1,
904 GNUNET_NO); 954 GNUNET_NO);
905 return GNUNET_OK;
906} 955}
907 956
908 957
909/** 958/**
959 * Master task run for every neighbour. Performs all of the time-related
960 * activities (keep alive, send next message, disconnect if idle, finish
961 * clean up after disconnect).
962 *
963 * @param cls the 'struct NeighbourMapEntry' for which we are running
964 * @param tc scheduler context (unused)
965 */
966static void
967master_task (void *cls,
968 const struct GNUNET_SCHEDULER_TaskContext *tc);
969
970
971/**
910 * Disconnect from the given neighbour, clean up the record. 972 * Disconnect from the given neighbour, clean up the record.
911 * 973 *
912 * @param n neighbour to disconnect from 974 * @param n neighbour to disconnect from
@@ -914,844 +976,533 @@ send_disconnect (struct NeighbourMapEntry * n)
914static void 976static void
915disconnect_neighbour (struct NeighbourMapEntry *n) 977disconnect_neighbour (struct NeighbourMapEntry *n)
916{ 978{
917 struct MessageQueue *mq; 979 /* depending on state, notify neighbour and/or upper layers of this peer
918 int previous_state; 980 about disconnect */
919 981 switch (n->state)
920 previous_state = n->state;
921
922 if (is_disconnecting (n))
923 return;
924
925 /* send DISCONNECT MESSAGE */
926 if (previous_state == S_CONNECTED)
927 {
928 if (GNUNET_OK == send_disconnect (n))
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
930 GNUNET_i2s (&n->id));
931 else
932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933 "Could not send DISCONNECT_MSG to `%s'\n",
934 GNUNET_i2s (&n->id));
935 }
936
937 change_state (n, S_DISCONNECT);
938
939 if (previous_state == S_CONNECTED)
940 {
941 GNUNET_assert (NULL != n->address);
942 if (n->address_state == USED)
943 {
944 GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
945 GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
946 n->address_state = UNUSED;
947 }
948 }
949
950 if (n->address != NULL)
951 {
952 struct GNUNET_TRANSPORT_PluginFunctions *papi;
953
954 papi = GST_plugins_find (n->address->transport_name);
955 if (papi != NULL)
956 papi->disconnect (papi->cls, &n->id);
957 }
958 while (NULL != (mq = n->messages_head))
959 {
960 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
961 if (NULL != mq->cont)
962 mq->cont (mq->cont_cls, GNUNET_SYSERR);
963 GNUNET_free (mq);
964 }
965 if (NULL != n->is_active)
966 {
967 n->is_active->n = NULL;
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "Failing transmission of active message due to disconnect\n");
970 if (NULL != n->is_active->cont)
971 n->is_active->cont (n->is_active->cont_cls, GNUNET_SYSERR);
972 GNUNET_free (n->is_active);
973 n->is_active = NULL;
974 }
975
976 switch (previous_state)
977 { 982 {
983 case S_NOT_CONNECTED:
984 case S_INIT_ATS:
985 case S_INIT_BLACKLIST:
986 /* other peer is completely unaware of us, no need to send DISCONNECT */
987 n->state = S_DISCONNECT_FINISHED;
988 free_neighbour (n);
989 return;
990 case S_CONNECT_SENT:
991 send_disconnect (n);
992 n->state = S_DISCONNECT;
993 break;
994 case S_CONNECT_RECV_ATS:
995 case S_CONNECT_RECV_BLACKLIST:
996 /* we never ACK'ed the other peer's request, no need to send DISCONNECT */
997 n->state = S_DISCONNECT_FINISHED;
998 free_neighbour (n);
999 return;
1000 case S_CONNECT_RECV_ACK:
1001 /* we DID ACK the other peer's request, must send DISCONNECT */
1002 send_disconnect (n);
1003 n->state = S_DISCONNECT;
1004 break;
978 case S_CONNECTED: 1005 case S_CONNECTED:
979 GNUNET_assert (neighbours_connected > 0); 1006 case S_RECONNECT_ATS:
980 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task); 1007 case S_RECONNECT_BLACKLIST:
981 GNUNET_SCHEDULER_cancel (n->keepalive_task); 1008 case S_RECONNECT_SENT:
982 n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; 1009 case S_CONNECTED_SWITCHING_BLACKLIST:
983 n->expect_latency_response = GNUNET_NO; 1010 case S_CONNECTED_SWITCHING_CONNECT_SENT:
984 neighbours_connected--; 1011 /* we are currently connected, need to send disconnect and do
985 GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected, 1012 internal notifications and update statistics */
986 GNUNET_NO); 1013 send_disconnect (n);
1014 GNUNET_STATISTICS_set (GST_stats,
1015 gettext_noop ("# peers connected"),
1016 --neighbours_connected,
1017 GNUNET_NO);
987 disconnect_notify_cb (callback_cls, &n->id); 1018 disconnect_notify_cb (callback_cls, &n->id);
1019 n->state = S_DISCONNECT;
988 break; 1020 break;
989 case S_FAST_RECONNECT: 1021 case S_DISCONNECT:
990 GNUNET_STATISTICS_update (GST_stats, 1022 /* already disconnected, ignore */
991 gettext_noop ("# fast reconnects failed"), 1, 1023 break;
992 GNUNET_NO); 1024 case S_DISCONNECT_FINISHED:
993 disconnect_notify_cb (callback_cls, &n->id); 1025 /* already cleaned up, how did we get here!? */
1026 GNUNET_assert (0);
994 break; 1027 break;
995 default: 1028 default:
1029 GNUNET_break (0);
996 break; 1030 break;
997 } 1031 }
998 1032 /* schedule timeout to clean up */
999 GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); 1033 if (GNUNET_SCHEDULER_NO_TASK != n->task)
1000 1034 GNUNET_SCHEDULER_cancel (n->task);
1001 GNUNET_assert (GNUNET_YES == 1035 n->task = GNUNET_SCHEDULER_add_delayed (DISCONNECT_SENT_TIMEOUT,
1002 GNUNET_CONTAINER_multihashmap_remove (neighbours, 1036 &master_task, n);
1003 &n->id.hashPubKey, n));
1004 if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1005 {
1006 GNUNET_SCHEDULER_cancel (n->ats_suggest);
1007 n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1008 }
1009 if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1010 {
1011 GNUNET_SCHEDULER_cancel (n->ats_request);
1012 n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1013 }
1014
1015 if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1016 {
1017 GNUNET_SCHEDULER_cancel (n->timeout_task);
1018 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1019 }
1020 if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1021 {
1022 GNUNET_SCHEDULER_cancel (n->transmission_task);
1023 n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1024 }
1025 if (NULL != n->fast_reconnect_address)
1026 {
1027 GNUNET_HELLO_address_free (n->fast_reconnect_address);
1028 n->fast_reconnect_address = NULL;
1029 }
1030 if (NULL != n->address)
1031 {
1032 GNUNET_HELLO_address_free (n->address);
1033 n->address = NULL;
1034 }
1035 n->session = NULL;
1036 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1037 GNUNET_i2s (&n->id), n);
1038 GNUNET_free (n);
1039} 1037}
1040 1038
1041 1039
1042/** 1040/**
1043 * Peer has been idle for too long. Disconnect. 1041 * We're done with our transmission attempt, continue processing.
1044 *
1045 * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1046 * @param tc scheduler context
1047 */
1048static void
1049neighbour_timeout_task (void *cls,
1050 const struct GNUNET_SCHEDULER_TaskContext *tc)
1051{
1052 struct NeighbourMapEntry *n = cls;
1053
1054 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1056 GNUNET_i2s (&n->id));
1057
1058 GNUNET_STATISTICS_update (GST_stats,
1059 gettext_noop
1060 ("# peers disconnected due to timeout"), 1,
1061 GNUNET_NO);
1062 disconnect_neighbour (n);
1063}
1064
1065
1066/**
1067 * Send another keepalive message.
1068 * 1042 *
1069 * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle 1043 * @param cls the 'struct MessageQueue' of the message
1070 * @param tc scheduler context 1044 * @param receiver intended receiver
1045 * @param success whether it worked or not
1071 */ 1046 */
1072static void 1047static void
1073neighbour_keepalive_task (void *cls, 1048transmit_send_continuation (void *cls,
1074 const struct GNUNET_SCHEDULER_TaskContext *tc) 1049 const struct GNUNET_PeerIdentity *receiver,
1050 int success)
1075{ 1051{
1076 struct NeighbourMapEntry *n = cls; 1052 struct MessageQueue *mq = cls;
1077 struct GNUNET_MessageHeader m; 1053 struct NeighbourMapEntry *n;
1078 int ret;
1079
1080 GNUNET_assert (S_CONNECTED == n->state);
1081 n->keepalive_task =
1082 GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1083 &neighbour_keepalive_task, n);
1084
1085 GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1086 GNUNET_NO);
1087 m.size = htons (sizeof (struct GNUNET_MessageHeader));
1088 m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1089
1090 ret = send_with_session (n,
1091 (const void *) &m, sizeof (m),
1092 UINT32_MAX /* priority */ ,
1093 GNUNET_TIME_UNIT_FOREVER_REL,
1094 NULL, NULL);
1095 1054
1096 n->expect_latency_response = GNUNET_NO; 1055 n = lookup_neighbour (receiver);
1097 n->keep_alive_sent = GNUNET_TIME_absolute_get_zero (); 1056 if (n->is_active == mq)
1098 if (ret != GNUNET_SYSERR)
1099 { 1057 {
1100 n->expect_latency_response = GNUNET_YES; 1058 /* this is still "our" neighbour, remove us from its queue
1101 n->keep_alive_sent = GNUNET_TIME_absolute_get (); 1059 and allow it to send the next message now */
1060 n->is_active = NULL;
1061 GNUNET_SCHEDULER_cancel (n->task);
1062 n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
1102 } 1063 }
1103 1064 GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size);
1104} 1065 bytes_in_send_queue -= mq->message_buf_size;
1105 1066 GNUNET_STATISTICS_set (GST_stats,
1106 1067 gettext_noop
1107/** 1068 ("# bytes in message queue for other peers"),
1108 * Disconnect from the given neighbour. 1069 bytes_in_send_queue, GNUNET_NO);
1109 * 1070 if (GNUNET_OK == success)
1110 * @param cls unused
1111 * @param key hash of neighbour's public key (not used)
1112 * @param value the 'struct NeighbourMapEntry' of the neighbour
1113 */
1114static int
1115disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1116{
1117 struct NeighbourMapEntry *n = value;
1118
1119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1120 GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1121 if (S_CONNECTED == n->state)
1122 GNUNET_STATISTICS_update (GST_stats, 1071 GNUNET_STATISTICS_update (GST_stats,
1123 gettext_noop 1072 gettext_noop
1124 ("# peers disconnected due to global disconnect"), 1073 ("# messages transmitted to other peers"),
1125 1, GNUNET_NO); 1074 1, GNUNET_NO);
1126 disconnect_neighbour (n); 1075 else
1127 return GNUNET_OK; 1076 GNUNET_STATISTICS_update (GST_stats,
1128} 1077 gettext_noop
1129 1078 ("# transmission failures for messages to other peers"),
1130 1079 1, GNUNET_NO);
1131static void 1080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1081 "Sending message to `%s' of type %u was a %s\n",
1133{ 1082 GNUNET_i2s (receiver),
1134 struct NeighbourMapEntry *n = cls; 1083 ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
1135 1084 (success == GNUNET_OK) ? "success" : "FAILURE");
1136 n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; 1085 if (NULL != mq->cont)
1137 1086 mq->cont (mq->cont_cls, success);
1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1087 GNUNET_free (mq);
1139 "ATS did not suggested address to connect to peer `%s'\n",
1140 GNUNET_i2s (&n->id));
1141
1142 GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1143 GNUNET_NO);
1144
1145
1146 disconnect_neighbour (n);
1147}
1148
1149/**
1150 * Cleanup the neighbours subsystem.
1151 */
1152void
1153GST_neighbours_stop ()
1154{
1155 // This can happen during shutdown
1156 if (neighbours == NULL)
1157 {
1158 return;
1159 }
1160
1161 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1162 NULL);
1163 GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1164// GNUNET_assert (neighbours_connected == 0);
1165 neighbours = NULL;
1166 callback_cls = NULL;
1167 connect_notify_cb = NULL;
1168 disconnect_notify_cb = NULL;
1169 address_change_cb = NULL;
1170} 1088}
1171 1089
1172struct ContinutionContext
1173{
1174 struct GNUNET_HELLO_Address *address;
1175
1176 struct Session *session;
1177};
1178
1179static void
1180send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1181 struct GNUNET_BANDWIDTH_Value32NBO quota)
1182{
1183 struct QuotaSetMessage q_msg;
1184
1185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1187 ntohl (quota.value__), GNUNET_i2s (target));
1188 q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1189 q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1190 q_msg.quota = quota;
1191 q_msg.peer = (*target);
1192 GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1193}
1194 1090
1195/** 1091/**
1196 * We tried to send a SESSION_CONNECT message to another peer. If this 1092 * Check the message list for the given neighbour and if we can
1197 * succeeded, we change the state. If it failed, we should tell 1093 * send a message, do so. This function should only be called
1198 * ATS to not use this address anymore (until it is re-validated). 1094 * if the connection is at least generally ready for transmission.
1095 * While we will only send one message at a time, no bandwidth
1096 * quota management is performed here. If a message was given to
1097 * the plugin, the continuation will automatically re-schedule
1098 * the 'master' task once the next message might be transmitted.
1199 * 1099 *
1200 * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried 1100 * @param n target peer for which to transmit
1201 * @param target peer to send the message to
1202 * @param success GNUNET_OK on success
1203 */ 1101 */
1204static void 1102static void
1205send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target, 1103try_transmission_to_peer (struct NeighbourMapEntry *n)
1206 int success)
1207{ 1104{
1208 struct ContinutionContext *cc = cls; 1105 struct MessageQueue *mq;
1209 struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer); 1106 struct GNUNET_TIME_Relative timeout;
1210 1107
1211 if (GNUNET_YES != success) 1108 if (NULL == n->primary_address.address)
1212 { 1109 {
1213 GNUNET_assert (strlen (cc->address->transport_name) > 0); 1110 /* no address, why are we here? */
1214 GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); 1111 GNUNET_break (0);
1112 return;
1215 } 1113 }
1216 if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT)) 1114 if ((0 == n->primary_address.address->address_length) &&
1115 (NULL == n->primary_address.session))
1217 { 1116 {
1218 GNUNET_HELLO_address_free (cc->address); 1117 /* no address, why are we here? */
1219 GNUNET_free (cc); 1118 GNUNET_break (0);
1220 return; 1119 return;
1221 } 1120 }
1222 1121 if (NULL != n->is_active)
1223 if ((GNUNET_YES == success) &&
1224 ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1225 { 1122 {
1226 change_state (n, S_CONNECT_SENT); 1123 /* transmission already pending */
1227 GNUNET_HELLO_address_free (cc->address); 1124 return;
1228 GNUNET_free (cc);
1229 return;
1230 } 1125 }
1231 1126
1232 if ((GNUNET_NO == success) && 1127 /* timeout messages from the queue that are past their due date */
1233 ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) 1128 while (NULL != (mq = n->messages_head))
1234 { 1129 {
1235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1130 timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
1236 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n", 1131 if (timeout.rel_value > 0)
1237 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); 1132 break;
1238 change_state (n, S_NOT_CONNECTED); 1133 GNUNET_STATISTICS_update (GST_stats,
1239 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) 1134 gettext_noop
1240 GNUNET_SCHEDULER_cancel (n->ats_suggest); 1135 ("# messages timed out while in transport queue"),
1241 n->ats_suggest = 1136 1, GNUNET_NO);
1242 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel, 1137 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
1243 n); 1138 n->is_active = mq;
1244 GNUNET_ATS_suggest_address (GST_ats, &n->id); 1139 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
1245 } 1140 }
1246 GNUNET_HELLO_address_free (cc->address); 1141 if (NULL == mq)
1247 GNUNET_free (cc); 1142 return; /* no more messages */
1143 GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
1144 n->is_active = mq;
1145 send_with_session (n,
1146 mq->message_buf, mq->message_buf_size,
1147 0 /* priority */, timeout,
1148 &transmit_send_continuation, mq);
1248} 1149}
1249 1150
1250 1151
1152/**
1153 * Send keepalive message to the neighbour. Must only be called
1154 * if we are on 'connected' state. Will internally determine
1155 * if a keepalive is truly needed (so can always be called).
1156 *
1157 * @param n neighbour that went idle and needs a keepalive
1158 */
1251static void 1159static void
1252ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1160send_keepalive (struct NeighbourMapEntry *n)
1253{ 1161{
1254 struct NeighbourMapEntry *n = cls; 1162 struct GNUNET_MessageHeader m;
1255 n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1256 1163
1257 n->ats_suggest = 1164 GNUNET_assert (S_CONNECTED == n->state);
1258 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, 1165 if (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time).rel_value > 0)
1259 n); 1166 return; /* no keepalive needed at this time */
1167 m.size = htons (sizeof (struct GNUNET_MessageHeader));
1168 m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1169 send_with_session (n,
1170 (const void *) &m, sizeof (m),
1171 UINT32_MAX /* priority */,
1172 KEEPALIVE_FREQUENCY,
1173 NULL, NULL);
1174 GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1175 GNUNET_NO);
1176 n->expect_latency_response = GNUNET_YES;
1177 n->last_keep_alive_time = GNUNET_TIME_absolute_get ();
1178 n->keep_alive_time = GNUNET_TIME_relative_to_absolute (KEEPALIVE_FREQUENCY);
1260} 1179}
1261 1180
1262 1181
1263/** 1182/**
1264 * We tried to switch addresses with an peer already connected. If it failed, 1183 * Keep the connection to the given neighbour alive longer,
1265 * we should tell ATS to not use this address anymore (until it is re-validated). 1184 * we received a KEEPALIVE (or equivalent); send a response.
1266 * 1185 *
1267 * @param cls the 'struct NeighbourMapEntry' 1186 * @param neighbour neighbour to keep alive (by sending keep alive response)
1268 * @param target peer to send the message to
1269 * @param success GNUNET_OK on success
1270 */ 1187 */
1271static void 1188void
1272send_switch_address_continuation (void *cls, 1189GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1273 const struct GNUNET_PeerIdentity *target,
1274 int success)
1275{ 1190{
1276 struct ContinutionContext *cc = cls;
1277 struct NeighbourMapEntry *n; 1191 struct NeighbourMapEntry *n;
1192 struct GNUNET_MessageHeader m;
1278 1193
1279 if (neighbours == NULL) 1194 if (NULL == (n = lookup_neighbour (neighbour)))
1280 {
1281 GNUNET_HELLO_address_free (cc->address);
1282 GNUNET_free (cc);
1283 return; /* neighbour is going away */
1284 }
1285
1286 n = lookup_neighbour (&cc->address->peer);
1287 if ((n == NULL) || (is_disconnecting (n)))
1288 {
1289 GNUNET_HELLO_address_free (cc->address);
1290 GNUNET_free (cc);
1291 return; /* neighbour is going away */
1292 }
1293
1294 GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1295 if (GNUNET_YES != success)
1296 { 1195 {
1297 1196 GNUNET_STATISTICS_update (GST_stats,
1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1197 gettext_noop
1299 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n", 1198 ("# KEEPALIVE messages discarded (peer unknown)"),
1300 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session); 1199 1, GNUNET_NO);
1301
1302 GNUNET_assert (strlen (cc->address->transport_name) > 0);
1303 GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1304
1305 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1306 {
1307 GNUNET_SCHEDULER_cancel (n->ats_suggest);
1308 n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1309 }
1310
1311 if (n->state == S_FAST_RECONNECT)
1312 {
1313 if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1314 {
1315 /* Throtteled ATS request for fast reconnect */
1316 struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1317 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318 "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1319 n->ats_request =
1320 GNUNET_SCHEDULER_add_delayed (delay,
1321 ats_request,
1322 n);
1323 }
1324 }
1325 else
1326 {
1327 /* Immediate ATS request for connected peers */
1328 n->ats_suggest =
1329 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1330 n);
1331 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1332 }
1333 GNUNET_HELLO_address_free (cc->address);
1334 GNUNET_free (cc);
1335 return; 1200 return;
1336 } 1201 }
1337 /* Tell ATS that switching addresses was successful */ 1202 if (NULL == n->primary_address.session)
1338 switch (n->state)
1339 { 1203 {
1340 case S_CONNECTED: 1204 GNUNET_STATISTICS_update (GST_stats,
1341 if (n->address_state == FRESH) 1205 gettext_noop
1342 { 1206 ("# KEEPALIVE messages discarded (no session)"),
1343 GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES, __LINE__); 1207 1, GNUNET_NO);
1344 GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0); 1208 return;
1345 GNUNET_break (cc->session == n->session);
1346 GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1347 n->address_state = USED;
1348 }
1349 break;
1350 case S_FAST_RECONNECT:
1351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352 "Successful fast reconnect to peer `%s'\n",
1353 GNUNET_i2s (&n->id));
1354 change_state (n, S_CONNECTED);
1355 neighbours_connected++;
1356 GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1357 GNUNET_NO);
1358
1359 if (n->address_state == FRESH)
1360 {
1361 GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES, __LINE__);
1362 GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1363 GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1364 n->address_state = USED;
1365 }
1366
1367 if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1368 n->keepalive_task =
1369 GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1370
1371 /* Updating quotas */
1372 GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1373 send_outbound_quota (target, n->bandwidth_out);
1374
1375 default:
1376 break;
1377 } 1209 }
1378 GNUNET_HELLO_address_free (cc->address); 1210 /* send reply to allow neighbour to measure latency */
1379 GNUNET_free (cc); 1211 m.size = htons (sizeof (struct GNUNET_MessageHeader));
1212 m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1213 send_with_session(n,
1214 (const void *) &m, sizeof (m),
1215 UINT32_MAX /* priority */,
1216 KEEPALIVE_FREQUENCY,
1217 NULL, NULL);
1380} 1218}
1381 1219
1382 1220
1383/** 1221/**
1384 * We tried to send a SESSION_CONNECT message to another peer. If this 1222 * We received a KEEP_ALIVE_RESPONSE message and use this to calculate
1385 * succeeded, we change the state. If it failed, we should tell 1223 * latency to this peer. Pass the updated information (existing ats
1386 * ATS to not use this address anymore (until it is re-validated). 1224 * plus calculated latency) to ATS.
1387 * 1225 *
1388 * @param cls the 'struct NeighbourMapEntry' 1226 * @param neighbour neighbour to keep alive
1389 * @param target peer to send the message to 1227 * @param ats performance data
1390 * @param success GNUNET_OK on success 1228 * @param ats_count number of entries in ats
1391 */ 1229 */
1392static void 1230void
1393send_connect_ack_continuation (void *cls, 1231GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1394 const struct GNUNET_PeerIdentity *target, 1232 const struct GNUNET_ATS_Information *ats,
1395 int success) 1233 uint32_t ats_count)
1396{ 1234{
1397 struct ContinutionContext *cc = cls;
1398 struct NeighbourMapEntry *n; 1235 struct NeighbourMapEntry *n;
1236 uint32_t latency;
1237 struct GNUNET_ATS_Information ats_new[ats_count + 1];
1399 1238
1400 if (neighbours == NULL) 1239 if (NULL == (n = lookup_neighbour (neighbour)))
1401 {
1402 GNUNET_HELLO_address_free (cc->address);
1403 GNUNET_free (cc);
1404 return; /* neighbour is going away */
1405 }
1406
1407 n = lookup_neighbour (&cc->address->peer);
1408 if ((n == NULL) || (is_disconnecting (n)))
1409 { 1240 {
1410 GNUNET_HELLO_address_free (cc->address); 1241 GNUNET_STATISTICS_update (GST_stats,
1411 GNUNET_free (cc); 1242 gettext_noop
1412 return; /* neighbour is going away */ 1243 ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
1244 1, GNUNET_NO);
1245 return;
1413 } 1246 }
1414 1247 if ( (S_CONNECTED != n->state) ||
1415 if (GNUNET_YES == success) 1248 (GNUNET_YES != n->expect_latency_response) )
1416 { 1249 {
1417 GNUNET_HELLO_address_free (cc->address); 1250 GNUNET_STATISTICS_update (GST_stats,
1418 GNUNET_free (cc); 1251 gettext_noop
1419 return; /* sending successful */ 1252 ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
1253 1, GNUNET_NO);
1254 return;
1420 } 1255 }
1421 1256 n->expect_latency_response = GNUNET_NO;
1422 /* sending failed, ask for next address */ 1257 n->latency = GNUNET_TIME_absolute_get_duration (n->last_keep_alive_time);
1423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1258 n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1424 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n", 1259 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); 1260 "Latency for peer `%s' is %llu ms\n",
1426 if (n->state != S_NOT_CONNECTED) 1261 GNUNET_i2s (&n->id), n->latency.rel_value);
1427 change_state (n, S_NOT_CONNECTED); 1262 memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
1428 GNUNET_assert (strlen (cc->address->transport_name) > 0); 1263 /* append latency */
1429 GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); 1264 ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
1430 1265 if (n->latency.rel_value > UINT32_MAX)
1431 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) 1266 latency = UINT32_MAX;
1432 GNUNET_SCHEDULER_cancel (n->ats_suggest); 1267 else
1433 n->ats_suggest = 1268 latency = n->latency.rel_value;
1434 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, 1269 ats_new[ats_count].value = htonl (latency);
1435 n); 1270 GNUNET_ATS_address_update (GST_ats,
1436 GNUNET_ATS_suggest_address (GST_ats, &n->id); 1271 n->primary_address.address,
1437 GNUNET_HELLO_address_free (cc->address); 1272 n->primary_address.session, ats_new,
1438 GNUNET_free (cc); 1273 ats_count + 1);
1439} 1274}
1440 1275
1441 1276
1442/** 1277/**
1443 * For an existing neighbour record, set the active connection to 1278 * We have received a message from the given sender. How long should
1444 * use the given address. 1279 * we delay before receiving more? (Also used to keep the peer marked
1280 * as live).
1445 * 1281 *
1446 * @param peer identity of the peer to switch the address for 1282 * @param sender sender of the message
1447 * @param address address of the other peer, NULL if other peer 1283 * @param size size of the message
1448 * connected to us 1284 * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1449 * @param session session to use (or NULL) 1285 * GNUNET_NO if the neighbour is not connected or violates the quota,
1450 * @param ats performance data 1286 * GNUNET_SYSERR if the connection is not fully up yet
1451 * @param ats_count number of entries in ats 1287 * @return how long to wait before reading more from this sender
1452 * @param bandwidth_in inbound quota to be used when connection is up
1453 * @param bandwidth_out outbound quota to be used when connection is up
1454 * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1455 * connection is not up (yet)
1456 */ 1288 */
1457int 1289struct GNUNET_TIME_Relative
1458GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, 1290GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1459 const struct GNUNET_HELLO_Address 1291 *sender, ssize_t size, int *do_forward)
1460 *address,
1461 struct Session *session,
1462 const struct GNUNET_ATS_Information *ats,
1463 uint32_t ats_count,
1464 struct GNUNET_BANDWIDTH_Value32NBO
1465 bandwidth_in,
1466 struct GNUNET_BANDWIDTH_Value32NBO
1467 bandwidth_out)
1468{ 1292{
1469 struct NeighbourMapEntry *n; 1293 struct NeighbourMapEntry *n;
1470 struct SessionConnectMessage connect_msg; 1294 struct GNUNET_TIME_Relative ret;
1471 struct ContinutionContext *cc; 1295
1472 size_t msg_len; 1296 if (NULL == neighbours)
1473 size_t ret;
1474
1475 if (neighbours == NULL)
1476 { 1297 {
1477 /* This can happen during shutdown */ 1298 *do_forward = GNUNET_NO;
1478 return GNUNET_NO; 1299 return GNUNET_TIME_UNIT_FOREVER_REL; /* This can happen during shutdown */
1479 } 1300 }
1480 n = lookup_neighbour (peer); 1301 if (NULL == (n = lookup_neighbour (sender)))
1481 if (NULL == n)
1482 return GNUNET_NO;
1483 if (n->state == S_DISCONNECT)
1484 { 1302 {
1485 /* We are disconnecting, nothing to do here */ 1303 GST_neighbours_try_connect (sender);
1486 return GNUNET_NO; 1304 if (NULL == (n = lookup_neighbour (sender)))
1305 {
1306 GNUNET_STATISTICS_update (GST_stats,
1307 gettext_noop
1308 ("# messages discarded due to lack of neighbour record"),
1309 1, GNUNET_NO);
1310 *do_forward = GNUNET_NO;
1311 return GNUNET_TIME_UNIT_ZERO;
1312 }
1487 } 1313 }
1488 GNUNET_assert (address->transport_name != NULL); 1314 if (! test_connected (n))
1489 if ((session == NULL) && (0 == address->address_length))
1490 { 1315 {
1491 GNUNET_break_op (0); 1316 *do_forward = GNUNET_SYSERR;
1492 /* FIXME: is this actually possible? When does this happen? */ 1317 return GNUNET_TIME_UNIT_ZERO;
1493 if (strlen (address->transport_name) > 0)
1494 GNUNET_ATS_address_destroyed (GST_ats, address, session);
1495 GNUNET_ATS_suggest_address (GST_ats, peer);
1496 return GNUNET_NO;
1497 } 1318 }
1498 1319 if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1499 /* checks successful and neighbour != NULL */
1500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1501 "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1502 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1503 session,
1504 GNUNET_i2s (peer),
1505 print_state (n->state));
1506
1507 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1508 { 1320 {
1509 GNUNET_SCHEDULER_cancel (n->ats_suggest); 1321 n->quota_violation_count++;
1510 n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; 1322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1324 n->in_tracker.available_bytes_per_s__,
1325 n->quota_violation_count);
1326 /* Discount 32k per violation */
1327 GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1511 } 1328 }
1512 1329 else
1513
1514 if (n->state == S_FAST_RECONNECT)
1515 { 1330 {
1516 /* Throtteled fast reconnect */ 1331 if (n->quota_violation_count > 0)
1517
1518 if (NULL == n->fast_reconnect_address)
1519 {
1520 n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1521
1522 }
1523 else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1524 { 1332 {
1525 n->fast_reconnect_attempts ++; 1333 /* try to add 32k back */
1526 1334 GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1335 n->quota_violation_count--;
1528 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1529 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1530 } 1336 }
1531 } 1337 }
1532 else 1338 if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1533 {
1534 n->fast_reconnect_attempts = 0;
1535 }
1536
1537 /* do not switch addresses just update quotas */
1538 if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1539 (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1540 (n->session == session))
1541 { 1339 {
1542 n->bandwidth_in = bandwidth_in; 1340 GNUNET_STATISTICS_update (GST_stats,
1543 n->bandwidth_out = bandwidth_out; 1341 gettext_noop
1544 GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); 1342 ("# bandwidth quota violations by other peers"),
1545 send_outbound_quota (peer, n->bandwidth_out); 1343 1, GNUNET_NO);
1546 return GNUNET_NO; 1344 *do_forward = GNUNET_NO;
1345 return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1547 } 1346 }
1548 if (n->state == S_CONNECTED) 1347 *do_forward = GNUNET_YES;
1348 ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1349 if (ret.rel_value > 0)
1549 { 1350 {
1550 /* mark old address as no longer used */ 1351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551 GNUNET_assert (NULL != n->address); 1352 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1552 if (n->address_state == USED) 1353 (unsigned long long) n->in_tracker.
1553 { 1354 consumption_since_last_update__,
1554 GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__); 1355 (unsigned int) n->in_tracker.available_bytes_per_s__,
1555 GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO); 1356 (unsigned long long) ret.rel_value);
1556 n->address_state = UNUSED; 1357 GNUNET_STATISTICS_update (GST_stats,
1557 } 1358 gettext_noop ("# ms throttling suggested"),
1359 (int64_t) ret.rel_value, GNUNET_NO);
1558 } 1360 }
1361 return ret;
1362}
1559 1363
1560 /* set new address */
1561 if (NULL != n->address)
1562 GNUNET_HELLO_address_free (n->address);
1563 n->address = GNUNET_HELLO_address_copy (address);
1564 n->address_state = FRESH;
1565 n->bandwidth_in = bandwidth_in;
1566 n->bandwidth_out = bandwidth_out;
1567 GNUNET_SCHEDULER_cancel (n->timeout_task);
1568 n->timeout_task =
1569 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1570 &neighbour_timeout_task, n);
1571
1572 if (NULL != address_change_cb && n->state == S_CONNECTED)
1573 address_change_cb (callback_cls, &n->id, n->address);
1574 1364
1575 /* Obtain an session for this address from plugin */ 1365/**
1576 struct GNUNET_TRANSPORT_PluginFunctions *papi; 1366 * Transmit a message to the given target using the active connection.
1577 papi = GST_plugins_find (address->transport_name); 1367 *
1368 * @param target destination
1369 * @param msg message to send
1370 * @param msg_size number of bytes in msg
1371 * @param timeout when to fail with timeout
1372 * @param cont function to call when done
1373 * @param cont_cls closure for 'cont'
1374 */
1375void
1376GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1377 size_t msg_size, struct GNUNET_TIME_Relative timeout,
1378 GST_NeighbourSendContinuation cont, void *cont_cls)
1379{
1380 struct NeighbourMapEntry *n;
1381 struct MessageQueue *mq;
1578 1382
1579 if (papi == NULL) 1383 /* All ove these cases should never happen; they are all API violations.
1384 But we check anyway, just to be sure. */
1385 if (NULL == (n = lookup_neighbour (target)))
1580 { 1386 {
1581 /* we don't have the plugin for this address */ 1387 GNUNET_break (0);
1582 GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); 1388 if (NULL != cont)
1583 1389 cont (cont_cls, GNUNET_SYSERR);
1584 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) 1390 return;
1585 GNUNET_SCHEDULER_cancel (n->ats_suggest);
1586 n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1587 ats_suggest_cancel,
1588 n);
1589 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1590 GNUNET_HELLO_address_free (n->address);
1591 n->address = NULL;
1592 n->session = NULL;
1593 return GNUNET_NO;
1594 } 1391 }
1595 1392 if (GNUNET_YES != test_connected (n))
1596 if (session == NULL)
1597 { 1393 {
1598 n->session = papi->get_session (papi->cls, address); 1394 GNUNET_break (0);
1599 /* Session could not be initiated */ 1395 if (NULL != cont)
1600 if (n->session == NULL) 1396 cont (cont_cls, GNUNET_SYSERR);
1601 { 1397 return;
1602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603 "Failed to obtain new session %p for peer `%s' and address '%s'\n",
1604 n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1605
1606 GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1607
1608 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1609 GNUNET_SCHEDULER_cancel (n->ats_suggest);
1610 n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1611 ats_suggest_cancel,
1612 n);
1613 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1614 GNUNET_HELLO_address_free (n->address);
1615 n->address = NULL;
1616 n->session = NULL;
1617 return GNUNET_NO;
1618 }
1619
1620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1621 "Obtained new session %p for peer `%s' and address '%s'\n",
1622 n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1623 /* Telling ATS about new session */
1624 GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1625 } 1398 }
1626 else 1399 if ((NULL == n->primary_address.session) && (NULL == n->primary_address.address))
1627 { 1400 {
1628 n->session = session; 1401 GNUNET_break (0);
1629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1402 if (NULL != cont)
1630 "Using existing session %p for peer `%s' and address '%s'\n", 1403 cont (cont_cls, GNUNET_SYSERR);
1631 n->session, 1404 return;
1632 GNUNET_i2s (&n->id),
1633 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1634 } 1405 }
1635 1406
1636 switch (n->state) 1407 bytes_in_send_queue += msg_size;
1637 { 1408 GNUNET_STATISTICS_set (GST_stats,
1638 case S_NOT_CONNECTED: 1409 gettext_noop
1639 case S_CONNECT_SENT: 1410 ("# bytes in message queue for other peers"),
1640 msg_len = sizeof (struct SessionConnectMessage); 1411 bytes_in_send_queue, GNUNET_NO);
1641 connect_msg.header.size = htons (msg_len); 1412 mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1642 connect_msg.header.type = 1413 mq->cont = cont;
1643 htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); 1414 mq->cont_cls = cont_cls;
1644 connect_msg.reserved = htonl (0); 1415 memcpy (&mq[1], msg, msg_size);
1645 connect_msg.timestamp = 1416 mq->message_buf = (const char *) &mq[1];
1646 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); 1417 mq->message_buf_size = msg_size;
1647 1418 mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1648 cc = GNUNET_malloc (sizeof (struct ContinutionContext)); 1419 GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1649 cc->session = n->session; 1420 if (NULL != n->is_active)
1650 cc->address = GNUNET_HELLO_address_copy (address); 1421 return;
1651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1422 GNUNET_SCHEDULER_cancel (n->task);
1652 "Sending CONNECT message to %s\n", 1423 n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
1653 GNUNET_i2s (&n->id));
1654 if (n->state != S_CONNECT_SENT)
1655 change_state (n, S_CONNECT_SENT);
1656 ret = send_with_session (n,
1657 (const char *) &connect_msg, msg_len,
1658 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1659 &send_connect_continuation, cc);
1660
1661 return GNUNET_NO;
1662 case S_CONNECT_RECV:
1663 /* We received a CONNECT message and asked ATS for an address */
1664 msg_len = sizeof (struct SessionConnectMessage);
1665 connect_msg.header.size = htons (msg_len);
1666 connect_msg.header.type =
1667 htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1668 connect_msg.reserved = htonl (0);
1669 connect_msg.timestamp =
1670 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1671 cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1672 cc->session = n->session;
1673 cc->address = GNUNET_HELLO_address_copy (address);
1674
1675 ret = send_with_session(n,
1676 (const void *) &connect_msg, msg_len,
1677 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1678 &send_connect_ack_continuation,
1679 cc);
1680 return GNUNET_NO;
1681 case S_CONNECTED:
1682 case S_FAST_RECONNECT:
1683 /* connected peer is switching addresses or tries fast reconnect */
1684 msg_len = sizeof (struct SessionConnectMessage);
1685 connect_msg.header.size = htons (msg_len);
1686 connect_msg.header.type =
1687 htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1688 connect_msg.reserved = htonl (0);
1689 connect_msg.timestamp =
1690 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1691 cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1692 cc->session = n->session;
1693 cc->address = GNUNET_HELLO_address_copy (address);
1694 ret = send_with_session(n,
1695 (const void *) &connect_msg, msg_len,
1696 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1697 &send_switch_address_continuation, cc);
1698 if (ret == GNUNET_SYSERR)
1699 {
1700 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1701 "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1702 GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1703 }
1704 return GNUNET_NO;
1705 default:
1706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1707 "Invalid connection state to switch addresses %u \n", n->state);
1708 GNUNET_break_op (0);
1709 return GNUNET_NO;
1710 }
1711} 1424}
1712 1425
1713 1426
1714/** 1427/**
1715 * Obtain current latency information for the given neighbour. 1428 * Send a SESSION_CONNECT message via the given address.
1716 * 1429 *
1717 * @param peer 1430 * @param na address to use
1718 * @return observed latency of the address, FOREVER if the address was
1719 * never successfully validated
1720 */ 1431 */
1721struct GNUNET_TIME_Relative 1432static void
1722GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) 1433send_session_connect (struct NeighbourAddress *na)
1723{ 1434{
1724 struct NeighbourMapEntry *n; 1435 struct GNUNET_TRANSPORT_PluginFunctions *papi;
1725 1436 struct SessionConnectMessage connect_msg;
1726 n = lookup_neighbour (peer); 1437
1727 if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) 1438 if (NULL == (papi = GST_plugins_find (na->address->transport_name)))
1728 return GNUNET_TIME_UNIT_FOREVER_REL; 1439 {
1729 1440 GNUNET_break (0);
1730 return n->latency; 1441 return;
1442 }
1443 if (NULL == na->session)
1444 na->session = papi->get_session (papi->cls, na->address);
1445 if (NULL == na->session)
1446 {
1447 GNUNET_break (0);
1448 return;
1449 }
1450 na->connect_timestamp = GNUNET_TIME_absolute_get ();
1451 connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
1452 connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1453 connect_msg.reserved = htonl (0);
1454 connect_msg.timestamp = GNUNET_TIME_absolute_hton (na->connect_timestamp);
1455 (void) papi->send (papi->cls,
1456 na->session,
1457 (const char *) &connect_msg, sizeof (struct SessionConnectMessage),
1458 UINT_MAX,
1459 GNUNET_TIME_UNIT_FOREVER_REL,
1460 NULL, NULL);
1731} 1461}
1732 1462
1463
1733/** 1464/**
1734 * Obtain current address information for the given neighbour. 1465 * Send a SESSION_CONNECT_ACK message via the given address.
1735 * 1466 *
1736 * @param peer 1467 * @param address address to use
1737 * @return address currently used 1468 * @param session session to use
1469 * @param timestamp timestamp to use for the ACK message
1738 */ 1470 */
1739struct GNUNET_HELLO_Address * 1471static void
1740GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) 1472send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
1473 struct Session *session,
1474 struct GNUNET_TIME_Absolute timestamp)
1741{ 1475{
1742 struct NeighbourMapEntry *n; 1476 struct GNUNET_TRANSPORT_PluginFunctions *papi;
1743 1477 struct SessionConnectMessage connect_msg;
1744 n = lookup_neighbour (peer); 1478
1745 if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) 1479 if (NULL == (papi = GST_plugins_find (address->transport_name)))
1746 return NULL; 1480 {
1747 1481 GNUNET_break (0);
1748 return n->address; 1482 return;
1483 }
1484 if (NULL == session)
1485 session = papi->get_session (papi->cls, address);
1486 if (NULL == session)
1487 {
1488 GNUNET_break (0);
1489 return;
1490 }
1491 connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
1492 connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1493 connect_msg.reserved = htonl (0);
1494 connect_msg.timestamp = GNUNET_TIME_absolute_hton (timestamp);
1495 (void) papi->send (papi->cls,
1496 session,
1497 (const char *) &connect_msg, sizeof (struct SessionConnectMessage),
1498 UINT_MAX,
1499 GNUNET_TIME_UNIT_FOREVER_REL,
1500 NULL, NULL);
1749} 1501}
1750 1502
1751 1503
1752
1753/** 1504/**
1754 * Create an entry in the neighbour map for the given peer 1505 * Create a fresh entry in the neighbour map for the given peer
1755 * 1506 *
1756 * @param peer peer to create an entry for 1507 * @param peer peer to create an entry for
1757 * @return new neighbour map entry 1508 * @return new neighbour map entry
@@ -1762,17 +1513,16 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1762 struct NeighbourMapEntry *n; 1513 struct NeighbourMapEntry *n;
1763 1514
1764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765 "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer)); 1516 "Creating new neighbour entry for `%s'\n",
1517 GNUNET_i2s (peer));
1766 n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); 1518 n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1767 n->id = *peer; 1519 n->id = *peer;
1768 n->state = S_NOT_CONNECTED; 1520 n->state = S_NOT_CONNECTED;
1769 n->latency = GNUNET_TIME_relative_get_forever (); 1521 n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
1770 GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, 1522 GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1771 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 1523 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1772 MAX_BANDWIDTH_CARRY_S); 1524 MAX_BANDWIDTH_CARRY_S);
1773 n->timeout_task = 1525 n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
1774 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1775 &neighbour_timeout_task, n);
1776 GNUNET_assert (GNUNET_OK == 1526 GNUNET_assert (GNUNET_OK ==
1777 GNUNET_CONTAINER_multihashmap_put (neighbours, 1527 GNUNET_CONTAINER_multihashmap_put (neighbours,
1778 &n->id.hashPubKey, n, 1528 &n->id.hashPubKey, n,
@@ -1782,6 +1532,32 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1782 1532
1783 1533
1784/** 1534/**
1535 * Check if the two given addresses are the same.
1536 * Actually only checks if the sessions are non-NULL
1537 * (which they should be) and then if they are identical;
1538 * the actual addresses don't matter if the session
1539 * pointers match anyway, and we must have session pointers
1540 * at this time.
1541 *
1542 * @param a1 first address to compare
1543 * @param a2 other address to compare
1544 * @return GNUNET_NO if the addresses do not match, GNUNET_YES if they do match
1545 */
1546static int
1547address_matches (const struct NeighbourAddress *a1,
1548 const struct NeighbourAddress *a2)
1549{
1550 if ( (NULL == a1->session) ||
1551 (NULL == a2->session) )
1552 {
1553 GNUNET_break (0);
1554 return 0;
1555 }
1556 return (a1->session == a2->session) ? GNUNET_YES : GNUNET_NO;
1557}
1558
1559
1560/**
1785 * Try to create a connection to the given target (eventually). 1561 * Try to create a connection to the given target (eventually).
1786 * 1562 *
1787 * @param target peer to try to connect to 1563 * @param target peer to try to connect to
@@ -1791,562 +1567,1169 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1791{ 1567{
1792 struct NeighbourMapEntry *n; 1568 struct NeighbourMapEntry *n;
1793 1569
1794 // This can happen during shutdown 1570 if (NULL == neighbours)
1795 if (neighbours == NULL) 1571 return; /* during shutdown, do nothing */
1796 { 1572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797 return; 1573 "Asked to connect to peer `%s'\n",
1798 }
1799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1800 GNUNET_i2s (target)); 1574 GNUNET_i2s (target));
1801 if (0 == 1575 if (0 ==
1802 memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity))) 1576 memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1803 { 1577 {
1804 /* my own hello */ 1578 /* refuse to connect to myself */
1579 /* FIXME: can this happen? Is this not an API violation? */
1580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1581 "Refusing to try to connect to myself.\n");
1805 return; 1582 return;
1806 } 1583 }
1807 n = lookup_neighbour (target); 1584 n = lookup_neighbour (target);
1808
1809 if (NULL != n) 1585 if (NULL != n)
1810 { 1586 {
1811 if ((S_CONNECTED == n->state) || (is_connecting (n))) 1587 switch (n->state)
1812 return; /* already connecting or connected */ 1588 {
1813 if (is_disconnecting (n)) 1589 case S_NOT_CONNECTED:
1814 change_state (n, S_NOT_CONNECTED); 1590 /* this should not be possible */
1591 GNUNET_break (0);
1592 free_neighbour (n);
1593 break;
1594 case S_INIT_ATS:
1595 case S_INIT_BLACKLIST:
1596 case S_CONNECT_SENT:
1597 case S_CONNECT_RECV_ATS:
1598 case S_CONNECT_RECV_BLACKLIST:
1599 case S_CONNECT_RECV_ACK:
1600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601 "Ignoring request to try to connect to `%s', already trying!\n",
1602 GNUNET_i2s (target));
1603 return; /* already trying */
1604 case S_CONNECTED:
1605 case S_RECONNECT_ATS:
1606 case S_RECONNECT_BLACKLIST:
1607 case S_RECONNECT_SENT:
1608 case S_CONNECTED_SWITCHING_BLACKLIST:
1609 case S_CONNECTED_SWITCHING_CONNECT_SENT:
1610 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1611 "Ignoring request to try to connect, already connected to `%s'!\n",
1612 GNUNET_i2s (target));
1613 return; /* already connected */
1614 case S_DISCONNECT:
1615 /* get rid of remains, ready to re-try immediately */
1616 free_neighbour (n);
1617 break;
1618 case S_DISCONNECT_FINISHED:
1619 /* should not be possible */
1620 GNUNET_assert (0);
1621 default:
1622 GNUNET_break (0);
1623 free_neighbour (n);
1624 break;
1625 }
1815 } 1626 }
1816 1627 n = setup_neighbour (target);
1817 1628 n->state = S_INIT_ATS;
1818 if (n == NULL) 1629 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
1819 n = setup_neighbour (target); 1630 GNUNET_ATS_suggest_address (GST_ats, target);
1820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1821 "Asking ATS for suggested address to connect to peer `%s'\n",
1822 GNUNET_i2s (&n->id));
1823 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1824} 1631}
1825 1632
1633
1826/** 1634/**
1827 * Test if we're connected to the given peer. 1635 * Function called with the result of a blacklist check.
1828 * 1636 *
1829 * @param target peer to test 1637 * @param cls closure with the 'struct BlackListCheckContext'
1830 * @return GNUNET_YES if we are connected, GNUNET_NO if not 1638 * @param peer peer this check affects
1639 * @param result GNUNET_OK if the address is allowed
1831 */ 1640 */
1832int 1641static void
1833GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) 1642handle_test_blacklist_cont (void *cls,
1643 const struct GNUNET_PeerIdentity *peer,
1644 int result)
1834{ 1645{
1646 struct BlackListCheckContext *bcc = cls;
1835 struct NeighbourMapEntry *n; 1647 struct NeighbourMapEntry *n;
1836 1648
1837 // This can happen during shutdown 1649 bcc->bc = NULL;
1838 if (neighbours == NULL) 1650 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1651 "Connection to new address of peer `%s' based on blacklist is `%s'\n",
1652 GNUNET_i2s (peer),
1653 (GNUNET_OK == result) ? "allowed" : "FORBIDDEN");
1654 if (GNUNET_OK == result)
1839 { 1655 {
1840 return GNUNET_NO; 1656 /* valid new address, let ATS know! */
1657 GNUNET_ATS_address_update (GST_ats,
1658 bcc->na.address,
1659 bcc->na.session,
1660 bcc->ats, bcc->ats_count);
1661 }
1662 if (NULL == (n = lookup_neighbour (peer)))
1663 goto cleanup; /* nobody left to care about new address */
1664 switch (n->state)
1665 {
1666 case S_NOT_CONNECTED:
1667 /* this should not be possible */
1668 GNUNET_break (0);
1669 free_neighbour (n);
1670 break;
1671 case S_INIT_ATS:
1672 /* still waiting on ATS suggestion */
1673 break;
1674 case S_INIT_BLACKLIST:
1675 /* check if the address the blacklist was fine with matches
1676 ATS suggestion, if so, we can move on! */
1677 if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address))
1678 break; /* result for an address we currently don't care about */
1679 if (GNUNET_OK == result)
1680 {
1681 n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEOUT);
1682 n->state = S_CONNECT_SENT;
1683 send_session_connect (&n->primary_address);
1684 if (1 == n->send_connect_ack)
1685 {
1686 n->send_connect_ack = 2;
1687 send_session_connect_ack_message (bcc->na.address,
1688 bcc->na.session,
1689 n->connect_ack_timestamp);
1690 }
1691 }
1692 else
1693 {
1694 GNUNET_ATS_address_destroyed (GST_ats,
1695 bcc->na.address,
1696 NULL);
1697 n->state = S_INIT_ATS;
1698 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
1699 // FIXME: do we need to ask ATS again for suggestions?
1700 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1701 }
1702 break;
1703 case S_CONNECT_SENT:
1704 /* waiting on CONNECT_ACK, don't care about blacklist */
1705 break;
1706 case S_CONNECT_RECV_ATS:
1707 /* still waiting on ATS suggestion, don't care about blacklist */
1708 break;
1709 case S_CONNECT_RECV_BLACKLIST:
1710 if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address))
1711 break; /* result for an address we currently don't care about */
1712 if (GNUNET_OK == result)
1713 {
1714 n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEOUT);
1715 n->state = S_CONNECT_RECV_ACK;
1716 send_session_connect_ack_message (bcc->na.address,
1717 bcc->na.session,
1718 n->connect_ack_timestamp);
1719 if (1 == n->send_connect_ack)
1720 n->send_connect_ack = 2;
1721 }
1722 else
1723 {
1724 GNUNET_ATS_address_destroyed (GST_ats,
1725 bcc->na.address,
1726 NULL);
1727 n->state = S_INIT_ATS;
1728 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
1729 // FIXME: do we need to ask ATS again for suggestions?
1730 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1731 }
1732 break;
1733 case S_CONNECT_RECV_ACK:
1734 /* waiting on SESSION_ACK, don't care about blacklist */
1735 break;
1736 case S_CONNECTED:
1737 /* already connected, don't care about blacklist */
1738 break;
1739 case S_RECONNECT_ATS:
1740 /* still waiting on ATS suggestion, don't care about blacklist */
1741 break;
1742 case S_RECONNECT_BLACKLIST:
1743 if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address))
1744 break; /* result for an address we currently don't care about */
1745 if (GNUNET_OK == result)
1746 {
1747 send_session_connect (&n->primary_address);
1748 n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT);
1749 n->state = S_RECONNECT_SENT;
1750 if (1 == n->send_connect_ack)
1751 {
1752 n->send_connect_ack = 2;
1753 send_session_connect_ack_message (bcc->na.address,
1754 bcc->na.session,
1755 n->connect_ack_timestamp);
1756 }
1757 }
1758 else
1759 {
1760 GNUNET_ATS_address_destroyed (GST_ats,
1761 bcc->na.address,
1762 NULL);
1763 n->state = S_RECONNECT_ATS;
1764 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
1765 // FIXME: do we need to ask ATS again for suggestions?
1766 GNUNET_ATS_suggest_address (GST_ats, &n->id);
1767 }
1768 break;
1769 case S_RECONNECT_SENT:
1770 /* waiting on CONNECT_ACK, don't care about blacklist */
1771 break;
1772 case S_CONNECTED_SWITCHING_BLACKLIST:
1773 if (GNUNET_YES != address_matches (&bcc->na, &n->alternative_address))
1774 break; /* result for an address we currently don't care about */
1775 if (GNUNET_OK == result)
1776 {
1777 send_session_connect (&n->alternative_address);
1778 n->state = S_CONNECTED_SWITCHING_CONNECT_SENT;
1779 }
1780 else
1781 {
1782 GNUNET_ATS_address_destroyed (GST_ats,
1783 bcc->na.address,
1784 NULL);
1785 free_address (&n->alternative_address);
1786 n->state = S_CONNECTED;
1787 }
1788 break;
1789 case S_CONNECTED_SWITCHING_CONNECT_SENT:
1790 /* waiting on CONNECT_ACK, don't care about blacklist */
1791 break;
1792 case S_DISCONNECT:
1793 /* Nothing to do here, ATS will already do what can be done */
1794 break;
1795 case S_DISCONNECT_FINISHED:
1796 /* should not be possible */
1797 GNUNET_assert (0);
1798 break;
1799 default:
1800 GNUNET_break (0);
1801 free_neighbour (n);
1802 break;
1841 } 1803 }
1804 cleanup:
1805 GNUNET_HELLO_address_free (bcc->na.address);
1806 GNUNET_CONTAINER_DLL_remove (bc_head,
1807 bc_tail,
1808 bcc);
1809 GNUNET_free (bcc);
1810}
1842 1811
1843 n = lookup_neighbour (target);
1844 1812
1845 if ((NULL == n) || (S_CONNECTED != n->state)) 1813/**
1846 return GNUNET_NO; /* not connected */ 1814 * We want to know if connecting to a particular peer via
1847 return GNUNET_YES; 1815 * a particular address is allowed. Check it!
1816 *
1817 * @param peer identity of the peer to switch the address for
1818 * @param ts time at which the check was initiated
1819 * @param address address of the other peer, NULL if other peer
1820 * connected to us
1821 * @param session session to use (or NULL)
1822 * @param ats performance data
1823 * @param ats_count number of entries in ats (excluding 0-termination)
1824 */
1825static void
1826check_blacklist (const struct GNUNET_PeerIdentity *peer,
1827 struct GNUNET_TIME_Absolute ts,
1828 const struct GNUNET_HELLO_Address *address,
1829 struct Session *session,
1830 const struct GNUNET_ATS_Information *ats,
1831 uint32_t ats_count)
1832{
1833 struct BlackListCheckContext *bcc;
1834 struct GST_BlacklistCheck *bc;
1835
1836 bcc =
1837 GNUNET_malloc (sizeof (struct BlackListCheckContext) +
1838 sizeof (struct GNUNET_ATS_Information) * ats_count);
1839 bcc->ats_count = ats_count;
1840 bcc->na.address = GNUNET_HELLO_address_copy (address);
1841 bcc->na.session = session;
1842 bcc->na.connect_timestamp = ts;
1843 bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
1844 memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
1845 GNUNET_CONTAINER_DLL_insert (bc_head,
1846 bc_tail,
1847 bcc);
1848 if (NULL != (bc = GST_blacklist_test_allowed (peer,
1849 address->transport_name,
1850 &handle_test_blacklist_cont, bcc)))
1851 bcc->bc = bc;
1852 /* if NULL == bc, 'cont' was already called and 'bcc' already free'd, so
1853 we must only store 'bc' if 'bc' is non-NULL... */
1848} 1854}
1849 1855
1856
1850/** 1857/**
1851 * A session was terminated. Take note. 1858 * We received a 'SESSION_CONNECT' message from the other peer.
1859 * Consider switching to it.
1852 * 1860 *
1853 * @param peer identity of the peer where the session died 1861 * @param message possibly a 'struct SessionConnectMessage' (check format)
1854 * @param session session that is gone 1862 * @param peer identity of the peer to switch the address for
1863 * @param address address of the other peer, NULL if other peer
1864 * connected to us
1865 * @param session session to use (or NULL)
1866 * @param ats performance data
1867 * @param ats_count number of entries in ats (excluding 0-termination)
1855 */ 1868 */
1856void 1869void
1857GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, 1870GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
1858 struct Session *session) 1871 const struct GNUNET_PeerIdentity *peer,
1872 const struct GNUNET_HELLO_Address *address,
1873 struct Session *session,
1874 const struct GNUNET_ATS_Information *ats,
1875 uint32_t ats_count)
1859{ 1876{
1877 const struct SessionConnectMessage *scm;
1860 struct NeighbourMapEntry *n; 1878 struct NeighbourMapEntry *n;
1879 struct GNUNET_TIME_Absolute ts;
1861 1880
1862 if (neighbours == NULL) 1881 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1882 "Received CONNECT message from peer `%s'\n",
1883 GNUNET_i2s (peer));
1884 if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1863 { 1885 {
1864 /* This can happen during shutdown */ 1886 GNUNET_break_op (0);
1865 return; 1887 return;
1866 } 1888 }
1867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p to peer `%s' ended \n", 1889 if (NULL == neighbours)
1868 session, GNUNET_i2s (peer)); 1890 return; /* we're shutting down */
1891 scm = (const struct SessionConnectMessage *) message;
1892 GNUNET_break_op (0 == ntohl (scm->reserved));
1893 ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
1869 n = lookup_neighbour (peer); 1894 n = lookup_neighbour (peer);
1870 if (NULL == n) 1895 if (NULL == n)
1871 return; 1896 n = setup_neighbour (peer);
1872 if (session != n->session) 1897 n->send_connect_ack = 1;
1873 return; /* doesn't affect us */ 1898 n->connect_ack_timestamp = ts;
1874 if (n->state == S_CONNECTED) 1899 switch (n->state)
1875 { 1900 {
1876 if (n->address_state == USED) 1901 case S_NOT_CONNECTED:
1877 { 1902 n->state = S_CONNECT_RECV_ATS;
1878 GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__); 1903 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
1879 GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO); 1904 GNUNET_ATS_suggest_address (GST_ats, peer);
1880 n->address_state = UNUSED; 1905 check_blacklist (peer, ts, address, session, ats, ats_count);
1881 1906 break;
1882 } 1907 case S_INIT_ATS:
1883 } 1908 case S_INIT_BLACKLIST:
1884 1909 case S_CONNECT_SENT:
1885 if (NULL != n->address) 1910 case S_CONNECT_RECV_ATS:
1886 { 1911 case S_CONNECT_RECV_BLACKLIST:
1887 GNUNET_HELLO_address_free (n->address); 1912 case S_CONNECT_RECV_ACK:
1888 n->address = NULL; 1913 /* It can never hurt to have an alternative address in the above cases,
1889 } 1914 see if it is allowed */
1890 n->session = NULL; 1915 check_blacklist (peer, ts, address, session, ats, ats_count);
1891 1916 break;
1892 /* not connected anymore anyway, shouldn't matter */ 1917 case S_CONNECTED:
1893 if (S_CONNECTED != n->state) 1918 /* we are already connected and can thus send the ACK immediately;
1894 return; 1919 still, it can never hurt to have an alternative address, so also
1895 1920 tell ATS about it */
1896 if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK) 1921 GNUNET_assert (NULL != n->primary_address.address);
1897 { 1922 GNUNET_assert (NULL != n->primary_address.session);
1898 GNUNET_SCHEDULER_cancel (n->keepalive_task); 1923 n->send_connect_ack = 0;
1899 n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; 1924 send_session_connect_ack_message (n->primary_address.address,
1900 n->expect_latency_response = GNUNET_NO; 1925 n->primary_address.session, ts);
1926 check_blacklist (peer, ts, address, session, ats, ats_count);
1927 break;
1928 case S_RECONNECT_ATS:
1929 case S_RECONNECT_BLACKLIST:
1930 case S_RECONNECT_SENT:
1931 /* It can never hurt to have an alternative address in the above cases,
1932 see if it is allowed */
1933 check_blacklist (peer, ts, address, session, ats, ats_count);
1934 break;
1935 case S_CONNECTED_SWITCHING_BLACKLIST:
1936 case S_CONNECTED_SWITCHING_CONNECT_SENT:
1937 /* we are already connected and can thus send the ACK immediately;
1938 still, it can never hurt to have an alternative address, so also
1939 tell ATS about it */
1940 GNUNET_assert (NULL != n->primary_address.address);
1941 GNUNET_assert (NULL != n->primary_address.session);
1942 n->send_connect_ack = 0;
1943 send_session_connect_ack_message (n->primary_address.address,
1944 n->primary_address.session, ts);
1945 check_blacklist (peer, ts, address, session, ats, ats_count);
1946 break;
1947 case S_DISCONNECT:
1948 /* get rid of remains, ready to re-try */
1949 free_neighbour (n);
1950 n = setup_neighbour (peer);
1951 n->state = S_CONNECT_RECV_ATS;
1952 GNUNET_ATS_suggest_address (GST_ats, peer);
1953 check_blacklist (peer, ts, address, session, ats, ats_count);
1954 break;
1955 case S_DISCONNECT_FINISHED:
1956 /* should not be possible */
1957 GNUNET_assert (0);
1958 break;
1959 default:
1960 GNUNET_break (0);
1961 free_neighbour (n);
1962 break;
1901 } 1963 }
1902
1903 /* connected, try fast reconnect */
1904 /* statistics "transport" : "# peers connected" -= 1
1905 * neighbours_connected -= 1
1906 * BUT: no disconnect_cb to notify clients about disconnect
1907 */
1908
1909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1910 GNUNET_i2s (peer));
1911
1912 GNUNET_assert (neighbours_connected > 0);
1913 change_state (n, S_FAST_RECONNECT);
1914 neighbours_connected--;
1915 GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1916 GNUNET_NO);
1917
1918
1919 /* We are connected, so ask ATS to switch addresses */
1920 GNUNET_SCHEDULER_cancel (n->timeout_task);
1921 n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1922 &neighbour_timeout_task, n);
1923 /* try QUICKLY to re-establish a connection, reduce timeout! */
1924 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1925 GNUNET_SCHEDULER_cancel (n->ats_suggest);
1926 n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1927 &ats_suggest_cancel,
1928 n);
1929 GNUNET_ATS_suggest_address (GST_ats, peer);
1930} 1964}
1931 1965
1932 1966
1933/** 1967/**
1934 * Transmit a message to the given target using the active connection. 1968 * For an existing neighbour record, set the active connection to
1969 * use the given address.
1935 * 1970 *
1936 * @param target destination 1971 * @param peer identity of the peer to switch the address for
1937 * @param msg message to send 1972 * @param address address of the other peer, NULL if other peer
1938 * @param msg_size number of bytes in msg 1973 * connected to us
1939 * @param timeout when to fail with timeout 1974 * @param session session to use (or NULL)
1940 * @param cont function to call when done 1975 * @param ats performance data
1941 * @param cont_cls closure for 'cont' 1976 * @param ats_count number of entries in ats
1977 * @param bandwidth_in inbound quota to be used when connection is up
1978 * @param bandwidth_out outbound quota to be used when connection is up
1942 */ 1979 */
1943void 1980void
1944GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, 1981GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1945 size_t msg_size, struct GNUNET_TIME_Relative timeout, 1982 const struct GNUNET_HELLO_Address *address,
1946 GST_NeighbourSendContinuation cont, void *cont_cls) 1983 struct Session *session,
1984 const struct GNUNET_ATS_Information *ats,
1985 uint32_t ats_count,
1986 struct GNUNET_BANDWIDTH_Value32NBO
1987 bandwidth_in,
1988 struct GNUNET_BANDWIDTH_Value32NBO
1989 bandwidth_out)
1947{ 1990{
1948 struct NeighbourMapEntry *n; 1991 struct NeighbourMapEntry *n;
1949 struct MessageQueue *mq; 1992 struct GNUNET_TRANSPORT_PluginFunctions *papi;
1993
1994 GNUNET_assert (address->transport_name != NULL);
1995 if (NULL == (n = lookup_neighbour (peer)))
1996 return;
1950 1997
1951 // This can happen during shutdown 1998 /* Obtain an session for this address from plugin */
1952 if (neighbours == NULL) 1999 if (NULL == (papi = GST_plugins_find (address->transport_name)))
1953 { 2000 {
2001 /* we don't have the plugin for this address */
2002 GNUNET_ATS_address_destroyed (GST_ats, address, NULL);
1954 return; 2003 return;
1955 } 2004 }
1956 2005 if ((NULL == session) && (0 == address->address_length))
1957 n = lookup_neighbour (target);
1958 if ((n == NULL) || (!is_connected (n)))
1959 { 2006 {
1960 GNUNET_STATISTICS_update (GST_stats, 2007 GNUNET_break (0);
1961 gettext_noop 2008 if (strlen (address->transport_name) > 0)
1962 ("# messages not sent (no such peer or not connected)"), 2009 GNUNET_ATS_address_destroyed (GST_ats, address, session);
1963 1, GNUNET_NO);
1964 if (n == NULL)
1965 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1966 "Could not send message to peer `%s': unknown neighbour",
1967 GNUNET_i2s (target));
1968 else if (!is_connected (n))
1969 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970 "Could not send message to peer `%s': not connected\n",
1971 GNUNET_i2s (target));
1972 if (NULL != cont)
1973 cont (cont_cls, GNUNET_SYSERR);
1974 return; 2010 return;
1975 } 2011 }
1976 2012 if (NULL == session)
1977 if ((n->session == NULL) && (n->address == NULL)) 2013 session = papi->get_session (papi->cls, address);
2014 if (NULL == session)
1978 { 2015 {
1979 GNUNET_STATISTICS_update (GST_stats,
1980 gettext_noop
1981 ("# messages not sent (no such peer or not connected)"),
1982 1, GNUNET_NO);
1983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984 "Could not send message to peer `%s': no address available\n", 2017 "Failed to obtain new session for peer `%s' and address '%s'\n",
1985 GNUNET_i2s (target)); 2018 GNUNET_i2s (&address->peer), GST_plugins_a2s (address));
1986 if (NULL != cont) 2019 GNUNET_ATS_address_destroyed (GST_ats, address, NULL);
1987 cont (cont_cls, GNUNET_SYSERR);
1988 return; 2020 return;
1989 } 2021 }
1990 2022 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991 GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); 2023 "ATS tells us to switch to address '%s' for peer `%s'\n",
1992 bytes_in_send_queue += msg_size; 2024 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1993 GNUNET_STATISTICS_set (GST_stats, 2025 GNUNET_i2s (peer));
1994 gettext_noop 2026 switch (n->state)
1995 ("# bytes in message queue for other peers"), 2027 {
1996 bytes_in_send_queue, GNUNET_NO); 2028 case S_NOT_CONNECTED:
1997 mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); 2029 GNUNET_break (0);
1998 mq->cont = cont; 2030 free_neighbour (n);
1999 mq->cont_cls = cont_cls; 2031 return;
2000 /* FIXME: this memcpy can be up to 7% of our total runtime! */ 2032 case S_INIT_ATS:
2001 memcpy (&mq[1], msg, msg_size); 2033 set_address (&n->primary_address,
2002 mq->message_buf = (const char *) &mq[1]; 2034 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2003 mq->message_buf_size = msg_size; 2035 n->state = S_INIT_BLACKLIST;
2004 mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); 2036 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2005 GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); 2037 check_blacklist (&n->id,
2006 2038 n->connect_ack_timestamp,
2007 if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && 2039 address, session, ats, ats_count);
2008 (NULL == n->is_active)) 2040 break;
2009 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); 2041 case S_INIT_BLACKLIST:
2042 /* ATS suggests a different address, switch again */
2043 set_address (&n->primary_address,
2044 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2045 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2046 check_blacklist (&n->id,
2047 n->connect_ack_timestamp,
2048 address, session, ats, ats_count);
2049 break;
2050 case S_CONNECT_SENT:
2051 /* ATS suggests a different address, switch again */
2052 set_address (&n->primary_address,
2053 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2054 n->state = S_INIT_BLACKLIST;
2055 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2056 check_blacklist (&n->id,
2057 n->connect_ack_timestamp,
2058 address, session, ats, ats_count);
2059 break;
2060 case S_CONNECT_RECV_ATS:
2061 set_address (&n->primary_address,
2062 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2063 n->state = S_CONNECT_RECV_BLACKLIST;
2064 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2065 check_blacklist (&n->id,
2066 n->connect_ack_timestamp,
2067 address, session, ats, ats_count);
2068 break;
2069 case S_CONNECT_RECV_BLACKLIST:
2070 /* ATS asks us to switch while we were trying to connect; switch to new
2071 address and check blacklist again */
2072 set_address (&n->primary_address,
2073 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2074 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2075 check_blacklist (&n->id,
2076 n->connect_ack_timestamp,
2077 address, session, ats, ats_count);
2078 break;
2079 case S_CONNECTED:
2080 GNUNET_assert (NULL != n->primary_address.address);
2081 GNUNET_assert (NULL != n->primary_address.session);
2082 if (n->primary_address.session == session)
2083 {
2084 /* not an address change, just a quota change */
2085 set_address (&n->primary_address,
2086 address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
2087 break;
2088 }
2089 /* ATS asks us to switch a life connection; see if we can get
2090 a CONNECT_ACK on it before we actually do this! */
2091 set_address (&n->alternative_address,
2092 address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
2093 n->state = S_CONNECTED_SWITCHING_BLACKLIST;
2094 check_blacklist (&n->id,
2095 GNUNET_TIME_absolute_get (),
2096 address, session, ats, ats_count);
2097 break;
2098 case S_RECONNECT_ATS:
2099 set_address (&n->primary_address,
2100 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2101 n->state = S_RECONNECT_BLACKLIST;
2102 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2103 check_blacklist (&n->id,
2104 n->connect_ack_timestamp,
2105 address, session, ats, ats_count);
2106 break;
2107 case S_RECONNECT_BLACKLIST:
2108 /* ATS asks us to switch while we were trying to reconnect; switch to new
2109 address and check blacklist again */
2110 set_address (&n->primary_address,
2111 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2112 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2113 check_blacklist (&n->id,
2114 n->connect_ack_timestamp,
2115 address, session, ats, ats_count);
2116 break;
2117 case S_RECONNECT_SENT:
2118 /* ATS asks us to switch while we were trying to reconnect; switch to new
2119 address and check blacklist again */
2120 set_address (&n->primary_address,
2121 address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
2122 n->state = S_RECONNECT_BLACKLIST;
2123 n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
2124 check_blacklist (&n->id,
2125 n->connect_ack_timestamp,
2126 address, session, ats, ats_count);
2127 break;
2128 case S_CONNECTED_SWITCHING_BLACKLIST:
2129 if (n->primary_address.session == session)
2130 {
2131 /* ATS switches back to still-active session */
2132 free_address (&n->alternative_address);
2133 n->state = S_CONNECTED;
2134 break;
2135 }
2136 /* ATS asks us to switch a life connection, update blacklist check */
2137 set_address (&n->alternative_address,
2138 address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
2139 check_blacklist (&n->id,
2140 GNUNET_TIME_absolute_get (),
2141 address, session, ats, ats_count);
2142 break;
2143 case S_CONNECTED_SWITCHING_CONNECT_SENT:
2144 if (n->primary_address.session == session)
2145 {
2146 /* ATS switches back to still-active session */
2147 free_address (&n->alternative_address);
2148 n->state = S_CONNECTED;
2149 break;
2150 }
2151 /* ATS asks us to switch a life connection, update blacklist check */
2152 set_address (&n->alternative_address,
2153 address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
2154 n->state = S_CONNECTED_SWITCHING_BLACKLIST;
2155 check_blacklist (&n->id,
2156 GNUNET_TIME_absolute_get (),
2157 address, session, ats, ats_count);
2158 break;
2159 case S_DISCONNECT:
2160 /* not going to switch addresses while disconnecting */
2161 return;
2162 case S_DISCONNECT_FINISHED:
2163 GNUNET_assert (0);
2164 break;
2165 default:
2166 GNUNET_break (0);
2167 break;
2168 }
2010} 2169}
2011 2170
2012 2171
2013/** 2172/**
2014 * We have received a message from the given sender. How long should 2173 * Master task run for every neighbour. Performs all of the time-related
2015 * we delay before receiving more? (Also used to keep the peer marked 2174 * activities (keep alive, send next message, disconnect if idle, finish
2016 * as live). 2175 * clean up after disconnect).
2017 * 2176 *
2018 * @param sender sender of the message 2177 * @param cls the 'struct NeighbourMapEntry' for which we are running
2019 * @param size size of the message 2178 * @param tc scheduler context (unused)
2020 * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2021 * GNUNET_NO if the neighbour is not connected or violates the quota,
2022 * GNUNET_SYSERR if the connection is not fully up yet
2023 * @return how long to wait before reading more from this sender
2024 */ 2179 */
2025struct GNUNET_TIME_Relative 2180static void
2026GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity 2181master_task (void *cls,
2027 *sender, ssize_t size, int *do_forward) 2182 const struct GNUNET_SCHEDULER_TaskContext *tc)
2028{ 2183{
2029 struct NeighbourMapEntry *n; 2184 struct NeighbourMapEntry *n = cls;
2030 struct GNUNET_TIME_Relative ret; 2185 struct GNUNET_TIME_Relative delay;
2031
2032 // This can happen during shutdown
2033 if (neighbours == NULL)
2034 {
2035 return GNUNET_TIME_UNIT_FOREVER_REL;
2036 }
2037 2186
2038 n = lookup_neighbour (sender); 2187 n->task = GNUNET_SCHEDULER_NO_TASK;
2039 if (n == NULL) 2188 delay = GNUNET_TIME_absolute_get_remaining (n->timeout);
2189 switch (n->state)
2040 { 2190 {
2041 GST_neighbours_try_connect (sender); 2191 case S_NOT_CONNECTED:
2042 n = lookup_neighbour (sender); 2192 /* invalid state for master task, clean up */
2043 if (NULL == n) 2193 GNUNET_break (0);
2194 n->state = S_DISCONNECT_FINISHED;
2195 free_neighbour (n);
2196 return;
2197 case S_INIT_ATS:
2198 if (0 == delay.rel_value)
2044 { 2199 {
2045 GNUNET_STATISTICS_update (GST_stats, 2200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2046 gettext_noop 2201 "Connection to `%s' timed out waiting for ATS to provide address\n",
2047 ("# messages discarded due to lack of neighbour record"), 2202 GNUNET_i2s (&n->id));
2048 1, GNUNET_NO); 2203 n->state = S_DISCONNECT_FINISHED;
2049 *do_forward = GNUNET_NO; 2204 free_neighbour (n);
2050 return GNUNET_TIME_UNIT_ZERO; 2205 return;
2051 } 2206 }
2052 } 2207 break;
2053 if (!is_connected (n)) 2208 case S_INIT_BLACKLIST:
2054 { 2209 if (0 == delay.rel_value)
2055 *do_forward = GNUNET_SYSERR;
2056 return GNUNET_TIME_UNIT_ZERO;
2057 }
2058 if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2059 {
2060 n->quota_violation_count++;
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2063 n->in_tracker.available_bytes_per_s__,
2064 n->quota_violation_count);
2065 /* Discount 32k per violation */
2066 GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2067 }
2068 else
2069 {
2070 if (n->quota_violation_count > 0)
2071 { 2210 {
2072 /* try to add 32k back */ 2211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2073 GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); 2212 "Connection to `%s' timed out waiting for BLACKLIST to approve address\n",
2074 n->quota_violation_count--; 2213 GNUNET_i2s (&n->id));
2214 n->state = S_DISCONNECT_FINISHED;
2215 free_neighbour (n);
2216 return;
2075 } 2217 }
2076 } 2218 break;
2077 if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) 2219 case S_CONNECT_SENT:
2078 { 2220 if (0 == delay.rel_value)
2079 GNUNET_STATISTICS_update (GST_stats, 2221 {
2080 gettext_noop 2222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2081 ("# bandwidth quota violations by other peers"), 2223 "Connection to `%s' timed out waiting for other peer to send CONNECT_ACK\n",
2082 1, GNUNET_NO); 2224 GNUNET_i2s (&n->id));
2083 *do_forward = GNUNET_NO; 2225 disconnect_neighbour (n);
2084 return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; 2226 return;
2085 } 2227 }
2086 *do_forward = GNUNET_YES; 2228 break;
2087 ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); 2229 case S_CONNECT_RECV_ATS:
2088 if (ret.rel_value > 0) 2230 if (0 == delay.rel_value)
2089 { 2231 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2233 "Connection to `%s' timed out waiting ATS to provide address to use for CONNECT_ACK\n",
2234 GNUNET_i2s (&n->id));
2235 n->state = S_DISCONNECT_FINISHED;
2236 free_neighbour (n);
2237 return;
2238 }
2239 break;
2240 case S_CONNECT_RECV_BLACKLIST:
2241 if (0 == delay.rel_value)
2242 {
2243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244 "Connection to `%s' timed out waiting BLACKLIST to approve address to use for CONNECT_ACK\n",
2245 GNUNET_i2s (&n->id));
2246 n->state = S_DISCONNECT_FINISHED;
2247 free_neighbour (n);
2248 return;
2249 }
2250 break;
2251 case S_CONNECT_RECV_ACK:
2252 if (0 == delay.rel_value)
2253 {
2254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2255 "Connection to `%s' timed out waiting for other peer to send SESSION_ACK\n",
2256 GNUNET_i2s (&n->id));
2257 disconnect_neighbour (n);
2258 return;
2259 }
2260 break;
2261 case S_CONNECTED:
2262 if (0 == delay.rel_value)
2263 {
2264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265 "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs\n",
2266 GNUNET_i2s (&n->id));
2267 disconnect_neighbour (n);
2268 return;
2269 }
2270 try_transmission_to_peer (n);
2271 send_keepalive (n);
2272 break;
2273 case S_RECONNECT_ATS:
2274 if (0 == delay.rel_value)
2275 {
2276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2277 "Connection to `%s' timed out, waiting for ATS replacement address\n",
2278 GNUNET_i2s (&n->id));
2279 disconnect_neighbour (n);
2280 return;
2281 }
2282 break;
2283 case S_RECONNECT_BLACKLIST:
2284 if (0 == delay.rel_value)
2285 {
2286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287 "Connection to `%s' timed out, waiting for BLACKLIST to approve replacement address\n",
2288 GNUNET_i2s (&n->id));
2289 disconnect_neighbour (n);
2290 return;
2291 }
2292 break;
2293 case S_RECONNECT_SENT:
2294 if (0 == delay.rel_value)
2295 {
2296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2297 "Connection to `%s' timed out, waiting for other peer to CONNECT_ACK replacement address\n",
2298 GNUNET_i2s (&n->id));
2299 disconnect_neighbour (n);
2300 return;
2301 }
2302 break;
2303 case S_CONNECTED_SWITCHING_BLACKLIST:
2304 if (0 == delay.rel_value)
2305 {
2306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2307 "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs\n",
2308 GNUNET_i2s (&n->id));
2309 disconnect_neighbour (n);
2310 return;
2311 }
2312 try_transmission_to_peer (n);
2313 send_keepalive (n);
2314 break;
2315 case S_CONNECTED_SWITCHING_CONNECT_SENT:
2316 if (0 == delay.rel_value)
2317 {
2318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319 "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs (after trying to CONNECT on alternative address)\n",
2320 GNUNET_i2s (&n->id));
2321 disconnect_neighbour (n);
2322 return;
2323 }
2324 try_transmission_to_peer (n);
2325 send_keepalive (n);
2326 break;
2327 case S_DISCONNECT:
2090 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2091 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n", 2329 "Cleaning up connection to `%s' after sending DISCONNECT\n",
2092 (unsigned long long) n->in_tracker. 2330 GNUNET_i2s (&n->id));
2093 consumption_since_last_update__, 2331 n->state = S_DISCONNECT_FINISHED;
2094 (unsigned int) n->in_tracker.available_bytes_per_s__, 2332 free_neighbour (n);
2095 (unsigned long long) ret.rel_value); 2333 return;
2096 GNUNET_STATISTICS_update (GST_stats, 2334 case S_DISCONNECT_FINISHED:
2097 gettext_noop ("# ms throttling suggested"), 2335 /* how did we get here!? */
2098 (int64_t) ret.rel_value, GNUNET_NO); 2336 GNUNET_assert (0);
2099 } 2337 break;
2100 return ret; 2338 default:
2339 GNUNET_break (0);
2340 break;
2341 }
2342 delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time),
2343 delay);
2344 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->task);
2345 n->task = GNUNET_SCHEDULER_add_delayed (delay,
2346 &master_task,
2347 n);
2101} 2348}
2102 2349
2103 2350
2104/** 2351/**
2105 * Keep the connection to the given neighbour alive longer, 2352 * Send a SESSION_ACK message to the neighbour to confirm that we
2106 * we received a KEEPALIVE (or equivalent). 2353 * got his CONNECT_ACK.
2107 * 2354 *
2108 * @param neighbour neighbour to keep alive 2355 * @param n neighbour to send the SESSION_ACK to
2109 */ 2356 */
2110void 2357static void
2111GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) 2358send_session_ack_message (struct NeighbourMapEntry *n)
2112{ 2359{
2113 struct NeighbourMapEntry *n; 2360 struct GNUNET_MessageHeader msg;
2114
2115 // This can happen during shutdown
2116 if (neighbours == NULL)
2117 {
2118 return;
2119 }
2120
2121 n = lookup_neighbour (neighbour);
2122 if (NULL == n)
2123 {
2124 GNUNET_STATISTICS_update (GST_stats,
2125 gettext_noop
2126 ("# KEEPALIVE messages discarded (not connected)"),
2127 1, GNUNET_NO);
2128 return;
2129 }
2130 GNUNET_SCHEDULER_cancel (n->timeout_task);
2131 n->timeout_task =
2132 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2133 &neighbour_timeout_task, n);
2134
2135 /* send reply to measure latency */
2136 if (S_CONNECTED != n->state)
2137 return;
2138
2139 struct GNUNET_MessageHeader m;
2140
2141 m.size = htons (sizeof (struct GNUNET_MessageHeader));
2142 m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2143 2361
2144 send_with_session(n, 2362 msg.size = htons (sizeof (struct GNUNET_MessageHeader));
2145 (const void *) &m, sizeof (m), 2363 msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2146 UINT32_MAX, 2364 (void) send_with_session(n,
2147 GNUNET_TIME_UNIT_FOREVER_REL, 2365 (const char *) &msg, sizeof (struct GNUNET_MessageHeader),
2148 NULL, NULL); 2366 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2367 NULL, NULL);
2149} 2368}
2150 2369
2370
2151/** 2371/**
2152 * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency 2372 * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2153 * to this peer 2373 * Consider switching to it.
2154 * 2374 *
2155 * @param neighbour neighbour to keep alive 2375 * @param message possibly a 'struct SessionConnectMessage' (check format)
2376 * @param peer identity of the peer to switch the address for
2377 * @param address address of the other peer, NULL if other peer
2378 * connected to us
2379 * @param session session to use (or NULL)
2156 * @param ats performance data 2380 * @param ats performance data
2157 * @param ats_count number of entries in ats 2381 * @param ats_count number of entries in ats
2158 */ 2382 */
2159void 2383void
2160GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, 2384GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2385 const struct GNUNET_PeerIdentity *peer,
2386 const struct GNUNET_HELLO_Address *address,
2387 struct Session *session,
2161 const struct GNUNET_ATS_Information *ats, 2388 const struct GNUNET_ATS_Information *ats,
2162 uint32_t ats_count) 2389 uint32_t ats_count)
2163{ 2390{
2391 const struct SessionConnectMessage *scm;
2392 struct GNUNET_TIME_Absolute ts;
2164 struct NeighbourMapEntry *n; 2393 struct NeighbourMapEntry *n;
2165 struct GNUNET_ATS_Information *ats_new;
2166 uint32_t latency;
2167 2394
2168 if (neighbours == NULL) 2395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2396 "Received CONNECT_ACK message from peer `%s'\n",
2397 GNUNET_i2s (peer));
2398 if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2169 { 2399 {
2170 // This can happen during shutdown 2400 GNUNET_break_op (0);
2171 return; 2401 return;
2172 } 2402 }
2173 2403 scm = (const struct SessionConnectMessage *) message;
2174 n = lookup_neighbour (neighbour); 2404 GNUNET_break_op (ntohl (scm->reserved) == 0);
2175 if ((NULL == n) || (n->state != S_CONNECTED)) 2405 if (NULL == (n = lookup_neighbour (peer)))
2176 { 2406 {
2177 GNUNET_STATISTICS_update (GST_stats, 2407 GNUNET_STATISTICS_update (GST_stats,
2178 gettext_noop 2408 gettext_noop
2179 ("# KEEPALIVE_RESPONSE messages discarded (not connected)"), 2409 ("# unexpected CONNECT_ACK messages (no peer)"),
2180 1, GNUNET_NO); 2410 1, GNUNET_NO);
2181 return; 2411 return;
2182 } 2412 }
2183 if (n->expect_latency_response != GNUNET_YES) 2413 ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2414 switch (n->state)
2184 { 2415 {
2416 case S_NOT_CONNECTED:
2417 GNUNET_break (0);
2418 free_neighbour (n);
2419 return;
2420 case S_INIT_ATS:
2421 case S_INIT_BLACKLIST:
2185 GNUNET_STATISTICS_update (GST_stats, 2422 GNUNET_STATISTICS_update (GST_stats,
2186 gettext_noop 2423 gettext_noop
2187 ("# KEEPALIVE_RESPONSE messages discarded (not expected)"), 2424 ("# unexpected CONNECT_ACK messages (not ready)"),
2188 1, GNUNET_NO); 2425 1, GNUNET_NO);
2189 return; 2426 break;
2427 case S_CONNECT_SENT:
2428 n->state = S_CONNECTED;
2429 n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2430 GNUNET_STATISTICS_set (GST_stats,
2431 gettext_noop ("# peers connected"),
2432 ++neighbours_connected,
2433 GNUNET_NO);
2434 connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2435 set_address (&n->primary_address,
2436 n->primary_address.address,
2437 n->primary_address.session,
2438 n->primary_address.bandwidth_in,
2439 n->primary_address.bandwidth_out,
2440 GNUNET_YES);
2441 send_session_ack_message (n);
2442 break;
2443 case S_CONNECT_RECV_ATS:
2444 case S_CONNECT_RECV_BLACKLIST:
2445 case S_CONNECT_RECV_ACK:
2446 GNUNET_STATISTICS_update (GST_stats,
2447 gettext_noop
2448 ("# unexpected CONNECT_ACK messages (not ready)"),
2449 1, GNUNET_NO);
2450 break;
2451 case S_CONNECTED:
2452 /* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in case */
2453 send_session_ack_message (n);
2454 break;
2455 case S_RECONNECT_ATS:
2456 case S_RECONNECT_BLACKLIST:
2457 /* we didn't expect any CONNECT_ACK, as we are waiting for ATS
2458 to give us a new address... */
2459 GNUNET_STATISTICS_update (GST_stats,
2460 gettext_noop
2461 ("# unexpected CONNECT_ACK messages (waiting on ATS)"),
2462 1, GNUNET_NO);
2463 break;
2464 case S_RECONNECT_SENT:
2465 /* new address worked; go back to connected! */
2466 n->state = S_CONNECTED;
2467 send_session_ack_message (n);
2468 break;
2469 case S_CONNECTED_SWITCHING_BLACKLIST:
2470 /* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in case */
2471 send_session_ack_message (n);
2472 break;
2473 case S_CONNECTED_SWITCHING_CONNECT_SENT:
2474 /* new address worked; adopt it and go back to connected! */
2475 n->state = S_CONNECTED;
2476 n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2477 GNUNET_assert (GNUNET_NO == n->alternative_address.ats_active);
2478 set_address (&n->primary_address,
2479 n->alternative_address.address,
2480 n->alternative_address.session,
2481 n->alternative_address.bandwidth_in,
2482 n->alternative_address.bandwidth_out,
2483 GNUNET_YES);
2484 free_address (&n->alternative_address);
2485 send_session_ack_message (n);
2486 break;
2487 case S_DISCONNECT:
2488 GNUNET_STATISTICS_update (GST_stats,
2489 gettext_noop
2490 ("# unexpected CONNECT_ACK messages (disconnecting)"),
2491 1, GNUNET_NO);
2492 break;
2493 case S_DISCONNECT_FINISHED:
2494 GNUNET_assert (0);
2495 break;
2496 default:
2497 GNUNET_break (0);
2498 break;
2190 } 2499 }
2191 n->expect_latency_response = GNUNET_NO; 2500}
2192 2501
2193 GNUNET_assert (n->keep_alive_sent.abs_value !=
2194 GNUNET_TIME_absolute_get_zero ().abs_value);
2195 n->latency =
2196 GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2197 GNUNET_TIME_absolute_get ());
2198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2199 GNUNET_i2s (&n->id), n->latency.rel_value);
2200 2502
2201 if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value) 2503/**
2504 * A session was terminated. Take note; if needed, try to get
2505 * an alternative address from ATS.
2506 *
2507 * @param peer identity of the peer where the session died
2508 * @param session session that is gone
2509 */
2510void
2511GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
2512 struct Session *session)
2513{
2514 struct NeighbourMapEntry *n;
2515 struct BlackListCheckContext *bcc;
2516 struct BlackListCheckContext *bcc_next;
2517
2518 /* make sure to cancel all ongoing blacklist checks involving 'session' */
2519 bcc_next = bc_head;
2520 while (NULL != (bcc = bcc_next))
2202 { 2521 {
2203 GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count); 2522 bcc_next = bcc->next;
2523 if (bcc->na.session == session)
2524 {
2525 GST_blacklist_test_cancel (bcc->bc);
2526 GNUNET_HELLO_address_free (bcc->na.address);
2527 GNUNET_CONTAINER_DLL_remove (bc_head,
2528 bc_tail,
2529 bcc);
2530 GNUNET_free (bcc);
2531 }
2204 } 2532 }
2205 else 2533 if (NULL == (n = lookup_neighbour (peer)))
2534 return; /* can't affect us */
2535 if (session != n->primary_address.session)
2206 { 2536 {
2207 ats_new = 2537 if (session == n->alternative_address.session)
2208 GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * 2538 {
2209 (ats_count + 1)); 2539 free_address (&n->alternative_address);
2210 memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); 2540 if ( (S_CONNECTED_SWITCHING_BLACKLIST == n->state) ||
2211 2541 (S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) )
2212 /* add latency */ 2542 n->state = S_CONNECTED;
2213 ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); 2543 else
2214 if (n->latency.rel_value > UINT32_MAX) 2544 GNUNET_break (0);
2215 latency = UINT32_MAX; 2545 }
2216 else 2546 return; /* doesn't affect us further */
2217 latency = n->latency.rel_value; 2547 }
2218 ats_new[ats_count].value = htonl (latency); 2548
2549 n->expect_latency_response = GNUNET_NO;
2219 2550
2220 GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, 2551 switch (n->state)
2221 ats_count + 1); 2552 {
2222 GNUNET_free (ats_new); 2553 case S_NOT_CONNECTED:
2554 GNUNET_break (0);
2555 free_neighbour (n);
2556 return;
2557 case S_INIT_ATS:
2558 GNUNET_break (0);
2559 free_neighbour (n);
2560 return;
2561 case S_INIT_BLACKLIST:
2562 case S_CONNECT_SENT:
2563 free_address (&n->primary_address);
2564 n->state = S_INIT_ATS;
2565 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
2566 // FIXME: need to ask ATS for suggestions again?
2567 GNUNET_ATS_suggest_address (GST_ats, &n->id);
2568 break;
2569 case S_CONNECT_RECV_ATS:
2570 case S_CONNECT_RECV_BLACKLIST:
2571 case S_CONNECT_RECV_ACK:
2572 /* error on inbound session; free neighbour entirely */
2573 free_address (&n->primary_address);
2574 free_neighbour (n);
2575 return;
2576 case S_CONNECTED:
2577 free_address (&n->primary_address);
2578 n->state = S_RECONNECT_ATS;
2579 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
2580 /* FIXME: is this ATS call needed? */
2581 GNUNET_ATS_suggest_address (GST_ats, &n->id);
2582 break;
2583 case S_RECONNECT_ATS:
2584 /* we don't have an address, how can it go down? */
2585 GNUNET_break (0);
2586 break;
2587 case S_RECONNECT_BLACKLIST:
2588 case S_RECONNECT_SENT:
2589 n->state = S_RECONNECT_ATS;
2590 n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
2591 // FIXME: need to ask ATS for suggestions again?
2592 GNUNET_ATS_suggest_address (GST_ats, &n->id);
2593 break;
2594 case S_CONNECTED_SWITCHING_BLACKLIST:
2595 /* primary went down while we were checking secondary against
2596 blacklist, adopt secondary as primary */
2597 free_address (&n->primary_address);
2598 n->primary_address = n->alternative_address;
2599 memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress));
2600 n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT);
2601 n->state = S_RECONNECT_BLACKLIST;
2602 break;
2603 case S_CONNECTED_SWITCHING_CONNECT_SENT:
2604 /* primary went down while we were waiting for CONNECT_ACK on secondary;
2605 secondary as primary */
2606 free_address (&n->primary_address);
2607 n->primary_address = n->alternative_address;
2608 memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress));
2609 n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT);
2610 n->state = S_RECONNECT_SENT;
2611 break;
2612 case S_DISCONNECT:
2613 free_address (&n->primary_address);
2614 break;
2615 case S_DISCONNECT_FINISHED:
2616 GNUNET_assert (0);
2617 break;
2618 default:
2619 GNUNET_break (0);
2620 break;
2223 } 2621 }
2622 if (GNUNET_SCHEDULER_NO_TASK != n->task)
2623 GNUNET_SCHEDULER_cancel (n->task);
2624 n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
2224} 2625}
2225 2626
2226 2627
2227/** 2628/**
2228 * Change the incoming quota for the given peer. 2629 * We received a 'SESSION_ACK' message from the other peer.
2630 * If we sent a 'CONNECT_ACK' last, this means we are now
2631 * connected. Otherwise, do nothing.
2229 * 2632 *
2230 * @param neighbour identity of peer to change qutoa for 2633 * @param message possibly a 'struct SessionConnectMessage' (check format)
2231 * @param quota new quota 2634 * @param peer identity of the peer to switch the address for
2635 * @param address address of the other peer, NULL if other peer
2636 * connected to us
2637 * @param session session to use (or NULL)
2638 * @param ats performance data
2639 * @param ats_count number of entries in ats
2232 */ 2640 */
2233void 2641void
2234GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, 2642GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
2235 struct GNUNET_BANDWIDTH_Value32NBO quota) 2643 const struct GNUNET_PeerIdentity *peer,
2644 const struct GNUNET_HELLO_Address *address,
2645 struct Session *session,
2646 const struct GNUNET_ATS_Information *ats,
2647 uint32_t ats_count)
2236{ 2648{
2237 struct NeighbourMapEntry *n; 2649 struct NeighbourMapEntry *n;
2238 2650
2239 // This can happen during shutdown 2651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2240 if (neighbours == NULL) 2652 "Received SESSION_ACK message from peer `%s'\n",
2653 GNUNET_i2s (peer));
2654 if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2241 { 2655 {
2656 GNUNET_break_op (0);
2242 return; 2657 return;
2243 } 2658 }
2244 2659 if (NULL == (n = lookup_neighbour (peer)))
2245 n = lookup_neighbour (neighbour); 2660 return;
2246 if (n == NULL) 2661 /* check if we are in a plausible state for having sent
2662 a CONNECT_ACK. If not, return, otherwise break */
2663 if ( ( (S_CONNECT_RECV_ACK != n->state) &&
2664 (S_CONNECT_SENT != n->state) ) ||
2665 (2 != n->send_connect_ack) )
2247 { 2666 {
2248 GNUNET_STATISTICS_update (GST_stats, 2667 GNUNET_STATISTICS_update (GST_stats,
2249 gettext_noop 2668 gettext_noop ("# unexpected SESSION ACK messages"), 1,
2250 ("# SET QUOTA messages ignored (no such peer)"), 2669 GNUNET_NO);
2251 1, GNUNET_NO);
2252 return; 2670 return;
2253 } 2671 }
2254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2672 n->state = S_CONNECTED;
2255 "Setting inbound quota of %u Bps for peer `%s' to all clients\n", 2673 n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2256 ntohl (quota.value__), GNUNET_i2s (&n->id)); 2674 GNUNET_STATISTICS_set (GST_stats,
2257 GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); 2675 gettext_noop ("# peers connected"),
2258 if (0 != ntohl (quota.value__)) 2676 ++neighbours_connected,
2259 return; 2677 GNUNET_NO);
2260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n", 2678 connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2261 GNUNET_i2s (&n->id), "SET_QUOTA"); 2679 set_address (&n->primary_address,
2262 if (is_connected (n)) 2680 n->primary_address.address,
2263 GNUNET_STATISTICS_update (GST_stats, 2681 n->primary_address.session,
2264 gettext_noop ("# disconnects due to quota of 0"), 2682 n->primary_address.bandwidth_in,
2265 1, GNUNET_NO); 2683 n->primary_address.bandwidth_out,
2266 disconnect_neighbour (n); 2684 GNUNET_YES);
2267} 2685}
2268 2686
2269 2687
2270/** 2688/**
2271 * Closure for the neighbours_iterate function. 2689 * Test if we're connected to the given peer.
2272 */
2273struct IteratorContext
2274{
2275 /**
2276 * Function to call on each connected neighbour.
2277 */
2278 GST_NeighbourIterator cb;
2279
2280 /**
2281 * Closure for 'cb'.
2282 */
2283 void *cb_cls;
2284};
2285
2286
2287/**
2288 * Call the callback from the closure for each connected neighbour.
2289 * 2690 *
2290 * @param cls the 'struct IteratorContext' 2691 * @param target peer to test
2291 * @param key the hash of the public key of the neighbour 2692 * @return GNUNET_YES if we are connected, GNUNET_NO if not
2292 * @param value the 'struct NeighbourMapEntry'
2293 * @return GNUNET_OK (continue to iterate)
2294 */ 2693 */
2295static int 2694int
2296neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) 2695GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
2297{ 2696{
2298 struct IteratorContext *ic = cls; 2697 return test_connected (lookup_neighbour (target));
2299 struct NeighbourMapEntry *n = value;
2300
2301 if (!is_connected (n))
2302 return GNUNET_OK;
2303
2304 ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2305 return GNUNET_OK;
2306} 2698}
2307 2699
2308 2700
2309/** 2701/**
2310 * Iterate over all connected neighbours. 2702 * Change the incoming quota for the given peer.
2311 *
2312 * @param cb function to call
2313 * @param cb_cls closure for cb
2314 */
2315void
2316GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2317{
2318 struct IteratorContext ic;
2319
2320 // This can happen during shutdown
2321 if (neighbours == NULL)
2322 {
2323 return;
2324 }
2325
2326 ic.cb = cb;
2327 ic.cb_cls = cb_cls;
2328 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2329}
2330
2331/**
2332 * If we have an active connection to the given target, it must be shutdown.
2333 * 2703 *
2334 * @param target peer to disconnect from 2704 * @param neighbour identity of peer to change qutoa for
2705 * @param quota new quota
2335 */ 2706 */
2336void 2707void
2337GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) 2708GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2709 struct GNUNET_BANDWIDTH_Value32NBO quota)
2338{ 2710{
2339 struct NeighbourMapEntry *n; 2711 struct NeighbourMapEntry *n;
2340 2712
2341 // This can happen during shutdown 2713 if (NULL == (n = lookup_neighbour (neighbour)))
2342 if (neighbours == NULL)
2343 { 2714 {
2715 GNUNET_STATISTICS_update (GST_stats,
2716 gettext_noop
2717 ("# SET QUOTA messages ignored (no such peer)"),
2718 1, GNUNET_NO);
2344 return; 2719 return;
2345 } 2720 }
2346 2721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347 n = lookup_neighbour (target); 2722 "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2348 if (NULL == n) 2723 ntohl (quota.value__), GNUNET_i2s (&n->id));
2349 return; /* not active */ 2724 GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2725 if (0 != ntohl (quota.value__))
2726 return;
2727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2728 GNUNET_i2s (&n->id), "SET_QUOTA");
2729 if (GNUNET_YES == test_connected (n))
2730 GNUNET_STATISTICS_update (GST_stats,
2731 gettext_noop ("# disconnects due to quota of 0"),
2732 1, GNUNET_NO);
2350 disconnect_neighbour (n); 2733 disconnect_neighbour (n);
2351} 2734}
2352 2735
@@ -2381,11 +2764,9 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2381 return; 2764 return;
2382 } 2765 }
2383 sdm = (const struct SessionDisconnectMessage *) msg; 2766 sdm = (const struct SessionDisconnectMessage *) msg;
2384 n = lookup_neighbour (peer); 2767 if (NULL == (n = lookup_neighbour (peer)))
2385 if (NULL == n)
2386 return; /* gone already */ 2768 return; /* gone already */
2387 if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= 2769 if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= n->connect_ack_timestamp.abs_value)
2388 n->connect_ts.abs_value)
2389 { 2770 {
2390 GNUNET_STATISTICS_update (GST_stats, 2771 GNUNET_STATISTICS_update (GST_stats,
2391 gettext_noop 2772 gettext_noop
@@ -2417,348 +2798,208 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2417 GNUNET_break_op (0); 2798 GNUNET_break_op (0);
2418 return; 2799 return;
2419 } 2800 }
2420 GST_neighbours_force_disconnect (peer); 2801 if (GNUNET_YES == test_connected (n))
2802 GNUNET_STATISTICS_update (GST_stats,
2803 gettext_noop
2804 ("# other peer asked to disconnect from us"), 1,
2805 GNUNET_NO);
2806 disconnect_neighbour (n);
2421} 2807}
2422 2808
2423 2809
2424/** 2810/**
2425 * We received a 'SESSION_CONNECT_ACK' message from the other peer. 2811 * Closure for the neighbours_iterate function.
2426 * Consider switching to it.
2427 *
2428 * @param message possibly a 'struct SessionConnectMessage' (check format)
2429 * @param peer identity of the peer to switch the address for
2430 * @param address address of the other peer, NULL if other peer
2431 * connected to us
2432 * @param session session to use (or NULL)
2433 * @param ats performance data
2434 * @param ats_count number of entries in ats
2435 */ 2812 */
2436void 2813struct IteratorContext
2437GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2438 const struct GNUNET_PeerIdentity *peer,
2439 const struct GNUNET_HELLO_Address *address,
2440 struct Session *session,
2441 const struct GNUNET_ATS_Information *ats,
2442 uint32_t ats_count)
2443{ 2814{
2444 const struct SessionConnectMessage *scm; 2815 /**
2445 struct GNUNET_MessageHeader msg; 2816 * Function to call on each connected neighbour.
2446 struct NeighbourMapEntry *n;
2447 size_t msg_len;
2448 size_t ret;
2449
2450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2451 "Received CONNECT_ACK message from peer `%s'\n",
2452 GNUNET_i2s (peer));
2453
2454 if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2455 {
2456 GNUNET_break_op (0);
2457 return;
2458 }
2459 scm = (const struct SessionConnectMessage *) message;
2460 GNUNET_break_op (ntohl (scm->reserved) == 0);
2461 n = lookup_neighbour (peer);
2462 if (NULL == n)
2463 {
2464 /* we did not send 'CONNECT' -- at least not recently */
2465 GNUNET_STATISTICS_update (GST_stats,
2466 gettext_noop
2467 ("# unexpected CONNECT_ACK messages (no peer)"),
2468 1, GNUNET_NO);
2469 return;
2470 }
2471
2472 /* Additional check
2473 *
2474 * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2475 *
2476 * We also received an CONNECT message, switched from SENDT to RECV and
2477 * ATS already suggested us an address after a successful blacklist check
2478 */ 2817 */
2818 GST_NeighbourIterator cb;
2479 2819
2480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2820 /**
2481 "Received CONNECT_ACK message from peer `%s' in state `%s'\n", 2821 * Closure for 'cb'.
2482 GNUNET_i2s (peer), 2822 */
2483 print_state(n->state)); 2823 void *cb_cls;
2484 2824};
2485 if ((n->address != NULL) && (n->state == S_CONNECTED))
2486 {
2487 /* After fast reconnect: send ACK (ACK) even when we are connected */
2488 msg_len = sizeof (msg);
2489 msg.size = htons (msg_len);
2490 msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2491
2492 ret = send_with_session(n,
2493 (const char *) &msg, msg_len,
2494 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2495 NULL, NULL);
2496
2497 if (ret == GNUNET_SYSERR)
2498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2499 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2500 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2501 return;
2502 }
2503
2504 if ((NULL == n->address) ||
2505 ((n->state != S_CONNECT_SENT) &&
2506 (n->state != S_CONNECT_RECV)))
2507 {
2508 GNUNET_STATISTICS_update (GST_stats,
2509 gettext_noop
2510 ("# unexpected CONNECT_ACK messages"), 1,
2511 GNUNET_NO);
2512 return;
2513 }
2514 if (n->state != S_CONNECTED)
2515 change_state (n, S_CONNECTED);
2516
2517 if (NULL != session)
2518 {
2519 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2520 "transport-ats",
2521 "Giving ATS session %p of plugin %s for peer %s\n",
2522 session, address->transport_name, GNUNET_i2s (peer));
2523 }
2524 GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2525 GNUNET_assert (NULL != n->address);
2526 2825
2527 if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2528 {
2529 GST_validation_set_address_use (n->address, n->session, GNUNET_YES, __LINE__);
2530 GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2531 n->address_state = USED;
2532 }
2533 2826
2534 GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); 2827/**
2828 * Call the callback from the closure for each connected neighbour.
2829 *
2830 * @param cls the 'struct IteratorContext'
2831 * @param key the hash of the public key of the neighbour
2832 * @param value the 'struct NeighbourMapEntry'
2833 * @return GNUNET_OK (continue to iterate)
2834 */
2835static int
2836neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2837{
2838 struct IteratorContext *ic = cls;
2839 struct NeighbourMapEntry *n = value;
2535 2840
2536 /* send ACK (ACK) */ 2841 if (GNUNET_YES == test_connected (n))
2537 msg_len = sizeof (msg); 2842 ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address);
2538 msg.size = htons (msg_len); 2843 return GNUNET_OK;
2539 msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); 2844}
2540 2845
2541 ret = send_with_session(n,
2542 (const char *) &msg, msg_len,
2543 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2544 NULL, NULL);
2545 2846
2546 if (ret == GNUNET_SYSERR) 2847/**
2547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2848 * Iterate over all connected neighbours.
2548 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n", 2849 *
2549 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); 2850 * @param cb function to call
2851 * @param cb_cls closure for cb
2852 */
2853void
2854GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2855{
2856 struct IteratorContext ic;
2550 2857
2858 if (NULL == neighbours)
2859 return; /* can happen during shutdown */
2860 ic.cb = cb;
2861 ic.cb_cls = cb_cls;
2862 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2863}
2551 2864
2552 if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2553 n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2554 2865
2555 neighbours_connected++; 2866/**
2556 GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected, 2867 * If we have an active connection to the given target, it must be shutdown.
2557 GNUNET_NO); 2868 *
2558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2869 * @param target peer to disconnect from
2559 "Notify about connect of `%4s' using address '%s' session %X LINE %u\n", 2870 */
2560 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session, 2871void
2561 __LINE__); 2872GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2562 connect_notify_cb (callback_cls, &n->id, ats, ats_count); 2873{
2563 send_outbound_quota (peer, n->bandwidth_out); 2874 struct NeighbourMapEntry *n;
2564 2875
2876 if (NULL == (n = lookup_neighbour (target)))
2877 return; /* not active */
2878 if (GNUNET_YES == test_connected (n))
2879 GNUNET_STATISTICS_update (GST_stats,
2880 gettext_noop
2881 ("# disconnected from peer upon explicit request"), 1,
2882 GNUNET_NO);
2883 disconnect_neighbour (n);
2565} 2884}
2566 2885
2567 2886
2568void 2887/**
2569GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, 2888 * Obtain current latency information for the given neighbour.
2570 const struct GNUNET_PeerIdentity *peer, 2889 *
2571 const struct GNUNET_HELLO_Address *address, 2890 * @param peer to get the latency for
2572 struct Session *session, 2891 * @return observed latency of the address, FOREVER if the
2573 const struct GNUNET_ATS_Information *ats, 2892 * the connection is not up
2574 uint32_t ats_count) 2893 */
2894struct GNUNET_TIME_Relative
2895GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
2575{ 2896{
2576 struct NeighbourMapEntry *n; 2897 struct NeighbourMapEntry *n;
2577 2898
2578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2579 GNUNET_i2s (peer));
2580 if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2581 {
2582 GNUNET_break_op (0);
2583 return;
2584 }
2585 n = lookup_neighbour (peer); 2899 n = lookup_neighbour (peer);
2586 if (NULL == n) 2900 if (NULL == n)
2901 return GNUNET_TIME_UNIT_FOREVER_REL;
2902 switch (n->state)
2587 { 2903 {
2904 case S_CONNECTED:
2905 case S_RECONNECT_SENT:
2906 case S_RECONNECT_ATS:
2907 return n->latency;
2908 case S_NOT_CONNECTED:
2909 case S_INIT_BLACKLIST:
2910 case S_INIT_ATS:
2911 case S_CONNECT_SENT:
2912 case S_CONNECT_RECV_BLACKLIST:
2913 case S_DISCONNECT:
2914 case S_DISCONNECT_FINISHED:
2915 return GNUNET_TIME_UNIT_FOREVER_REL;
2916 default:
2588 GNUNET_break (0); 2917 GNUNET_break (0);
2589 return; 2918 break;
2590 }
2591 if (S_CONNECTED == n->state)
2592 return;
2593 if (!is_connecting (n))
2594 {
2595 GNUNET_STATISTICS_update (GST_stats,
2596 gettext_noop ("# unexpected ACK messages"), 1,
2597 GNUNET_NO);
2598 return;
2599 }
2600 change_state (n, S_CONNECTED);
2601 if (NULL != session)
2602 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2603 "transport-ats",
2604 "Giving ATS session %p of plugin %s for peer %s\n",
2605 session, address->transport_name, GNUNET_i2s (peer));
2606 GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2607 GNUNET_assert (n->address != NULL);
2608
2609 if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2610 {
2611 GST_validation_set_address_use (n->address, n->session, GNUNET_YES, __LINE__);
2612 GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2613 n->address_state = USED;
2614 } 2919 }
2615 2920 return GNUNET_TIME_UNIT_FOREVER_REL;
2616
2617 neighbours_connected++;
2618 GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2619 GNUNET_NO);
2620
2621 GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2622 if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2623 n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2625 "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2626 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2627 __LINE__);
2628 connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2629 send_outbound_quota (peer, n->bandwidth_out);
2630} 2921}
2631 2922
2632struct BlackListCheckContext
2633{
2634 struct GNUNET_ATS_Information *ats;
2635
2636 uint32_t ats_count;
2637 2923
2638 struct Session *session; 2924/**
2639 2925 * Obtain current address information for the given neighbour.
2640 struct GNUNET_HELLO_Address *address; 2926 *
2641 2927 * @param peer
2642 struct GNUNET_TIME_Absolute ts; 2928 * @return address currently used
2643}; 2929 */
2644 2930struct GNUNET_HELLO_Address *
2645 2931GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
2646static void
2647handle_connect_blacklist_cont (void *cls,
2648 const struct GNUNET_PeerIdentity *peer,
2649 int result)
2650{ 2932{
2651 struct NeighbourMapEntry *n; 2933 struct NeighbourMapEntry *n;
2652 struct BlackListCheckContext *bcc = cls;
2653
2654 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2655 "Blacklist check due to CONNECT message: `%s'\n",
2656 GNUNET_i2s (peer),
2657 (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2658 /* not allowed */
2659 if (GNUNET_OK != result)
2660 {
2661 GNUNET_HELLO_address_free (bcc->address);
2662 GNUNET_free (bcc);
2663 return;
2664 }
2665 2934
2666 n = lookup_neighbour (peer); 2935 n = lookup_neighbour (peer);
2667 if (NULL == n) 2936 if (NULL == n)
2668 n = setup_neighbour (peer); 2937 return NULL;
2669 2938 return n->primary_address.address;
2670 if (bcc->ts.abs_value > n->connect_ts.abs_value)
2671 {
2672 if (NULL != bcc->session)
2673 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2674 "transport-ats",
2675 "Giving ATS session %p of address `%s' for peer %s\n",
2676 bcc->session, GST_plugins_a2s (bcc->address),
2677 GNUNET_i2s (peer));
2678 /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2679
2680 GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2681 bcc->ats_count);
2682 n->connect_ts = bcc->ts;
2683 }
2684
2685 GNUNET_HELLO_address_free (bcc->address);
2686 GNUNET_free (bcc);
2687
2688 if (n->state != S_CONNECT_RECV)
2689 change_state (n, S_CONNECT_RECV);
2690
2691
2692 /* Ask ATS for an address to connect via that address */
2693 if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2694 GNUNET_SCHEDULER_cancel (n->ats_suggest);
2695 n->ats_suggest =
2696 GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2697 n);
2698 GNUNET_ATS_suggest_address (GST_ats, peer);
2699} 2939}
2700 2940
2941
2701/** 2942/**
2702 * We received a 'SESSION_CONNECT' message from the other peer. 2943 * Initialize the neighbours subsystem.
2703 * Consider switching to it.
2704 * 2944 *
2705 * @param message possibly a 'struct SessionConnectMessage' (check format) 2945 * @param cls closure for callbacks
2706 * @param peer identity of the peer to switch the address for 2946 * @param connect_cb function to call if we connect to a peer
2707 * @param address address of the other peer, NULL if other peer 2947 * @param disconnect_cb function to call if we disconnect from a peer
2708 * connected to us 2948 * @param peer_address_cb function to call if we change an active address
2709 * @param session session to use (or NULL) 2949 * of a neighbour
2710 * @param ats performance data
2711 * @param ats_count number of entries in ats (excluding 0-termination)
2712 */ 2950 */
2713void 2951void
2714GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, 2952GST_neighbours_start (void *cls,
2715 const struct GNUNET_PeerIdentity *peer, 2953 GNUNET_TRANSPORT_NotifyConnect connect_cb,
2716 const struct GNUNET_HELLO_Address *address, 2954 GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
2717 struct Session *session, 2955 GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
2718 const struct GNUNET_ATS_Information *ats,
2719 uint32_t ats_count)
2720{ 2956{
2721 const struct SessionConnectMessage *scm; 2957 callback_cls = cls;
2722 struct BlackListCheckContext *bcc = NULL; 2958 connect_notify_cb = connect_cb;
2723 struct NeighbourMapEntry *n; 2959 disconnect_notify_cb = disconnect_cb;
2724 2960 address_change_cb = peer_address_cb;
2725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2961 neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
2726 "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer)); 2962}
2727 if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2728 {
2729 GNUNET_break_op (0);
2730 return;
2731 }
2732 2963
2733 scm = (const struct SessionConnectMessage *) message;
2734 GNUNET_break_op (ntohl (scm->reserved) == 0);
2735 2964
2736 GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); 2965/**
2966 * Disconnect from the given neighbour.
2967 *
2968 * @param cls unused
2969 * @param key hash of neighbour's public key (not used)
2970 * @param value the 'struct NeighbourMapEntry' of the neighbour
2971 * @return GNUNET_OK (continue to iterate)
2972 */
2973static int
2974disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
2975{
2976 struct NeighbourMapEntry *n = value;
2737 2977
2738 n = lookup_neighbour (peer); 2978 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2739 if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state))) 2979 "Disconnecting peer `%4s', %s\n",
2740 { 2980 GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
2741 /* connected peer switches addresses or is trying to do a fast reconnect*/ 2981 free_neighbour (n);
2742 return; 2982 return GNUNET_OK;
2743 } 2983}
2744 2984
2745 2985
2746 /* we are not connected to this peer */ 2986/**
2747 /* do blacklist check */ 2987 * Cleanup the neighbours subsystem.
2748 bcc = 2988 */
2749 GNUNET_malloc (sizeof (struct BlackListCheckContext) + 2989void
2750 sizeof (struct GNUNET_ATS_Information) * (ats_count + 1)); 2990GST_neighbours_stop ()
2751 bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); 2991{
2752 bcc->ats_count = ats_count + 1; 2992 if (NULL == neighbours)
2753 bcc->address = GNUNET_HELLO_address_copy (address); 2993 return;
2754 bcc->session = session; 2994 GNUNET_CONTAINER_multihashmap_iterate (neighbours,
2755 bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; 2995 &disconnect_all_neighbours,
2756 memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); 2996 NULL);
2757 bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); 2997 GNUNET_CONTAINER_multihashmap_destroy (neighbours);
2758 bcc->ats[ats_count].value = 2998 neighbours = NULL;
2759 htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value); 2999 callback_cls = NULL;
2760 GST_blacklist_test_allowed (peer, address->transport_name, 3000 connect_notify_cb = NULL;
2761 &handle_connect_blacklist_cont, bcc); 3001 disconnect_notify_cb = NULL;
3002 address_change_cb = NULL;
2762} 3003}
2763 3004
2764 3005
diff --git a/src/transport/gnunet-service-transport_neighbours.h b/src/transport/gnunet-service-transport_neighbours.h
index 33fa1dac5..23091b750 100644
--- a/src/transport/gnunet-service-transport_neighbours.h
+++ b/src/transport/gnunet-service-transport_neighbours.h
@@ -127,6 +127,7 @@ GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
127void 127void
128GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour); 128GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour);
129 129
130
130/** 131/**
131 * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency 132 * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
132 * to this peer 133 * to this peer
@@ -212,10 +213,8 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
212 * @param ats_count number of entries in ats 213 * @param ats_count number of entries in ats
213 * @param bandwidth_in inbound quota to be used when connection is up 214 * @param bandwidth_in inbound quota to be used when connection is up
214 * @param bandwidth_out outbound quota to be used when connection is up 215 * @param bandwidth_out outbound quota to be used when connection is up
215 * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
216 * connection is not up (yet)
217 */ 216 */
218int 217void
219GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, 218GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
220 const struct GNUNET_HELLO_Address 219 const struct GNUNET_HELLO_Address
221 *address, struct Session *session, 220 *address, struct Session *session,
@@ -266,13 +265,26 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
266 const struct GNUNET_ATS_Information *ats, 265 const struct GNUNET_ATS_Information *ats,
267 uint32_t ats_count); 266 uint32_t ats_count);
268 267
268
269/**
270 * We received a 'SESSION_ACK' message from the other peer.
271 * FIXME: describe what this means!
272 *
273 * @param message possibly a 'struct SessionConnectMessage' (check format)
274 * @param peer identity of the peer to switch the address for
275 * @param address address of the other peer, NULL if other peer
276 * connected to us
277 * @param session session to use (or NULL)
278 * @param ats performance data
279 * @param ats_count number of entries in ats
280 */
269void 281void
270GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, 282GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
271 const struct GNUNET_PeerIdentity *peer, 283 const struct GNUNET_PeerIdentity *peer,
272 const struct GNUNET_HELLO_Address *address, 284 const struct GNUNET_HELLO_Address *address,
273 struct Session *session, 285 struct Session *session,
274 const struct GNUNET_ATS_Information *ats, 286 const struct GNUNET_ATS_Information *ats,
275 uint32_t ats_count); 287 uint32_t ats_count);
276 288
277 289
278/** 290/**