summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_transport_communication_service.h2
-rw-r--r--src/transport/Makefile.am9
-rw-r--r--src/transport/gnunet-communicator-unix.c769
3 files changed, 363 insertions, 417 deletions
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h
index b1a248e51..ab5d3742a 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -70,7 +70,7 @@ extern "C"
typedef int
(*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls,
const struct GNUNET_PeerIdentity *peer,
- const void *address);
+ const char *address);
/**
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index c6c02c6ed..92b53137f 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -140,6 +140,7 @@ endif
noinst_PROGRAMS = \
gnunet-transport-profiler \
+ gnunet-communicator-unix \
$(WLAN_BIN_SENDER) \
$(WLAN_BIN_RECEIVER)
@@ -219,6 +220,14 @@ gnunet_transport_certificate_creation_SOURCES = \
gnunet_transport_certificate_creation_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la
+gnunet_communicator_unix_SOURCES = \
+ gnunet-communicator-unix.c
+gnunet_communicator_unix_LDADD = \
+ libgnunettransportcommunicator.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
gnunet_helper_transport_wlan_SOURCES = \
gnunet-helper-transport-wlan.c
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
index f07975186..2879b1738 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -154,6 +154,11 @@ static unsigned long long delivering_messages;
static unsigned long long max_queue_length;
/**
+ * For logging statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+/**
* Our environment.
*/
static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
@@ -194,12 +199,11 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
static void
queue_destroy (struct Queue *queue)
{
- struct Plugin *plugin = cls;
struct GNUNET_MQ_Handle *mq;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Disconnecting queue for peer `%s'\n",
- GNUNET_i2s (&queue->target));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Disconnecting queue for peer `%s'\n",
+ GNUNET_i2s (&queue->target));
if (0 != queue->bytes_in_queue)
{
GNUNET_CONTAINER_DLL_remove (queue_head,
@@ -253,11 +257,11 @@ queue_timeout (void *cls)
queue);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queue %p was idle for %s, disconnecting\n",
- queue,
- GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- GNUNET_YES));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue %p was idle for %s, disconnecting\n",
+ queue,
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ GNUNET_YES));
queue_destroy (queue);
}
@@ -288,8 +292,7 @@ reschedule_queue_timeout (struct Queue *queue)
*/
static struct sockaddr_un *
unix_address_to_sockaddr (const char *unixpath,
- socklen_t *sock_len,
- int is_abstract)
+ socklen_t *sock_len)
{
struct sockaddr_un *un;
size_t slen;
@@ -309,7 +312,7 @@ unix_address_to_sockaddr (const char *unixpath,
un->sun_len = (u_char) slen;
#endif
(*sock_len) = slen;
- if (GNUNET_YES == is_abstract)
+ if ('@' == un->sun_path[0])
un->sun_path[0] = '\0';
return un;
}
@@ -328,7 +331,7 @@ struct LookupCtx
/**
* Address we are looking for.
*/
- const sockaddr_un *un;
+ const struct sockaddr_un *un;
/**
* Number of bytes in @a un
@@ -347,7 +350,7 @@ struct LookupCtx
*/
static int
lookup_queue_it (void *cls,
- const struct GNUNET_PeerIdentity * key,
+ const struct GNUNET_PeerIdentity *key,
void *value)
{
struct LookupCtx *lctx = cls;
@@ -374,14 +377,14 @@ lookup_queue_it (void *cls,
*/
static struct Queue *
lookup_queue (const struct GNUNET_PeerIdentity *peer,
- const sockaddr_un *un,
+ const struct sockaddr_un *un,
socklen_t un_len)
{
struct LookupCtx lctx;
lctx.un = un;
lctx.un_len = un_len;
- GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map,
+ GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
peer,
&lookup_queue_it,
&lctx);
@@ -390,295 +393,6 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer,
/**
- * Creates a new outbound queue the transport service will use to send
- * data to another peer.
- *
- * @param peer the target peer
- * @param un the address
- * @param un_len number of bytes in @a un
- * @return the queue or NULL of max connections exceeded
- */
-static struct Queue *
-unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target,
- const struct sockaddr_un *un,
- socklen_t un_len)
-{
- struct Plugin *plugin = cls;
- struct Queue *queue;
- struct UnixAddress *ua;
- char * addrstr;
- uint32_t addr_str_len;
- uint32_t addr_option;
- char *foreign_addr;
- int is_abstract;
-
- if (is_abstract = ('\0' == un.sun_path[0]))
- un.sun_path[0] = '/';
- GNUNET_asprintf (&foreign_addr,
- "%s-%s#%d",
- COMMUNICATOR_NAME,
- un.sun_path,
- is_abstract);
-
-
- addrstr = (char *) &ua[1];
- addr_str_len = ntohl (ua->addrlen);
- addr_option = ntohl (ua->options);
-
- /* create a new queue */
- queue = GNUNET_new (struct Queue);
- queue->target = address->peer;
- queue->address = GNUNET_HELLO_address_copy (address);
- queue->plugin = plugin;
- queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- queue->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &queue_timeout,
- queue);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating a new queue %p for address `%s'\n",
- queue,
- unix_plugin_address_to_string (NULL,
- address->address,
- address->address_length));
- (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map,
- &address->peer, queue,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_STATISTICS_set (plugin->env->stats,
- "# UNIX queues active",
- GNUNET_CONTAINER_multipeermap_size (queue_map),
- GNUNET_NO);
- return queue;
-}
-
-
-/**
- * Function that will be called whenever the transport service wants
- * to notify the plugin that a queue is still active and in use and
- * therefore the queue timeout for this queue has to be updated
- *
- * @param cls closure with the `struct Plugin *`
- * @param peer which peer was the queue for
- * @param queue which queue is being updated
- */
-static void
-unix_plugin_update_queue_timeout (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct Queue *queue)
-{
- struct Plugin *plugin = cls;
-
- if (GNUNET_OK !=
- GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map,
- &queue->target,
- queue))
- {
- GNUNET_break (0);
- return;
- }
- reschedule_queue_timeout (queue);
-}
-
-
-/**
- * Demultiplexer for UNIX messages
- *
- * @param plugin the main plugin for this transport
- * @param sender from which peer the message was received
- * @param currhdr pointer to the header of the message
- * @param ua address to look for
- * @param ua_len length of the address @a ua
- */
-static void
-unix_demultiplexer (struct Plugin *plugin,
- struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *currhdr,
- const struct UnixAddress *ua,
- size_t ua_len)
-{
- struct Queue *queue;
- struct GNUNET_HELLO_Address *address;
-
- GNUNET_assert (ua_len >= sizeof (struct UnixAddress));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message from %s\n",
- unix_plugin_address_to_string (NULL, ua, ua_len));
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# bytes received via UNIX",
- ntohs (currhdr->size),
- GNUNET_NO);
-
- /* Look for existing queue */
- address = GNUNET_HELLO_address_allocate (sender,
- PLUGIN_NAME,
- ua, ua_len,
- GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" queues */
- queue = lookup_queue (plugin, address);
- if (NULL == queue)
- {
- queue = unix_plugin_get_queue (plugin, address);
- /* Notify transport and ATS about new inbound queue */
- plugin->env->queue_start (NULL,
- queue->address,
- queue,
- GNUNET_ATS_NET_LOOPBACK);
- }
- else
- {
- reschedule_queue_timeout (queue);
- }
- GNUNET_HELLO_address_free (address);
- plugin->env->receive (plugin->env->cls,
- queue->address,
- queue,
- currhdr);
-}
-
-
-/**
- * We have been notified that our socket has something to read. Do the
- * read and reschedule this function to be called again once more is
- * available.
- *
- * @param cls NULL
- */
-static void
-select_read_cb (void *cls);
-
-
-/**
- * Function called when message was successfully passed to
- * transport service. Continue read activity.
- *
- * @param cls NULL
- */
-static void
-receive_complete_cb (void *cls)
-{
- delivering_messages--;
- if ( (NULL == read_task) &&
- (delivering_messages < max_queue_length) )
- read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- unix_sock,
- &select_read_cb,
- NULL);
-}
-
-
-/**
- * We have been notified that our socket has something to read. Do the
- * read and reschedule this function to be called again once more is
- * available.
- *
- * @param cls NULL
- */
-static void
-select_read_cb (void *cls)
-{
- char buf[65536] GNUNET_ALIGN;
- struct Queue *queue;
- const struct UNIXMessage *msg;
- struct sockaddr_un un;
- socklen_t addrlen;
- ssize_t ret;
- uint16_t msize;
-
- read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- unix_sock,
- &select_read_cb,
- NULL);
- addrlen = sizeof (un);
- memset (&un,
- 0,
- sizeof (un));
- ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
- buf,
- sizeof (buf),
- (struct sockaddr *) &un,
- &addrlen);
- if ( (-1 == ret) &&
- ( (EAGAIN == errno) ||
- (ENOBUFS == errno) ) )
- return;
- if (-1 == ret)
- {
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
- "recvfrom");
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Read %d bytes from socket %s\n",
- (int) ret,
- un.sun_path);
- GNUNET_assert (AF_UNIX == (un.sun_family));
- msg = (struct UNIXMessage *) buf;
- msize = ntohs (msg->header.size);
- if ( (msize < sizeof (struct UNIXMessage)) ||
- (msize > ret) )
- {
- GNUNET_break_op (0);
- return;
- }
- queue = lookup_queue (&msg->sender,
- un,
- addrlen);
- if (NULL == queue)
- queue = setup_queue (&msg->sender,
- un,
- addrlen);
- if (NULL == queue)
- {
- GNUENT_log (GNUNET_ERROR_TYPE_ERROR,
- _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
- return;
- }
-
-
- {
- uint16_t offset = 0;
- uint16_t tsize = msize - sizeof (struct UNIXMessage);
- const char *msgbuf = (const char *) &msg[1];
-
- while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
- {
- const struct GNUNET_MessageHeader *currhdr;
- struct GNUNET_MessageHeader al_hdr;
- uint16_t csize;
-
- currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
- /* ensure aligned access */
- memcpy (&al_hdr,
- currhdr,
- sizeof (al_hdr));
- csize = ntohs (al_hdr.size);
- if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
- (csize > tsize - offset))
- {
- GNUNET_break_op (0);
- break;
- }
- ret = GNUNET_TRANSPORT_communicator_receive (ch,
- &msg->sender,
- currhdr,
- &receive_complete_cb,
- NULL);
- if (GNUNET_SYSERR == ret)
- return; /* transport not up */
- if (GNUNET_NO == ret)
- break;
- delivering_messages++;
- offset += csize;
- }
- }
- if (delivering_messages >= max_queue_length)
- {
- /* we should try to apply 'back pressure' */
- GNUNET_SCHEDULER_cancel (read_task);
- read_task = NULL;
- }
-}
-
-
-/**
* We have been notified that our socket is ready to write.
* Then reschedule this function to be called again once more is available.
*
@@ -712,16 +426,27 @@ select_write_cb (void *cls)
sent = GNUNET_NETWORK_socket_sendto (unix_sock,
queue->msg,
msg_size,
- (const struct sockaddr *) mq->address,
- mq->address_len);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "UNIX transmitted message to %s (%d/%u: %s)\n",
- GNUNET_i2s (&queue->target),
- (int) sent,
- (unsigned int) msg_size,
- (sent < 0) ? STRERROR (errno) : "ok");
+ (const struct sockaddr *) queue->address,
+ queue->address_len);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "UNIX transmitted message to %s (%d/%u: %s)\n",
+ GNUNET_i2s (&queue->target),
+ (int) sent,
+ (unsigned int) msg_size,
+ (sent < 0) ? STRERROR (errno) : "ok");
if (-1 != sent)
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# bytes sent",
+ (long long) sent,
+ GNUNET_NO);
+ reschedule_queue_timeout (queue);
return; /* all good */
+ }
+ GNUNET_STATISTICS_update (stats,
+ "# network transmission failures",
+ 1,
+ GNUNET_NO);
switch (errno)
{
case EAGAIN:
@@ -747,11 +472,11 @@ select_write_cb (void *cls)
GNUNET_break (0);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Trying to increase socket buffer size from %u to %u for message size %u\n",
- (unsigned int) size,
- (unsigned int) m((msg_size / 1000) + 2) * 1000,
- (unsigned int) msg_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to increase socket buffer size from %u to %u for message size %u\n",
+ (unsigned int) size,
+ (unsigned int) ((msg_size / 1000) + 2) * 1000,
+ (unsigned int) msg_size);
size = ((msg_size / 1000) + 2) * 1000;
if (GNUNET_OK ==
GNUNET_NETWORK_socket_setsockopt (unix_sock,
@@ -846,7 +571,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq,
{
struct Queue *queue = impl_state;
- // FIXME: TBD!
+ GNUNET_assert (NULL != queue->msg);
+ queue->msg = NULL;
+ GNUNET_CONTAINER_DLL_remove (queue_head,
+ queue_tail,
+ queue);
+ GNUNET_assert (NULL != write_task);
+ if (NULL == queue_head)
+ {
+ GNUNET_SCHEDULER_cancel (write_task);
+ write_task = NULL;
+ }
}
@@ -865,7 +600,230 @@ mq_error (void *cls,
{
struct Queue *queue = cls;
- // FIXME: TBD!
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "UNIX MQ error in queue to %s: %d\n",
+ GNUNET_i2s (&queue->target),
+ (int) error);
+ queue_destroy (queue);
+}
+
+
+/**
+ * Creates a new outbound queue the transport service will use to send
+ * data to another peer.
+ *
+ * @param peer the target peer
+ * @param un the address
+ * @param un_len number of bytes in @a un
+ * @return the queue or NULL of max connections exceeded
+ */
+static struct Queue *
+setup_queue (const struct GNUNET_PeerIdentity *target,
+ const struct sockaddr_un *un,
+ socklen_t un_len)
+{
+ struct Queue *queue;
+
+ queue = GNUNET_new (struct Queue);
+ queue->target = *target;
+ queue->address = GNUNET_memdup (un,
+ un_len);
+ queue->address_len = un_len;
+ (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
+ &queue->target,
+ queue,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_STATISTICS_set (stats,
+ "# queues active",
+ GNUNET_CONTAINER_multipeermap_size (queue_map),
+ GNUNET_NO);
+ queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ queue->timeout_task
+ = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &queue_timeout,
+ queue);
+ queue->mq
+ = GNUNET_MQ_queue_for_callbacks (&mq_send,
+ &mq_destroy,
+ &mq_cancel,
+ queue,
+ NULL,
+ &mq_error,
+ queue);
+ {
+ char *foreign_addr;
+
+ if ('\0' == un->sun_path[0])
+ GNUNET_asprintf (&foreign_addr,
+ "%s-@%s",
+ COMMUNICATOR_NAME,
+ &un->sun_path[1]);
+ else
+ GNUNET_asprintf (&foreign_addr,
+ "%s-%s",
+ COMMUNICATOR_NAME,
+ un->sun_path);
+ queue->qh
+ = GNUNET_TRANSPORT_communicator_mq_add (ch,
+ &queue->target,
+ foreign_addr,
+ GNUNET_ATS_NET_LOOPBACK,
+ queue->mq);
+ GNUNET_free (foreign_addr);
+ }
+ return queue;
+}
+
+
+/**
+ * We have been notified that our socket has something to read. Do the
+ * read and reschedule this function to be called again once more is
+ * available.
+ *
+ * @param cls NULL
+ */
+static void
+select_read_cb (void *cls);
+
+
+/**
+ * Function called when message was successfully passed to
+ * transport service. Continue read activity.
+ *
+ * @param cls NULL
+ * @param success #GNUNET_OK on success
+ */
+static void
+receive_complete_cb (void *cls,
+ int success)
+{
+ delivering_messages--;
+ if (GNUNET_OK != success)
+ GNUNET_STATISTICS_update (stats,
+ "# transport transmission failures",
+ 1,
+ GNUNET_NO);
+ if ( (NULL == read_task) &&
+ (delivering_messages < max_queue_length) )
+ read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ unix_sock,
+ &select_read_cb,
+ NULL);
+}
+
+
+/**
+ * We have been notified that our socket has something to read. Do the
+ * read and reschedule this function to be called again once more is
+ * available.
+ *
+ * @param cls NULL
+ */
+static void
+select_read_cb (void *cls)
+{
+ char buf[65536] GNUNET_ALIGN;
+ struct Queue *queue;
+ const struct UNIXMessage *msg;
+ struct sockaddr_un un;
+ socklen_t addrlen;
+ ssize_t ret;
+ uint16_t msize;
+
+ read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ unix_sock,
+ &select_read_cb,
+ NULL);
+ addrlen = sizeof (un);
+ memset (&un,
+ 0,
+ sizeof (un));
+ ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
+ buf,
+ sizeof (buf),
+ (struct sockaddr *) &un,
+ &addrlen);
+ if ( (-1 == ret) &&
+ ( (EAGAIN == errno) ||
+ (ENOBUFS == errno) ) )
+ return;
+ if (-1 == ret)
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+ "recvfrom");
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Read %d bytes from socket %s\n",
+ (int) ret,
+ un.sun_path);
+ GNUNET_assert (AF_UNIX == (un.sun_family));
+ msg = (struct UNIXMessage *) buf;
+ msize = ntohs (msg->header.size);
+ if ( (msize < sizeof (struct UNIXMessage)) ||
+ (msize > ret) )
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ queue = lookup_queue (&msg->sender,
+ &un,
+ addrlen);
+ if (NULL == queue)
+ queue = setup_queue (&msg->sender,
+ &un,
+ addrlen);
+ else
+ reschedule_queue_timeout (queue);
+ if (NULL == queue)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
+ return;
+ }
+
+ {
+ uint16_t offset = 0;
+ uint16_t tsize = msize - sizeof (struct UNIXMessage);
+ const char *msgbuf = (const char *) &msg[1];
+
+ while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
+ {
+ const struct GNUNET_MessageHeader *currhdr;
+ struct GNUNET_MessageHeader al_hdr;
+ uint16_t csize;
+
+ currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
+ /* ensure aligned access */
+ memcpy (&al_hdr,
+ currhdr,
+ sizeof (al_hdr));
+ csize = ntohs (al_hdr.size);
+ if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
+ (csize > tsize - offset))
+ {
+ GNUNET_break_op (0);
+ break;
+ }
+ ret = GNUNET_TRANSPORT_communicator_receive (ch,
+ &msg->sender,
+ currhdr,
+ &receive_complete_cb,
+ NULL);
+ if (GNUNET_SYSERR == ret)
+ return; /* transport not up */
+ if (GNUNET_NO == ret)
+ break;
+ delivering_messages++;
+ offset += csize;
+ }
+ }
+ if (delivering_messages >= max_queue_length)
+ {
+ /* we should try to apply 'back pressure' */
+ GNUNET_SCHEDULER_cancel (read_task);
+ read_task = NULL;
+ }
}
@@ -889,76 +847,69 @@ mq_error (void *cls,
static int
mq_init (void *cls,
const struct GNUNET_PeerIdentity *peer,
- const void *address)
+ const char *address)
{
struct Queue *queue;
- char *a;
- char *e;
- int is_abs;
- sockaddr_un *un;
+ const char *path;
+ struct sockaddr_un *un;
socklen_t un_len;
- if (NULL == strncmp (address,
- COMMUNICATOR_NAME "-",
- strlen (COMMUNICATOR_NAME "-")))
+ if (0 != strncmp (address,
+ COMMUNICATOR_NAME "-",
+ strlen (COMMUNICATOR_NAME "-")))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]);
- e = strchr (a,
- (unsigned char) '#');
- if (NULL == e)
- {
- GNUNET_free (a);
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- is_abs = ('1' == e[1]);
- *e = '\0';
- un = unix_address_to_sockaddr (a,
- &un_len,
- is_abs);
+ path = &address[strlen (COMMUNICATOR_NAME "-")];
+ un = unix_address_to_sockaddr (path,
+ &un_len);
queue = lookup_queue (peer,
un,
un_len);
if (NULL != queue)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Address `%s' ignored, queue exists\n",
- address);
+ "Address `%s' for %s ignored, queue exists\n",
+ path,
+ GNUNET_i2s (peer));
GNUNET_free (un);
return GNUNET_OK;
}
- queue = GNUNET_new (struct Queue);
- queue->target = *peer;
- queue->address = un;
- queue->address_len = un_len;
- (void) GNUNET_CONTAINER_multihashmap_put (queue_map,
- &queue->target,
- queue,
- GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_STATISTICS_set (stats,
- "# UNIX queues active",
- GNUNET_CONTAINER_multipeermap_size (plugin->queue_map),
- GNUNET_NO);
- queue->timeout = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &queue_timeout,
- queue);
- queue->mq
- = GNUNET_MQ_queue_for_callbacks (&mq_send,
- &mq_destroy,
- &mq_cancel,
- queue,
- NULL,
- &mq_error,
- queue);
- queue->qh
- = GNUNET_TRANSPORT_communicator_mq_add (ch,
- &queue->target,
- address,
- ATS,
- queue->mq);
+ queue = setup_queue (peer,
+ un,
+ un_len);
+ GNUNET_free (un);
+ if (NULL == queue)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to setup queue to %s at `%s'\n",
+ GNUNET_i2s (peer),
+ path);
+ return GNUNET_NO;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Iterator over all message queues to clean up.
+ *
+ * @param cls NULL
+ * @param target unused
+ * @param value the queue to destroy
+ * @return #GNUNET_OK to continue to iterate
+ */
+static int
+get_queue_delete_it (void *cls,
+ const struct GNUNET_PeerIdentity *target,
+ void *value)
+{
+ struct Queue *queue = value;
+
+ (void) cls;
+ (void) target;
+ queue_destroy (queue);
return GNUNET_OK;
}
@@ -971,22 +922,6 @@ mq_init (void *cls,
static void
do_shutdown (void *cls)
{
- struct UNIXMessageWrapper *msgw;
-
- while (NULL != (msgw = msg_head))
- {
- GNUNET_CONTAINER_DLL_remove (msg_head,
- msg_tail,
- msgw);
- queue = msgw->queue;
- queue->msgs_in_queue--;
- GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
- queue->bytes_in_queue -= msgw->msgsize;
- GNUNET_assert (bytes_in_queue >= msgw->msgsize);
- bytes_in_queue -= msgw->msgsize;
- GNUNET_free (msgw->msg);
- GNUNET_free (msgw);
- }
if (NULL != read_task)
{
GNUNET_SCHEDULER_cancel (read_task);
@@ -1017,6 +952,12 @@ do_shutdown (void *cls)
GNUNET_TRANSPORT_communicator_disconnect (ch);
ch = NULL;
}
+ if (NULL != stats)
+ {
+ GNUNET_STATISTICS_destroy (stats,
+ GNUNET_NO);
+ stats = NULL;
+ }
}
@@ -1035,7 +976,6 @@ run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
char *unix_socket_path;
- int is_abstract;
struct sockaddr_un *un;
socklen_t un_len;
char *my_addr;
@@ -1059,18 +999,16 @@ run (void *cls,
&max_queue_length))
max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
-
- /* Initialize my flags */
- is_abstract = 0;
-#ifdef LINUX
- is_abstract
- = GNUNET_CONFIGURATION_get_value_yesno (cfg,
- "testing",
- "USE_ABSTRACT_SOCKETS");
-#endif
un = unix_address_to_sockaddr (unix_socket_path,
- &un_len,
- is_abstract);
+ &un_len);
+ if (NULL == un)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to setup UNIX domain socket address with path `%s'\n",
+ unix_socket_path);
+ GNUNET_free (unix_socket_path);
+ return;
+ }
unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
SOCK_DGRAM,
0);
@@ -1086,9 +1024,9 @@ run (void *cls,
(GNUNET_OK !=
GNUNET_DISK_directory_create_for_file (un->sun_path)) )
{
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Cannot create path to `%s'\n"),
- un->sun_path);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Cannot create path to `%s'\n"),
+ un->sun_path);
GNUNET_NETWORK_socket_close (unix_sock);
unix_sock = NULL;
GNUNET_free (un);
@@ -1100,11 +1038,9 @@ run (void *cls,
(const struct sockaddr *) un,
un_len))
{
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
- "bind");
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Cannot bind to `%s'\n"),
- un->sun_path);
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
+ "bind",
+ un->sun_path);
GNUNET_NETWORK_socket_close (unix_sock);
unix_sock = NULL;
GNUNET_free (un);
@@ -1112,14 +1048,16 @@ run (void *cls,
return;
}
GNUNET_free (un);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Bound to `%s'\n",
- unix_socket_path);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Bound to `%s'\n",
+ unix_socket_path);
+ stats = GNUNET_STATISTICS_create ("C-UNIX",
+ cfg);
GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
NULL);
read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
unix_sock,
- &unix_plugin_select_read,
+ &select_read_cb,
NULL);
queue_map = GNUNET_CONTAINER_multipeermap_create (10,
GNUNET_NO);
@@ -1136,10 +1074,9 @@ run (void *cls,
return;
}
GNUNET_asprintf (&my_addr,
- "%s-%s#%d",
+ "%s-%s",
COMMUNICATOR_NAME,
- unix_socket_path,
- is_abstract);
+ unix_socket_path);
ai = GNUNET_TRANSPORT_communicator_address_add (ch,
my_addr,
GNUNET_ATS_NET_LOOPBACK,