From a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 8 Nov 2018 11:32:03 +0100 Subject: work on UNIX communicator --- src/transport/gnunet-communicator-unix.c | 1072 ++++++++------------------ src/transport/transport_api2_communication.c | 1 + 2 files changed, 315 insertions(+), 758 deletions(-) (limited to 'src') diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 373b74149..f07975186 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c @@ -30,6 +30,16 @@ #include "gnunet_statistics_service.h" #include "gnunet_transport_communication_service.h" +/** + * How many messages do we keep at most in the queue to the + * transport service before we start to drop (default, + * can be changed via the configuration file). + * Should be _below_ the level of the communicator API, as + * otherwise we may read messages just to have them dropped + * by the communicator API. + */ +#define DEFAULT_MAX_QUEUE_LENGTH 8 + /** * Name of the communicator. */ @@ -58,63 +68,6 @@ struct UNIXMessage GNUNET_NETWORK_STRUCT_END -/** - * Information we track for a message awaiting transmission. - */ -struct UNIXMessageWrapper -{ - /** - * We keep messages in a doubly linked list. - */ - struct UNIXMessageWrapper *next; - - /** - * We keep messages in a doubly linked list. - */ - struct UNIXMessageWrapper *prev; - - /** - * The actual payload (allocated separately right now). - */ - struct UNIXMessage *msg; - - /** - * Queue this message belongs to. - */ - struct Queue *queue; - - /** - * Function to call upon transmission. - */ - GNUNET_TRANSPORT_TransmitContinuation cont; - - /** - * Closure for @e cont. - */ - void *cont_cls; - - /** - * Timeout for this message. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Number of bytes in @e msg. - */ - size_t msgsize; - - /** - * Number of bytes of payload encapsulated in @e msg. - */ - size_t payload; - - /** - * Priority of the message (ignored, just dragged along in UNIX). - */ - unsigned int priority; -}; - - /** * Handle for a queue. */ @@ -132,10 +85,7 @@ struct Queue struct Queue *prev; /** - * To whom are we talking to (set to our identity - * if we are still waiting for the welcome message). - * - * FIXME: information duplicated with 'peer' in address! + * To whom are we talking to. */ struct GNUNET_PeerIdentity target; @@ -149,6 +99,12 @@ struct Queue */ socklen_t address_len; + /** + * Message currently scheduled for transmission, non-NULL if and only + * if this queue is in the #queue_head DLL. + */ + const struct GNUNET_MessageHeader *msg; + /** * Message queue we are providing for the #ch. */ @@ -172,17 +128,11 @@ struct Queue /** * Queue timeout task. */ - struct GNUNET_SCHEDULER_Task * timeout_task; - - /** - * Number of messages we currently have in our write queue. - */ - unsigned int msgs_in_queue; + struct GNUNET_SCHEDULER_Task *timeout_task; }; - /** * ID of read task */ @@ -194,9 +144,14 @@ static struct GNUNET_SCHEDULER_Task *read_task; static struct GNUNET_SCHEDULER_Task *write_task; /** - * Number of bytes we currently have in our write queues. + * Number of messages we currently have in our queues towards the transport service. */ -static unsigned long long bytes_in_queue; +static unsigned long long delivering_messages; + +/** + * Maximum queue length before we stop reading towards the transport service. + */ +static unsigned long long max_queue_length; /** * Our environment. @@ -211,12 +166,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map; /** * Head of queue of messages to transmit. */ -static struct UNIXMessageWrapper *msg_head; +static struct Queue *queue_head; /** * Tail of queue of messages to transmit. */ -static struct UNIXMessageWrapper *msg_tail; +static struct Queue *queue_tail; /** * socket that we transmit all data with @@ -229,101 +184,6 @@ static struct GNUNET_NETWORK_Handle *unix_sock; static struct GNUNET_TRANSPORT_AddressIdentifier *ai; -/** - * If a queue monitor is attached, notify it about the new - * queue state. - * - * @param plugin our plugin - * @param queue queue that changed state - * @param state new state of the queue - */ -static void -notify_queue_monitor (struct Plugin *plugin, - struct Queue *queue, - enum GNUNET_TRANSPORT_QueueState state) -{ - struct GNUNET_TRANSPORT_QueueInfo info; - - if (NULL == plugin->sic) - return; - memset (&info, 0, sizeof (info)); - info.state = state; - info.is_inbound = GNUNET_SYSERR; /* hard to say */ - info.num_msg_pending = queue->msgs_in_queue; - info.num_bytes_pending = queue->bytes_in_queue; - /* info.receive_delay remains zero as this is not supported by UNIX - (cannot selectively not receive from 'some' peer while continuing - to receive from others) */ - info.queue_timeout = queue->timeout; - info.address = queue->address; - plugin->sic (plugin->sic_cls, - queue, - &info); -} - - -/** - * Function called for a quick conversion of the binary address to - * a numeric address. Note that the caller must not free the - * address and that the next call to this function is allowed - * to override the address again. - * - * @param cls closure - * @param addr binary address - * @param addrlen length of the @a addr - * @return string representing the same address - */ -static const char * -unix_plugin_address_to_string (void *cls, - const void *addr, - size_t addrlen) -{ - static char rbuf[1024]; - struct UnixAddress *ua = (struct UnixAddress *) addr; - char *addrstr; - size_t addr_str_len; - unsigned int off; - - if ((NULL == addr) || (sizeof (struct UnixAddress) > addrlen)) - { - GNUNET_break(0); - return NULL; - } - addrstr = (char *) &ua[1]; - addr_str_len = ntohl (ua->addrlen); - - if (addr_str_len != addrlen - sizeof(struct UnixAddress)) - { - GNUNET_break(0); - return NULL; - } - if ('\0' != addrstr[addr_str_len - 1]) - { - GNUNET_break(0); - return NULL; - } - if (strlen (addrstr) + 1 != addr_str_len) - { - GNUNET_break(0); - return NULL; - } - - off = 0; - if ('\0' == addrstr[0]) - off++; - memset (rbuf, 0, sizeof (rbuf)); - GNUNET_snprintf (rbuf, - sizeof (rbuf) - 1, - "%s.%u.%s%.*s", - PLUGIN_NAME, - ntohl (ua->options), - (off == 1) ? "@" : "", - (int) (addr_str_len - off), - &addrstr[off]); - return rbuf; -} - - /** * Functions with this signature are called whenever we need * to close a queue due to a disconnect or failure to @@ -332,58 +192,40 @@ unix_plugin_address_to_string (void *cls, * @param queue queue to close down */ static void -unix_plugin_queue_disconnect (struct Queue *queue) +queue_destroy (struct Queue *queue) { struct Plugin *plugin = cls; - struct UNIXMessageWrapper *msgw; - struct UNIXMessageWrapper *next; + struct GNUNET_MQ_Handle *mq; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting queue for peer `%s'\n", + "Disconnecting queue for peer `%s'\n", GNUNET_i2s (&queue->target)); - plugin->env->queue_end (plugin->env->cls, - queue->address, - queue); - next = plugin->msg_head; - while (NULL != next) + if (0 != queue->bytes_in_queue) { - msgw = next; - next = msgw->next; - if (msgw->queue != queue) - continue; - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, - plugin->msg_tail, - msgw); - queue->msgs_in_queue--; - GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); - queue->bytes_in_queue -= msgw->msgsize; - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, - &msgw->queue->target, - GNUNET_SYSERR, - msgw->payload, 0); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); + GNUNET_CONTAINER_DLL_remove (queue_head, + queue_tail, + queue); + queue->bytes_in_queue = 0; + } + if (NULL != (mq = queue->mq)) + { + queue->mq = NULL; + GNUNET_MQ_destroy (mq); } GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (plugin->queue_map, + GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue)); GNUNET_STATISTICS_set (stats, "# UNIX queues active", - GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), + GNUNET_CONTAINER_multipeermap_size (queue_map), GNUNET_NO); if (NULL != queue->timeout_task) { GNUNET_SCHEDULER_cancel (queue->timeout_task); queue->timeout_task = NULL; - queue->timeout = GNUNET_TIME_UNIT_ZERO_ABS; } GNUNET_free (queue->address); - GNUNET_break (0 == queue->bytes_in_queue); - GNUNET_break (0 == queue->msgs_in_queue); GNUNET_free (queue); } @@ -416,7 +258,7 @@ queue_timeout (void *cls) queue, GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES)); - unix_plugin_queue_disconnect (queue); + queue_destroy (queue); } @@ -458,7 +300,9 @@ unix_address_to_sockaddr (const char *unixpath, slen = strlen (unixpath); if (slen >= sizeof (un->sun_path)) slen = sizeof (un->sun_path) - 1; - GNUNET_memcpy (un->sun_path, unixpath, slen); + GNUNET_memcpy (un->sun_path, + unixpath, + slen); un->sun_path[slen] = '\0'; slen = sizeof (struct sockaddr_un); #if HAVE_SOCKADDR_UN_SUN_LEN @@ -545,182 +389,19 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer, } - -/** - * Actually send out the message, assume we've got the address and - * send_handle squared away! - * - * @param cls closure - * @param send_handle which handle to send message on - * @param target who should receive this message (ignored by UNIX) - * @param msgbuf one or more GNUNET_MessageHeader(s) strung together - * @param msgbuf_size the size of the @a msgbuf to send - * @param priority how important is the message (ignored by UNIX) - * @param timeout when should we time out (give up) if we can not transmit? - * @param addr the addr to send the message to, needs to be a sockaddr for us - * @param addrlen the len of @a addr - * @param payload bytes payload to send - * @param cont continuation to call once the message has - * been transmitted (or if the transport is ready - * for the next transmission call; or if the - * peer disconnected...) - * @param cont_cls closure for @a cont - * @return on success the number of bytes written, RETRY for retry, -1 on errors - */ -static ssize_t -unix_real_send (void *cls, - struct GNUNET_NETWORK_Handle *send_handle, - const struct GNUNET_PeerIdentity *target, - const char *msgbuf, - size_t msgbuf_size, - unsigned int priority, - struct GNUNET_TIME_Absolute timeout, - const struct UnixAddress *addr, - size_t addrlen, - size_t payload, - GNUNET_TRANSPORT_TransmitContinuation cont, - void *cont_cls) -{ - struct Plugin *plugin = cls; - ssize_t sent; - struct sockaddr_un *un; - socklen_t un_len; - const char *unixpath; - - if (NULL == send_handle) - { - GNUNET_break (0); /* We do not have a send handle */ - return GNUNET_SYSERR; - } - if ((NULL == addr) || (0 == addrlen)) - { - GNUNET_break (0); /* Can never send if we don't have an address */ - return GNUNET_SYSERR; - } - - /* Prepare address */ - unixpath = (const char *) &addr[1]; - if (NULL == (un = unix_address_to_sockaddr (unixpath, - &un_len))) - { - GNUNET_break (0); - return -1; - } - - if ((GNUNET_YES == plugin->is_abstract) && - (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) ) - { - un->sun_path[0] = '\0'; - } -resend: - /* Send the data */ - sent = GNUNET_NETWORK_socket_sendto (send_handle, - msgbuf, - msgbuf_size, - (const struct sockaddr *) un, - un_len); - if (GNUNET_SYSERR == sent) - { - if ( (EAGAIN == errno) || - (ENOBUFS == errno) ) - { - GNUNET_free (un); - return RETRY; /* We have to retry later */ - } - if (EMSGSIZE == errno) - { - socklen_t size = 0; - socklen_t len = sizeof (size); - - GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *) - send_handle, SOL_SOCKET, SO_SNDBUF, &size, - &len); - if (size < msgbuf_size) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to increase socket buffer size from %u to %u for message size %u\n", - (unsigned int) size, - (unsigned int) ((msgbuf_size / 1000) + 2) * 1000, - (unsigned int) msgbuf_size); - size = ((msgbuf_size / 1000) + 2) * 1000; - if (GNUNET_OK == - GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle, - SOL_SOCKET, SO_SNDBUF, - &size, sizeof (size))) - goto resend; /* Increased buffer size, retry sending */ - else - { - /* Could not increase buffer size: error, no retry */ - GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt"); - GNUNET_free (un); - return GNUNET_SYSERR; - } - } - else - { - /* Buffer is bigger than message: error, no retry - * This should never happen!*/ - GNUNET_break (0); - GNUNET_free (un); - return GNUNET_SYSERR; - } - } - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "UNIX transmitted %u-byte message to %s (%d: %s)\n", - (unsigned int) msgbuf_size, - GNUNET_a2s ((const struct sockaddr *)un, un_len), - (int) sent, - (sent < 0) ? STRERROR (errno) : "ok"); - GNUNET_free (un); - return sent; -} - - /** - * Function obtain the network type for a queue + * Creates a new outbound queue the transport service will use to send + * data to another peer. * - * @param cls closure ('struct Plugin*') - * @param queue the queue - * @return the network type in HBO or #GNUNET_SYSERR - */ -static enum GNUNET_ATS_Network_Type -unix_plugin_get_network (void *cls, - struct Queue *queue) -{ - GNUNET_assert (NULL != queue); - return GNUNET_ATS_NET_LOOPBACK; -} - - -/** - * Function obtain the network type for a queue - * - * @param cls closure (`struct Plugin *`) - * @param address the address - * @return the network type - */ -static enum GNUNET_ATS_Network_Type -unix_plugin_get_network_for_address (void *cls, - const struct GNUNET_HELLO_Address *address) - -{ - return GNUNET_ATS_NET_LOOPBACK; -} - - -/** - * Creates a new outbound queue the transport service will use to send data to the - * peer - * - * @param cls the plugin - * @param address the address + * @param peer the target peer + * @param un the address + * @param un_len number of bytes in @a un * @return the queue or NULL of max connections exceeded */ static struct Queue * -unix_plugin_get_queue (void *cls, - const struct GNUNET_HELLO_Address *address) +unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target, + const struct sockaddr_un *un, + socklen_t un_len) { struct Plugin *plugin = cls; struct Queue *queue; @@ -728,53 +409,22 @@ unix_plugin_get_queue (void *cls, char * addrstr; uint32_t addr_str_len; uint32_t addr_option; + char *foreign_addr; + int is_abstract; + + if (is_abstract = ('\0' == un.sun_path[0])) + un.sun_path[0] = '/'; + GNUNET_asprintf (&foreign_addr, + "%s-%s#%d", + COMMUNICATOR_NAME, + un.sun_path, + is_abstract); + - ua = (struct UnixAddress *) address->address; - if ((NULL == address->address) || (0 == address->address_length) || - (sizeof (struct UnixAddress) > address->address_length)) - { - GNUNET_break (0); - return NULL; - } addrstr = (char *) &ua[1]; addr_str_len = ntohl (ua->addrlen); addr_option = ntohl (ua->options); - if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) && - (GNUNET_NO == plugin->is_abstract)) - { - return NULL; - } - - if (addr_str_len != address->address_length - sizeof (struct UnixAddress)) - { - return NULL; /* This can be a legacy address */ - } - - if ('\0' != addrstr[addr_str_len - 1]) - { - GNUNET_break (0); - return NULL; - } - if (strlen (addrstr) + 1 != addr_str_len) - { - GNUNET_break (0); - return NULL; - } - - /* Check if a queue for this address already exists */ - if (NULL != (queue = lookup_queue (plugin, - address))) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Found existing queue %p for address `%s'\n", - queue, - unix_plugin_address_to_string (NULL, - address->address, - address->address_length)); - return queue; - } - /* create a new queue */ queue = GNUNET_new (struct Queue); queue->target = address->peer; @@ -795,14 +445,8 @@ unix_plugin_get_queue (void *cls, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_STATISTICS_set (plugin->env->stats, "# UNIX queues active", - GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), + GNUNET_CONTAINER_multipeermap_size (queue_map), GNUNET_NO); - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_INIT); - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_UP); return queue; } @@ -891,245 +535,146 @@ unix_demultiplexer (struct Plugin *plugin, /** - * Read from UNIX domain socket (it is ready). + * We have been notified that our socket has something to read. Do the + * read and reschedule this function to be called again once more is + * available. * - * @param plugin the plugin + * @param cls NULL + */ +static void +select_read_cb (void *cls); + + +/** + * Function called when message was successfully passed to + * transport service. Continue read activity. + * + * @param cls NULL + */ +static void +receive_complete_cb (void *cls) +{ + delivering_messages--; + if ( (NULL == read_task) && + (delivering_messages < max_queue_length) ) + read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_read_cb, + NULL); +} + + +/** + * We have been notified that our socket has something to read. Do the + * read and reschedule this function to be called again once more is + * available. + * + * @param cls NULL */ static void -unix_plugin_do_read (struct Plugin *plugin) +select_read_cb (void *cls) { char buf[65536] GNUNET_ALIGN; - struct UnixAddress *ua; - struct UNIXMessage *msg; - struct GNUNET_PeerIdentity sender; + struct Queue *queue; + const struct UNIXMessage *msg; struct sockaddr_un un; socklen_t addrlen; ssize_t ret; - int offset; - int tsize; - int is_abstract; - char *msgbuf; - const struct GNUNET_MessageHeader *currhdr; - uint16_t csize; - size_t ua_len; + uint16_t msize; + read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_read_cb, + NULL); addrlen = sizeof (un); - memset (&un, 0, sizeof (un)); + memset (&un, + 0, + sizeof (un)); ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, - buf, sizeof (buf), + buf, + sizeof (buf), (struct sockaddr *) &un, &addrlen); - if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) + if ( (-1 == ret) && + ( (EAGAIN == errno) || + (ENOBUFS == errno) ) ) return; - if (GNUNET_SYSERR == ret) + if (-1 == ret) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom"); return; } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Read %d bytes from socket %s\n", - (int) ret, - un.sun_path); - } - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read %d bytes from socket %s\n", + (int) ret, + un.sun_path); GNUNET_assert (AF_UNIX == (un.sun_family)); - is_abstract = GNUNET_NO; - if ('\0' == un.sun_path[0]) - { - un.sun_path[0] = '@'; - is_abstract = GNUNET_YES; - } - - ua_len = sizeof (struct UnixAddress) + strlen (un.sun_path) + 1; - ua = GNUNET_malloc (ua_len); - ua->addrlen = htonl (strlen (&un.sun_path[0]) +1); - GNUNET_memcpy (&ua[1], &un.sun_path[0], strlen (un.sun_path) + 1); - if (is_abstract) - ua->options = htonl(UNIX_OPTIONS_USE_ABSTRACT_SOCKETS); - else - ua->options = htonl(UNIX_OPTIONS_NONE); - msg = (struct UNIXMessage *) buf; - csize = ntohs (msg->header.size); - if ((csize < sizeof (struct UNIXMessage)) || (csize > ret)) + msize = ntohs (msg->header.size); + if ( (msize < sizeof (struct UNIXMessage)) || + (msize > ret) ) { GNUNET_break_op (0); - GNUNET_free (ua); return; } - msgbuf = (char *) &msg[1]; - GNUNET_memcpy (&sender, - &msg->sender, - sizeof (struct GNUNET_PeerIdentity)); - offset = 0; - tsize = csize - sizeof (struct UNIXMessage); - while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) + queue = lookup_queue (&msg->sender, + un, + addrlen); + if (NULL == queue) + queue = setup_queue (&msg->sender, + un, + addrlen); + if (NULL == queue) { - currhdr = (struct GNUNET_MessageHeader *) &msgbuf[offset]; - csize = ntohs (currhdr->size); - if ((csize < sizeof (struct GNUNET_MessageHeader)) || - (csize > tsize - offset)) - { - GNUNET_break_op (0); - break; - } - unix_demultiplexer (plugin, &sender, currhdr, ua, ua_len); - offset += csize; + GNUENT_log (GNUNET_ERROR_TYPE_ERROR, + _("Maximum number of UNIX connections exceeded, dropping incoming message\n")); + return; } - GNUNET_free (ua); -} - - -/** - * Write to UNIX domain socket (it is ready). - * - * @param plugin handle to the plugin - */ -static void -unix_plugin_do_write (struct Plugin *plugin) -{ - ssize_t sent = 0; - struct UNIXMessageWrapper *msgw; - struct Queue *queue; - int did_delete; + - queue = NULL; - did_delete = GNUNET_NO; - while (NULL != (msgw = plugin->msg_head)) - { - if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) - break; /* Message is ready for sending */ - /* Message has a timeout */ - did_delete = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout for message with %u bytes \n", - (unsigned int) msgw->msgsize); - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, - plugin->msg_tail, - msgw); - queue = msgw->queue; - queue->msgs_in_queue--; - GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); - queue->bytes_in_queue -= msgw->msgsize; - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UNIX bytes discarded", - msgw->msgsize, - GNUNET_NO); - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, - &msgw->queue->target, - GNUNET_SYSERR, - msgw->payload, - 0); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - } - if (NULL == msgw) { - if (GNUNET_YES == did_delete) - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_UPDATE); - return; /* Nothing to send at the moment */ - } - queue = msgw->queue; - sent = unix_real_send (plugin, - unix_sock, - &queue->target, - (const char *) msgw->msg, - msgw->msgsize, - msgw->priority, - msgw->timeout, - msgw->queue->address->address, - msgw->queue->address->address_length, - msgw->payload, - msgw->cont, msgw->cont_cls); - if (RETRY == sent) - { - GNUNET_STATISTICS_update (plugin->env->stats, - "# UNIX retry attempts", - 1, GNUNET_NO); - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_UPDATE); - return; + uint16_t offset = 0; + uint16_t tsize = msize - sizeof (struct UNIXMessage); + const char *msgbuf = (const char *) &msg[1]; + + while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) + { + const struct GNUNET_MessageHeader *currhdr; + struct GNUNET_MessageHeader al_hdr; + uint16_t csize; + + currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset]; + /* ensure aligned access */ + memcpy (&al_hdr, + currhdr, + sizeof (al_hdr)); + csize = ntohs (al_hdr.size); + if ( (csize < sizeof (struct GNUNET_MessageHeader)) || + (csize > tsize - offset)) + { + GNUNET_break_op (0); + break; + } + ret = GNUNET_TRANSPORT_communicator_receive (ch, + &msg->sender, + currhdr, + &receive_complete_cb, + NULL); + if (GNUNET_SYSERR == ret) + return; /* transport not up */ + if (GNUNET_NO == ret) + break; + delivering_messages++; + offset += csize; + } } - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, - plugin->msg_tail, - msgw); - queue->msgs_in_queue--; - GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); - queue->bytes_in_queue -= msgw->msgsize; - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_UPDATE); - if (GNUNET_SYSERR == sent) + if (delivering_messages >= max_queue_length) { - /* failed and no retry */ - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, - &msgw->queue->target, - GNUNET_SYSERR, - msgw->payload, 0); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UNIX bytes discarded", - msgw->msgsize, - GNUNET_NO); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - return; + /* we should try to apply 'back pressure' */ + GNUNET_SCHEDULER_cancel (read_task); + read_task = NULL; } - /* successfully sent bytes */ - GNUNET_break (sent > 0); - GNUNET_STATISTICS_update (plugin->env->stats, - "# bytes transmitted via UNIX", - msgw->msgsize, - GNUNET_NO); - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, - &msgw->queue->target, - GNUNET_OK, - msgw->payload, - msgw->msgsize); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); -} - - -/** - * We have been notified that our socket has something to read. - * Then reschedule this function to be called again once more is available. - * - * @param cls the plugin handle - */ -static void -unix_plugin_select_read (void *cls) -{ - struct Plugin *plugin = cls; - const struct GNUNET_SCHEDULER_TaskContext *tc; - - plugin->read_task = NULL; - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) - unix_plugin_do_read (plugin); - plugin->read_task = - GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, - unix_sock, - &unix_plugin_select_read, plugin); } @@ -1137,158 +682,155 @@ unix_plugin_select_read (void *cls) * We have been notified that our socket is ready to write. * Then reschedule this function to be called again once more is available. * - * @param cls the plugin handle + * @param cls NULL */ static void -unix_plugin_select_write (void *cls) +select_write_cb (void *cls) { - struct Plugin *plugin = cls; - const struct GNUNET_SCHEDULER_TaskContext *tc; - - plugin->write_task = NULL; - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) - unix_plugin_do_write (plugin); - if (NULL == plugin->msg_head) - return; /* write queue empty */ - plugin->write_task = - GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, - unix_sock, - &unix_plugin_select_write, plugin); -} - + struct Queue *queue = queue_tail; + const struct GNUNET_MessageHeader *msg = queue->msg; + size_t msg_size = ntohs (msg->size); + ssize_t sent; -/** - * Function that can be used by the transport service to transmit - * a message using the plugin. Note that in the case of a - * peer disconnecting, the continuation MUST be called - * prior to the disconnect notification itself. This function - * will be called with this peer's HELLO message to initiate - * a fresh connection to another peer. - * - * @param cls closure - * @param queue which queue must be used - * @param msgbuf the message to transmit - * @param msgbuf_size number of bytes in @a msgbuf - * @param priority how important is the message (most plugins will - * ignore message priority and just FIFO) - * @param to how long to wait at most for the transmission (does not - * require plugins to discard the message after the timeout, - * just advisory for the desired delay; most plugins will ignore - * this as well) - * @param cont continuation to call once the message has - * been transmitted (or if the transport is ready - * for the next transmission call; or if the - * peer disconnected...); can be NULL - * @param cont_cls closure for @a cont - * @return number of bytes used (on the physical network, with overheads); - * -1 on hard errors (i.e. address invalid); 0 is a legal value - * and does NOT mean that the message was not transmitted (DV) - */ -static ssize_t -unix_plugin_send (void *cls, - struct Queue *queue, - const char *msgbuf, - size_t msgbuf_size, - unsigned int priority, - struct GNUNET_TIME_Relative to, - GNUNET_TRANSPORT_TransmitContinuation cont, - void *cont_cls) -{ - struct Plugin *plugin = cls; - struct UNIXMessageWrapper *wrapper; - struct UNIXMessage *message; - int ssize; + /* take queue of the ready list */ + write_task = NULL; + GNUNET_CONTAINER_DLL_remove (queue_head, + queue_tail, + queue); + if (NULL != queue_head) + write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_write_cb, + NULL); - if (GNUNET_OK != - GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, - &queue->target, - queue)) + /* send 'msg' */ + queue->msg = NULL; + GNUNET_MQ_impl_send_continue (queue->mq); + resend: + /* Send the data */ + sent = GNUNET_NETWORK_socket_sendto (unix_sock, + queue->msg, + msg_size, + (const struct sockaddr *) mq->address, + mq->address_len); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UNIX transmitted message to %s (%d/%u: %s)\n", + GNUNET_i2s (&queue->target), + (int) sent, + (unsigned int) msg_size, + (sent < 0) ? STRERROR (errno) : "ok"); + if (-1 != sent) + return; /* all good */ + switch (errno) { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Invalid queue for peer `%s' `%s'\n", - GNUNET_i2s (&queue->target), - unix_plugin_address_to_string (NULL, - queue->address->address, - queue->address->address_length)); - GNUNET_break (0); - return GNUNET_SYSERR; + case EAGAIN: + case ENOBUFS: + /* We should retry later... */ + GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, + "send"); + return; + case EMSGSIZE: + { + socklen_t size = 0; + socklen_t len = sizeof (size); + + GNUNET_NETWORK_socket_getsockopt (unix_sock, + SOL_SOCKET, + SO_SNDBUF, + &size, + &len); + if (size > ntohs (msg->size)) + { + /* Buffer is bigger than message: error, no retry + * This should never happen!*/ + GNUNET_break (0); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Trying to increase socket buffer size from %u to %u for message size %u\n", + (unsigned int) size, + (unsigned int) m((msg_size / 1000) + 2) * 1000, + (unsigned int) msg_size); + size = ((msg_size / 1000) + 2) * 1000; + if (GNUNET_OK == + GNUNET_NETWORK_socket_setsockopt (unix_sock, + SOL_SOCKET, + SO_SNDBUF, + &size, + sizeof (size))) + goto resend; /* Increased buffer size, retry sending */ + /* Ok, then just try very modest increase */ + size = msg_size; + if (GNUNET_OK == + GNUNET_NETWORK_socket_setsockopt (unix_sock, + SOL_SOCKET, + SO_SNDBUF, + &size, + sizeof (size))) + goto resend; /* Increased buffer size, retry sending */ + /* Could not increase buffer size: error, no retry */ + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "setsockopt"); + return; + } + default: + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "send"); + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes with queue for peer `%s' `%s'\n", - msgbuf_size, - GNUNET_i2s (&queue->target), - unix_plugin_address_to_string (NULL, - queue->address->address, - queue->address->address_length)); - ssize = sizeof (struct UNIXMessage) + msgbuf_size; - message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); - message->header.size = htons (ssize); - message->header.type = htons (0); - GNUNET_memcpy (&message->sender, plugin->env->my_identity, - sizeof (struct GNUNET_PeerIdentity)); - GNUNET_memcpy (&message[1], msgbuf, msgbuf_size); - wrapper = GNUNET_new (struct UNIXMessageWrapper); - wrapper->msg = message; - wrapper->msgsize = ssize; - wrapper->payload = msgbuf_size; - wrapper->priority = priority; - wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), - to); - wrapper->cont = cont; - wrapper->cont_cls = cont_cls; - wrapper->queue = queue; - GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head, - plugin->msg_tail, - wrapper); - plugin->bytes_in_queue += ssize; - queue->bytes_in_queue += ssize; - queue->msgs_in_queue++; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, - GNUNET_NO); - notify_queue_monitor (plugin, - queue, - GNUNET_TRANSPORT_SS_UPDATE); - if (NULL == plugin->write_task) - plugin->write_task = - GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, - unix_sock, - &unix_plugin_select_write, plugin); - return ssize; } /** - * Signature of functions implementing the - * sending functionality of a message queue. + * Signature of functions implementing the sending functionality of a + * message queue. * * @param mq the message queue * @param msg the message to send - * @param impl_state state of the implementation + * @param impl_state our `struct Queue` */ static void mq_send (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state) { + struct Queue *queue = impl_state; + + GNUNET_assert (mq == queue->mq); + GNUNET_assert (NULL == queue->msg); + queue->msg = msg; + GNUNET_CONTAINER_DLL_insert (queue_head, + queue_tail, + queue); + if (NULL == write_task) + write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_write_cb, + NULL); } /** - * Signature of functions implementing the - * destruction of a message queue. - * Implementations must not free @a mq, but should - * take care of @a impl_state. + * Signature of functions implementing the destruction of a message + * queue. Implementations must not free @a mq, but should take care + * of @a impl_state. * * @param mq the message queue to destroy - * @param impl_state state of the implementation + * @param impl_state our `struct Queue` */ static void mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) { + struct Queue *queue = impl_state; + + if (mq == queue->mq) + { + queue->mq = NULL; + queue_destroy (queue); + } } @@ -1296,12 +838,15 @@ mq_destroy (struct GNUNET_MQ_Handle *mq, * Implementation function that cancels the currently sent message. * * @param mq message queue - * @param impl_state state specific to the implementation + * @param impl_state our `struct Queue` */ static void mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state) { + struct Queue *queue = impl_state; + + // FIXME: TBD! } @@ -1311,15 +856,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq, * the message queue. * Not every message queue implementation supports an error handler. * - * @param cls closure + * @param cls our `struct Queue` * @param error error code */ static void mq_error (void *cls, enum GNUNET_MQ_Error error) { -} + struct Queue *queue = cls; + // FIXME: TBD! +} /** @@ -1470,7 +1017,6 @@ do_shutdown (void *cls) GNUNET_TRANSPORT_communicator_disconnect (ch); ch = NULL; } - GNUNET_break (0 == bytes_in_queue); } @@ -1497,7 +1043,7 @@ run (void *cls, if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, - "transport-unix", + "communicator-unix", "UNIXPATH", &unix_socket_path)) { @@ -1506,7 +1052,14 @@ run (void *cls, "UNIXPATH"); return; } - + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (cfg, + "communicator-unix", + "MAX_QUEUE_LENGTH", + &max_queue_length)) + max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; + + /* Initialize my flags */ is_abstract = 0; #ifdef LINUX @@ -1571,7 +1124,7 @@ run (void *cls, queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); ch = GNUNET_TRANSPORT_communicator_connect (cfg, - "unix", + COMMUNICATOR_NAME, 65535, &mq_init, NULL); @@ -1587,13 +1140,16 @@ run (void *cls, COMMUNICATOR_NAME, unix_socket_path, is_abstract); - ai = GNUNET_TRANSPORT_communicator_address_add (ch, my_addr, GNUNET_ATS_NET_LOOPBACK, GNUNET_TIME_UNIT_FOREVER_REL); GNUNET_free (my_addr); GNUNET_free (unix_socket_path); + read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_read_cb, + NULL); } diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e5be53150..434138e19 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -729,6 +729,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) handlers, &error_handler, ch); + // FIXME: must notify transport that we are responsible for 'ch->name' addresses!!! for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai; ai = ai->next) -- cgit v1.2.3