From 7961bd44ccf6e76aa8d82154381030f332f7fb6d Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 27 Oct 2011 19:14:50 +0000 Subject: --- src/transport/gnunet-service-transport_3way.c | 580 ++++++ .../gnunet-service-transport_neighbours_3way.c | 2071 ++++++++++++++++++++ .../gnunet-service-transport_neighbours_3way.h | 272 +++ .../gnunet-service-transport_neighbours_fsm.c | 2071 -------------------- 4 files changed, 2923 insertions(+), 2071 deletions(-) create mode 100644 src/transport/gnunet-service-transport_3way.c create mode 100644 src/transport/gnunet-service-transport_neighbours_3way.c create mode 100644 src/transport/gnunet-service-transport_neighbours_3way.h delete mode 100644 src/transport/gnunet-service-transport_neighbours_fsm.c diff --git a/src/transport/gnunet-service-transport_3way.c b/src/transport/gnunet-service-transport_3way.c new file mode 100644 index 000000000..a24d6c1b1 --- /dev/null +++ b/src/transport/gnunet-service-transport_3way.c @@ -0,0 +1,580 @@ +/* + This file is part of GNUnet. + (C) 2010,2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file transport/gnunet-service-transport-new.c + * @brief + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "gnunet_transport_service.h" +#include "gnunet_peerinfo_service.h" +#include "gnunet_ats_service.h" +#include "gnunet-service-transport.h" +#include "gnunet-service-transport_blacklist.h" +#include "gnunet-service-transport_clients.h" +#include "gnunet-service-transport_hello.h" +#include "gnunet-service-transport_neighbours.h" +#include "gnunet-service-transport_plugins.h" +#include "gnunet-service-transport_validation.h" +#include "transport.h" + +/* globals */ + +/** + * Statistics handle. + */ +struct GNUNET_STATISTICS_Handle *GST_stats; + +/** + * Configuration handle. + */ +const struct GNUNET_CONFIGURATION_Handle *GST_cfg; + +/** + * Configuration handle. + */ +struct GNUNET_PeerIdentity GST_my_identity; + +/** + * Handle to peerinfo service. + */ +struct GNUNET_PEERINFO_Handle *GST_peerinfo; + +/** + * Our public key. + */ +struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded GST_my_public_key; + +/** + * Our private key. + */ +struct GNUNET_CRYPTO_RsaPrivateKey *GST_my_private_key; + +/** + * ATS handle. + */ +struct GNUNET_ATS_SchedulingHandle *GST_ats; + + +/** + * Transmit our HELLO message to the given (connected) neighbour. + * + * @param cls the 'HELLO' message + * @param target a connected neighbour + * @param ats performance information (unused) + * @param ats_count number of records in ats (unused) + * @param transport plugin + * @param addr address + * @param addrlen address length + */ +static void +transmit_our_hello (void *cls, const struct GNUNET_PeerIdentity *target, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count, + const char * transport, + const void * addr, + size_t addrlen) +{ + const struct GNUNET_MessageHeader *hello = cls; + + GST_neighbours_send (target, (const char *) hello, ntohs (hello->size), + GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION, NULL, NULL); +} + + +/** + * My HELLO has changed. Tell everyone who should know. + * + * @param cls unused + * @param hello new HELLO + */ +static void +process_hello_update (void *cls, const struct GNUNET_MessageHeader *hello) +{ + GST_clients_broadcast (hello, GNUNET_NO); + GST_neighbours_iterate (&transmit_our_hello, (void *) hello); +} + + + +/** + * We received some payload. Prepare to pass it on to our clients. + * + * @param peer (claimed) identity of the other peer + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again -- FIXME! + * @param ats performance information + * @param ats_count number of records in ats + * @return how long the plugin should wait until receiving more data + */ +static struct GNUNET_TIME_Relative +process_payload (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) +{ + struct GNUNET_TIME_Relative ret; + int do_forward; + struct InboundMessage *im; + size_t size = sizeof (struct InboundMessage) + ntohs (message->size) + sizeof (struct GNUNET_ATS_Information) * ats_count; + char buf[size]; + struct GNUNET_ATS_Information *ap; + + ret = GNUNET_TIME_UNIT_ZERO; + do_forward = GNUNET_SYSERR; + ret = + GST_neighbours_calculate_receive_delay (peer, + (message == + NULL) ? 0 : + ntohs (message->size), + &do_forward); + + im = (struct InboundMessage*) buf; + im->header.size = htons (size); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->ats_count = htonl (ats_count); + im->peer = *peer; + ap = (struct GNUNET_ATS_Information*) &im[1]; + memcpy (ap, ats, ats_count * sizeof (struct GNUNET_ATS_Information)); + memcpy (&ap[ats_count], message, ntohs (message->size)); + + switch (do_forward) + { + case GNUNET_YES: + GST_clients_broadcast (&im->header, GNUNET_YES); + break; + case GNUNET_NO: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Discarded %u bytes of type %u from %s: quota violated or no neighbour record!\n"), + ntohs (message->size), + ntohs (message->type), + GNUNET_i2s (peer)); + break; + case GNUNET_SYSERR: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Discarded %u bytes of type %u from %s: connection is down!\n"), + ntohs (message->size), + ntohs (message->type), + GNUNET_i2s (peer)); + /* FIXME: store until connection is up? This is virtually always a SETKEY and a PING... */ + break; + default: + GNUNET_break (0); + break; + } + return ret; +} + + +/** + * Function called by the transport for each received message. + * This function should also be called with "NULL" for the + * message to signal that the other peer disconnected. + * + * @param cls closure, const char* with the name of the plugin we received the message from + * @param peer (claimed) identity of the other peer + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again -- FIXME! + * @param ats performance information + * @param ats_count number of records in ats + * @param session identifier used for this session (NULL for plugins + * that do not offer bi-directional communication to the sender + * using the same "connection") + * @param sender_address binary address of the sender (if we established the + * connection or are otherwise sure of it; should be NULL + * for inbound TCP/UDP connections since it it not clear + * that we could establish ourselves a connection to that + * IP address and get the same system) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative +plugin_env_receive_callback (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count, struct Session *session, + const char *sender_address, + uint16_t sender_address_len) +{ + const char *plugin_name = cls; + struct GNUNET_TIME_Relative ret; + uint16_t type; + + ret = GNUNET_TIME_UNIT_ZERO; + if (NULL == message) + goto end; + type = ntohs (message->type); +#if DEBUG_TRANSPORT + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Received Message with type %u\n", type); +#endif + + switch (type) + { + case GNUNET_MESSAGE_TYPE_HELLO: + GST_validation_handle_hello (message); + return ret; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Processing `%s' from `%s'\n", "PING", + (sender_address != NULL) ? GST_plugins_a2s (plugin_name, + sender_address, + sender_address_len) + : ""); +#endif + GST_validation_handle_ping (peer, message, plugin_name, session, + sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Processing `%s' from `%s'\n", "PONG", + (sender_address != NULL) ? GST_plugins_a2s (plugin_name, + sender_address, + sender_address_len) + : ""); +#endif + GST_validation_handle_pong (peer, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT: + GST_neighbours_handle_connect (message, + peer, + plugin_name, sender_address, sender_address_len, + session, ats, ats_count); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK: + GST_neighbours_handle_connect_ack (message, + peer, + plugin_name, sender_address, sender_address_len, + session, ats, ats_count); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK: + GST_neighbours_handle_ack (message, + peer, + plugin_name, sender_address, sender_address_len, + session, ats, ats_count); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT: + GST_neighbours_handle_disconnect_message (peer, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE: + GST_neighbours_keepalive (peer); + break; + default: + /* should be payload */ + ret = process_payload (peer, + message, + ats, ats_count); + break; + } + end: +#if 1 + /* FIXME: this should not be needed, and not sure it's good to have it, but without + this connections seem to go extra-slow */ + GNUNET_ATS_address_update (GST_ats, peer, + plugin_name, sender_address, sender_address_len, + session, + ats, ats_count); +#endif +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Allowing receive from peer %s to continue in %llu ms\n", + GNUNET_i2s (peer), + (unsigned long long) ret.rel_value); +#endif + return ret; +} + + +/** + * Function that will be called for each address the transport + * is aware that it might be reachable under. Update our HELLO. + * + * @param cls name of the plugin (const char*) + * @param add_remove should the address added (YES) or removed (NO) from the + * set of valid addresses? + * @param addr one of the addresses of the host + * the specific address format depends on the transport + * @param addrlen length of the address + */ +static void +plugin_env_address_change_notification (void *cls, int add_remove, + const void *addr, size_t addrlen) +{ + const char *plugin_name = cls; + + GST_hello_modify_addresses (add_remove, plugin_name, addr, addrlen); +} + + +/** + * Function that will be called whenever the plugin internally + * cleans up a session pointer and hence the service needs to + * discard all of those sessions as well. Plugins that do not + * use sessions can simply omit calling this function and always + * use NULL wherever a session pointer is needed. This function + * should be called BEFORE a potential "TransmitContinuation" + * from the "TransmitFunction". + * + * @param cls closure + * @param peer which peer was the session for + * @param session which session is being destoyed + */ +static void +plugin_env_session_end (void *cls, const struct GNUNET_PeerIdentity *peer, + struct Session *session) +{ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Session %X to peer `%s' ended \n", + session, GNUNET_i2s (peer)); +#endif + if (NULL != session) + GNUNET_log_from (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, + "transport-ats", + "Telling ATS to destroy session %p from peer %s\n", + session, + GNUNET_i2s (peer)); + GNUNET_ATS_address_destroyed (GST_ats, peer, NULL, NULL, 0, session); + GST_neighbours_session_terminated (peer, session); +} + + +/** + * Function called by ATS to notify the callee that the + * assigned bandwidth or address for a given peer was changed. If the + * callback is called with address/bandwidth assignments of zero, the + * ATS disconnect function will still be called once the disconnect + * actually happened. + * + * @param cls closure + * @param peer identity of the peer + * @param plugin_name name of the transport plugin, NULL to disconnect + * @param session session to use (if available) + * @param plugin_addr address to use (if available) + * @param plugin_addr_len number of bytes in addr + * @param bandwidth_out assigned outbound bandwidth for the connection, 0 to disconnect from peer + * @param bandwidth_in assigned inbound bandwidth for the connection, 0 to disconnect from peer + */ +static void +ats_request_address_change (void *cls, const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const void *plugin_addr, size_t plugin_addr_len, + struct Session *session, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, + const struct GNUNET_ATS_Information * ats, + uint32_t ats_count) +{ + uint32_t bw_in = ntohl (bandwidth_in.value__); + uint32_t bw_out = ntohl (bandwidth_out.value__); + + /* ATS tells me to disconnect from peer*/ + if ((bw_in == 0) && (bw_out == 0)) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS tells me to disconnect from peer `%s'\n", + GNUNET_i2s (peer)); +#endif + GST_neighbours_force_disconnect(peer); + return; + } + /* will never return GNUNET_YES since connection is to be established */ + GST_neighbours_switch_to_address (peer, plugin_name, plugin_addr, + plugin_addr_len, session, ats, ats_count, + bandwidth_in, bandwidth_out); +} + + +/** + * Function called to notify transport users that another + * peer connected to us. + * + * @param cls closure + * @param peer the peer that connected + * @param ats performance data + * @param ats_count number of entries in ats + */ +static void +neighbours_connect_notification (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information + *ats, uint32_t ats_count) +{ + size_t len = sizeof (struct ConnectInfoMessage) + + ats_count * sizeof (struct GNUNET_ATS_Information); + char buf[len]; + struct ConnectInfoMessage *connect_msg = (struct ConnectInfoMessage *) buf; + struct GNUNET_ATS_Information *ap; + + connect_msg->header.size = htons (sizeof (buf)); + connect_msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + connect_msg->ats_count = htonl (ats_count); + connect_msg->id = *peer; + ap = (struct GNUNET_ATS_Information *) &connect_msg[1]; + memcpy (ap, ats, + ats_count * sizeof (struct GNUNET_ATS_Information)); + GST_clients_broadcast (&connect_msg->header, GNUNET_NO); +} + + +/** + * Function called to notify transport users that another + * peer disconnected from us. + * + * @param cls closure + * @param peer the peer that disconnected + */ +static void +neighbours_disconnect_notification (void *cls, + const struct GNUNET_PeerIdentity *peer) +{ + struct DisconnectInfoMessage disconnect_msg; + + disconnect_msg.header.size = htons (sizeof (struct DisconnectInfoMessage)); + disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + disconnect_msg.reserved = htonl (0); + disconnect_msg.peer = *peer; + GST_clients_broadcast (&disconnect_msg.header, GNUNET_NO); +} + + +/** + * Function called when the service shuts down. Unloads our plugins + * and cancels pending validations. + * + * @param cls closure, unused + * @param tc task context (unused) + */ +static void +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GST_validation_stop (); + GST_plugins_unload (); + GST_neighbours_stop (); + GNUNET_ATS_scheduling_done (GST_ats); + GST_ats = NULL; + GST_clients_stop (); + GST_blacklist_stop (); + GST_hello_stop (); + + if (GST_peerinfo != NULL) + { + GNUNET_PEERINFO_disconnect (GST_peerinfo); + GST_peerinfo = NULL; + } + if (GST_stats != NULL) + { + GNUNET_STATISTICS_destroy (GST_stats, GNUNET_NO); + GST_stats = NULL; + } + if (GST_my_private_key != NULL) + { + GNUNET_CRYPTO_rsa_key_free (GST_my_private_key); + GST_my_private_key = NULL; + } +} + + +/** + * Initiate transport service. + * + * @param cls closure + * @param server the initialized server + * @param c configuration to use + */ +static void +run (void *cls, struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + char *keyfile; + + /* setup globals */ + GST_cfg = c; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_filename (c, "GNUNETD", "HOSTKEY", + &keyfile)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _ + ("Transport service is lacking key configuration settings. Exiting.\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } + GST_my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile); + GNUNET_free (keyfile); + if (GST_my_private_key == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Transport service could not access hostkey. Exiting.\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } + GST_stats = GNUNET_STATISTICS_create ("transport", c); + GST_peerinfo = GNUNET_PEERINFO_connect (c); + GNUNET_CRYPTO_rsa_key_get_public (GST_my_private_key, &GST_my_public_key); + GNUNET_CRYPTO_hash (&GST_my_public_key, sizeof (GST_my_public_key), + &GST_my_identity.hashPubKey); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, + NULL); + if (GST_peerinfo == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Could not access PEERINFO service. Exiting.\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } + + /* start subsystems */ + GST_hello_start (&process_hello_update, NULL); + GST_blacklist_start (server); + GST_plugins_load (&plugin_env_receive_callback, + &plugin_env_address_change_notification, + &plugin_env_session_end); + GST_ats = GNUNET_ATS_scheduling_init (GST_cfg, &ats_request_address_change, NULL); + GST_neighbours_start (NULL, &neighbours_connect_notification, + &neighbours_disconnect_notification); + GST_clients_start (server); + GST_validation_start (); +} + + +/** + * The main function for the transport service. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, char *const *argv) +{ + return (GNUNET_OK == + GNUNET_SERVICE_run (argc, argv, "transport", + GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1; +} + +/* end of file gnunet-service-transport-new.c */ diff --git a/src/transport/gnunet-service-transport_neighbours_3way.c b/src/transport/gnunet-service-transport_neighbours_3way.c new file mode 100644 index 000000000..fd3ad37f9 --- /dev/null +++ b/src/transport/gnunet-service-transport_neighbours_3way.c @@ -0,0 +1,2071 @@ +/* + This file is part of GNUnet. + (C) 2010,2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file transport/gnunet-service-transport_neighbours.c + * @brief neighbour management + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_ats_service.h" +#include "gnunet-service-transport_neighbours.h" +#include "gnunet-service-transport_plugins.h" +#include "gnunet-service-transport_validation.h" +#include "gnunet-service-transport_clients.h" +#include "gnunet-service-transport.h" +#include "gnunet_peerinfo_service.h" +#include "gnunet-service-transport_blacklist.h" +#include "gnunet_constants.h" +#include "transport.h" + + +/** + * Size of the neighbour hash map. + */ +#define NEIGHBOUR_TABLE_SIZE 256 + +/** + * How often must a peer violate bandwidth quotas before we start + * to simply drop its messages? + */ +#define QUOTA_VIOLATION_DROP_THRESHOLD 10 + +/** + * How often do we send KEEPALIVE messages to each of our neighbours? + * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we + * send 3 keepalives in each interval, so 3 messages would need to be + * lost in a row for a disconnect). + */ +#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90) + + +/** + * Entry in neighbours. + */ +struct NeighbourMapEntry; + +/** + * Message a peer sends to another to indicate its + * preference for communicating via a particular + * session (and the desire to establish a real + * connection). + */ +struct SessionConnectMessage +{ + /** + * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Absolute time at the sender. Only the most recent connect + * message implies which session is preferred by the sender. + */ + struct GNUNET_TIME_AbsoluteNBO timestamp; + +}; + + +struct SessionDisconnectMessage +{ + /** + * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT' + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Purpose of the signature. Extends over the timestamp. + * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT. + */ + struct GNUNET_CRYPTO_RsaSignaturePurpose purpose; + + /** + * Absolute time at the sender. Only the most recent connect + * message implies which session is preferred by the sender. + */ + struct GNUNET_TIME_AbsoluteNBO timestamp; + + /** + * Public key of the sender. + */ + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key; + + /** + * Signature of the peer that sends us the disconnect. Only + * valid if the timestamp is AFTER the timestamp from the + * corresponding 'CONNECT' message. + */ + struct GNUNET_CRYPTO_RsaSignature signature; + +}; + + +/** + * For each neighbour we keep a list of messages + * that we still want to transmit to the neighbour. + */ +struct MessageQueue +{ + + /** + * This is a doubly linked list. + */ + struct MessageQueue *next; + + /** + * This is a doubly linked list. + */ + struct MessageQueue *prev; + + /** + * Once this message is actively being transmitted, which + * neighbour is it associated with? + */ + struct NeighbourMapEntry *n; + + /** + * Function to call once we're done. + */ + GST_NeighbourSendContinuation cont; + + /** + * Closure for 'cont' + */ + void *cont_cls; + + /** + * The message(s) we want to transmit, GNUNET_MessageHeader(s) + * stuck together in memory. Allocated at the end of this struct. + */ + const char *message_buf; + + /** + * Size of the message buf + */ + size_t message_buf_size; + + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + +}; + +enum State +{ + /* fresh peer or completely disconnected */ + S_NOT_CONNECTED = 0, + /* sent CONNECT message to other peer, waiting for CONNECT_ACK */ + S_CONNECT_SENT = 1, + /* received CONNECT message to other peer, sending CONNECT_ACK */ + S_CONNECT_RECV = 4, + /* sent CONNECT_ACK message to other peer, wait for ACK or payload */ + S_CONNECT_RECV_ACK_SENT = 8, + /* received ACK or payload */ + S_CONNECTED = 16, + /* Disconnect in progress */ + S_DISCONNECT = 32 +}; + +/** + * Entry in neighbours. + */ +struct NeighbourMapEntry +{ + + /** + * Head of list of messages we would like to send to this peer; + * must contain at most one message per client. + */ + struct MessageQueue *messages_head; + + /** + * Tail of list of messages we would like to send to this peer; must + * contain at most one message per client. + */ + struct MessageQueue *messages_tail; + + /** + * Performance data for the peer. + */ + //struct GNUNET_ATS_Information *ats; + + /** + * Are we currently trying to send a message? If so, which one? + */ + struct MessageQueue *is_active; + + /** + * Active session for communicating with the peer. + */ + struct Session *session; + + /** + * Name of the plugin we currently use. + */ + char *plugin_name; + + /** + * Address used for communicating with the peer, NULL for inbound connections. + */ + void *addr; + + /** + * Number of bytes in 'addr'. + */ + size_t addrlen; + + /** + * Identity of this neighbour. + */ + struct GNUNET_PeerIdentity id; + + /** + * ID of task scheduled to run when this peer is about to + * time out (will free resources associated with the peer). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * ID of task scheduled to send keepalives. + */ + GNUNET_SCHEDULER_TaskIdentifier keepalive_task; + + /** + * ID of task scheduled to run when we should try transmitting + * the head of the message queue. + */ + GNUNET_SCHEDULER_TaskIdentifier transmission_task; + + /** + * Tracker for inbound bandwidth. + */ + struct GNUNET_BANDWIDTH_Tracker in_tracker; + + /** + * Inbound bandwidth from ATS, activated when connection is up + */ + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; + + /** + * Inbound bandwidth from ATS, activated when connection is up + */ + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; + + /** + * Timestamp of the 'SESSION_CONNECT' message we got from the other peer + */ + struct GNUNET_TIME_Absolute connect_ts; + + /** + * How often has the other peer (recently) violated the inbound + * traffic limit? Incremented by 10 per violation, decremented by 1 + * per non-violation (for each time interval). + */ + unsigned int quota_violation_count; + + /** + * Number of values in 'ats' array. + */ + //unsigned int ats_count; + + + /** + * Do we currently consider this neighbour connected? (as far as + * the connect/disconnect callbacks are concerned)? + */ + //int is_connected; + + int state; + +}; + + +/** + * All known neighbours and their HELLOs. + */ +static struct GNUNET_CONTAINER_MultiHashMap *neighbours; + +/** + * Closure for connect_notify_cb and disconnect_notify_cb + */ +static void *callback_cls; + +/** + * Function to call when we connected to a neighbour. + */ +static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; + +/** + * Function to call when we disconnected from a neighbour. + */ +static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb; + +/** + * counter for connected neighbours + */ +static int neighbours_connected; + +/** + * Lookup a neighbour entry in the neighbours hash map. + * + * @param pid identity of the peer to look up + * @return the entry, NULL if there is no existing record + */ +static struct NeighbourMapEntry * +lookup_neighbour (const struct GNUNET_PeerIdentity *pid) +{ + return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); +} + +#define change_state(n, state, ...) change (n, state, __LINE__) + +static int +is_connecting (struct NeighbourMapEntry * n) +{ + if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED)) + return GNUNET_YES; + return GNUNET_NO; +} + +static int +is_connected (struct NeighbourMapEntry * n) +{ + if (n->state == S_CONNECTED) + return GNUNET_YES; + return GNUNET_NO; +} + +static int +is_disconnecting (struct NeighbourMapEntry * n) +{ + if (n->state == S_DISCONNECT) + return GNUNET_YES; + return GNUNET_NO; +} + +static const char * +print_state (int state) +{ + switch (state) { + case S_CONNECTED: + return "S_CONNECTED"; + break; + case S_CONNECT_RECV: + return "S_CONNECT_RECV"; + break; + case S_CONNECT_RECV_ACK_SENT: + return"S_CONNECT_RECV_ACK_SENT"; + break; + case S_CONNECT_SENT: + return "S_CONNECT_SENT"; + break; + case S_DISCONNECT: + return "S_DISCONNECT"; + break; + case S_NOT_CONNECTED: + return "S_NOT_CONNECTED"; + break; + default: + GNUNET_break (0); + break; + } + return NULL; +} + +static int +change (struct NeighbourMapEntry * n, int state, int line) +{ + char * old = strdup(print_state(n->state)); + char * new = strdup(print_state(state)); + + /* allowed transitions */ + int allowed = GNUNET_NO; + switch (n->state) { + case S_NOT_CONNECTED: + if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) || + (state == S_DISCONNECT)) + { + allowed = GNUNET_YES; + break; + } + break; + case S_CONNECT_RECV: + if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) || + (state == S_CONNECTED)) + { + allowed = GNUNET_YES; + break; + } + break; + case S_CONNECT_SENT: + if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) || + (state == S_DISCONNECT) || /* FIXME SENT -> RECV ISSUE!*/ (state == S_CONNECT_RECV)) + { + allowed = GNUNET_YES; + break; + } + break; + case S_CONNECTED: + if (state == S_DISCONNECT) + { + allowed = GNUNET_YES; + break; + } + break; + case S_DISCONNECT: + /* + if (state == S_NOT_CONNECTED) + { + allowed = GNUNET_YES; + break; + }*/ + break; + default: + GNUNET_break (0); + break; + + } + + if (allowed == GNUNET_NO) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Illegal state transition from `%s' to `%s' in line %u \n", + old, new, line); + GNUNET_break (0); + GNUNET_free (old); + GNUNET_free (new); + return GNUNET_SYSERR; + } + + n->state = state; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", + GNUNET_i2s (&n->id), n, old, new, line); + GNUNET_free (old); + GNUNET_free (new); + return GNUNET_OK; +} + +static ssize_t +send_with_plugin ( const struct GNUNET_PeerIdentity * target, + const char *msgbuf, + size_t msgbuf_size, + uint32_t priority, + struct GNUNET_TIME_Relative timeout, + struct Session * session, + const char * plugin_name, + const void *addr, + size_t addrlen, + int force_address, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) + +{ + struct GNUNET_TRANSPORT_PluginFunctions *papi; + size_t ret = GNUNET_SYSERR; + + papi = GST_plugins_find (plugin_name); + if (papi == NULL) + { + if (cont != NULL) + cont (cont_cls, target, GNUNET_SYSERR); + return GNUNET_SYSERR; + } + + ret = papi->send (papi->cls, + target, + msgbuf, msgbuf_size, + 0, + timeout, + session, + addr, addrlen, + GNUNET_YES, + cont, cont_cls); + + if (ret == -1) + { + if (cont != NULL) + cont (cont_cls, target, GNUNET_SYSERR); + } + return ret; +} + +/** + * Task invoked to start a transmission to another peer. + * + * @param cls the 'struct NeighbourMapEntry' + * @param tc scheduler context + */ +static void +transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * We're done with our transmission attempt, continue processing. + * + * @param cls the 'struct MessageQueue' of the message + * @param receiver intended receiver + * @param success whether it worked or not + */ +static void +transmit_send_continuation (void *cls, + const struct GNUNET_PeerIdentity *receiver, + int success) +{ + struct MessageQueue *mq; + struct NeighbourMapEntry *n; + + mq = cls; + n = mq->n; + if (NULL != n) + { + GNUNET_assert (n->is_active == mq); + n->is_active = NULL; + if (success == GNUNET_YES) + { + GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); + } + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n", + ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type), + (success == GNUNET_OK) ? "successful" : "FAILED"); +#endif + if (NULL != mq->cont) + mq->cont (mq->cont_cls, success); + GNUNET_free (mq); +} + + +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param n target peer for which to transmit + */ +static void +try_transmission_to_peer (struct NeighbourMapEntry *n) +{ + struct MessageQueue *mq; + struct GNUNET_TIME_Relative timeout; + ssize_t ret; + + if (n->is_active != NULL) + { + GNUNET_break (0); + return; /* transmission already pending */ + } + if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_break (0); + return; /* currently waiting for bandwidth */ + } + while (NULL != (mq = n->messages_head)) + { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.rel_value > 0) + break; + GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + n->is_active = mq; + mq->n = n; + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ + } + if (NULL == mq) + return; /* no more messages */ + + if (GST_plugins_find (n->plugin_name) == NULL) + { + GNUNET_break (0); + return; + } + GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + n->is_active = mq; + mq->n = n; + + if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", + GNUNET_i2s (&n->id)); + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); + GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); + return; + } + + ret = send_with_plugin (&n->id, + mq->message_buf, mq->message_buf_size, 0, + timeout, + n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, + &transmit_send_continuation, mq); + if (ret == -1) + { + /* failure, but 'send' would not call continuation in this case, + * so we need to do it here! */ + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); + } + +} + + +/** + * Task invoked to start a transmission to another peer. + * + * @param cls the 'struct NeighbourMapEntry' + * @param tc scheduler context + */ +static void +transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourMapEntry *n = cls; + GNUNET_assert (NULL != lookup_neighbour(&n->id)); + n->transmission_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} + + +/** + * Initialize the neighbours subsystem. + * + * @param cls closure for callbacks + * @param connect_cb function to call if we connect to a peer + * @param disconnect_cb function to call if we disconnect from a peer + */ +void +GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb, + GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb) +{ + callback_cls = cls; + connect_notify_cb = connect_cb; + disconnect_notify_cb = disconnect_cb; + neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); +} + +/* +static void +send_disconnect_cont (void *cls, + const struct GNUNET_PeerIdentity * target, + int result) +{ + struct NeighbourMapEntry *n = cls; + +}*/ + +static int +send_disconnect (struct NeighbourMapEntry *n) +{ + size_t ret; + struct SessionDisconnectMessage disconnect_msg; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending DISCONNECT message to peer `%4s'\n", + GNUNET_i2s (&n->id)); +#endif + + disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage)); + disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + disconnect_msg.reserved = htonl (0); + disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + + sizeof (struct GNUNET_TIME_AbsoluteNBO) ); + disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); + disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + disconnect_msg.public_key = GST_my_public_key; + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_rsa_sign (GST_my_private_key, + &disconnect_msg.purpose, + &disconnect_msg.signature)); + + ret = send_with_plugin(&n->id, + (const char *) &disconnect_msg, sizeof (disconnect_msg), + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, NULL, NULL); + + if (ret == GNUNET_SYSERR) + return GNUNET_SYSERR; + + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# peers disconnected due to external request"), 1, + GNUNET_NO); + return GNUNET_OK; +} + +/** + * Disconnect from the given neighbour, clean up the record. + * + * @param n neighbour to disconnect from + */ +static void +disconnect_neighbour (struct NeighbourMapEntry *n) +{ + struct MessageQueue *mq; + int was_connected = is_connected(n); + + if (is_disconnecting(n) == GNUNET_YES) + return; + + /* send DISCONNECT MESSAGE */ + if (is_connected(n) || is_connecting(n)) + { + if (GNUNET_OK == send_disconnect(n)) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n", + GNUNET_i2s (&n->id)); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not send DISCONNECT_MSG to `%s'\n", + GNUNET_i2s (&n->id)); + } + + + if (is_disconnecting(n)) + return; + change_state (n, S_DISCONNECT); + + while (NULL != (mq = n->messages_head)) + { + GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + if (NULL != mq->cont) + mq->cont (mq->cont_cls, GNUNET_SYSERR); + GNUNET_free (mq); + } + if (NULL != n->is_active) + { + n->is_active->n = NULL; + n->is_active = NULL; + } + if (was_connected) + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task); + GNUNET_SCHEDULER_cancel (n->keepalive_task); + n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (neighbours_connected > 0); + neighbours_connected--; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1, + GNUNET_NO); + disconnect_notify_cb (callback_cls, &n->id); + } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (neighbours, + &n->id.hashPubKey, n)); + if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) + { + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) + { + GNUNET_SCHEDULER_cancel (n->transmission_task); + n->transmission_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != n->plugin_name) + { + GNUNET_free (n->plugin_name); + n->plugin_name = NULL; + } + if (NULL != n->addr) + { + GNUNET_free (n->addr); + n->addr = NULL; + n->addrlen = 0; + } + n->session = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n", + GNUNET_i2s (&n->id), n); + GNUNET_free (n); +} + + +/** + * Peer has been idle for too long. Disconnect. + * + * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle + * @param tc scheduler context + */ +static void +neighbour_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourMapEntry *n = cls; + + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# peers disconnected due to timeout"), 1, + GNUNET_NO); + disconnect_neighbour (n); +} + + +/** + * Send another keepalive message. + * + * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle + * @param tc scheduler context + */ +static void +neighbour_keepalive_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourMapEntry *n = cls; + struct GNUNET_MessageHeader m; + + n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, + &neighbour_keepalive_task, + n); + GNUNET_assert (is_connected(n)); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# keepalives sent"), 1, + GNUNET_NO); + m.size = htons (sizeof (struct GNUNET_MessageHeader)); + m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); + + send_with_plugin(&n->id, (const void *) &m, + sizeof (m), + UINT32_MAX /* priority */ , + GNUNET_TIME_UNIT_FOREVER_REL, + n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, NULL, NULL); +} + + +/** + * Disconnect from the given neighbour. + * + * @param cls unused + * @param key hash of neighbour's public key (not used) + * @param value the 'struct NeighbourMapEntry' of the neighbour + */ +static int +disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct NeighbourMapEntry *n = value; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n", + GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); +#endif + if (is_connected(n)) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# peers disconnected due to global disconnect"), 1, + GNUNET_NO); + disconnect_neighbour (n); + return GNUNET_OK; +} + + +/** + * Cleanup the neighbours subsystem. + */ +void +GST_neighbours_stop () +{ + GNUNET_assert (neighbours != NULL); + + GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (neighbours); + GNUNET_assert (neighbours_connected == 0); + neighbours = NULL; + callback_cls = NULL; + connect_notify_cb = NULL; + disconnect_notify_cb = NULL; +} + + +/** + * We tried to send a SESSION_CONNECT message to another peer. If this + * succeeded, we change the state. If it failed, we should tell + * ATS to not use this address anymore (until it is re-validated). + * + * @param cls the 'struct NeighbourMapEntry' + * @param success GNUNET_OK on success + */ +static void +send_connect_continuation (void *cls, + const struct GNUNET_PeerIdentity * target, + int success) + +{ + struct NeighbourMapEntry *n = cls; + + GNUNET_assert (n != NULL); + GNUNET_assert (!is_connected(n)); + + if (is_disconnecting(n)) + return; /* neighbour is going away */ + if (GNUNET_YES != success) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); +#endif + + GNUNET_ATS_address_destroyed (GST_ats, + &n->id, + n->plugin_name, + n->addr, + n->addrlen, + NULL); + + GNUNET_ATS_suggest_address(GST_ats, &n->id); + return; + } + change_state(n, S_CONNECT_SENT); +} + + +/** + * We tried to switch addresses with an peer already connected. If it failed, + * we should tell ATS to not use this address anymore (until it is re-validated). + * + * @param cls the 'struct NeighbourMapEntry' + * @param success GNUNET_OK on success + */ +static void +send_switch_address_continuation (void *cls, + const struct GNUNET_PeerIdentity * target, + int success) + +{ + struct NeighbourMapEntry *n = cls; + + GNUNET_assert (n != NULL); + if (is_disconnecting(n)) + return; /* neighbour is going away */ + + GNUNET_assert (n->state == S_CONNECTED); + if (GNUNET_YES != success) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); +#endif + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Do not forget to fix this!\n"); + /* FIXME: We have to change the state away from connected: + * If ATS can not suggest another address we do not get a callback + * but we still think we are connected + */ + //change_state(n, S_NOT_CONNECTED); + + GNUNET_ATS_address_destroyed (GST_ats, + &n->id, + n->plugin_name, + n->addr, + n->addrlen, + NULL); + + GNUNET_ATS_suggest_address(GST_ats, &n->id); + return; + } +} + +/** + * We tried to send a SESSION_CONNECT message to another peer. If this + * succeeded, we change the state. If it failed, we should tell + * ATS to not use this address anymore (until it is re-validated). + * + * @param cls the 'struct NeighbourMapEntry' + * @param success GNUNET_OK on success + */ +static void +send_connect_ack_continuation (void *cls, + const struct GNUNET_PeerIdentity * target, + int success) + +{ + struct NeighbourMapEntry *n = cls; + + GNUNET_assert (n != NULL); + + if (GNUNET_YES == success) + return; /* sending successful */ + + /* sending failed, ask for next address */ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); +#endif + change_state(n, S_NOT_CONNECTED); + + GNUNET_ATS_address_destroyed (GST_ats, + &n->id, + n->plugin_name, + n->addr, + n->addrlen, + NULL); + + GNUNET_ATS_suggest_address(GST_ats, &n->id); +} + +/** + * For an existing neighbour record, set the active connection to + * the given address. + * + * @param peer identity of the peer to switch the address for + * @param plugin_name name of transport that delivered the PONG + * @param address address of the other peer, NULL if other peer + * connected to us + * @param address_len number of bytes in address + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats + * @return GNUNET_YES if we are currently connected, GNUNET_NO if the + * connection is not up (yet) + */ +int +GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, const void *address, + size_t address_len, struct Session *session, + const struct GNUNET_ATS_Information + *ats, uint32_t ats_count, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out) +{ + struct NeighbourMapEntry *n; + struct SessionConnectMessage connect_msg; + size_t msg_len; + size_t ret; + + GNUNET_assert (neighbours != NULL); + n = lookup_neighbour (peer); + if (NULL == n) + { + if (NULL == session) + GNUNET_ATS_address_destroyed (GST_ats, + peer, + plugin_name, address, + address_len, NULL); + return GNUNET_NO; + } + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n", + plugin_name, + (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, + address, + address_len), + session, (is_connected(n) ? "CONNECTED" : "NOT CONNECTED"), + GNUNET_i2s (peer)); +#endif + + GNUNET_free_non_null (n->addr); + n->addr = GNUNET_malloc (address_len); + memcpy (n->addr, address, address_len); + n->bandwidth_in = bandwidth_in; + n->bandwidth_out = bandwidth_out; + n->addrlen = address_len; + n->session = session; + GNUNET_free_non_null (n->plugin_name); + n->plugin_name = GNUNET_strdup (plugin_name); + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + + if (n->state == S_DISCONNECT) + { + /* We are disconnecting, nothing to do here */ + return GNUNET_NO; + } + /* We are not connected/connecting and initiate a fresh connect */ + if (n->state == S_NOT_CONNECTED) + { + msg_len = sizeof (struct SessionConnectMessage); + connect_msg.header.size = htons (msg_len); + connect_msg.header.type = + htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + + ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_continuation, n); + if (ret == GNUNET_SYSERR) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", + GNUNET_i2s (peer), plugin_name, + (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, + address, + address_len), + session); + } + return GNUNET_NO; + } + /* We received a CONNECT message and asked ATS for an address */ + else if (n->state == S_CONNECT_RECV) + { + msg_len = sizeof (struct SessionConnectMessage); + connect_msg.header.size = htons (msg_len); + connect_msg.header.type = + htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + + ret = send_with_plugin(&n->id, (const void *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_ack_continuation, n); + if (ret == GNUNET_SYSERR) + { + change_state (n, S_NOT_CONNECTED); + GNUNET_break (0); + } + return GNUNET_NO; + } + /* connected peer is switching addresses */ + else if (n->state == S_CONNECTED) + { + msg_len = sizeof (struct SessionConnectMessage); + connect_msg.header.size = htons (msg_len); + connect_msg.header.type = + htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + + ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_switch_address_continuation, n); + if (ret == GNUNET_SYSERR) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", + GNUNET_i2s (peer), plugin_name, + (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, + address, + address_len), + session); + } + return GNUNET_NO; + } + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Invalid connection state to switch addresses %u ", n->state); + GNUNET_break_op (0); + return GNUNET_NO; +} + + +/** + * Create an entry in the neighbour map for the given peer + * + * @param peer peer to create an entry for + * @return new neighbour map entry + */ +static struct NeighbourMapEntry * +setup_neighbour (const struct GNUNET_PeerIdentity *peer) +{ + struct NeighbourMapEntry *n; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Unknown peer `%s', creating new neighbour\n", + GNUNET_i2s (peer)); +#endif + n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); + n->id = *peer; + n->state = S_NOT_CONNECTED; + GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + MAX_BANDWIDTH_CARRY_S); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (neighbours, + &n->id.hashPubKey, n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return n; +} + + +/** + * Try to create a connection to the given target (eventually). + * + * @param target peer to try to connect to + */ +void +GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n", + GNUNET_i2s (target)); +#endif + GNUNET_assert (0 != + memcmp (target, &GST_my_identity, + sizeof (struct GNUNET_PeerIdentity))); + n = lookup_neighbour (target); + + if (NULL != n) + { + if ((is_connected(n)) || (is_connecting(n))) + return; /* already connecting or connected */ + if (is_disconnecting(n)) + change_state (n, S_NOT_CONNECTED); + } + + + if (n == NULL) + n = setup_neighbour (target); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking ATS for suggested address to connect to peer `%s'\n", + GNUNET_i2s (&n->id)); +#endif + GNUNET_ATS_suggest_address (GST_ats, &n->id); +} + + +/** + * Test if we're connected to the given peer. + * + * @param target peer to test + * @return GNUNET_YES if we are connected, GNUNET_NO if not + */ +int +GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (target); + + if ((NULL == n) || (!is_connected(n))) + return GNUNET_NO; /* not connected */ + return GNUNET_YES; +} + + +/** + * A session was terminated. Take note. + * + * @param peer identity of the peer where the session died + * @param session session that is gone + */ +void +GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, + struct Session *session) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Session %X to peer `%s' ended \n", + session, GNUNET_i2s (peer)); +#endif + + n = lookup_neighbour (peer); + if (NULL == n) + return; + if (session != n->session) + return; /* doesn't affect us */ + + n->session = NULL; + GNUNET_free (n->addr); + n->addr = NULL; + n->addrlen = 0; + + /* not connected anymore anyway, shouldn't matter */ + if ((!is_connected(n)) && (!is_connecting(n))) + return; + + // FIXME: switch address what is the state + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Do not forget to fix this!\n"); + + /* We are connected, so ask ATS to switch addresses */ + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT, + &neighbour_timeout_task, n); + /* try QUICKLY to re-establish a connection, reduce timeout! */ + GNUNET_ATS_suggest_address (GST_ats, peer); +} + + +/** + * Transmit a message to the given target using the active connection. + * + * @param target destination + * @param msg message to send + * @param msg_size number of bytes in msg + * @param timeout when to fail with timeout + * @param cont function to call when done + * @param cont_cls closure for 'cont' + */ +void +GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, + size_t msg_size, struct GNUNET_TIME_Relative timeout, + GST_NeighbourSendContinuation cont, void *cont_cls) +{ + struct NeighbourMapEntry *n; + struct MessageQueue *mq; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (target); + if ((n == NULL) || (!is_connected(n))) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages not sent (no such peer or not connected)"), + 1, GNUNET_NO); +#if DEBUG_TRANSPORT + if (n == NULL) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not send message to peer `%s': unknown neighbour", + GNUNET_i2s (target)); + else if (!is_connected(n)) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not send message to peer `%s': not connected\n", + GNUNET_i2s (target)); +#endif + if (NULL != cont) + cont (cont_cls, GNUNET_SYSERR); + return; + } + + if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0)) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages not sent (no such peer or not connected)"), + 1, GNUNET_NO); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not send message to peer `%s': no address available\n", + GNUNET_i2s (target)); +#endif + + if (NULL != cont) + cont (cont_cls, GNUNET_SYSERR); + return; + } + + GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# bytes in message queue for other peers"), + msg_size, GNUNET_NO); + mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); + mq->cont = cont; + mq->cont_cls = cont_cls; + /* FIXME: this memcpy can be up to 7% of our total runtime! */ + memcpy (&mq[1], msg, msg_size); + mq->message_buf = (const char *) &mq[1]; + mq->message_buf_size = msg_size; + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); + + if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && + (NULL == n->is_active)) + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); +} + + +/** + * We have received a message from the given sender. How long should + * we delay before receiving more? (Also used to keep the peer marked + * as live). + * + * @param sender sender of the message + * @param size size of the message + * @param do_forward set to GNUNET_YES if the message should be forwarded to clients + * GNUNET_NO if the neighbour is not connected or violates the quota, + * GNUNET_SYSERR if the connection is not fully up yet + * @return how long to wait before reading more from this sender + */ +struct GNUNET_TIME_Relative +GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity + *sender, ssize_t size, int *do_forward) +{ + struct NeighbourMapEntry *n; + struct GNUNET_TIME_Relative ret; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (sender); + if (n == NULL) + { + GST_neighbours_try_connect (sender); + n = lookup_neighbour (sender); + if (NULL == n) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages discarded due to lack of neighbour record"), + 1, GNUNET_NO); + *do_forward = GNUNET_NO; + return GNUNET_TIME_UNIT_ZERO; + } + } + if (!is_connected(n)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Plugin gave us %d bytes of data but somehow the session is not marked as UP yet!\n"), + (int) size); + *do_forward = GNUNET_SYSERR; + return GNUNET_TIME_UNIT_ZERO; + } + if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size)) + { + n->quota_violation_count++; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth quota (%u b/s) violation detected (total of %u).\n", + n->in_tracker.available_bytes_per_s__, + n->quota_violation_count); +#endif + /* Discount 32k per violation */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); + } + else + { + if (n->quota_violation_count > 0) + { + /* try to add 32k back */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); + n->quota_violation_count--; + } + } + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# bandwidth quota violations by other peers"), + 1, GNUNET_NO); + *do_forward = GNUNET_NO; + return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; + } + *do_forward = GNUNET_YES; + ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); + if (ret.rel_value > 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n", + (unsigned long long) n->in_tracker. + consumption_since_last_update__, + (unsigned int) n->in_tracker.available_bytes_per_s__, + (unsigned long long) ret.rel_value); +#endif + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# ms throttling suggested"), + (int64_t) ret.rel_value, GNUNET_NO); + } + return ret; +} + + +/** + * Keep the connection to the given neighbour alive longer, + * we received a KEEPALIVE (or equivalent). + * + * @param neighbour neighbour to keep alive + */ +void +GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (neighbour); + if (NULL == n) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# KEEPALIVE messages discarded (not connected)"), + 1, GNUNET_NO); + return; + } + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); +} + + +/** + * Change the incoming quota for the given peer. + * + * @param neighbour identity of peer to change qutoa for + * @param quota new quota + */ +void +GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, + struct GNUNET_BANDWIDTH_Value32NBO quota) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (neighbour); + if (n == NULL) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# SET QUOTA messages ignored (no such peer)"), + 1, GNUNET_NO); + return; + } + GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); + if (0 != ntohl (quota.value__)) + return; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n", + GNUNET_i2s (&n->id), "SET_QUOTA"); +#endif + if (is_connected(n)) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# disconnects due to quota of 0"), 1, + GNUNET_NO); + disconnect_neighbour (n); +} + + +/** + * Closure for the neighbours_iterate function. + */ +struct IteratorContext +{ + /** + * Function to call on each connected neighbour. + */ + GST_NeighbourIterator cb; + + /** + * Closure for 'cb'. + */ + void *cb_cls; +}; + + +/** + * Call the callback from the closure for each connected neighbour. + * + * @param cls the 'struct IteratorContext' + * @param key the hash of the public key of the neighbour + * @param value the 'struct NeighbourMapEntry' + * @return GNUNET_OK (continue to iterate) + */ +static int +neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct IteratorContext *ic = cls; + struct NeighbourMapEntry *n = value; + + if (is_connected(n)) + return GNUNET_OK; + + ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen); + return GNUNET_OK; +} + + +/** + * Iterate over all connected neighbours. + * + * @param cb function to call + * @param cb_cls closure for cb + */ +void +GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) +{ + struct IteratorContext ic; + + GNUNET_assert (neighbours != NULL); + + ic.cb = cb; + ic.cb_cls = cb_cls; + GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic); +} + +/** + * If we have an active connection to the given target, it must be shutdown. + * + * @param target peer to disconnect from + */ +void +GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) +{ + struct NeighbourMapEntry *n; + + GNUNET_assert (neighbours != NULL); + + n = lookup_neighbour (target); + if (NULL == n) + return; /* not active */ + if (is_connected(n)) + { + send_disconnect(n); + + n = lookup_neighbour (target); + if (NULL == n) + return; /* gone already */ + } + disconnect_neighbour (n); +} + + +/** + * We received a disconnect message from the given peer, + * validate and process. + * + * @param peer sender of the message + * @param msg the disconnect message + */ +void +GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) +{ + struct NeighbourMapEntry *n; + const struct SessionDisconnectMessage *sdm; + GNUNET_HashCode hc; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received DISCONNECT message from peer `%s'\n", GNUNET_i2s (peer)); +#endif + + if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) + { + // GNUNET_break_op (0); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# disconnect messages ignored (old format)"), 1, + GNUNET_NO); + return; + } + sdm = (const struct SessionDisconnectMessage* ) msg; + n = lookup_neighbour (peer); + if (NULL == n) + return; /* gone already */ + if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= + n->connect_ts.abs_value) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# disconnect messages ignored (timestamp)"), 1, + GNUNET_NO); + return; + } + GNUNET_CRYPTO_hash (&sdm->public_key, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &hc); + if (0 != memcmp (peer, + &hc, + sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break_op (0); + return; + } + if (ntohl (sdm->purpose.size) != + sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + + sizeof (struct GNUNET_TIME_AbsoluteNBO)) + { + GNUNET_break_op (0); + return; + } + if (GNUNET_OK != + GNUNET_CRYPTO_rsa_verify (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, + &sdm->purpose, + &sdm->signature, + &sdm->public_key)) + { + GNUNET_break_op (0); + return; + } + GST_neighbours_force_disconnect (peer); +} + +/** + * We received a 'SESSION_CONNECT_ACK' message from the other peer. + * Consider switching to it. + * + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param plugin_name name of transport that delivered the PONG + * @param address address of the other peer, NULL if other peer + * connected to us + * @param address_len number of bytes in address + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats + */ +void +GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) +{ + const struct SessionConnectMessage *scm; + struct QuotaSetMessage q_msg; + struct GNUNET_MessageHeader msg; + struct GNUNET_TIME_Absolute ts; + struct NeighbourMapEntry *n; + size_t msg_len; + size_t ret; + +#if DEBUG_TRANSPORT +#endif + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Received CONNECT_ACK message from peer `%s'\n", GNUNET_i2s (peer)); + + + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) + { + GNUNET_break_op (0); + return; + } + + scm = (const struct SessionConnectMessage *) message; + GNUNET_break_op (ntohl (scm->reserved) == 0); + ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); + n = lookup_neighbour (peer); + if (NULL == n) + n = setup_neighbour (peer); +/* + if (n->state != S_CONNECT_SENT) + { + GNUNET_break (0); + send_disconnect(n); + return; + } +*/ + if (NULL != session) + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "transport-ats", + "Giving ATS session %p of plugin %s for peer %s\n", + session, + plugin_name, + GNUNET_i2s (peer)); + GNUNET_ATS_address_update (GST_ats, + peer, + plugin_name, sender_address, sender_address_len, + session, ats, ats_count); + + change_state (n, S_CONNECTED); + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Setting inbound quota of %u for peer `%s' to \n", + ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id)); +#endif + GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in); + + n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, + &neighbour_keepalive_task, + n); + /* send ACK (ACK)*/ + msg_len = sizeof (msg); + msg.size = htons (msg_len); + msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); + + ret = send_with_plugin (&n->id, (const char *) &msg, msg_len, 0, + GNUNET_TIME_UNIT_FOREVER_REL, + n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, NULL, NULL); + + if (ret == GNUNET_SYSERR) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); + + neighbours_connected++; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending outbound quota of %u Bps for peer `%s' to all clients\n", + ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer)); +#endif + q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); + q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); + q_msg.quota = n->bandwidth_out; + q_msg.peer = (*peer); + GST_clients_broadcast (&q_msg.header, GNUNET_NO); +} + +void +GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) +{ + struct NeighbourMapEntry *n; + struct QuotaSetMessage q_msg; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received ACK message from peer `%s'\n", GNUNET_i2s (peer)); +#endif + + if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return; + } + + n = lookup_neighbour (peer); + if (NULL == n) + { + send_disconnect(n); + GNUNET_break (0); + } +// FIXME check this +// if (n->state != S_CONNECT_RECV) + if (is_connecting(n)) + { + send_disconnect (n); + change_state (n, S_DISCONNECT); + GNUNET_break (0); + return; + } + + if (is_connected(n)) + return; + + if (NULL != session) + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "transport-ats", + "Giving ATS session %p of plugin %s for peer %s\n", + session, + plugin_name, + GNUNET_i2s (peer)); + GNUNET_ATS_address_update (GST_ats, + peer, + plugin_name, sender_address, sender_address_len, + session, ats, ats_count); + + change_state (n, S_CONNECTED); + + GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in); + + n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, + &neighbour_keepalive_task, + n); + + neighbours_connected++; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending outbound quota of %u Bps for peer `%s' to all clients\n", + ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer)); +#endif + + q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); + q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); + q_msg.quota = n->bandwidth_out; + q_msg.peer = (*peer); + GST_clients_broadcast (&q_msg.header, GNUNET_NO); +} + +struct BlackListCheckContext +{ + struct GNUNET_ATS_Information *ats; + + uint32_t ats_count; + + struct Session *session; + + char *sender_address; + + uint16_t sender_address_len; + + char *plugin_name; + + struct GNUNET_TIME_Absolute ts; +}; + + +static void +handle_connect_blacklist_cont (void *cls, + const struct GNUNET_PeerIdentity + * peer, int result) +{ + struct NeighbourMapEntry *n; + struct BlackListCheckContext * bcc = cls; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Blacklist check due to CONNECT message: `%s'\n", GNUNET_i2s (peer), (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN"); +#endif + + /* not allowed */ + if (GNUNET_OK != result) + { + GNUNET_free (bcc); + return; + } + + n = lookup_neighbour (peer); + if (NULL == n) + n = setup_neighbour (peer); + + if (bcc->ts.abs_value > n->connect_ts.abs_value) + { + if (NULL != bcc->session) + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "transport-ats", + "Giving ATS session %p of plugin %s address `%s' for peer %s\n", + bcc->session, + bcc->plugin_name, + GST_plugins_a2s (bcc->plugin_name, bcc->sender_address, bcc->sender_address_len), + GNUNET_i2s (peer)); + GNUNET_ATS_address_update (GST_ats, + peer, + bcc->plugin_name, bcc->sender_address, bcc->sender_address_len, + bcc->session, bcc->ats, bcc->ats_count); + n->connect_ts = bcc->ts; + } + + GNUNET_free (bcc); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Blacklist check due to CONNECT message: `%s'\n"); +/* + if (n->state != S_NOT_CONNECTED) + return;*/ + change_state (n, S_CONNECT_RECV); + + /* Ask ATS for an address to connect via that address */ + GNUNET_ATS_suggest_address(GST_ats, peer); +} + +/** + * We received a 'SESSION_CONNECT' message from the other peer. + * Consider switching to it. + * + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param plugin_name name of transport that delivered the PONG + * @param address address of the other peer, NULL if other peer + * connected to us + * @param address_len number of bytes in address + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats (excluding 0-termination) + */ +void +GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) +{ + const struct SessionConnectMessage *scm; + struct NeighbourMapEntry * n; + struct BlackListCheckContext * bcc = NULL; + +#if DEBUG_TRANSPORT +#endif + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer)); + + + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) + { + GNUNET_break_op (0); + return; + } + + scm = (const struct SessionConnectMessage *) message; + GNUNET_break_op (ntohl (scm->reserved) == 0); + + n = lookup_neighbour(peer); + if (n != NULL) + { + /* connected peer switches addresses */ + if (is_connected(n)) + { + GNUNET_ATS_address_update(GST_ats, peer, plugin_name, sender_address, sender_address_len, session, ats, ats_count); + return; + } + } + + /* we are not connected to this peer */ + /* do blacklist check*/ + bcc = GNUNET_malloc (sizeof (struct BlackListCheckContext) + + sizeof (struct GNUNET_ATS_Information) * ats_count + + sender_address_len + + strlen (plugin_name)+1); + + bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); + + bcc->ats_count = ats_count; + bcc->sender_address_len = sender_address_len; + bcc->session = session; + + bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; + memcpy (bcc->ats, ats,sizeof (struct GNUNET_ATS_Information) * ats_count ); + + bcc->sender_address = (char *) &bcc->ats[ats_count]; + memcpy (bcc->sender_address, sender_address , sender_address_len); + + bcc->plugin_name = &bcc->sender_address[sender_address_len]; + strcpy (bcc->plugin_name, plugin_name); + + GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont, bcc); +} + + +/* end of file gnunet-service-transport_neighbours.c */ diff --git a/src/transport/gnunet-service-transport_neighbours_3way.h b/src/transport/gnunet-service-transport_neighbours_3way.h new file mode 100644 index 000000000..88790449e --- /dev/null +++ b/src/transport/gnunet-service-transport_neighbours_3way.h @@ -0,0 +1,272 @@ +/* + This file is part of GNUnet. + (C) 2010,2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file transport/gnunet-service-transport_neighbours.h + * @brief neighbour management API + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_TRANSPORT_NEIGHBOURS_H +#define GNUNET_SERVICE_TRANSPORT_NEIGHBOURS_H + +#include "gnunet_statistics_service.h" +#include "gnunet_transport_service.h" +#include "gnunet_transport_plugin.h" +#include "gnunet_util_lib.h" + +// TODO: +// - ATS and similar info is a bit lacking in the API right now... + + + +/** + * Initialize the neighbours subsystem. + * + * @param cls closure for callbacks + * @param connect_cb function to call if we connect to a peer + * @param disconnect_cb function to call if we disconnect from a peer + */ +void +GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb, + GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb); + + +/** + * Cleanup the neighbours subsystem. + */ +void +GST_neighbours_stop (void); + + +/** + * Try to create a connection to the given target (eventually). + * + * @param target peer to try to connect to + */ +void +GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target); + + +/** + * Test if we're connected to the given peer. + * + * @param target peer to test + * @return GNUNET_YES if we are connected, GNUNET_NO if not + */ +int +GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target); + + +/** + * Function called after the transmission is done. + * + * @param cls closure + * @param success GNUNET_OK on success, GNUNET_NO on failure, GNUNET_SYSERR if we're not connected + */ +typedef void (*GST_NeighbourSendContinuation) (void *cls, int success); + + +/** + * Transmit a message to the given target using the active connection. + * + * @param target destination + * @param msg message to send + * @param msg_size number of bytes in msg + * @param timeout when to fail with timeout + * @param cont function to call when done + * @param cont_cls closure for 'cont' + */ +void +GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, + size_t msg_size, struct GNUNET_TIME_Relative timeout, + GST_NeighbourSendContinuation cont, void *cont_cls); + + +/** + * We have received a message from the given sender. + * How long should we delay before receiving more? + * (Also used to keep the peer marked as live). + * + * @param sender sender of the message + * @param size size of the message + * @param do_forward set to GNUNET_YES if the message should be forwarded to clients + * GNUNET_NO if the neighbour is not connected or violates the quota + * @return how long to wait before reading more from this sender + */ +struct GNUNET_TIME_Relative +GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity + *sender, ssize_t size, int *do_forward); + + +/** + * Keep the connection to the given neighbour alive longer, + * we received a KEEPALIVE (or equivalent). + * + * @param neighbour neighbour to keep alive + */ +void +GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour); + + +/** + * Change the incoming quota for the given peer. + * + * @param neighbour identity of peer to change qutoa for + * @param quota new quota + */ +void +GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, + struct GNUNET_BANDWIDTH_Value32NBO quota); + + +/** + * If we have an active connection to the given target, it must be shutdown. + * + * @param target peer to disconnect from + */ +void +GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target); + + +/** + * Function called for each connected neighbour. + * + * @param cls closure + * @param neighbour identity of the neighbour + * @param ats performance data + * @param ats_count number of entries in ats (including 0-termination) + * @param transport plugin + * @param addr address + * @param addrlen address length + */ +typedef void (*GST_NeighbourIterator) (void *cls, + const struct GNUNET_PeerIdentity * + neighbour, + const struct + GNUNET_ATS_Information * ats, + uint32_t ats_count, + const char * transport, + const void * addr, + size_t addrlen); + + +/** + * Iterate over all connected neighbours. + * + * @param cb function to call + * @param cb_cls closure for cb + */ +void +GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls); + + +/** + * A session was terminated. Take note. + * + * @param peer identity of the peer where the session died + * @param session session that is gone + */ +void +GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, + struct Session *session); + + +/** + * For an existing neighbour record, set the active connection to + * use the given address. + * + * @param peer identity of the peer to switch the address for + * @param plugin_name name of transport that delivered the PONG + * @param address address of the other peer, NULL if other peer + * connected to us + * @param address_len number of bytes in address + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats + * @param bandwidth_in inbound quota to be used when connection is up + * @param bandwidth_out outbound quota to be used when connection is up + * @return GNUNET_YES if we are currently connected, GNUNET_NO if the + * connection is not up (yet) + */ +int +GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, const void *address, + size_t address_len, struct Session *session, + const struct GNUNET_ATS_Information + *ats, uint32_t ats_count, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out); + + +/** + * We received a 'SESSION_CONNECT' message from the other peer. + * Consider switching to it. + * + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param plugin_name name of transport that delivered the PONG + * @param address address of the other peer, NULL if other peer + * connected to us + * @param address_len number of bytes in address + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats (excluding 0-termination) + */ +void +GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count); + +void +GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count); + +void +GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *plugin_name, + const char *sender_address, uint16_t sender_address_len, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count); + +/** + * We received a disconnect message from the given peer, + * validate and process. + * + * @param peer sender of the message + * @param msg the disconnect message + */ +void +GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg); + + +#endif +/* end of file gnunet-service-transport_neighbours.h */ diff --git a/src/transport/gnunet-service-transport_neighbours_fsm.c b/src/transport/gnunet-service-transport_neighbours_fsm.c deleted file mode 100644 index fd3ad37f9..000000000 --- a/src/transport/gnunet-service-transport_neighbours_fsm.c +++ /dev/null @@ -1,2071 +0,0 @@ -/* - This file is part of GNUnet. - (C) 2010,2011 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ - -/** - * @file transport/gnunet-service-transport_neighbours.c - * @brief neighbour management - * @author Christian Grothoff - */ -#include "platform.h" -#include "gnunet_ats_service.h" -#include "gnunet-service-transport_neighbours.h" -#include "gnunet-service-transport_plugins.h" -#include "gnunet-service-transport_validation.h" -#include "gnunet-service-transport_clients.h" -#include "gnunet-service-transport.h" -#include "gnunet_peerinfo_service.h" -#include "gnunet-service-transport_blacklist.h" -#include "gnunet_constants.h" -#include "transport.h" - - -/** - * Size of the neighbour hash map. - */ -#define NEIGHBOUR_TABLE_SIZE 256 - -/** - * How often must a peer violate bandwidth quotas before we start - * to simply drop its messages? - */ -#define QUOTA_VIOLATION_DROP_THRESHOLD 10 - -/** - * How often do we send KEEPALIVE messages to each of our neighbours? - * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we - * send 3 keepalives in each interval, so 3 messages would need to be - * lost in a row for a disconnect). - */ -#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90) - - -/** - * Entry in neighbours. - */ -struct NeighbourMapEntry; - -/** - * Message a peer sends to another to indicate its - * preference for communicating via a particular - * session (and the desire to establish a real - * connection). - */ -struct SessionConnectMessage -{ - /** - * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' - */ - struct GNUNET_MessageHeader header; - - /** - * Always zero. - */ - uint32_t reserved GNUNET_PACKED; - - /** - * Absolute time at the sender. Only the most recent connect - * message implies which session is preferred by the sender. - */ - struct GNUNET_TIME_AbsoluteNBO timestamp; - -}; - - -struct SessionDisconnectMessage -{ - /** - * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT' - */ - struct GNUNET_MessageHeader header; - - /** - * Always zero. - */ - uint32_t reserved GNUNET_PACKED; - - /** - * Purpose of the signature. Extends over the timestamp. - * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT. - */ - struct GNUNET_CRYPTO_RsaSignaturePurpose purpose; - - /** - * Absolute time at the sender. Only the most recent connect - * message implies which session is preferred by the sender. - */ - struct GNUNET_TIME_AbsoluteNBO timestamp; - - /** - * Public key of the sender. - */ - struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key; - - /** - * Signature of the peer that sends us the disconnect. Only - * valid if the timestamp is AFTER the timestamp from the - * corresponding 'CONNECT' message. - */ - struct GNUNET_CRYPTO_RsaSignature signature; - -}; - - -/** - * For each neighbour we keep a list of messages - * that we still want to transmit to the neighbour. - */ -struct MessageQueue -{ - - /** - * This is a doubly linked list. - */ - struct MessageQueue *next; - - /** - * This is a doubly linked list. - */ - struct MessageQueue *prev; - - /** - * Once this message is actively being transmitted, which - * neighbour is it associated with? - */ - struct NeighbourMapEntry *n; - - /** - * Function to call once we're done. - */ - GST_NeighbourSendContinuation cont; - - /** - * Closure for 'cont' - */ - void *cont_cls; - - /** - * The message(s) we want to transmit, GNUNET_MessageHeader(s) - * stuck together in memory. Allocated at the end of this struct. - */ - const char *message_buf; - - /** - * Size of the message buf - */ - size_t message_buf_size; - - /** - * At what time should we fail? - */ - struct GNUNET_TIME_Absolute timeout; - -}; - -enum State -{ - /* fresh peer or completely disconnected */ - S_NOT_CONNECTED = 0, - /* sent CONNECT message to other peer, waiting for CONNECT_ACK */ - S_CONNECT_SENT = 1, - /* received CONNECT message to other peer, sending CONNECT_ACK */ - S_CONNECT_RECV = 4, - /* sent CONNECT_ACK message to other peer, wait for ACK or payload */ - S_CONNECT_RECV_ACK_SENT = 8, - /* received ACK or payload */ - S_CONNECTED = 16, - /* Disconnect in progress */ - S_DISCONNECT = 32 -}; - -/** - * Entry in neighbours. - */ -struct NeighbourMapEntry -{ - - /** - * Head of list of messages we would like to send to this peer; - * must contain at most one message per client. - */ - struct MessageQueue *messages_head; - - /** - * Tail of list of messages we would like to send to this peer; must - * contain at most one message per client. - */ - struct MessageQueue *messages_tail; - - /** - * Performance data for the peer. - */ - //struct GNUNET_ATS_Information *ats; - - /** - * Are we currently trying to send a message? If so, which one? - */ - struct MessageQueue *is_active; - - /** - * Active session for communicating with the peer. - */ - struct Session *session; - - /** - * Name of the plugin we currently use. - */ - char *plugin_name; - - /** - * Address used for communicating with the peer, NULL for inbound connections. - */ - void *addr; - - /** - * Number of bytes in 'addr'. - */ - size_t addrlen; - - /** - * Identity of this neighbour. - */ - struct GNUNET_PeerIdentity id; - - /** - * ID of task scheduled to run when this peer is about to - * time out (will free resources associated with the peer). - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** - * ID of task scheduled to send keepalives. - */ - GNUNET_SCHEDULER_TaskIdentifier keepalive_task; - - /** - * ID of task scheduled to run when we should try transmitting - * the head of the message queue. - */ - GNUNET_SCHEDULER_TaskIdentifier transmission_task; - - /** - * Tracker for inbound bandwidth. - */ - struct GNUNET_BANDWIDTH_Tracker in_tracker; - - /** - * Inbound bandwidth from ATS, activated when connection is up - */ - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; - - /** - * Inbound bandwidth from ATS, activated when connection is up - */ - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; - - /** - * Timestamp of the 'SESSION_CONNECT' message we got from the other peer - */ - struct GNUNET_TIME_Absolute connect_ts; - - /** - * How often has the other peer (recently) violated the inbound - * traffic limit? Incremented by 10 per violation, decremented by 1 - * per non-violation (for each time interval). - */ - unsigned int quota_violation_count; - - /** - * Number of values in 'ats' array. - */ - //unsigned int ats_count; - - - /** - * Do we currently consider this neighbour connected? (as far as - * the connect/disconnect callbacks are concerned)? - */ - //int is_connected; - - int state; - -}; - - -/** - * All known neighbours and their HELLOs. - */ -static struct GNUNET_CONTAINER_MultiHashMap *neighbours; - -/** - * Closure for connect_notify_cb and disconnect_notify_cb - */ -static void *callback_cls; - -/** - * Function to call when we connected to a neighbour. - */ -static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; - -/** - * Function to call when we disconnected from a neighbour. - */ -static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb; - -/** - * counter for connected neighbours - */ -static int neighbours_connected; - -/** - * Lookup a neighbour entry in the neighbours hash map. - * - * @param pid identity of the peer to look up - * @return the entry, NULL if there is no existing record - */ -static struct NeighbourMapEntry * -lookup_neighbour (const struct GNUNET_PeerIdentity *pid) -{ - return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); -} - -#define change_state(n, state, ...) change (n, state, __LINE__) - -static int -is_connecting (struct NeighbourMapEntry * n) -{ - if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED)) - return GNUNET_YES; - return GNUNET_NO; -} - -static int -is_connected (struct NeighbourMapEntry * n) -{ - if (n->state == S_CONNECTED) - return GNUNET_YES; - return GNUNET_NO; -} - -static int -is_disconnecting (struct NeighbourMapEntry * n) -{ - if (n->state == S_DISCONNECT) - return GNUNET_YES; - return GNUNET_NO; -} - -static const char * -print_state (int state) -{ - switch (state) { - case S_CONNECTED: - return "S_CONNECTED"; - break; - case S_CONNECT_RECV: - return "S_CONNECT_RECV"; - break; - case S_CONNECT_RECV_ACK_SENT: - return"S_CONNECT_RECV_ACK_SENT"; - break; - case S_CONNECT_SENT: - return "S_CONNECT_SENT"; - break; - case S_DISCONNECT: - return "S_DISCONNECT"; - break; - case S_NOT_CONNECTED: - return "S_NOT_CONNECTED"; - break; - default: - GNUNET_break (0); - break; - } - return NULL; -} - -static int -change (struct NeighbourMapEntry * n, int state, int line) -{ - char * old = strdup(print_state(n->state)); - char * new = strdup(print_state(state)); - - /* allowed transitions */ - int allowed = GNUNET_NO; - switch (n->state) { - case S_NOT_CONNECTED: - if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) || - (state == S_DISCONNECT)) - { - allowed = GNUNET_YES; - break; - } - break; - case S_CONNECT_RECV: - if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) || - (state == S_CONNECTED)) - { - allowed = GNUNET_YES; - break; - } - break; - case S_CONNECT_SENT: - if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) || - (state == S_DISCONNECT) || /* FIXME SENT -> RECV ISSUE!*/ (state == S_CONNECT_RECV)) - { - allowed = GNUNET_YES; - break; - } - break; - case S_CONNECTED: - if (state == S_DISCONNECT) - { - allowed = GNUNET_YES; - break; - } - break; - case S_DISCONNECT: - /* - if (state == S_NOT_CONNECTED) - { - allowed = GNUNET_YES; - break; - }*/ - break; - default: - GNUNET_break (0); - break; - - } - - if (allowed == GNUNET_NO) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Illegal state transition from `%s' to `%s' in line %u \n", - old, new, line); - GNUNET_break (0); - GNUNET_free (old); - GNUNET_free (new); - return GNUNET_SYSERR; - } - - n->state = state; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", - GNUNET_i2s (&n->id), n, old, new, line); - GNUNET_free (old); - GNUNET_free (new); - return GNUNET_OK; -} - -static ssize_t -send_with_plugin ( const struct GNUNET_PeerIdentity * target, - const char *msgbuf, - size_t msgbuf_size, - uint32_t priority, - struct GNUNET_TIME_Relative timeout, - struct Session * session, - const char * plugin_name, - const void *addr, - size_t addrlen, - int force_address, - GNUNET_TRANSPORT_TransmitContinuation cont, - void *cont_cls) - -{ - struct GNUNET_TRANSPORT_PluginFunctions *papi; - size_t ret = GNUNET_SYSERR; - - papi = GST_plugins_find (plugin_name); - if (papi == NULL) - { - if (cont != NULL) - cont (cont_cls, target, GNUNET_SYSERR); - return GNUNET_SYSERR; - } - - ret = papi->send (papi->cls, - target, - msgbuf, msgbuf_size, - 0, - timeout, - session, - addr, addrlen, - GNUNET_YES, - cont, cont_cls); - - if (ret == -1) - { - if (cont != NULL) - cont (cont_cls, target, GNUNET_SYSERR); - } - return ret; -} - -/** - * Task invoked to start a transmission to another peer. - * - * @param cls the 'struct NeighbourMapEntry' - * @param tc scheduler context - */ -static void -transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -/** - * We're done with our transmission attempt, continue processing. - * - * @param cls the 'struct MessageQueue' of the message - * @param receiver intended receiver - * @param success whether it worked or not - */ -static void -transmit_send_continuation (void *cls, - const struct GNUNET_PeerIdentity *receiver, - int success) -{ - struct MessageQueue *mq; - struct NeighbourMapEntry *n; - - mq = cls; - n = mq->n; - if (NULL != n) - { - GNUNET_assert (n->is_active == mq); - n->is_active = NULL; - if (success == GNUNET_YES) - { - GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); - } - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n", - ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type), - (success == GNUNET_OK) ? "successful" : "FAILED"); -#endif - if (NULL != mq->cont) - mq->cont (mq->cont_cls, success); - GNUNET_free (mq); -} - - -/** - * Check the ready list for the given neighbour and if a plugin is - * ready for transmission (and if we have a message), do so! - * - * @param n target peer for which to transmit - */ -static void -try_transmission_to_peer (struct NeighbourMapEntry *n) -{ - struct MessageQueue *mq; - struct GNUNET_TIME_Relative timeout; - ssize_t ret; - - if (n->is_active != NULL) - { - GNUNET_break (0); - return; /* transmission already pending */ - } - if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_break (0); - return; /* currently waiting for bandwidth */ - } - while (NULL != (mq = n->messages_head)) - { - timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); - if (timeout.rel_value > 0) - break; - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - n->is_active = mq; - mq->n = n; - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ - } - if (NULL == mq) - return; /* no more messages */ - - if (GST_plugins_find (n->plugin_name) == NULL) - { - GNUNET_break (0); - return; - } - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - n->is_active = mq; - mq->n = n; - - if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", - GNUNET_i2s (&n->id)); - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); - GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); - return; - } - - ret = send_with_plugin (&n->id, - mq->message_buf, mq->message_buf_size, 0, - timeout, - n->session, n->plugin_name, n->addr, n->addrlen, - GNUNET_YES, - &transmit_send_continuation, mq); - if (ret == -1) - { - /* failure, but 'send' would not call continuation in this case, - * so we need to do it here! */ - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); - } - -} - - -/** - * Task invoked to start a transmission to another peer. - * - * @param cls the 'struct NeighbourMapEntry' - * @param tc scheduler context - */ -static void -transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourMapEntry *n = cls; - GNUNET_assert (NULL != lookup_neighbour(&n->id)); - n->transmission_task = GNUNET_SCHEDULER_NO_TASK; - try_transmission_to_peer (n); -} - - -/** - * Initialize the neighbours subsystem. - * - * @param cls closure for callbacks - * @param connect_cb function to call if we connect to a peer - * @param disconnect_cb function to call if we disconnect from a peer - */ -void -GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb, - GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb) -{ - callback_cls = cls; - connect_notify_cb = connect_cb; - disconnect_notify_cb = disconnect_cb; - neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); -} - -/* -static void -send_disconnect_cont (void *cls, - const struct GNUNET_PeerIdentity * target, - int result) -{ - struct NeighbourMapEntry *n = cls; - -}*/ - -static int -send_disconnect (struct NeighbourMapEntry *n) -{ - size_t ret; - struct SessionDisconnectMessage disconnect_msg; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending DISCONNECT message to peer `%4s'\n", - GNUNET_i2s (&n->id)); -#endif - - disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage)); - disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); - disconnect_msg.reserved = htonl (0); - disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) ); - disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); - disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - disconnect_msg.public_key = GST_my_public_key; - GNUNET_assert (GNUNET_OK == - GNUNET_CRYPTO_rsa_sign (GST_my_private_key, - &disconnect_msg.purpose, - &disconnect_msg.signature)); - - ret = send_with_plugin(&n->id, - (const char *) &disconnect_msg, sizeof (disconnect_msg), - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name, n->addr, n->addrlen, - GNUNET_YES, NULL, NULL); - - if (ret == GNUNET_SYSERR) - return GNUNET_SYSERR; - - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# peers disconnected due to external request"), 1, - GNUNET_NO); - return GNUNET_OK; -} - -/** - * Disconnect from the given neighbour, clean up the record. - * - * @param n neighbour to disconnect from - */ -static void -disconnect_neighbour (struct NeighbourMapEntry *n) -{ - struct MessageQueue *mq; - int was_connected = is_connected(n); - - if (is_disconnecting(n) == GNUNET_YES) - return; - - /* send DISCONNECT MESSAGE */ - if (is_connected(n) || is_connecting(n)) - { - if (GNUNET_OK == send_disconnect(n)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n", - GNUNET_i2s (&n->id)); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not send DISCONNECT_MSG to `%s'\n", - GNUNET_i2s (&n->id)); - } - - - if (is_disconnecting(n)) - return; - change_state (n, S_DISCONNECT); - - while (NULL != (mq = n->messages_head)) - { - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - if (NULL != mq->cont) - mq->cont (mq->cont_cls, GNUNET_SYSERR); - GNUNET_free (mq); - } - if (NULL != n->is_active) - { - n->is_active->n = NULL; - n->is_active = NULL; - } - if (was_connected) - { - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task); - GNUNET_SCHEDULER_cancel (n->keepalive_task); - n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (neighbours_connected > 0); - neighbours_connected--; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1, - GNUNET_NO); - disconnect_notify_cb (callback_cls, &n->id); - } - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (neighbours, - &n->id.hashPubKey, n)); - if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) - { - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } - if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) - { - GNUNET_SCHEDULER_cancel (n->transmission_task); - n->transmission_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != n->plugin_name) - { - GNUNET_free (n->plugin_name); - n->plugin_name = NULL; - } - if (NULL != n->addr) - { - GNUNET_free (n->addr); - n->addr = NULL; - n->addrlen = 0; - } - n->session = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n", - GNUNET_i2s (&n->id), n); - GNUNET_free (n); -} - - -/** - * Peer has been idle for too long. Disconnect. - * - * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle - * @param tc scheduler context - */ -static void -neighbour_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourMapEntry *n = cls; - - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# peers disconnected due to timeout"), 1, - GNUNET_NO); - disconnect_neighbour (n); -} - - -/** - * Send another keepalive message. - * - * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle - * @param tc scheduler context - */ -static void -neighbour_keepalive_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourMapEntry *n = cls; - struct GNUNET_MessageHeader m; - - n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, - &neighbour_keepalive_task, - n); - GNUNET_assert (is_connected(n)); - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# keepalives sent"), 1, - GNUNET_NO); - m.size = htons (sizeof (struct GNUNET_MessageHeader)); - m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); - - send_with_plugin(&n->id, (const void *) &m, - sizeof (m), - UINT32_MAX /* priority */ , - GNUNET_TIME_UNIT_FOREVER_REL, - n->session, n->plugin_name, n->addr, n->addrlen, - GNUNET_YES, NULL, NULL); -} - - -/** - * Disconnect from the given neighbour. - * - * @param cls unused - * @param key hash of neighbour's public key (not used) - * @param value the 'struct NeighbourMapEntry' of the neighbour - */ -static int -disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct NeighbourMapEntry *n = value; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n", - GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); -#endif - if (is_connected(n)) - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# peers disconnected due to global disconnect"), 1, - GNUNET_NO); - disconnect_neighbour (n); - return GNUNET_OK; -} - - -/** - * Cleanup the neighbours subsystem. - */ -void -GST_neighbours_stop () -{ - GNUNET_assert (neighbours != NULL); - - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (neighbours); - GNUNET_assert (neighbours_connected == 0); - neighbours = NULL; - callback_cls = NULL; - connect_notify_cb = NULL; - disconnect_notify_cb = NULL; -} - - -/** - * We tried to send a SESSION_CONNECT message to another peer. If this - * succeeded, we change the state. If it failed, we should tell - * ATS to not use this address anymore (until it is re-validated). - * - * @param cls the 'struct NeighbourMapEntry' - * @param success GNUNET_OK on success - */ -static void -send_connect_continuation (void *cls, - const struct GNUNET_PeerIdentity * target, - int success) - -{ - struct NeighbourMapEntry *n = cls; - - GNUNET_assert (n != NULL); - GNUNET_assert (!is_connected(n)); - - if (is_disconnecting(n)) - return; /* neighbour is going away */ - if (GNUNET_YES != success) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n", - GNUNET_i2s (&n->id), n->plugin_name, - (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, - n->addr, - n->addrlen), - n->session); -#endif - - GNUNET_ATS_address_destroyed (GST_ats, - &n->id, - n->plugin_name, - n->addr, - n->addrlen, - NULL); - - GNUNET_ATS_suggest_address(GST_ats, &n->id); - return; - } - change_state(n, S_CONNECT_SENT); -} - - -/** - * We tried to switch addresses with an peer already connected. If it failed, - * we should tell ATS to not use this address anymore (until it is re-validated). - * - * @param cls the 'struct NeighbourMapEntry' - * @param success GNUNET_OK on success - */ -static void -send_switch_address_continuation (void *cls, - const struct GNUNET_PeerIdentity * target, - int success) - -{ - struct NeighbourMapEntry *n = cls; - - GNUNET_assert (n != NULL); - if (is_disconnecting(n)) - return; /* neighbour is going away */ - - GNUNET_assert (n->state == S_CONNECTED); - if (GNUNET_YES != success) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n", - GNUNET_i2s (&n->id), n->plugin_name, - (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, - n->addr, - n->addrlen), - n->session); -#endif - - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Do not forget to fix this!\n"); - /* FIXME: We have to change the state away from connected: - * If ATS can not suggest another address we do not get a callback - * but we still think we are connected - */ - //change_state(n, S_NOT_CONNECTED); - - GNUNET_ATS_address_destroyed (GST_ats, - &n->id, - n->plugin_name, - n->addr, - n->addrlen, - NULL); - - GNUNET_ATS_suggest_address(GST_ats, &n->id); - return; - } -} - -/** - * We tried to send a SESSION_CONNECT message to another peer. If this - * succeeded, we change the state. If it failed, we should tell - * ATS to not use this address anymore (until it is re-validated). - * - * @param cls the 'struct NeighbourMapEntry' - * @param success GNUNET_OK on success - */ -static void -send_connect_ack_continuation (void *cls, - const struct GNUNET_PeerIdentity * target, - int success) - -{ - struct NeighbourMapEntry *n = cls; - - GNUNET_assert (n != NULL); - - if (GNUNET_YES == success) - return; /* sending successful */ - - /* sending failed, ask for next address */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n", - GNUNET_i2s (&n->id), n->plugin_name, - (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, - n->addr, - n->addrlen), - n->session); -#endif - change_state(n, S_NOT_CONNECTED); - - GNUNET_ATS_address_destroyed (GST_ats, - &n->id, - n->plugin_name, - n->addr, - n->addrlen, - NULL); - - GNUNET_ATS_suggest_address(GST_ats, &n->id); -} - -/** - * For an existing neighbour record, set the active connection to - * the given address. - * - * @param peer identity of the peer to switch the address for - * @param plugin_name name of transport that delivered the PONG - * @param address address of the other peer, NULL if other peer - * connected to us - * @param address_len number of bytes in address - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats - * @return GNUNET_YES if we are currently connected, GNUNET_NO if the - * connection is not up (yet) - */ -int -GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, - const char *plugin_name, const void *address, - size_t address_len, struct Session *session, - const struct GNUNET_ATS_Information - *ats, uint32_t ats_count, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out) -{ - struct NeighbourMapEntry *n; - struct SessionConnectMessage connect_msg; - size_t msg_len; - size_t ret; - - GNUNET_assert (neighbours != NULL); - n = lookup_neighbour (peer); - if (NULL == n) - { - if (NULL == session) - GNUNET_ATS_address_destroyed (GST_ats, - peer, - plugin_name, address, - address_len, NULL); - return GNUNET_NO; - } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n", - plugin_name, - (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, - address, - address_len), - session, (is_connected(n) ? "CONNECTED" : "NOT CONNECTED"), - GNUNET_i2s (peer)); -#endif - - GNUNET_free_non_null (n->addr); - n->addr = GNUNET_malloc (address_len); - memcpy (n->addr, address, address_len); - n->bandwidth_in = bandwidth_in; - n->bandwidth_out = bandwidth_out; - n->addrlen = address_len; - n->session = session; - GNUNET_free_non_null (n->plugin_name); - n->plugin_name = GNUNET_strdup (plugin_name); - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - - if (n->state == S_DISCONNECT) - { - /* We are disconnecting, nothing to do here */ - return GNUNET_NO; - } - /* We are not connected/connecting and initiate a fresh connect */ - if (n->state == S_NOT_CONNECTED) - { - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - - ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_continuation, n); - if (ret == GNUNET_SYSERR) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", - GNUNET_i2s (peer), plugin_name, - (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, - address, - address_len), - session); - } - return GNUNET_NO; - } - /* We received a CONNECT message and asked ATS for an address */ - else if (n->state == S_CONNECT_RECV) - { - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - - ret = send_with_plugin(&n->id, (const void *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_ack_continuation, n); - if (ret == GNUNET_SYSERR) - { - change_state (n, S_NOT_CONNECTED); - GNUNET_break (0); - } - return GNUNET_NO; - } - /* connected peer is switching addresses */ - else if (n->state == S_CONNECTED) - { - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - - ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_switch_address_continuation, n); - if (ret == GNUNET_SYSERR) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", - GNUNET_i2s (peer), plugin_name, - (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, - address, - address_len), - session); - } - return GNUNET_NO; - } - - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Invalid connection state to switch addresses %u ", n->state); - GNUNET_break_op (0); - return GNUNET_NO; -} - - -/** - * Create an entry in the neighbour map for the given peer - * - * @param peer peer to create an entry for - * @return new neighbour map entry - */ -static struct NeighbourMapEntry * -setup_neighbour (const struct GNUNET_PeerIdentity *peer) -{ - struct NeighbourMapEntry *n; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Unknown peer `%s', creating new neighbour\n", - GNUNET_i2s (peer)); -#endif - n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); - n->id = *peer; - n->state = S_NOT_CONNECTED; - GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, - MAX_BANDWIDTH_CARRY_S); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (neighbours, - &n->id.hashPubKey, n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - return n; -} - - -/** - * Try to create a connection to the given target (eventually). - * - * @param target peer to try to connect to - */ -void -GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n", - GNUNET_i2s (target)); -#endif - GNUNET_assert (0 != - memcmp (target, &GST_my_identity, - sizeof (struct GNUNET_PeerIdentity))); - n = lookup_neighbour (target); - - if (NULL != n) - { - if ((is_connected(n)) || (is_connecting(n))) - return; /* already connecting or connected */ - if (is_disconnecting(n)) - change_state (n, S_NOT_CONNECTED); - } - - - if (n == NULL) - n = setup_neighbour (target); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking ATS for suggested address to connect to peer `%s'\n", - GNUNET_i2s (&n->id)); -#endif - GNUNET_ATS_suggest_address (GST_ats, &n->id); -} - - -/** - * Test if we're connected to the given peer. - * - * @param target peer to test - * @return GNUNET_YES if we are connected, GNUNET_NO if not - */ -int -GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (target); - - if ((NULL == n) || (!is_connected(n))) - return GNUNET_NO; /* not connected */ - return GNUNET_YES; -} - - -/** - * A session was terminated. Take note. - * - * @param peer identity of the peer where the session died - * @param session session that is gone - */ -void -GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, - struct Session *session) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Session %X to peer `%s' ended \n", - session, GNUNET_i2s (peer)); -#endif - - n = lookup_neighbour (peer); - if (NULL == n) - return; - if (session != n->session) - return; /* doesn't affect us */ - - n->session = NULL; - GNUNET_free (n->addr); - n->addr = NULL; - n->addrlen = 0; - - /* not connected anymore anyway, shouldn't matter */ - if ((!is_connected(n)) && (!is_connecting(n))) - return; - - // FIXME: switch address what is the state - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Do not forget to fix this!\n"); - - /* We are connected, so ask ATS to switch addresses */ - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT, - &neighbour_timeout_task, n); - /* try QUICKLY to re-establish a connection, reduce timeout! */ - GNUNET_ATS_suggest_address (GST_ats, peer); -} - - -/** - * Transmit a message to the given target using the active connection. - * - * @param target destination - * @param msg message to send - * @param msg_size number of bytes in msg - * @param timeout when to fail with timeout - * @param cont function to call when done - * @param cont_cls closure for 'cont' - */ -void -GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, - size_t msg_size, struct GNUNET_TIME_Relative timeout, - GST_NeighbourSendContinuation cont, void *cont_cls) -{ - struct NeighbourMapEntry *n; - struct MessageQueue *mq; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (target); - if ((n == NULL) || (!is_connected(n))) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages not sent (no such peer or not connected)"), - 1, GNUNET_NO); -#if DEBUG_TRANSPORT - if (n == NULL) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': unknown neighbour", - GNUNET_i2s (target)); - else if (!is_connected(n)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': not connected\n", - GNUNET_i2s (target)); -#endif - if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); - return; - } - - if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0)) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages not sent (no such peer or not connected)"), - 1, GNUNET_NO); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': no address available\n", - GNUNET_i2s (target)); -#endif - - if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); - return; - } - - GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# bytes in message queue for other peers"), - msg_size, GNUNET_NO); - mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); - mq->cont = cont; - mq->cont_cls = cont_cls; - /* FIXME: this memcpy can be up to 7% of our total runtime! */ - memcpy (&mq[1], msg, msg_size); - mq->message_buf = (const char *) &mq[1]; - mq->message_buf_size = msg_size; - mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); - GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); - - if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && - (NULL == n->is_active)) - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); -} - - -/** - * We have received a message from the given sender. How long should - * we delay before receiving more? (Also used to keep the peer marked - * as live). - * - * @param sender sender of the message - * @param size size of the message - * @param do_forward set to GNUNET_YES if the message should be forwarded to clients - * GNUNET_NO if the neighbour is not connected or violates the quota, - * GNUNET_SYSERR if the connection is not fully up yet - * @return how long to wait before reading more from this sender - */ -struct GNUNET_TIME_Relative -GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity - *sender, ssize_t size, int *do_forward) -{ - struct NeighbourMapEntry *n; - struct GNUNET_TIME_Relative ret; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (sender); - if (n == NULL) - { - GST_neighbours_try_connect (sender); - n = lookup_neighbour (sender); - if (NULL == n) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages discarded due to lack of neighbour record"), - 1, GNUNET_NO); - *do_forward = GNUNET_NO; - return GNUNET_TIME_UNIT_ZERO; - } - } - if (!is_connected(n)) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Plugin gave us %d bytes of data but somehow the session is not marked as UP yet!\n"), - (int) size); - *do_forward = GNUNET_SYSERR; - return GNUNET_TIME_UNIT_ZERO; - } - if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size)) - { - n->quota_violation_count++; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth quota (%u b/s) violation detected (total of %u).\n", - n->in_tracker.available_bytes_per_s__, - n->quota_violation_count); -#endif - /* Discount 32k per violation */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); - } - else - { - if (n->quota_violation_count > 0) - { - /* try to add 32k back */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); - n->quota_violation_count--; - } - } - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# bandwidth quota violations by other peers"), - 1, GNUNET_NO); - *do_forward = GNUNET_NO; - return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; - } - *do_forward = GNUNET_YES; - ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); - if (ret.rel_value > 0) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n", - (unsigned long long) n->in_tracker. - consumption_since_last_update__, - (unsigned int) n->in_tracker.available_bytes_per_s__, - (unsigned long long) ret.rel_value); -#endif - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# ms throttling suggested"), - (int64_t) ret.rel_value, GNUNET_NO); - } - return ret; -} - - -/** - * Keep the connection to the given neighbour alive longer, - * we received a KEEPALIVE (or equivalent). - * - * @param neighbour neighbour to keep alive - */ -void -GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (neighbour); - if (NULL == n) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE messages discarded (not connected)"), - 1, GNUNET_NO); - return; - } - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); -} - - -/** - * Change the incoming quota for the given peer. - * - * @param neighbour identity of peer to change qutoa for - * @param quota new quota - */ -void -GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, - struct GNUNET_BANDWIDTH_Value32NBO quota) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (neighbour); - if (n == NULL) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# SET QUOTA messages ignored (no such peer)"), - 1, GNUNET_NO); - return; - } - GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); - if (0 != ntohl (quota.value__)) - return; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n", - GNUNET_i2s (&n->id), "SET_QUOTA"); -#endif - if (is_connected(n)) - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# disconnects due to quota of 0"), 1, - GNUNET_NO); - disconnect_neighbour (n); -} - - -/** - * Closure for the neighbours_iterate function. - */ -struct IteratorContext -{ - /** - * Function to call on each connected neighbour. - */ - GST_NeighbourIterator cb; - - /** - * Closure for 'cb'. - */ - void *cb_cls; -}; - - -/** - * Call the callback from the closure for each connected neighbour. - * - * @param cls the 'struct IteratorContext' - * @param key the hash of the public key of the neighbour - * @param value the 'struct NeighbourMapEntry' - * @return GNUNET_OK (continue to iterate) - */ -static int -neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct IteratorContext *ic = cls; - struct NeighbourMapEntry *n = value; - - if (is_connected(n)) - return GNUNET_OK; - - ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen); - return GNUNET_OK; -} - - -/** - * Iterate over all connected neighbours. - * - * @param cb function to call - * @param cb_cls closure for cb - */ -void -GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) -{ - struct IteratorContext ic; - - GNUNET_assert (neighbours != NULL); - - ic.cb = cb; - ic.cb_cls = cb_cls; - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic); -} - -/** - * If we have an active connection to the given target, it must be shutdown. - * - * @param target peer to disconnect from - */ -void -GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) -{ - struct NeighbourMapEntry *n; - - GNUNET_assert (neighbours != NULL); - - n = lookup_neighbour (target); - if (NULL == n) - return; /* not active */ - if (is_connected(n)) - { - send_disconnect(n); - - n = lookup_neighbour (target); - if (NULL == n) - return; /* gone already */ - } - disconnect_neighbour (n); -} - - -/** - * We received a disconnect message from the given peer, - * validate and process. - * - * @param peer sender of the message - * @param msg the disconnect message - */ -void -GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *msg) -{ - struct NeighbourMapEntry *n; - const struct SessionDisconnectMessage *sdm; - GNUNET_HashCode hc; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DISCONNECT message from peer `%s'\n", GNUNET_i2s (peer)); -#endif - - if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) - { - // GNUNET_break_op (0); - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# disconnect messages ignored (old format)"), 1, - GNUNET_NO); - return; - } - sdm = (const struct SessionDisconnectMessage* ) msg; - n = lookup_neighbour (peer); - if (NULL == n) - return; /* gone already */ - if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= - n->connect_ts.abs_value) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# disconnect messages ignored (timestamp)"), 1, - GNUNET_NO); - return; - } - GNUNET_CRYPTO_hash (&sdm->public_key, - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &hc); - if (0 != memcmp (peer, - &hc, - sizeof (struct GNUNET_PeerIdentity))) - { - GNUNET_break_op (0); - return; - } - if (ntohl (sdm->purpose.size) != - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + - sizeof (struct GNUNET_TIME_AbsoluteNBO)) - { - GNUNET_break_op (0); - return; - } - if (GNUNET_OK != - GNUNET_CRYPTO_rsa_verify (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, - &sdm->purpose, - &sdm->signature, - &sdm->public_key)) - { - GNUNET_break_op (0); - return; - } - GST_neighbours_force_disconnect (peer); -} - -/** - * We received a 'SESSION_CONNECT_ACK' message from the other peer. - * Consider switching to it. - * - * @param message possibly a 'struct SessionConnectMessage' (check format) - * @param peer identity of the peer to switch the address for - * @param plugin_name name of transport that delivered the PONG - * @param address address of the other peer, NULL if other peer - * connected to us - * @param address_len number of bytes in address - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats - */ -void -GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const char *plugin_name, - const char *sender_address, uint16_t sender_address_len, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) -{ - const struct SessionConnectMessage *scm; - struct QuotaSetMessage q_msg; - struct GNUNET_MessageHeader msg; - struct GNUNET_TIME_Absolute ts; - struct NeighbourMapEntry *n; - size_t msg_len; - size_t ret; - -#if DEBUG_TRANSPORT -#endif - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received CONNECT_ACK message from peer `%s'\n", GNUNET_i2s (peer)); - - - if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) - { - GNUNET_break_op (0); - return; - } - - scm = (const struct SessionConnectMessage *) message; - GNUNET_break_op (ntohl (scm->reserved) == 0); - ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); - n = lookup_neighbour (peer); - if (NULL == n) - n = setup_neighbour (peer); -/* - if (n->state != S_CONNECT_SENT) - { - GNUNET_break (0); - send_disconnect(n); - return; - } -*/ - if (NULL != session) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of plugin %s for peer %s\n", - session, - plugin_name, - GNUNET_i2s (peer)); - GNUNET_ATS_address_update (GST_ats, - peer, - plugin_name, sender_address, sender_address_len, - session, ats, ats_count); - - change_state (n, S_CONNECTED); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Setting inbound quota of %u for peer `%s' to \n", - ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id)); -#endif - GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in); - - n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, - &neighbour_keepalive_task, - n); - /* send ACK (ACK)*/ - msg_len = sizeof (msg); - msg.size = htons (msg_len); - msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); - - ret = send_with_plugin (&n->id, (const char *) &msg, msg_len, 0, - GNUNET_TIME_UNIT_FOREVER_REL, - n->session, n->plugin_name, n->addr, n->addrlen, - GNUNET_YES, NULL, NULL); - - if (ret == GNUNET_SYSERR) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n", - GNUNET_i2s (&n->id), n->plugin_name, - (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, - n->addr, - n->addrlen), - n->session); - - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); - connect_notify_cb (callback_cls, &n->id, ats, ats_count); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending outbound quota of %u Bps for peer `%s' to all clients\n", - ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer)); -#endif - q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); - q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); - q_msg.quota = n->bandwidth_out; - q_msg.peer = (*peer); - GST_clients_broadcast (&q_msg.header, GNUNET_NO); -} - -void -GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const char *plugin_name, - const char *sender_address, uint16_t sender_address_len, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) -{ - struct NeighbourMapEntry *n; - struct QuotaSetMessage q_msg; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received ACK message from peer `%s'\n", GNUNET_i2s (peer)); -#endif - - if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return; - } - - n = lookup_neighbour (peer); - if (NULL == n) - { - send_disconnect(n); - GNUNET_break (0); - } -// FIXME check this -// if (n->state != S_CONNECT_RECV) - if (is_connecting(n)) - { - send_disconnect (n); - change_state (n, S_DISCONNECT); - GNUNET_break (0); - return; - } - - if (is_connected(n)) - return; - - if (NULL != session) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of plugin %s for peer %s\n", - session, - plugin_name, - GNUNET_i2s (peer)); - GNUNET_ATS_address_update (GST_ats, - peer, - plugin_name, sender_address, sender_address_len, - session, ats, ats_count); - - change_state (n, S_CONNECTED); - - GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in); - - n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, - &neighbour_keepalive_task, - n); - - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); - connect_notify_cb (callback_cls, &n->id, ats, ats_count); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending outbound quota of %u Bps for peer `%s' to all clients\n", - ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer)); -#endif - - q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); - q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); - q_msg.quota = n->bandwidth_out; - q_msg.peer = (*peer); - GST_clients_broadcast (&q_msg.header, GNUNET_NO); -} - -struct BlackListCheckContext -{ - struct GNUNET_ATS_Information *ats; - - uint32_t ats_count; - - struct Session *session; - - char *sender_address; - - uint16_t sender_address_len; - - char *plugin_name; - - struct GNUNET_TIME_Absolute ts; -}; - - -static void -handle_connect_blacklist_cont (void *cls, - const struct GNUNET_PeerIdentity - * peer, int result) -{ - struct NeighbourMapEntry *n; - struct BlackListCheckContext * bcc = cls; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Blacklist check due to CONNECT message: `%s'\n", GNUNET_i2s (peer), (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN"); -#endif - - /* not allowed */ - if (GNUNET_OK != result) - { - GNUNET_free (bcc); - return; - } - - n = lookup_neighbour (peer); - if (NULL == n) - n = setup_neighbour (peer); - - if (bcc->ts.abs_value > n->connect_ts.abs_value) - { - if (NULL != bcc->session) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of plugin %s address `%s' for peer %s\n", - bcc->session, - bcc->plugin_name, - GST_plugins_a2s (bcc->plugin_name, bcc->sender_address, bcc->sender_address_len), - GNUNET_i2s (peer)); - GNUNET_ATS_address_update (GST_ats, - peer, - bcc->plugin_name, bcc->sender_address, bcc->sender_address_len, - bcc->session, bcc->ats, bcc->ats_count); - n->connect_ts = bcc->ts; - } - - GNUNET_free (bcc); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Blacklist check due to CONNECT message: `%s'\n"); -/* - if (n->state != S_NOT_CONNECTED) - return;*/ - change_state (n, S_CONNECT_RECV); - - /* Ask ATS for an address to connect via that address */ - GNUNET_ATS_suggest_address(GST_ats, peer); -} - -/** - * We received a 'SESSION_CONNECT' message from the other peer. - * Consider switching to it. - * - * @param message possibly a 'struct SessionConnectMessage' (check format) - * @param peer identity of the peer to switch the address for - * @param plugin_name name of transport that delivered the PONG - * @param address address of the other peer, NULL if other peer - * connected to us - * @param address_len number of bytes in address - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats (excluding 0-termination) - */ -void -GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const char *plugin_name, - const char *sender_address, uint16_t sender_address_len, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) -{ - const struct SessionConnectMessage *scm; - struct NeighbourMapEntry * n; - struct BlackListCheckContext * bcc = NULL; - -#if DEBUG_TRANSPORT -#endif - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer)); - - - if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) - { - GNUNET_break_op (0); - return; - } - - scm = (const struct SessionConnectMessage *) message; - GNUNET_break_op (ntohl (scm->reserved) == 0); - - n = lookup_neighbour(peer); - if (n != NULL) - { - /* connected peer switches addresses */ - if (is_connected(n)) - { - GNUNET_ATS_address_update(GST_ats, peer, plugin_name, sender_address, sender_address_len, session, ats, ats_count); - return; - } - } - - /* we are not connected to this peer */ - /* do blacklist check*/ - bcc = GNUNET_malloc (sizeof (struct BlackListCheckContext) + - sizeof (struct GNUNET_ATS_Information) * ats_count + - sender_address_len + - strlen (plugin_name)+1); - - bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); - - bcc->ats_count = ats_count; - bcc->sender_address_len = sender_address_len; - bcc->session = session; - - bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; - memcpy (bcc->ats, ats,sizeof (struct GNUNET_ATS_Information) * ats_count ); - - bcc->sender_address = (char *) &bcc->ats[ats_count]; - memcpy (bcc->sender_address, sender_address , sender_address_len); - - bcc->plugin_name = &bcc->sender_address[sender_address_len]; - strcpy (bcc->plugin_name, plugin_name); - - GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont, bcc); -} - - -/* end of file gnunet-service-transport_neighbours.c */ -- cgit v1.2.3