From 49b581dd1c00d769e97031c51b5865846e802f8f Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 7 Nov 2018 21:28:59 +0100 Subject: starting with communicator --- src/transport/gnunet-communicator-unix.c | 1649 ++++++++++++++++++++++++++++++ 1 file changed, 1649 insertions(+) create mode 100644 src/transport/gnunet-communicator-unix.c (limited to 'src/transport/gnunet-communicator-unix.c') diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c new file mode 100644 index 000000000..373b74149 --- /dev/null +++ b/src/transport/gnunet-communicator-unix.c @@ -0,0 +1,1649 @@ +/* + This file is part of GNUnet + Copyright (C) 2010-2014, 2018 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +/** + * @file transport/gnunet-communicator-unix.c + * @brief Transport plugin using unix domain sockets (!) + * Clearly, can only be used locally on Unix/Linux hosts... + * ONLY INTENDED FOR TESTING!!! + * @author Christian Grothoff + * @author Nathan Evans + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_statistics_service.h" +#include "gnunet_transport_communication_service.h" + +/** + * Name of the communicator. + */ +#define COMMUNICATOR_NAME "unix" + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * UNIX Message-Packet header. + */ +struct UNIXMessage +{ + /** + * Message header. + */ + struct GNUNET_MessageHeader header; + + /** + * What is the identity of the sender (GNUNET_hash of public key) + */ + struct GNUNET_PeerIdentity sender; + +}; + +GNUNET_NETWORK_STRUCT_END + + +/** + * Information we track for a message awaiting transmission. + */ +struct UNIXMessageWrapper +{ + /** + * We keep messages in a doubly linked list. + */ + struct UNIXMessageWrapper *next; + + /** + * We keep messages in a doubly linked list. + */ + struct UNIXMessageWrapper *prev; + + /** + * The actual payload (allocated separately right now). + */ + struct UNIXMessage *msg; + + /** + * Queue this message belongs to. + */ + struct Queue *queue; + + /** + * Function to call upon transmission. + */ + GNUNET_TRANSPORT_TransmitContinuation cont; + + /** + * Closure for @e cont. + */ + void *cont_cls; + + /** + * Timeout for this message. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Number of bytes in @e msg. + */ + size_t msgsize; + + /** + * Number of bytes of payload encapsulated in @e msg. + */ + size_t payload; + + /** + * Priority of the message (ignored, just dragged along in UNIX). + */ + unsigned int priority; +}; + + +/** + * Handle for a queue. + */ +struct Queue +{ + + /** + * Queues with pending messages (!) are kept in a DLL. + */ + struct Queue *next; + + /** + * Queues with pending messages (!) are kept in a DLL. + */ + struct Queue *prev; + + /** + * To whom are we talking to (set to our identity + * if we are still waiting for the welcome message). + * + * FIXME: information duplicated with 'peer' in address! + */ + struct GNUNET_PeerIdentity target; + + /** + * Address of the other peer. + */ + struct sockaddr_un *address; + + /** + * Length of the address. + */ + socklen_t address_len; + + /** + * Message queue we are providing for the #ch. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * handle for this queue with the #ch. + */ + struct GNUNET_TRANSPORT_QueueHandle *qh; + + /** + * Number of bytes we currently have in our write queue. + */ + unsigned long long bytes_in_queue; + + /** + * Timeout for this queue. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Queue timeout task. + */ + struct GNUNET_SCHEDULER_Task * timeout_task; + + /** + * Number of messages we currently have in our write queue. + */ + unsigned int msgs_in_queue; + +}; + + + +/** + * ID of read task + */ +static struct GNUNET_SCHEDULER_Task *read_task; + +/** + * ID of write task + */ +static struct GNUNET_SCHEDULER_Task *write_task; + +/** + * Number of bytes we currently have in our write queues. + */ +static unsigned long long bytes_in_queue; + +/** + * Our environment. + */ +static struct GNUNET_TRANSPORT_CommunicatorHandle *ch; + +/** + * Queues (map from peer identity to `struct Queue`) + */ +static struct GNUNET_CONTAINER_MultiPeerMap *queue_map; + +/** + * Head of queue of messages to transmit. + */ +static struct UNIXMessageWrapper *msg_head; + +/** + * Tail of queue of messages to transmit. + */ +static struct UNIXMessageWrapper *msg_tail; + +/** + * socket that we transmit all data with + */ +static struct GNUNET_NETWORK_Handle *unix_sock; + +/** + * Handle to the operation that publishes our address. + */ +static struct GNUNET_TRANSPORT_AddressIdentifier *ai; + + +/** + * If a queue monitor is attached, notify it about the new + * queue state. + * + * @param plugin our plugin + * @param queue queue that changed state + * @param state new state of the queue + */ +static void +notify_queue_monitor (struct Plugin *plugin, + struct Queue *queue, + enum GNUNET_TRANSPORT_QueueState state) +{ + struct GNUNET_TRANSPORT_QueueInfo info; + + if (NULL == plugin->sic) + return; + memset (&info, 0, sizeof (info)); + info.state = state; + info.is_inbound = GNUNET_SYSERR; /* hard to say */ + info.num_msg_pending = queue->msgs_in_queue; + info.num_bytes_pending = queue->bytes_in_queue; + /* info.receive_delay remains zero as this is not supported by UNIX + (cannot selectively not receive from 'some' peer while continuing + to receive from others) */ + info.queue_timeout = queue->timeout; + info.address = queue->address; + plugin->sic (plugin->sic_cls, + queue, + &info); +} + + +/** + * Function called for a quick conversion of the binary address to + * a numeric address. Note that the caller must not free the + * address and that the next call to this function is allowed + * to override the address again. + * + * @param cls closure + * @param addr binary address + * @param addrlen length of the @a addr + * @return string representing the same address + */ +static const char * +unix_plugin_address_to_string (void *cls, + const void *addr, + size_t addrlen) +{ + static char rbuf[1024]; + struct UnixAddress *ua = (struct UnixAddress *) addr; + char *addrstr; + size_t addr_str_len; + unsigned int off; + + if ((NULL == addr) || (sizeof (struct UnixAddress) > addrlen)) + { + GNUNET_break(0); + return NULL; + } + addrstr = (char *) &ua[1]; + addr_str_len = ntohl (ua->addrlen); + + if (addr_str_len != addrlen - sizeof(struct UnixAddress)) + { + GNUNET_break(0); + return NULL; + } + if ('\0' != addrstr[addr_str_len - 1]) + { + GNUNET_break(0); + return NULL; + } + if (strlen (addrstr) + 1 != addr_str_len) + { + GNUNET_break(0); + return NULL; + } + + off = 0; + if ('\0' == addrstr[0]) + off++; + memset (rbuf, 0, sizeof (rbuf)); + GNUNET_snprintf (rbuf, + sizeof (rbuf) - 1, + "%s.%u.%s%.*s", + PLUGIN_NAME, + ntohl (ua->options), + (off == 1) ? "@" : "", + (int) (addr_str_len - off), + &addrstr[off]); + return rbuf; +} + + +/** + * Functions with this signature are called whenever we need + * to close a queue due to a disconnect or failure to + * establish a connection. + * + * @param queue queue to close down + */ +static void +unix_plugin_queue_disconnect (struct Queue *queue) +{ + struct Plugin *plugin = cls; + struct UNIXMessageWrapper *msgw; + struct UNIXMessageWrapper *next; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting queue for peer `%s'\n", + GNUNET_i2s (&queue->target)); + plugin->env->queue_end (plugin->env->cls, + queue->address, + queue); + next = plugin->msg_head; + while (NULL != next) + { + msgw = next; + next = msgw->next; + if (msgw->queue != queue) + continue; + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + queue->msgs_in_queue--; + GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); + queue->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->queue->target, + GNUNET_SYSERR, + msgw->payload, 0); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); + } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (plugin->queue_map, + &queue->target, + queue)); + GNUNET_STATISTICS_set (stats, + "# UNIX queues active", + GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), + GNUNET_NO); + if (NULL != queue->timeout_task) + { + GNUNET_SCHEDULER_cancel (queue->timeout_task); + queue->timeout_task = NULL; + queue->timeout = GNUNET_TIME_UNIT_ZERO_ABS; + } + GNUNET_free (queue->address); + GNUNET_break (0 == queue->bytes_in_queue); + GNUNET_break (0 == queue->msgs_in_queue); + GNUNET_free (queue); +} + + +/** + * Queue was idle for too long, so disconnect it + * + * @param cls the `struct Queue *` to disconnect + */ +static void +queue_timeout (void *cls) +{ + struct Queue *queue = cls; + struct GNUNET_TIME_Relative left; + + queue->timeout_task = NULL; + left = GNUNET_TIME_absolute_get_remaining (queue->timeout); + if (0 != left.rel_value_us) + { + /* not actually our turn yet, but let's at least update + the monitor, it may think we're about to die ... */ + queue->timeout_task + = GNUNET_SCHEDULER_add_delayed (left, + &queue_timeout, + queue); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue %p was idle for %s, disconnecting\n", + queue, + GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + GNUNET_YES)); + unix_plugin_queue_disconnect (queue); +} + + +/** + * Increment queue timeout due to activity. We do not immediately + * notify the monitor here as that might generate excessive + * signalling. + * + * @param queue queue for which the timeout should be rescheduled + */ +static void +reschedule_queue_timeout (struct Queue *queue) +{ + GNUNET_assert (NULL != queue->timeout_task); + queue->timeout + = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); +} + + +/** + * Convert unix path to a `struct sockaddr_un *` + * + * @param unixpath path to convert + * @param[out] sock_len set to the length of the address + * @param is_abstract is this an abstract @a unixpath + * @return converted unix path + */ +static struct sockaddr_un * +unix_address_to_sockaddr (const char *unixpath, + socklen_t *sock_len, + int is_abstract) +{ + struct sockaddr_un *un; + size_t slen; + + GNUNET_assert (0 < strlen (unixpath)); /* sanity check */ + un = GNUNET_new (struct sockaddr_un); + un->sun_family = AF_UNIX; + slen = strlen (unixpath); + if (slen >= sizeof (un->sun_path)) + slen = sizeof (un->sun_path) - 1; + GNUNET_memcpy (un->sun_path, unixpath, slen); + un->sun_path[slen] = '\0'; + slen = sizeof (struct sockaddr_un); +#if HAVE_SOCKADDR_UN_SUN_LEN + un->sun_len = (u_char) slen; +#endif + (*sock_len) = slen; + if (GNUNET_YES == is_abstract) + un->sun_path[0] = '\0'; + return un; +} + + +/** + * Closure to #lookup_queue_it(). + */ +struct LookupCtx +{ + /** + * Location to store the queue, if found. + */ + struct Queue *res; + + /** + * Address we are looking for. + */ + const sockaddr_un *un; + + /** + * Number of bytes in @a un + */ + socklen_t un_len; +}; + + +/** + * Function called to find a queue by address. + * + * @param cls the `struct LookupCtx *` + * @param key peer we are looking for (unused) + * @param value a queue + * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success + */ +static int +lookup_queue_it (void *cls, + const struct GNUNET_PeerIdentity * key, + void *value) +{ + struct LookupCtx *lctx = cls; + struct Queue *queue = value; + + if ( (queue->address_len = lctx->un_len) && + (0 == memcmp (lctx->un, + queue->address, + queue->address_len)) ) + { + lctx->res = queue; + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * Find an existing queue by address. + * + * @param plugin the plugin + * @param address the address to find + * @return NULL if queue was not found + */ +static struct Queue * +lookup_queue (const struct GNUNET_PeerIdentity *peer, + const sockaddr_un *un, + socklen_t un_len) +{ + struct LookupCtx lctx; + + lctx.un = un; + lctx.un_len = un_len; + GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map, + peer, + &lookup_queue_it, + &lctx); + return lctx.res; +} + + + +/** + * Actually send out the message, assume we've got the address and + * send_handle squared away! + * + * @param cls closure + * @param send_handle which handle to send message on + * @param target who should receive this message (ignored by UNIX) + * @param msgbuf one or more GNUNET_MessageHeader(s) strung together + * @param msgbuf_size the size of the @a msgbuf to send + * @param priority how important is the message (ignored by UNIX) + * @param timeout when should we time out (give up) if we can not transmit? + * @param addr the addr to send the message to, needs to be a sockaddr for us + * @param addrlen the len of @a addr + * @param payload bytes payload to send + * @param cont continuation to call once the message has + * been transmitted (or if the transport is ready + * for the next transmission call; or if the + * peer disconnected...) + * @param cont_cls closure for @a cont + * @return on success the number of bytes written, RETRY for retry, -1 on errors + */ +static ssize_t +unix_real_send (void *cls, + struct GNUNET_NETWORK_Handle *send_handle, + const struct GNUNET_PeerIdentity *target, + const char *msgbuf, + size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Absolute timeout, + const struct UnixAddress *addr, + size_t addrlen, + size_t payload, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) +{ + struct Plugin *plugin = cls; + ssize_t sent; + struct sockaddr_un *un; + socklen_t un_len; + const char *unixpath; + + if (NULL == send_handle) + { + GNUNET_break (0); /* We do not have a send handle */ + return GNUNET_SYSERR; + } + if ((NULL == addr) || (0 == addrlen)) + { + GNUNET_break (0); /* Can never send if we don't have an address */ + return GNUNET_SYSERR; + } + + /* Prepare address */ + unixpath = (const char *) &addr[1]; + if (NULL == (un = unix_address_to_sockaddr (unixpath, + &un_len))) + { + GNUNET_break (0); + return -1; + } + + if ((GNUNET_YES == plugin->is_abstract) && + (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) ) + { + un->sun_path[0] = '\0'; + } +resend: + /* Send the data */ + sent = GNUNET_NETWORK_socket_sendto (send_handle, + msgbuf, + msgbuf_size, + (const struct sockaddr *) un, + un_len); + if (GNUNET_SYSERR == sent) + { + if ( (EAGAIN == errno) || + (ENOBUFS == errno) ) + { + GNUNET_free (un); + return RETRY; /* We have to retry later */ + } + if (EMSGSIZE == errno) + { + socklen_t size = 0; + socklen_t len = sizeof (size); + + GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *) + send_handle, SOL_SOCKET, SO_SNDBUF, &size, + &len); + if (size < msgbuf_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Trying to increase socket buffer size from %u to %u for message size %u\n", + (unsigned int) size, + (unsigned int) ((msgbuf_size / 1000) + 2) * 1000, + (unsigned int) msgbuf_size); + size = ((msgbuf_size / 1000) + 2) * 1000; + if (GNUNET_OK == + GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle, + SOL_SOCKET, SO_SNDBUF, + &size, sizeof (size))) + goto resend; /* Increased buffer size, retry sending */ + else + { + /* Could not increase buffer size: error, no retry */ + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt"); + GNUNET_free (un); + return GNUNET_SYSERR; + } + } + else + { + /* Buffer is bigger than message: error, no retry + * This should never happen!*/ + GNUNET_break (0); + GNUNET_free (un); + return GNUNET_SYSERR; + } + } + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UNIX transmitted %u-byte message to %s (%d: %s)\n", + (unsigned int) msgbuf_size, + GNUNET_a2s ((const struct sockaddr *)un, un_len), + (int) sent, + (sent < 0) ? STRERROR (errno) : "ok"); + GNUNET_free (un); + return sent; +} + + +/** + * Function obtain the network type for a queue + * + * @param cls closure ('struct Plugin*') + * @param queue the queue + * @return the network type in HBO or #GNUNET_SYSERR + */ +static enum GNUNET_ATS_Network_Type +unix_plugin_get_network (void *cls, + struct Queue *queue) +{ + GNUNET_assert (NULL != queue); + return GNUNET_ATS_NET_LOOPBACK; +} + + +/** + * Function obtain the network type for a queue + * + * @param cls closure (`struct Plugin *`) + * @param address the address + * @return the network type + */ +static enum GNUNET_ATS_Network_Type +unix_plugin_get_network_for_address (void *cls, + const struct GNUNET_HELLO_Address *address) + +{ + return GNUNET_ATS_NET_LOOPBACK; +} + + +/** + * Creates a new outbound queue the transport service will use to send data to the + * peer + * + * @param cls the plugin + * @param address the address + * @return the queue or NULL of max connections exceeded + */ +static struct Queue * +unix_plugin_get_queue (void *cls, + const struct GNUNET_HELLO_Address *address) +{ + struct Plugin *plugin = cls; + struct Queue *queue; + struct UnixAddress *ua; + char * addrstr; + uint32_t addr_str_len; + uint32_t addr_option; + + ua = (struct UnixAddress *) address->address; + if ((NULL == address->address) || (0 == address->address_length) || + (sizeof (struct UnixAddress) > address->address_length)) + { + GNUNET_break (0); + return NULL; + } + addrstr = (char *) &ua[1]; + addr_str_len = ntohl (ua->addrlen); + addr_option = ntohl (ua->options); + + if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) && + (GNUNET_NO == plugin->is_abstract)) + { + return NULL; + } + + if (addr_str_len != address->address_length - sizeof (struct UnixAddress)) + { + return NULL; /* This can be a legacy address */ + } + + if ('\0' != addrstr[addr_str_len - 1]) + { + GNUNET_break (0); + return NULL; + } + if (strlen (addrstr) + 1 != addr_str_len) + { + GNUNET_break (0); + return NULL; + } + + /* Check if a queue for this address already exists */ + if (NULL != (queue = lookup_queue (plugin, + address))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing queue %p for address `%s'\n", + queue, + unix_plugin_address_to_string (NULL, + address->address, + address->address_length)); + return queue; + } + + /* create a new queue */ + queue = GNUNET_new (struct Queue); + queue->target = address->peer; + queue->address = GNUNET_HELLO_address_copy (address); + queue->plugin = plugin; + queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + queue->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &queue_timeout, + queue); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating a new queue %p for address `%s'\n", + queue, + unix_plugin_address_to_string (NULL, + address->address, + address->address_length)); + (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map, + &address->peer, queue, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UNIX queues active", + GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), + GNUNET_NO); + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_INIT); + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_UP); + return queue; +} + + +/** + * Function that will be called whenever the transport service wants + * to notify the plugin that a queue is still active and in use and + * therefore the queue timeout for this queue has to be updated + * + * @param cls closure with the `struct Plugin *` + * @param peer which peer was the queue for + * @param queue which queue is being updated + */ +static void +unix_plugin_update_queue_timeout (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Queue *queue) +{ + struct Plugin *plugin = cls; + + if (GNUNET_OK != + GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, + &queue->target, + queue)) + { + GNUNET_break (0); + return; + } + reschedule_queue_timeout (queue); +} + + +/** + * Demultiplexer for UNIX messages + * + * @param plugin the main plugin for this transport + * @param sender from which peer the message was received + * @param currhdr pointer to the header of the message + * @param ua address to look for + * @param ua_len length of the address @a ua + */ +static void +unix_demultiplexer (struct Plugin *plugin, + struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *currhdr, + const struct UnixAddress *ua, + size_t ua_len) +{ + struct Queue *queue; + struct GNUNET_HELLO_Address *address; + + GNUNET_assert (ua_len >= sizeof (struct UnixAddress)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message from %s\n", + unix_plugin_address_to_string (NULL, ua, ua_len)); + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes received via UNIX", + ntohs (currhdr->size), + GNUNET_NO); + + /* Look for existing queue */ + address = GNUNET_HELLO_address_allocate (sender, + PLUGIN_NAME, + ua, ua_len, + GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" queues */ + queue = lookup_queue (plugin, address); + if (NULL == queue) + { + queue = unix_plugin_get_queue (plugin, address); + /* Notify transport and ATS about new inbound queue */ + plugin->env->queue_start (NULL, + queue->address, + queue, + GNUNET_ATS_NET_LOOPBACK); + } + else + { + reschedule_queue_timeout (queue); + } + GNUNET_HELLO_address_free (address); + plugin->env->receive (plugin->env->cls, + queue->address, + queue, + currhdr); +} + + +/** + * Read from UNIX domain socket (it is ready). + * + * @param plugin the plugin + */ +static void +unix_plugin_do_read (struct Plugin *plugin) +{ + char buf[65536] GNUNET_ALIGN; + struct UnixAddress *ua; + struct UNIXMessage *msg; + struct GNUNET_PeerIdentity sender; + struct sockaddr_un un; + socklen_t addrlen; + ssize_t ret; + int offset; + int tsize; + int is_abstract; + char *msgbuf; + const struct GNUNET_MessageHeader *currhdr; + uint16_t csize; + size_t ua_len; + + addrlen = sizeof (un); + memset (&un, 0, sizeof (un)); + ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, + buf, sizeof (buf), + (struct sockaddr *) &un, + &addrlen); + if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) + return; + if (GNUNET_SYSERR == ret) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "recvfrom"); + return; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read %d bytes from socket %s\n", + (int) ret, + un.sun_path); + } + + GNUNET_assert (AF_UNIX == (un.sun_family)); + is_abstract = GNUNET_NO; + if ('\0' == un.sun_path[0]) + { + un.sun_path[0] = '@'; + is_abstract = GNUNET_YES; + } + + ua_len = sizeof (struct UnixAddress) + strlen (un.sun_path) + 1; + ua = GNUNET_malloc (ua_len); + ua->addrlen = htonl (strlen (&un.sun_path[0]) +1); + GNUNET_memcpy (&ua[1], &un.sun_path[0], strlen (un.sun_path) + 1); + if (is_abstract) + ua->options = htonl(UNIX_OPTIONS_USE_ABSTRACT_SOCKETS); + else + ua->options = htonl(UNIX_OPTIONS_NONE); + + msg = (struct UNIXMessage *) buf; + csize = ntohs (msg->header.size); + if ((csize < sizeof (struct UNIXMessage)) || (csize > ret)) + { + GNUNET_break_op (0); + GNUNET_free (ua); + return; + } + msgbuf = (char *) &msg[1]; + GNUNET_memcpy (&sender, + &msg->sender, + sizeof (struct GNUNET_PeerIdentity)); + offset = 0; + tsize = csize - sizeof (struct UNIXMessage); + while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) + { + currhdr = (struct GNUNET_MessageHeader *) &msgbuf[offset]; + csize = ntohs (currhdr->size); + if ((csize < sizeof (struct GNUNET_MessageHeader)) || + (csize > tsize - offset)) + { + GNUNET_break_op (0); + break; + } + unix_demultiplexer (plugin, &sender, currhdr, ua, ua_len); + offset += csize; + } + GNUNET_free (ua); +} + + +/** + * Write to UNIX domain socket (it is ready). + * + * @param plugin handle to the plugin + */ +static void +unix_plugin_do_write (struct Plugin *plugin) +{ + ssize_t sent = 0; + struct UNIXMessageWrapper *msgw; + struct Queue *queue; + int did_delete; + + queue = NULL; + did_delete = GNUNET_NO; + while (NULL != (msgw = plugin->msg_head)) + { + if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) + break; /* Message is ready for sending */ + /* Message has a timeout */ + did_delete = GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Timeout for message with %u bytes \n", + (unsigned int) msgw->msgsize); + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + queue = msgw->queue; + queue->msgs_in_queue--; + GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); + queue->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX bytes discarded", + msgw->msgsize, + GNUNET_NO); + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->queue->target, + GNUNET_SYSERR, + msgw->payload, + 0); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); + } + if (NULL == msgw) + { + if (GNUNET_YES == did_delete) + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_UPDATE); + return; /* Nothing to send at the moment */ + } + queue = msgw->queue; + sent = unix_real_send (plugin, + unix_sock, + &queue->target, + (const char *) msgw->msg, + msgw->msgsize, + msgw->priority, + msgw->timeout, + msgw->queue->address->address, + msgw->queue->address->address_length, + msgw->payload, + msgw->cont, msgw->cont_cls); + if (RETRY == sent) + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX retry attempts", + 1, GNUNET_NO); + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_UPDATE); + return; + } + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + queue->msgs_in_queue--; + GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); + queue->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, GNUNET_NO); + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_UPDATE); + if (GNUNET_SYSERR == sent) + { + /* failed and no retry */ + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->queue->target, + GNUNET_SYSERR, + msgw->payload, 0); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX bytes discarded", + msgw->msgsize, + GNUNET_NO); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); + return; + } + /* successfully sent bytes */ + GNUNET_break (sent > 0); + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes transmitted via UNIX", + msgw->msgsize, + GNUNET_NO); + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->queue->target, + GNUNET_OK, + msgw->payload, + msgw->msgsize); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); +} + + +/** + * We have been notified that our socket has something to read. + * Then reschedule this function to be called again once more is available. + * + * @param cls the plugin handle + */ +static void +unix_plugin_select_read (void *cls) +{ + struct Plugin *plugin = cls; + const struct GNUNET_SCHEDULER_TaskContext *tc; + + plugin->read_task = NULL; + tc = GNUNET_SCHEDULER_get_task_context (); + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) + unix_plugin_do_read (plugin); + plugin->read_task = + GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &unix_plugin_select_read, plugin); +} + + +/** + * We have been notified that our socket is ready to write. + * Then reschedule this function to be called again once more is available. + * + * @param cls the plugin handle + */ +static void +unix_plugin_select_write (void *cls) +{ + struct Plugin *plugin = cls; + const struct GNUNET_SCHEDULER_TaskContext *tc; + + plugin->write_task = NULL; + tc = GNUNET_SCHEDULER_get_task_context (); + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) + unix_plugin_do_write (plugin); + if (NULL == plugin->msg_head) + return; /* write queue empty */ + plugin->write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &unix_plugin_select_write, plugin); +} + + +/** + * Function that can be used by the transport service to transmit + * a message using the plugin. Note that in the case of a + * peer disconnecting, the continuation MUST be called + * prior to the disconnect notification itself. This function + * will be called with this peer's HELLO message to initiate + * a fresh connection to another peer. + * + * @param cls closure + * @param queue which queue must be used + * @param msgbuf the message to transmit + * @param msgbuf_size number of bytes in @a msgbuf + * @param priority how important is the message (most plugins will + * ignore message priority and just FIFO) + * @param to how long to wait at most for the transmission (does not + * require plugins to discard the message after the timeout, + * just advisory for the desired delay; most plugins will ignore + * this as well) + * @param cont continuation to call once the message has + * been transmitted (or if the transport is ready + * for the next transmission call; or if the + * peer disconnected...); can be NULL + * @param cont_cls closure for @a cont + * @return number of bytes used (on the physical network, with overheads); + * -1 on hard errors (i.e. address invalid); 0 is a legal value + * and does NOT mean that the message was not transmitted (DV) + */ +static ssize_t +unix_plugin_send (void *cls, + struct Queue *queue, + const char *msgbuf, + size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Relative to, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) +{ + struct Plugin *plugin = cls; + struct UNIXMessageWrapper *wrapper; + struct UNIXMessage *message; + int ssize; + + if (GNUNET_OK != + GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, + &queue->target, + queue)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Invalid queue for peer `%s' `%s'\n", + GNUNET_i2s (&queue->target), + unix_plugin_address_to_string (NULL, + queue->address->address, + queue->address->address_length)); + GNUNET_break (0); + return GNUNET_SYSERR; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes with queue for peer `%s' `%s'\n", + msgbuf_size, + GNUNET_i2s (&queue->target), + unix_plugin_address_to_string (NULL, + queue->address->address, + queue->address->address_length)); + ssize = sizeof (struct UNIXMessage) + msgbuf_size; + message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); + message->header.size = htons (ssize); + message->header.type = htons (0); + GNUNET_memcpy (&message->sender, plugin->env->my_identity, + sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (&message[1], msgbuf, msgbuf_size); + wrapper = GNUNET_new (struct UNIXMessageWrapper); + wrapper->msg = message; + wrapper->msgsize = ssize; + wrapper->payload = msgbuf_size; + wrapper->priority = priority; + wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), + to); + wrapper->cont = cont; + wrapper->cont_cls = cont_cls; + wrapper->queue = queue; + GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head, + plugin->msg_tail, + wrapper); + plugin->bytes_in_queue += ssize; + queue->bytes_in_queue += ssize; + queue->msgs_in_queue++; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, + GNUNET_NO); + notify_queue_monitor (plugin, + queue, + GNUNET_TRANSPORT_SS_UPDATE); + if (NULL == plugin->write_task) + plugin->write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &unix_plugin_select_write, plugin); + return ssize; +} + + +/** + * Signature of functions implementing the + * sending functionality of a message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +mq_send (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ +} + + +/** + * Signature of functions implementing the + * destruction of a message queue. + * Implementations must not free @a mq, but should + * take care of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation + */ +static void +mq_destroy (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ +} + + +/** + * Implementation function that cancels the currently sent message. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ +static void +mq_cancel (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ +} + + +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure + * @param error error code + */ +static void +mq_error (void *cls, + enum GNUNET_MQ_Error error) +{ +} + + + +/** + * Function called by the transport service to initialize a + * message queue given address information about another peer. + * If and when the communication channel is established, the + * communicator must call #GNUNET_TRANSPORT_communicator_mq_add() + * to notify the service that the channel is now up. It is + * the responsibility of the communicator to manage sane + * retries and timeouts for any @a peer/@a address combination + * provided by the transport service. Timeouts and retries + * do not need to be signalled to the transport service. + * + * @param cls closure + * @param peer identity of the other peer + * @param address where to send the message, human-readable + * communicator-specific format, 0-terminated, UTF-8 + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid + */ +static int +mq_init (void *cls, + const struct GNUNET_PeerIdentity *peer, + const void *address) +{ + struct Queue *queue; + char *a; + char *e; + int is_abs; + sockaddr_un *un; + socklen_t un_len; + + if (NULL == strncmp (address, + COMMUNICATOR_NAME "-", + strlen (COMMUNICATOR_NAME "-"))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]); + e = strchr (a, + (unsigned char) '#'); + if (NULL == e) + { + GNUNET_free (a); + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + is_abs = ('1' == e[1]); + *e = '\0'; + un = unix_address_to_sockaddr (a, + &un_len, + is_abs); + queue = lookup_queue (peer, + un, + un_len); + if (NULL != queue) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Address `%s' ignored, queue exists\n", + address); + GNUNET_free (un); + return GNUNET_OK; + } + queue = GNUNET_new (struct Queue); + queue->target = *peer; + queue->address = un; + queue->address_len = un_len; + (void) GNUNET_CONTAINER_multihashmap_put (queue_map, + &queue->target, + queue, + GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_STATISTICS_set (stats, + "# UNIX queues active", + GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), + GNUNET_NO); + queue->timeout = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &queue_timeout, + queue); + queue->mq + = GNUNET_MQ_queue_for_callbacks (&mq_send, + &mq_destroy, + &mq_cancel, + queue, + NULL, + &mq_error, + queue); + queue->qh + = GNUNET_TRANSPORT_communicator_mq_add (ch, + &queue->target, + address, + ATS, + queue->mq); + return GNUNET_OK; +} + + +/** + * Shutdown the UNIX communicator. + * + * @param cls NULL (always) + */ +static void +do_shutdown (void *cls) +{ + struct UNIXMessageWrapper *msgw; + + while (NULL != (msgw = msg_head)) + { + GNUNET_CONTAINER_DLL_remove (msg_head, + msg_tail, + msgw); + queue = msgw->queue; + queue->msgs_in_queue--; + GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); + queue->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (bytes_in_queue >= msgw->msgsize); + bytes_in_queue -= msgw->msgsize; + GNUNET_free (msgw->msg); + GNUNET_free (msgw); + } + if (NULL != read_task) + { + GNUNET_SCHEDULER_cancel (read_task); + read_task = NULL; + } + if (NULL != write_task) + { + GNUNET_SCHEDULER_cancel (write_task); + write_task = NULL; + } + if (NULL != unix_sock) + { + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (unix_sock)); + unix_sock = NULL; + } + GNUNET_CONTAINER_multipeermap_iterate (queue_map, + &get_queue_delete_it, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (queue_map); + if (NULL != ai) + { + GNUNET_TRANSPORT_communicator_address_remove (ai); + ai = NULL; + } + if (NULL != ch) + { + GNUNET_TRANSPORT_communicator_disconnect (ch); + ch = NULL; + } + GNUNET_break (0 == bytes_in_queue); +} + + +/** + * Setup communicator and launch network interactions. + * + * @param cls NULL (always) + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param cfg configuration + */ +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + char *unix_socket_path; + int is_abstract; + struct sockaddr_un *un; + socklen_t un_len; + char *my_addr; + (void) cls; + + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_filename (cfg, + "transport-unix", + "UNIXPATH", + &unix_socket_path)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "communicator-unix", + "UNIXPATH"); + return; + } + + /* Initialize my flags */ + is_abstract = 0; +#ifdef LINUX + is_abstract + = GNUNET_CONFIGURATION_get_value_yesno (cfg, + "testing", + "USE_ABSTRACT_SOCKETS"); +#endif + un = unix_address_to_sockaddr (unix_socket_path, + &un_len, + is_abstract); + unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, + SOCK_DGRAM, + 0); + if (NULL == unix_sock) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "socket"); + GNUNET_free (un); + GNUNET_free (unix_socket_path); + return; + } + if ( ('\0' != un->sun_path[0]) && + (GNUNET_OK != + GNUNET_DISK_directory_create_for_file (un->sun_path)) ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Cannot create path to `%s'\n"), + un->sun_path); + GNUNET_NETWORK_socket_close (unix_sock); + unix_sock = NULL; + GNUNET_free (un); + GNUNET_free (unix_socket_path); + return; + } + if (GNUNET_OK != + GNUNET_NETWORK_socket_bind (unix_sock, + (const struct sockaddr *) un, + un_len)) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "bind"); + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Cannot bind to `%s'\n"), + un->sun_path); + GNUNET_NETWORK_socket_close (unix_sock); + unix_sock = NULL; + GNUNET_free (un); + GNUNET_free (unix_socket_path); + return; + } + GNUNET_free (un); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Bound to `%s'\n", + unix_socket_path); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, + NULL); + read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &unix_plugin_select_read, + NULL); + queue_map = GNUNET_CONTAINER_multipeermap_create (10, + GNUNET_NO); + ch = GNUNET_TRANSPORT_communicator_connect (cfg, + "unix", + 65535, + &mq_init, + NULL); + if (NULL == ch) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (unix_socket_path); + return; + } + GNUNET_asprintf (&my_addr, + "%s-%s#%d", + COMMUNICATOR_NAME, + unix_socket_path, + is_abstract); + + ai = GNUNET_TRANSPORT_communicator_address_add (ch, + my_addr, + GNUNET_ATS_NET_LOOPBACK, + GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_free (my_addr); + GNUNET_free (unix_socket_path); +} + + +/** + * The main function for the UNIX communicator. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, + char *const *argv) +{ + static const struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + int ret; + + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return 2; + + ret = + (GNUNET_OK == + GNUNET_PROGRAM_run (argc, argv, + "gnunet-communicator-unix", + _("GNUnet UNIX domain socket communicator"), + options, + &run, + NULL)) ? 0 : 1; + GNUNET_free ((void*) argv); + return ret; +} + + +#if defined(LINUX) && defined(__GLIBC__) +#include + +/** + * MINIMIZE heap size (way below 128k) since this process doesn't need much. + */ +void __attribute__ ((constructor)) +GNUNET_ARM_memory_init () +{ + mallopt (M_TRIM_THRESHOLD, 4 * 1024); + mallopt (M_TOP_PAD, 1 * 1024); + malloc_trim (0); +} +#endif + +/* end of gnunet-communicator-unix.c */ -- cgit v1.2.3