/* This file is part of GNUnet. (C) 2009 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ /** * @file transport/gnunet-service-transport.c * @brief low-level P2P messaging * @author Christian Grothoff * * TODO: * - if we do not receive an ACK in response to our * HELLO, retransmit HELLO! */ #include "platform.h" #include "gnunet_client_lib.h" #include "gnunet_constants.h" #include "gnunet_getopt_lib.h" #include "gnunet_hello_lib.h" #include "gnunet_os_lib.h" #include "gnunet_peerinfo_service.h" #include "gnunet_plugin_lib.h" #include "gnunet_protocols.h" #include "gnunet_service_lib.h" #include "gnunet_signatures.h" #include "plugin_transport.h" #include "transport.h" /** * How many messages can we have pending for a given client process * before we start to drop incoming messages? We typically should * have only one client and so this would be the primary buffer for * messages, so the number should be chosen rather generously. * * The expectation here is that most of the time the queue is large * enough so that a drop is virtually never required. */ #define MAX_PENDING 128 /** * How often should we try to reconnect to a peer using a particular * transport plugin before giving up? Note that the plugin may be * added back to the list after PLUGIN_RETRY_FREQUENCY expires. */ #define MAX_CONNECT_RETRY 3 /** * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? */ #define QUOTA_VIOLATION_DROP_THRESHOLD 100 /** * How long until a HELLO verification attempt should time out? */ #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_UNIT_MINUTES /** * How often do we re-add (cheaper) plugins to our list of plugins * to try for a given connected peer? */ #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) /** * After how long do we expire an address in a HELLO * that we just validated? This value is also used * for our own addresses when we create a HELLO. */ #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) /** * Entry in linked list of network addresses. */ struct AddressList { /** * This is a linked list. */ struct AddressList *next; /** * The address, actually a pointer to the end * of this struct. Do not free! */ void *addr; /** * How long until we auto-expire this address (unless it is * re-confirmed by the transport)? */ struct GNUNET_TIME_Absolute expires; /** * Length of addr. */ size_t addrlen; }; /** * Entry in linked list of all of our plugins. */ struct TransportPlugin { /** * This is a linked list. */ struct TransportPlugin *next; /** * API of the transport as returned by the plugin's * initialization function. */ struct GNUNET_TRANSPORT_PluginFunctions *api; /** * Short name for the plugin (i.e. "tcp"). */ char *short_name; /** * Name of the library (i.e. "gnunet_plugin_transport_tcp"). */ char *lib_name; /** * List of our known addresses for this transport. */ struct AddressList *addresses; /** * Environment this transport service is using * for this plugin. */ struct GNUNET_TRANSPORT_PluginEnvironment env; /** * ID of task that is used to clean up expired addresses. */ GNUNET_SCHEDULER_TaskIdentifier address_update_task; /** * Set to GNUNET_YES if we need to scrap the existing * list of "addresses" and start fresh when we receive * the next address update from a transport. Set to * GNUNET_NO if we should just add the new address * to the list and wait for the commit call. */ int rebuild; }; struct NeighbourList; /** * For each neighbour we keep a list of messages * that we still want to transmit to the neighbour. */ struct MessageQueue { /** * This is a linked list. */ struct MessageQueue *next; /** * The message we want to transmit. */ struct GNUNET_MessageHeader *message; /** * Client responsible for queueing the message; * used to check that a client has not two messages * pending for the same target. Can be NULL. */ struct TransportClient *client; /** * Neighbour this entry belongs to. */ struct NeighbourList *neighbour; /** * Plugin that we used for the transmission. * NULL until we scheduled a transmission. */ struct TransportPlugin *plugin; /** * Internal message of the transport system that should not be * included in the usual SEND-SEND_OK transmission confirmation * traffic management scheme. Typically, "internal_msg" will * be set whenever "client" is NULL (but it is not strictly * required). */ int internal_msg; /** * How important is the message? */ unsigned int priority; }; /** * For a given Neighbour, which plugins are available * to talk to this peer and what are their costs? */ struct ReadyList { /** * This is a linked list. */ struct ReadyList *next; /** * Which of our transport plugins does this entry * represent? */ struct TransportPlugin *plugin; /** * Neighbour this entry belongs to. */ struct NeighbourList *neighbour; /** * Opaque handle (specific to the plugin) for the * connection to our target; can be NULL. */ void *plugin_handle; /** * What was the last latency observed for this plugin * and peer? Invalid if connected is GNUNET_NO. */ struct GNUNET_TIME_Relative latency; /** * If we did not successfully transmit a message to the * given peer via this connection during the specified * time, we should consider the connection to be dead. * This is used in the case that a TCP transport simply * stalls writing to the stream but does not formerly * get a signal that the other peer died. */ struct GNUNET_TIME_Absolute timeout; /** * Is this plugin currently connected? The first time * we transmit or send data to a peer via a particular * plugin, we set this to GNUNET_YES. If we later get * an error (disconnect notification or transmission * failure), we set it back to GNUNET_NO. Each time the * value is set to GNUNET_YES, we increment the * "connect_attempts" counter. If that one reaches a * particular threshold, we consider the plugin to not * be working properly at this time for the given peer * and remove it from the eligible list. */ int connected; /** * How often have we tried to connect using this plugin? */ unsigned int connect_attempts; /** * Is this plugin ready to transmit to the specific * target? GNUNET_NO if not. Initially, all plugins * are marked ready. If a transmission is in progress, * "transmit_ready" is set to GNUNET_NO. */ int transmit_ready; }; /** * Entry in linked list of all of our current neighbours. */ struct NeighbourList { /** * This is a linked list. */ struct NeighbourList *next; /** * Which of our transports is connected to this peer * and what is their status? */ struct ReadyList *plugins; /** * List of messages we would like to send to this peer; * must contain at most one message per client. */ struct MessageQueue *messages; /** * Identity of this neighbour. */ struct GNUNET_PeerIdentity id; /** * ID of task scheduled to run when this peer is about to * time out (will free resources associated with the peer). */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; /** * How long until we should consider this peer dead * (if we don't receive another message in the * meantime)? */ struct GNUNET_TIME_Absolute peer_timeout; /** * At what time did we reset last_received last? */ struct GNUNET_TIME_Absolute last_quota_update; /** * At what time should we try to again add plugins to * our ready list? */ struct GNUNET_TIME_Absolute retry_plugins_time; /** * How many bytes have we received since the "last_quota_update" * timestamp? */ uint64_t last_received; /** * Global quota for inbound traffic for the neighbour in bytes/ms. */ uint32_t quota_in; /** * How often has the other peer (recently) violated the * inbound traffic limit? Incremented by 10 per violation, * decremented by 1 per non-violation (for each * time interval). */ unsigned int quota_violation_count; /** * Have we seen an ACK from this neighbour in the past? * (used to make up a fake ACK for clients connecting after * the neighbour connected to us). */ int saw_ack; }; /** * Linked list of messages to be transmitted to * the client. Each entry is followed by the * actual message. */ struct ClientMessageQueueEntry { /** * This is a linked list. */ struct ClientMessageQueueEntry *next; }; /** * Client connected to the transport service. */ struct TransportClient { /** * This is a linked list. */ struct TransportClient *next; /** * Handle to the client. */ struct GNUNET_SERVER_Client *client; /** * Linked list of messages yet to be transmitted to * the client. */ struct ClientMessageQueueEntry *message_queue_head; /** * Tail of linked list of messages yet to be transmitted to the * client. */ struct ClientMessageQueueEntry *message_queue_tail; /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client * disconnects) and instead only remove it from the linked list and * set the "client" field to NULL. */ int tcs_pending; /** * Length of the list of messages pending for this client. */ unsigned int message_count; }; /** * For each HELLO, we may have to validate multiple addresses; * each address gets its own request entry. */ struct ValidationAddress { /** * This is a linked list. */ struct ValidationAddress *next; /** * Name of the transport. */ char *transport_name; /** * When should this validated address expire? */ struct GNUNET_TIME_Absolute expiration; /** * Length of the address we are validating. */ size_t addr_len; /** * Challenge number we used. */ uint32_t challenge; /** * Set to GNUNET_YES if the challenge was met, * GNUNET_SYSERR if we know it failed, GNUNET_NO * if we are waiting on a response. */ int ok; }; /** * Entry in linked list of all HELLOs awaiting validation. */ struct ValidationList { /** * This is a linked list. */ struct ValidationList *next; /** * Linked list with one entry per address from the HELLO * that needs to be validated. */ struct ValidationAddress *addresses; /** * The public key of the peer. */ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; /** * When does this record time-out? (assuming the * challenge goes unanswered) */ struct GNUNET_TIME_Absolute timeout; }; /** * HELLOs awaiting validation. */ static struct ValidationList *pending_validations; /** * Our HELLO message. */ static struct GNUNET_HELLO_Message *our_hello; /** * "version" of "our_hello". Used to see if a given * neighbour has already been sent the latest version * of our HELLO message. */ static unsigned int our_hello_version; /** * Our public key. */ static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key; /** * Our identity. */ static struct GNUNET_PeerIdentity my_identity; /** * Our private key. */ static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key; /** * Our scheduler. */ struct GNUNET_SCHEDULER_Handle *sched; /** * Our configuration. */ const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Linked list of all clients to this service. */ static struct TransportClient *clients; /** * All loaded plugins. */ static struct TransportPlugin *plugins; /** * Our server. */ static struct GNUNET_SERVER_Handle *server; /** * All known neighbours and their HELLOs. */ static struct NeighbourList *neighbours; /** * Number of neighbours we'd like to have. */ static uint32_t max_connect_per_transport; /** * Find an entry in the neighbour list for a particular peer. * * @return NULL if not found. */ static struct NeighbourList * find_neighbour (const struct GNUNET_PeerIdentity *key) { struct NeighbourList *head = neighbours; while ((head != NULL) && (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) head = head->next; return head; } /** * Find an entry in the transport list for a particular transport. * * @return NULL if not found. */ static struct TransportPlugin * find_transport (const char *short_name) { struct TransportPlugin *head = plugins; while ((head != NULL) && (0 != strcmp (short_name, head->short_name))) head = head->next; return head; } /** * Update the quota values for the given neighbour now. */ static void update_quota (struct NeighbourList *n) { struct GNUNET_TIME_Relative delta; uint64_t allowed; uint64_t remaining; delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); if (delta.value < MIN_QUOTA_REFRESH_TIME) return; /* not enough time passed for doing quota update */ allowed = delta.value * n->quota_in; if (n->last_received < allowed) { remaining = allowed - n->last_received; if (n->quota_in > 0) remaining /= n->quota_in; else remaining = 0; if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; n->last_quota_update = GNUNET_TIME_absolute_get (); n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; } else { n->last_received -= allowed; n->last_quota_update = GNUNET_TIME_absolute_get (); if (n->last_received > allowed) { /* more than twice the allowed rate! */ n->quota_violation_count += 10; } } } /** * Function called to notify a client about the socket * being ready to queue more data. "buf" will be * NULL and "size" zero if the socket was closed for * writing in the meantime. * * @param cls closure * @param size number of bytes available in buf * @param buf where the callee should write the message * @return number of bytes written to buf */ static size_t transmit_to_client_callback (void *cls, size_t size, void *buf) { struct TransportClient *client = cls; struct ClientMessageQueueEntry *q; uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission to client failed, closing connection.\n"); /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { client->message_queue_head = q->next; GNUNET_free (q); } client->message_queue_tail = NULL; client->message_count = 0; return 0; } cbuf = buf; tsize = 0; while (NULL != (q = client->message_queue_head)) { msg = (const struct GNUNET_MessageHeader *) &q[1]; msize = ntohs (msg->size); if (msize + tsize > size) break; client->message_queue_head = q->next; if (q->next == NULL) client->message_queue_tail = NULL; memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); client->message_count--; } GNUNET_assert (tsize > 0); if (NULL != q) { th = GNUNET_SERVER_notify_transmit_ready (client->client, msize, GNUNET_TIME_UNIT_FOREVER_REL, &transmit_to_client_callback, client); GNUNET_assert (th != NULL); } return tsize; } /** * Send the specified message to the specified client. Since multiple * messages may be pending for the same client at a time, this code * makes sure that no message is lost. * * @param client client to transmit the message to * @param msg the message to send * @param may_drop can this message be dropped if the * message queue for this client is getting far too large? */ static void transmit_to_client (struct TransportClient *client, const struct GNUNET_MessageHeader *msg, int may_drop) { struct ClientMessageQueueEntry *q; uint16_t msize; struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Dropping message, have %u messages pending (%u is the soft limit)\n"), client->message_count, MAX_PENDING); /* TODO: call to statistics... */ return; } client->message_count++; msize = ntohs (msg->size); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); /* append to message queue */ if (client->message_queue_tail == NULL) { client->message_queue_tail = q; } else { client->message_queue_tail->next = q; client->message_queue_tail = q; } if (client->message_queue_head == NULL) { client->message_queue_head = q; th = GNUNET_SERVER_notify_transmit_ready (client->client, msize, GNUNET_TIME_UNIT_FOREVER_REL, &transmit_to_client_callback, client); GNUNET_assert (th != NULL); } } /** * Find alternative plugins for communication. * * @param neighbour for which neighbour should we try to find * more plugins? */ static void try_alternative_plugins (struct NeighbourList *neighbour) { struct ReadyList *rl; if ((neighbour->plugins != NULL) && (neighbour->retry_plugins_time.value > GNUNET_TIME_absolute_get ().value)) return; /* don't try right now */ neighbour->retry_plugins_time = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); rl = neighbour->plugins; while (rl != NULL) { if (rl->connect_attempts > 0) rl->connect_attempts--; /* amnesty */ rl = rl->next; } } /** * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and do the necessary notifications. * Also notifies our clients that the neighbour is now officially * gone. * * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ static void disconnect_neighbour (struct NeighbourList *n, int check); /** * Check the ready list for the given neighbour and * if a plugin is ready for transmission (and if we * have a message), do so! * * @param neighbour target peer for which to check the plugins */ static void try_transmission_to_peer (struct NeighbourList *neighbour); /** * Function called by the GNUNET_TRANSPORT_TransmitFunction * upon "completion" of a send request. This tells the API * that it is now legal to send another message to the given * peer. * * @param cls closure, identifies the entry on the * message queue that was transmitted and the * client responsible for queueing the message * @param rl identifies plugin used for the transmission for * this neighbour; needs to be re-enabled for * future transmissions * @param target the peer receiving the message * @param result GNUNET_OK on success, if the transmission * failed, we should not tell the client to transmit * more messages */ static void transmit_send_continuation (void *cls, struct ReadyList *rl, const struct GNUNET_PeerIdentity *target, int result) { struct MessageQueue *mq = cls; struct SendOkMessage send_ok_msg; struct NeighbourList *n; GNUNET_assert (mq != NULL); n = mq->neighbour; GNUNET_assert (n != NULL); GNUNET_assert (0 == memcmp (&n->id, target, sizeof (struct GNUNET_PeerIdentity))); if (rl == NULL) { rl = n->plugins; while ((rl != NULL) && (rl->plugin != mq->plugin)) rl = rl->next; GNUNET_assert (rl != NULL); } if (result == GNUNET_OK) { rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission to peer `%s' failed, marking connection as down.\n", GNUNET_i2s(target)); rl->connected = GNUNET_NO; rl->plugin_handle = NULL; } if (!mq->internal_msg) rl->transmit_ready = GNUNET_YES; if (mq->client != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Notifying client %p about failed transission to peer `%4s'.\n", mq->client, GNUNET_i2s(target)); send_ok_msg.header.size = htons (sizeof (send_ok_msg)); send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); send_ok_msg.success = htonl (result); send_ok_msg.peer = n->id; transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); } GNUNET_free (mq->message); GNUNET_free (mq); /* one plugin just became ready again, try transmitting another message (if available) */ if (result == GNUNET_OK) try_transmission_to_peer (n); else disconnect_neighbour (n, GNUNET_YES); } /** * Check the ready list for the given neighbour and * if a plugin is ready for transmission (and if we * have a message), do so! */ static void try_transmission_to_peer (struct NeighbourList *neighbour) { struct ReadyList *pos; struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; struct GNUNET_TIME_Absolute now; if (neighbour->messages == NULL) return; /* nothing to do */ try_alternative_plugins (neighbour); min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; mq = neighbour->messages; now = GNUNET_TIME_absolute_get (); pos = neighbour->plugins; while (pos != NULL) { /* set plugins that are inactive for a long time back to disconnected */ if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES)) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Marking long-time inactive connection to `%4s' as down.\n", GNUNET_i2s (&neighbour->id)); #endif pos->connected = GNUNET_NO; } if (((GNUNET_YES == pos->transmit_ready) || (mq->internal_msg)) && (pos->connect_attempts < MAX_CONNECT_RETRY) && ((rl == NULL) || (min_latency.value > pos->latency.value))) { rl = pos; min_latency = pos->latency; } pos = pos->next; } if (rl == NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No plugin ready to transmit message\n"); #endif return; /* nobody ready */ } if (GNUNET_NO == rl->connected) { rl->connect_attempts++; rl->connected = GNUNET_YES; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Establishing fresh connection with `%4s' via plugin `%s'\n", GNUNET_i2s (&neighbour->id), rl->plugin->short_name); #endif } neighbour->messages = mq->next; mq->plugin = rl->plugin; if (!mq->internal_msg) rl->transmit_ready = GNUNET_NO; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Giving message of type `%u' for `%4s' to plugin `%s'\n", ntohs (mq->message->type), GNUNET_i2s (&neighbour->id), rl->plugin->short_name); #endif rl->plugin_handle = rl->plugin->api->send (rl->plugin->api->cls, rl->plugin_handle, rl, &neighbour->id, mq->priority, mq->message, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &transmit_send_continuation, mq); } /** * Send the specified message to the specified peer. * * @param client source of the transmission request (can be NULL) * @param priority how important is the message * @param msg message to send * @param is_internal is this an internal message * @param neighbour handle to the neighbour for transmission */ static void transmit_to_peer (struct TransportClient *client, unsigned int priority, const struct GNUNET_MessageHeader *msg, int is_internal, struct NeighbourList *neighbour) { struct MessageQueue *mq; struct MessageQueue *mqe; struct GNUNET_MessageHeader *m; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _("Sending message of type %u to peer `%4s'\n"), ntohs (msg->type), GNUNET_i2s (&neighbour->id)); #endif if (client != NULL) { /* check for duplicate submission */ mq = neighbour->messages; while (NULL != mq) { if (mq->client == client) { /* client transmitted to same peer twice before getting SendOk! */ GNUNET_break (0); return; } mq = mq->next; } } mq = GNUNET_malloc (sizeof (struct MessageQueue)); mq->client = client; m = GNUNET_malloc (ntohs (msg->size)); memcpy (m, msg, ntohs (msg->size)); mq->message = m; mq->neighbour = neighbour; mq->internal_msg = is_internal; mq->priority = priority; /* find tail */ mqe = neighbour->messages; if (mqe != NULL) while (mqe->next != NULL) mqe = mqe->next; if (mqe == NULL) { /* new head */ neighbour->messages = mq; try_transmission_to_peer (neighbour); } else { /* append */ mqe->next = mq; } } /** * FIXME: document. */ struct GeneratorContext { struct TransportPlugin *plug_pos; struct AddressList *addr_pos; struct GNUNET_TIME_Absolute expiration; }; /** * FIXME: document. */ static size_t address_generator (void *cls, size_t max, void *buf) { struct GeneratorContext *gc = cls; size_t ret; while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL)) { gc->plug_pos = gc->plug_pos->next; gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL; } if (NULL == gc->plug_pos) return 0; ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name, gc->expiration, gc->addr_pos->addr, gc->addr_pos->addrlen, buf, max); gc->addr_pos = gc->addr_pos->next; return ret; } /** * Construct our HELLO message from all of the addresses of * all of the transports. */ static void refresh_hello () { struct GNUNET_HELLO_Message *hello; struct TransportClient *cpos; struct NeighbourList *npos; struct GeneratorContext gc; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Refreshing my `%s'\n", "HELLO"); #endif gc.plug_pos = plugins; gc.addr_pos = plugins != NULL ? plugins->addresses : NULL; gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc); cpos = clients; while (cpos != NULL) { transmit_to_client (cpos, (const struct GNUNET_MessageHeader *) hello, GNUNET_NO); cpos = cpos->next; } GNUNET_free_non_null (our_hello); our_hello = hello; our_hello_version++; npos = neighbours; while (npos != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Transmitting updated `%s' to neighbour `%4s'\n", "HELLO", GNUNET_i2s(&npos->id)); #endif transmit_to_peer (NULL, 0, (const struct GNUNET_MessageHeader *) our_hello, GNUNET_YES, npos); npos = npos->next; } } /** * Task used to clean up expired addresses for a plugin. * * @param cls closure * @param tc context */ static void expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); /** * Update the list of addresses for this plugin, * expiring those that are past their expiration date. * * @param plugin addresses of which plugin should be recomputed? * @param fresh set to GNUNET_YES if a new address was added * and we need to regenerate the HELLO even if nobody * expired */ static void update_addresses (struct TransportPlugin *plugin, int fresh) { struct GNUNET_TIME_Relative min_remaining; struct GNUNET_TIME_Relative remaining; struct GNUNET_TIME_Absolute now; struct AddressList *pos; struct AddressList *prev; struct AddressList *next; int expired; if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task); plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK; now = GNUNET_TIME_absolute_get (); min_remaining = GNUNET_TIME_UNIT_FOREVER_REL; expired = GNUNET_NO; prev = NULL; pos = plugin->addresses; while (pos != NULL) { next = pos->next; if (pos->expires.value < now.value) { expired = GNUNET_YES; if (prev == NULL) plugin->addresses = pos->next; else prev->next = pos->next; GNUNET_free (pos); } else { remaining = GNUNET_TIME_absolute_get_remaining (pos->expires); if (remaining.value < min_remaining.value) min_remaining = remaining; prev = pos; } pos = next; } if (expired || fresh) refresh_hello (); if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value) plugin->address_update_task = GNUNET_SCHEDULER_add_delayed (plugin->env.sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, min_remaining, &expire_address_task, plugin); } /** * Task used to clean up expired addresses for a plugin. * * @param cls closure * @param tc context */ static void expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TransportPlugin *plugin = cls; plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK; update_addresses (plugin, GNUNET_NO); } /** * Function that must be called by each plugin to notify the * transport service about the addresses under which the transport * provided by the plugin can be reached. * * @param cls closure * @param name name of the transport that generated the address * @param addr one of the addresses of the host, NULL for the last address * the specific address format depends on the transport * @param addrlen length of the address * @param expires when should this address automatically expire? */ static void plugin_env_notify_address (void *cls, const char *name, const void *addr, size_t addrlen, struct GNUNET_TIME_Relative expires) { struct TransportPlugin *p = cls; struct AddressList *al; struct GNUNET_TIME_Absolute abex; abex = GNUNET_TIME_relative_to_absolute (expires); GNUNET_assert (p == find_transport (name)); al = p->addresses; while (al != NULL) { if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen))) { if (al->expires.value < abex.value) al->expires = abex; return; } al = al->next; } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Plugin `%s' informs us about a new address `%s'\n", name, GNUNET_a2s(addr, addrlen)); #endif al = GNUNET_malloc (sizeof (struct AddressList) + addrlen); al->addr = &al[1]; al->next = p->addresses; p->addresses = al; al->expires = abex; al->addrlen = addrlen; memcpy (&al[1], addr, addrlen); update_addresses (p, GNUNET_YES); } /** * FIXME: document. */ struct LookupHelloContext { GNUNET_TRANSPORT_AddressCallback iterator; void *iterator_cls; }; /** * FIXME: document. */ static int lookup_address_callback (void *cls, const char *tname, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { struct LookupHelloContext *lhc = cls; lhc->iterator (lhc->iterator_cls, tname, addr, addrlen); return GNUNET_OK; } /** * FIXME: document. */ static void lookup_hello_callback (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_HELLO_Message *h, uint32_t trust) { struct LookupHelloContext *lhc = cls; if (peer == NULL) { lhc->iterator (lhc->iterator_cls, NULL, NULL, 0); GNUNET_free (lhc); return; } if (h == NULL) return; GNUNET_HELLO_iterate_addresses (h, GNUNET_NO, &lookup_address_callback, lhc); } /** * Function that allows a transport to query the known * network addresses for a given peer. * * @param cls closure * @param timeout after how long should we time out? * @param target which peer are we looking for? * @param iter function to call for each known address * @param iter_cls closure for iter */ static void plugin_env_lookup_address (void *cls, struct GNUNET_TIME_Relative timeout, const struct GNUNET_PeerIdentity *target, GNUNET_TRANSPORT_AddressCallback iter, void *iter_cls) { struct LookupHelloContext *lhc; lhc = GNUNET_malloc (sizeof (struct LookupHelloContext)); lhc->iterator = iter; lhc->iterator_cls = iter_cls; GNUNET_PEERINFO_for_all (cfg, sched, target, 0, timeout, &lookup_hello_callback, &lhc); } /** * Notify all of our clients about a peer connecting. */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, struct GNUNET_TIME_Relative latency) { struct ConnectInfoMessage cim; struct TransportClient *cpos; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Informing clients about peer `%4s' connecting to us\n", GNUNET_i2s (peer)); #endif cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000)); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; while (cpos != NULL) { transmit_to_client (cpos, &cim.header, GNUNET_NO); cpos = cpos->next; } } /** * Notify all of our clients about a peer disconnecting. */ static void notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) { struct DisconnectInfoMessage dim; struct TransportClient *cpos; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Informing clients about peer `%4s' disconnecting\n", GNUNET_i2s (peer)); #endif dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim.reserved = htonl (0); memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; while (cpos != NULL) { transmit_to_client (cpos, &dim.header, GNUNET_NO); cpos = cpos->next; } } /** * Copy any validated addresses to buf. * * @return 0 once all addresses have been * returned */ static size_t list_validated_addresses (void *cls, size_t max, void *buf) { struct ValidationAddress **va = cls; size_t ret; while ((NULL != *va) && ((*va)->ok != GNUNET_YES)) *va = (*va)->next; if (NULL == *va) return 0; ret = GNUNET_HELLO_add_address ((*va)->transport_name, (*va)->expiration, &(*va)[1], (*va)->addr_len, buf, max); *va = (*va)->next; return ret; } /** * HELLO validation cleanup task. */ static void cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct ValidationAddress *va; struct ValidationList *pos; struct ValidationList *prev; struct GNUNET_TIME_Absolute now; struct GNUNET_HELLO_Message *hello; struct GNUNET_PeerIdentity pid; struct NeighbourList *n; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "HELLO validation cleanup background task running...\n"); #endif now = GNUNET_TIME_absolute_get (); prev = NULL; pos = pending_validations; while (pos != NULL) { if (pos->timeout.value < now.value) { if (prev == NULL) pending_validations = pos->next; else prev->next = pos->next; va = pos->addresses; hello = GNUNET_HELLO_create (&pos->publicKey, &list_validated_addresses, &va); GNUNET_CRYPTO_hash (&pos->publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &pid.hashPubKey); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n", "HELLO", GNUNET_i2s (&pid)); #endif GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello); n = find_neighbour (&pid); if (NULL != n) try_transmission_to_peer (n); GNUNET_free (hello); while (NULL != (va = pos->addresses)) { pos->addresses = va->next; GNUNET_free (va->transport_name); GNUNET_free (va); } GNUNET_free (pos); if (prev == NULL) pos = pending_validations; else pos = prev->next; continue; } prev = pos; pos = pos->next; } /* finally, reschedule cleanup if needed; list is ordered by timeout, so we need the last element... */ pos = pending_validations; while ((pos != NULL) && (pos->next != NULL)) pos = pos->next; if (NULL != pos) GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, GNUNET_TIME_absolute_get_remaining (pos->timeout), &cleanup_validation, NULL); } /** * Function that will be called if we receive a validation * of an address challenge that we transmitted to another * peer. Note that the validation should only be considered * acceptable if the challenge matches AND if the sender * address is at least a plausible address for this peer * (otherwise we may be seeing a MiM attack). * * @param cls closure * @param name name of the transport that generated the address * @param peer who responded to our challenge * @param challenge the challenge number we presumably used * @param sender_addr string describing our sender address (as observed * by the other peer in human-readable format) */ static void plugin_env_notify_validation (void *cls, const char *name, const struct GNUNET_PeerIdentity *peer, uint32_t challenge, const char *sender_addr) { int all_done; int matched; struct ValidationList *pos; struct ValidationAddress *va; struct GNUNET_PeerIdentity id; pos = pending_validations; while (pos != NULL) { GNUNET_CRYPTO_hash (&pos->publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &id.hashPubKey); if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity))) break; pos = pos->next; } if (pos == NULL) { /* TODO: call statistics (unmatched PONG) */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"), GNUNET_i2s(peer)); return; } all_done = GNUNET_YES; matched = GNUNET_NO; va = pos->addresses; while (va != NULL) { if (va->challenge == challenge) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Confirmed validity of peer address.\n"); #endif GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"), sender_addr, name); va->ok = GNUNET_YES; va->expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); matched = GNUNET_YES; } if (va->ok != GNUNET_YES) all_done = GNUNET_NO; va = va->next; } if (GNUNET_NO == matched) { /* TODO: call statistics (unmatched PONG) */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"), "PONG", "PING"); } if (GNUNET_YES == all_done) { pos->timeout.value = 0; GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, GNUNET_TIME_UNIT_ZERO, &cleanup_validation, NULL); } } struct CheckHelloValidatedContext { /** * Plugin for which we are validating. */ struct TransportPlugin *plugin; /** * Hello that we are validating. */ struct GNUNET_HELLO_Message *hello; /** * Validation list being build. */ struct ValidationList *e; }; /** * Append the given address to the list of entries * that need to be validated. */ static int run_validation (void *cls, const char *tname, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { struct ValidationList *e = cls; struct TransportPlugin *tp; struct ValidationAddress *va; struct GNUNET_PeerIdentity id; tp = find_transport (tname); if (tp == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, _ ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"), tname); return GNUNET_OK; } GNUNET_CRYPTO_hash (&e->publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &id.hashPubKey); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling validation of address `%s' via `%s' for `%4s'\n", GNUNET_a2s(addr, addrlen), tname, GNUNET_i2s(&id)); va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen); va->next = e->addresses; e->addresses = va; va->transport_name = GNUNET_strdup (tname); va->addr_len = addrlen; va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) -1); memcpy (&va[1], addr, addrlen); return GNUNET_OK; } /** * Check if addresses in validated hello "h" overlap with * those in "chvc->hello" and update "chvc->hello" accordingly, * removing those addresses that have already been validated. */ static void check_hello_validated (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_HELLO_Message *h, uint32_t trust) { struct CheckHelloValidatedContext *chvc = cls; struct ValidationAddress *va; struct TransportPlugin *tp; int first_call; struct GNUNET_PeerIdentity apeer; first_call = GNUNET_NO; if (chvc->e == NULL) { first_call = GNUNET_YES; chvc->e = GNUNET_malloc (sizeof (struct ValidationList)); GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello, &chvc->e->publicKey)); chvc->e->timeout = GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT); chvc->e->next = pending_validations; pending_validations = chvc->e; } if (h != NULL) { GNUNET_HELLO_iterate_new_addresses (chvc->hello, h, GNUNET_TIME_absolute_get (), &run_validation, chvc->e); } else if (GNUNET_YES == first_call) { /* no existing HELLO, all addresses are new */ GNUNET_HELLO_iterate_addresses (chvc->hello, GNUNET_NO, &run_validation, chvc->e); } if (h != NULL) return; /* wait for next call */ /* finally, transmit validation attempts */ GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer)); va = chvc->e->addresses; while (va != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Establishing `%s' connection to validate `%s' of `%4s'\n", va->transport_name, "HELLO", GNUNET_i2s (&apeer)); #endif tp = find_transport (va->transport_name); GNUNET_assert (tp != NULL); if (GNUNET_OK != tp->api->validate (tp->api->cls, &apeer, va->challenge, HELLO_VERIFICATION_TIMEOUT, &va[1], va->addr_len)) va->ok = GNUNET_SYSERR; va = va->next; } if (chvc->e->next == NULL) GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, GNUNET_TIME_absolute_get_remaining (chvc->e->timeout), &cleanup_validation, NULL); GNUNET_free (chvc); } /** * Process HELLO-message. * * @param plugin transport involved, may be NULL * @param message the actual message * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise */ static int process_hello (struct TransportPlugin *plugin, const struct GNUNET_MessageHeader *message) { struct ValidationList *e; uint16_t hsize; struct GNUNET_PeerIdentity target; const struct GNUNET_HELLO_Message *hello; struct CheckHelloValidatedContext *chvc; struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; hsize = ntohs (message->size); if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) || (hsize < sizeof (struct GNUNET_MessageHeader))) { GNUNET_break (0); return GNUNET_SYSERR; } /* first, check if load is too high */ if (GNUNET_OS_load_cpu_get (cfg) > 100) { /* TODO: call to stats? */ return GNUNET_OK; } hello = (const struct GNUNET_HELLO_Message *) message; if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey)) { GNUNET_break_op (0); return GNUNET_SYSERR; } GNUNET_CRYPTO_hash (&publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &target.hashPubKey); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' message for `%4s'\n", "HELLO", GNUNET_i2s (&target)); #endif /* check if a HELLO for this peer is already on the validation list */ e = pending_validations; while (e != NULL) { if (0 == memcmp (&e->publicKey, &publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded))) { /* TODO: call to stats? */ #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s' message for peer `%4s' is already pending; ignoring new message\n", "HELLO", GNUNET_i2s (&target)); #endif return GNUNET_OK; } e = e->next; } chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize); chvc->plugin = plugin; chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1]; memcpy (chvc->hello, hello, hsize); /* finally, check if HELLO was previously validated (continuation will then schedule actual validation) */ GNUNET_PEERINFO_for_all (cfg, sched, &target, 0, HELLO_VERIFICATION_TIMEOUT, &check_hello_validated, chvc); return GNUNET_OK; } /** * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and do the necessary notifications. * Also notifies our clients that the neighbour is now officially * gone. * * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ static void disconnect_neighbour (struct NeighbourList *n, int check) { struct ReadyList *rpos; struct NeighbourList *npos; struct NeighbourList *nprev; struct MessageQueue *mq; if (GNUNET_YES == check) { rpos = n->plugins; while (NULL != rpos) { if (GNUNET_YES == rpos->connected) return; /* still connected */ rpos = rpos->next; } } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Disconnecting from `%4s'\n", GNUNET_i2s(&n->id)); #endif /* remove n from neighbours list */ nprev = NULL; npos = neighbours; while ((npos != NULL) && (npos != n)) { nprev = npos; npos = npos->next; } GNUNET_assert (npos != NULL); if (nprev == NULL) neighbours = n->next; else nprev->next = n->next; /* notify all clients about disconnect */ notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) { n->plugins = rpos->next; GNUNET_assert (rpos->neighbour == n); if (GNUNET_YES == rpos->connected) rpos->plugin->api->cancel (rpos->plugin->api->cls, rpos->plugin_handle, rpos, &n->id); GNUNET_free (rpos); } /* free all messages on the queue */ while (NULL != (mq = n->messages)) { n->messages = mq->next; GNUNET_assert (mq->neighbour == n); GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (sched, n->timeout_task); /* finally, free n itself */ GNUNET_free (n); } /** * Add an entry for each of our transport plugins * (that are able to send) to the list of plugins * for this neighbour. * * @param neighbour to initialize */ static void add_plugins (struct NeighbourList *neighbour) { struct TransportPlugin *tp; struct ReadyList *rl; neighbour->retry_plugins_time = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); tp = plugins; while (tp != NULL) { if (tp->api->send != NULL) { rl = GNUNET_malloc (sizeof (struct ReadyList)); rl->next = neighbour->plugins; neighbour->plugins = rl; rl->plugin = tp; rl->neighbour = neighbour; rl->transmit_ready = GNUNET_YES; } tp = tp->next; } } static void neighbour_timeout_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct NeighbourList *n = cls; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Neighbour `%4s' has timed out!\n", GNUNET_i2s(&n->id)); #endif n->timeout_task = GNUNET_SCHEDULER_NO_TASK; disconnect_neighbour (n, GNUNET_NO); } /** * Create a fresh entry in our neighbour list for the given peer. * Will try to transmit our current HELLO to the new neighbour. Also * notifies our clients about the new "connection". * * @param peer the peer for which we create the entry * @return the new neighbour list entry */ static struct NeighbourList * setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) { struct NeighbourList *n; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n", GNUNET_i2s (peer)); #endif GNUNET_assert (our_hello != NULL); n = GNUNET_malloc (sizeof (struct NeighbourList)); n->next = neighbours; neighbours = n; n->id = *peer; n->last_quota_update = GNUNET_TIME_absolute_get (); n->peer_timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); add_plugins (n); n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &neighbour_timeout_task, n); transmit_to_peer (NULL, 0, (const struct GNUNET_MessageHeader *) our_hello, GNUNET_YES, n); notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); return n; } /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about * reducing the rate at which they read from the socket * and generally forward to our receive callback. * * @param plugin_context value to pass to this plugin * to respond to the given peer (use is optional, * but may speed up processing) * @param service_context value passed to the transport-service * to identify the neighbour; will be NULL on the first * call for a given peer * @param latency estimated latency for communicating with the * given peer * @param peer (claimed) identity of the other peer * @param message the message, NULL if peer was disconnected * @return the new service_context that the plugin should use * for future receive calls for messages from this * particular peer */ static struct ReadyList * plugin_env_receive (void *cls, void *plugin_context, struct ReadyList *service_context, struct GNUNET_TIME_Relative latency, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message) { const struct GNUNET_MessageHeader ack = { htons (sizeof (struct GNUNET_MessageHeader)), htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK) }; struct TransportPlugin *plugin = cls; struct TransportClient *cpos; struct InboundMessage *im; uint16_t msize; struct NeighbourList *n; if (service_context != NULL) { n = service_context->neighbour; GNUNET_assert (n != NULL); } else { n = find_neighbour (peer); if (n == NULL) { if (message == NULL) return NULL; /* disconnect of peer already marked down */ n = setup_new_neighbour (peer); } service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); } if (message == NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Receive failed from `%4s', triggering disconnect\n", GNUNET_i2s(&n->id)); #endif /* TODO: call stats */ if ((service_context != NULL) && (service_context->plugin_handle == plugin_context)) { service_context->connected = GNUNET_NO; service_context->plugin_handle = NULL; } disconnect_neighbour (n, GNUNET_YES); return NULL; } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Processing message of type `%u' received by plugin...\n", ntohs (message->type)); #endif if (service_context != NULL) { if (service_context->connected == GNUNET_NO) { service_context->connected = GNUNET_YES; service_context->transmit_ready = GNUNET_YES; service_context->connect_attempts++; } service_context->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); service_context->plugin_handle = plugin_context; service_context->latency = latency; } /* update traffic received amount ... */ msize = ntohs (message->size); n->last_received += msize; GNUNET_SCHEDULER_cancel (sched, n->timeout_task); n->peer_timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_IDLE, GNUNET_SCHEDULER_NO_TASK, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &neighbour_timeout_task, n); update_quota (n); if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) { /* dropping message due to frequent inbound volume violations! */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, _ ("Dropping incoming message due to repeated bandwidth quota violations.\n")); /* TODO: call stats */ GNUNET_assert ( (service_context == NULL) || (NULL != service_context->neighbour) ); return service_context; } switch (ntohs (message->type)) { case GNUNET_MESSAGE_TYPE_HELLO: #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message from `%4s'.\n", "HELLO", GNUNET_i2s(peer)); #endif process_hello (plugin, message); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to connecting peer `%4s'.\n", "ACK", GNUNET_i2s(peer)); #endif transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n); break; case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK: n->saw_ack = GNUNET_YES; /* intentional fall-through! */ default: #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u from `%4s', sending to all clients.\n", ntohs (message->type), GNUNET_i2s(peer)); #endif /* transmit message to all clients */ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); im->header.size = htons (sizeof (struct InboundMessage) + msize); im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); im->latency = GNUNET_TIME_relative_hton (latency); im->peer = *peer; memcpy (&im[1], message, msize); cpos = clients; while (cpos != NULL) { transmit_to_client (cpos, &im->header, GNUNET_YES); cpos = cpos->next; } GNUNET_free (im); } GNUNET_assert ( (service_context == NULL) || (NULL != service_context->neighbour) ); return service_context; } /** * Handle START-message. This is the first message sent to us * by any client which causes us to add it to our list. * * @param cls closure (always NULL) * @param client identification of the client * @param message the actual message */ static void handle_start (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct TransportClient *c; struct ConnectInfoMessage cim; struct NeighbourList *n; struct InboundMessage *im; struct GNUNET_MessageHeader *ack; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request from client\n", "START"); #endif c = clients; while (c != NULL) { if (c->client == client) { /* client already on our list! */ GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } c = c->next; } c = GNUNET_malloc (sizeof (struct TransportClient)); c->next = clients; clients = c; c->client = client; if (our_hello != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending our own `%s' to new client\n", "HELLO"); #endif transmit_to_client (c, (const struct GNUNET_MessageHeader *) our_hello, GNUNET_NO); /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ im = GNUNET_malloc (sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)); im->header.size = htons (sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)); im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ ack = (struct GNUNET_MessageHeader *) &im[1]; ack->size = htons (sizeof (struct GNUNET_MessageHeader)); ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); for (n = neighbours; n != NULL; n = n->next) { cim.id = n->id; transmit_to_client (c, &cim.header, GNUNET_NO); if (n->saw_ack) { im->peer = n->id; transmit_to_client (c, &im->header, GNUNET_NO); } } GNUNET_free (im); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** * Handle HELLO-message. * * @param cls closure (always NULL) * @param client identification of the client * @param message the actual message */ static void handle_hello (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { int ret; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request from client\n", "HELLO"); #endif ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } /** * Handle SEND-message. * * @param cls closure (always NULL) * @param client identification of the client * @param message the actual message */ static void handle_send (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct TransportClient *tc; struct NeighbourList *n; const struct OutboundMessage *obm; const struct GNUNET_MessageHeader *obmm; uint16_t size; uint16_t msize; size = ntohs (message->size); if (size < sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } obm = (const struct OutboundMessage *) message; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request from client with target `%4s'\n", "SEND", GNUNET_i2s (&obm->peer)); #endif obmm = (const struct GNUNET_MessageHeader *) &obm[1]; msize = ntohs (obmm->size); if (size != msize + sizeof (struct OutboundMessage)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } n = find_neighbour (&obm->peer); if (n == NULL) n = setup_new_neighbour (&obm->peer); tc = clients; while ((tc != NULL) && (tc->client != client)) tc = tc->next; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client asked to transmit %u-byte message of type %u to `%4s'\n", ntohs (obmm->size), ntohs (obmm->type), GNUNET_i2s (&obm->peer)); #endif transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n); GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** * Handle SET_QUOTA-message. * * @param cls closure (always NULL) * @param client identification of the client * @param message the actual message */ static void handle_set_quota (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; struct TransportPlugin *p; struct ReadyList *rl; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request from client for peer `%4s'\n", "SET_QUOTA", GNUNET_i2s (&qsm->peer)); #endif n = find_neighbour (&qsm->peer); if (n == NULL) { GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } update_quota (n); if (n->quota_in < ntohl (qsm->quota_in)) n->last_quota_update = GNUNET_TIME_absolute_get (); n->quota_in = ntohl (qsm->quota_in); rl = n->plugins; while (rl != NULL) { p = rl->plugin; p->api->set_receive_quota (p->api->cls, &qsm->peer, ntohl (qsm->quota_in)); rl = rl->next; } GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** * Handle TRY_CONNECT-message. * * @param cls closure (always NULL) * @param client identification of the client * @param message the actual message */ static void handle_try_connect (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { const struct TryConnectMessage *tcm; tcm = (const struct TryConnectMessage *) message; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request from client %p asking to connect to `%4s'\n", "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer)); #endif if (NULL == find_neighbour (&tcm->peer)) setup_new_neighbour (&tcm->peer); #if DEBUG_TRANSPORT else GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Client asked to connect to `%4s', but connection already exists\n", "TRY_CONNECT", GNUNET_i2s (&tcm->peer)); #endif GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** * List of handlers for the messages understood by this * service. */ static struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_start, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0}, {&handle_hello, NULL, GNUNET_MESSAGE_TYPE_HELLO, 0}, {&handle_send, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, {&handle_try_connect, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, sizeof (struct TryConnectMessage)}, {NULL, NULL, 0, 0} }; /** * Setup the environment for this plugin. */ static void create_environment (struct TransportPlugin *plug) { plug->env.cfg = cfg; plug->env.sched = sched; plug->env.my_public_key = &my_public_key; plug->env.my_private_key = my_private_key; plug->env.my_identity = &my_identity; plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.lookup = &plugin_env_lookup_address; plug->env.notify_address = &plugin_env_notify_address; plug->env.notify_validation = &plugin_env_notify_validation; plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } /** * Start the specified transport (load the plugin). */ static void start_transport (struct GNUNET_SERVER_Handle *server, const char *name) { struct TransportPlugin *plug; char *libname; GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' transport plugin\n"), name); GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name); plug = GNUNET_malloc (sizeof (struct TransportPlugin)); create_environment (plug); plug->short_name = GNUNET_strdup (name); plug->lib_name = libname; plug->next = plugins; plugins = plug; plug->api = GNUNET_PLUGIN_load (libname, &plug->env); if (plug->api == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to load transport plugin for `%s'\n"), name); GNUNET_free (plug->short_name); plugins = plug->next; GNUNET_free (libname); GNUNET_free (plug); } } /** * Called whenever a client is disconnected. Frees our * resources associated with that client. * * @param cls closure * @param client identification of the client */ static void client_disconnect_notification (void *cls, struct GNUNET_SERVER_Client *client) { struct TransportClient *pos; struct TransportClient *prev; struct ClientMessageQueueEntry *mqe; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Client disconnected, cleaning up.\n"); #endif prev = NULL; pos = clients; while ((pos != NULL) && (pos->client != client)) { prev = pos; pos = pos->next; } if (pos == NULL) return; while (NULL != (mqe = pos->message_queue_head)) { pos->message_queue_head = mqe->next; GNUNET_free (mqe); } pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else prev->next = pos->next; if (GNUNET_YES == pos->tcs_pending) { pos->client = NULL; return; } GNUNET_free (pos); } /** * Initiate transport service. * * @param cls closure * @param s scheduler to use * @param serv the initialized server * @param c configuration to use */ static void run (void *cls, struct GNUNET_SCHEDULER_Handle *s, struct GNUNET_SERVER_Handle *serv, const struct GNUNET_CONFIGURATION_Handle *c) { char *plugs; char *pos; int no_transports; unsigned long long tneigh; char *keyfile; sched = s; cfg = c; /* parse configuration */ if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, "TRANSPORT", "NEIGHBOUR_LIMIT", &tneigh)) || (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (c, "GNUNETD", "HOSTKEY", &keyfile))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Transport service is lacking key configuration settings. Exiting.\n")); GNUNET_SCHEDULER_shutdown (s); return; } max_connect_per_transport = (uint32_t) tneigh; my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile); GNUNET_free (keyfile); if (my_private_key == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Transport service could not access hostkey. Exiting.\n")); GNUNET_SCHEDULER_shutdown (s); return; } GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key), &my_identity.hashPubKey); /* setup notification */ server = serv; GNUNET_SERVER_disconnect_notify (server, &client_disconnect_notification, NULL); /* load plugins... */ no_transports = 1; if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_string (c, "TRANSPORT", "PLUGINS", &plugs)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting transport plugins `%s'\n"), plugs); pos = strtok (plugs, " "); while (pos != NULL) { start_transport (server, pos); no_transports = 0; pos = strtok (NULL, " "); } GNUNET_free (plugs); } if (no_transports) refresh_hello (); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Transport service ready.\n")); #endif /* process client requests */ GNUNET_SERVER_add_handlers (server, handlers); } /** * Function called when the service shuts * down. Unloads our plugins. * * @param cls closure * @param cfg configuration to use */ static void unload_plugins (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg) { struct TransportPlugin *plug; struct AddressList *al; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport service is unloading plugins...\n"); #endif while (NULL != (plug = plugins)) { plugins = plug->next; GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api)); GNUNET_free (plug->lib_name); GNUNET_free (plug->short_name); while (NULL != (al = plug->addresses)) { plug->addresses = al->next; GNUNET_free (al); } GNUNET_free (plug); } if (my_private_key != NULL) GNUNET_CRYPTO_rsa_key_free (my_private_key); GNUNET_free_non_null (our_hello); } /** * The main function for the transport service. * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { return (GNUNET_OK == GNUNET_SERVICE_run (argc, argv, "transport", &run, NULL, &unload_plugins, NULL)) ? 0 : 1; } /* end of gnunet-service-transport.c */