aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-communicator-unix.c1649
1 files changed, 1649 insertions, 0 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
new file mode 100644
index 000000000..373b74149
--- /dev/null
+++ b/src/transport/gnunet-communicator-unix.c
@@ -0,0 +1,1649 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/gnunet-communicator-unix.c
21 * @brief Transport plugin using unix domain sockets (!)
22 * Clearly, can only be used locally on Unix/Linux hosts...
23 * ONLY INTENDED FOR TESTING!!!
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet_transport_communication_service.h"
32
33/**
34 * Name of the communicator.
35 */
36#define COMMUNICATOR_NAME "unix"
37
38
39GNUNET_NETWORK_STRUCT_BEGIN
40
41/**
42 * UNIX Message-Packet header.
43 */
44struct UNIXMessage
45{
46 /**
47 * Message header.
48 */
49 struct GNUNET_MessageHeader header;
50
51 /**
52 * What is the identity of the sender (GNUNET_hash of public key)
53 */
54 struct GNUNET_PeerIdentity sender;
55
56};
57
58GNUNET_NETWORK_STRUCT_END
59
60
61/**
62 * Information we track for a message awaiting transmission.
63 */
64struct UNIXMessageWrapper
65{
66 /**
67 * We keep messages in a doubly linked list.
68 */
69 struct UNIXMessageWrapper *next;
70
71 /**
72 * We keep messages in a doubly linked list.
73 */
74 struct UNIXMessageWrapper *prev;
75
76 /**
77 * The actual payload (allocated separately right now).
78 */
79 struct UNIXMessage *msg;
80
81 /**
82 * Queue this message belongs to.
83 */
84 struct Queue *queue;
85
86 /**
87 * Function to call upon transmission.
88 */
89 GNUNET_TRANSPORT_TransmitContinuation cont;
90
91 /**
92 * Closure for @e cont.
93 */
94 void *cont_cls;
95
96 /**
97 * Timeout for this message.
98 */
99 struct GNUNET_TIME_Absolute timeout;
100
101 /**
102 * Number of bytes in @e msg.
103 */
104 size_t msgsize;
105
106 /**
107 * Number of bytes of payload encapsulated in @e msg.
108 */
109 size_t payload;
110
111 /**
112 * Priority of the message (ignored, just dragged along in UNIX).
113 */
114 unsigned int priority;
115};
116
117
118/**
119 * Handle for a queue.
120 */
121struct Queue
122{
123
124 /**
125 * Queues with pending messages (!) are kept in a DLL.
126 */
127 struct Queue *next;
128
129 /**
130 * Queues with pending messages (!) are kept in a DLL.
131 */
132 struct Queue *prev;
133
134 /**
135 * To whom are we talking to (set to our identity
136 * if we are still waiting for the welcome message).
137 *
138 * FIXME: information duplicated with 'peer' in address!
139 */
140 struct GNUNET_PeerIdentity target;
141
142 /**
143 * Address of the other peer.
144 */
145 struct sockaddr_un *address;
146
147 /**
148 * Length of the address.
149 */
150 socklen_t address_len;
151
152 /**
153 * Message queue we are providing for the #ch.
154 */
155 struct GNUNET_MQ_Handle *mq;
156
157 /**
158 * handle for this queue with the #ch.
159 */
160 struct GNUNET_TRANSPORT_QueueHandle *qh;
161
162 /**
163 * Number of bytes we currently have in our write queue.
164 */
165 unsigned long long bytes_in_queue;
166
167 /**
168 * Timeout for this queue.
169 */
170 struct GNUNET_TIME_Absolute timeout;
171
172 /**
173 * Queue timeout task.
174 */
175 struct GNUNET_SCHEDULER_Task * timeout_task;
176
177 /**
178 * Number of messages we currently have in our write queue.
179 */
180 unsigned int msgs_in_queue;
181
182};
183
184
185
186/**
187 * ID of read task
188 */
189static struct GNUNET_SCHEDULER_Task *read_task;
190
191/**
192 * ID of write task
193 */
194static struct GNUNET_SCHEDULER_Task *write_task;
195
196/**
197 * Number of bytes we currently have in our write queues.
198 */
199static unsigned long long bytes_in_queue;
200
201/**
202 * Our environment.
203 */
204static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
205
206/**
207 * Queues (map from peer identity to `struct Queue`)
208 */
209static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
210
211/**
212 * Head of queue of messages to transmit.
213 */
214static struct UNIXMessageWrapper *msg_head;
215
216/**
217 * Tail of queue of messages to transmit.
218 */
219static struct UNIXMessageWrapper *msg_tail;
220
221/**
222 * socket that we transmit all data with
223 */
224static struct GNUNET_NETWORK_Handle *unix_sock;
225
226/**
227 * Handle to the operation that publishes our address.
228 */
229static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
230
231
232/**
233 * If a queue monitor is attached, notify it about the new
234 * queue state.
235 *
236 * @param plugin our plugin
237 * @param queue queue that changed state
238 * @param state new state of the queue
239 */
240static void
241notify_queue_monitor (struct Plugin *plugin,
242 struct Queue *queue,
243 enum GNUNET_TRANSPORT_QueueState state)
244{
245 struct GNUNET_TRANSPORT_QueueInfo info;
246
247 if (NULL == plugin->sic)
248 return;
249 memset (&info, 0, sizeof (info));
250 info.state = state;
251 info.is_inbound = GNUNET_SYSERR; /* hard to say */
252 info.num_msg_pending = queue->msgs_in_queue;
253 info.num_bytes_pending = queue->bytes_in_queue;
254 /* info.receive_delay remains zero as this is not supported by UNIX
255 (cannot selectively not receive from 'some' peer while continuing
256 to receive from others) */
257 info.queue_timeout = queue->timeout;
258 info.address = queue->address;
259 plugin->sic (plugin->sic_cls,
260 queue,
261 &info);
262}
263
264
265/**
266 * Function called for a quick conversion of the binary address to
267 * a numeric address. Note that the caller must not free the
268 * address and that the next call to this function is allowed
269 * to override the address again.
270 *
271 * @param cls closure
272 * @param addr binary address
273 * @param addrlen length of the @a addr
274 * @return string representing the same address
275 */
276static const char *
277unix_plugin_address_to_string (void *cls,
278 const void *addr,
279 size_t addrlen)
280{
281 static char rbuf[1024];
282 struct UnixAddress *ua = (struct UnixAddress *) addr;
283 char *addrstr;
284 size_t addr_str_len;
285 unsigned int off;
286
287 if ((NULL == addr) || (sizeof (struct UnixAddress) > addrlen))
288 {
289 GNUNET_break(0);
290 return NULL;
291 }
292 addrstr = (char *) &ua[1];
293 addr_str_len = ntohl (ua->addrlen);
294
295 if (addr_str_len != addrlen - sizeof(struct UnixAddress))
296 {
297 GNUNET_break(0);
298 return NULL;
299 }
300 if ('\0' != addrstr[addr_str_len - 1])
301 {
302 GNUNET_break(0);
303 return NULL;
304 }
305 if (strlen (addrstr) + 1 != addr_str_len)
306 {
307 GNUNET_break(0);
308 return NULL;
309 }
310
311 off = 0;
312 if ('\0' == addrstr[0])
313 off++;
314 memset (rbuf, 0, sizeof (rbuf));
315 GNUNET_snprintf (rbuf,
316 sizeof (rbuf) - 1,
317 "%s.%u.%s%.*s",
318 PLUGIN_NAME,
319 ntohl (ua->options),
320 (off == 1) ? "@" : "",
321 (int) (addr_str_len - off),
322 &addrstr[off]);
323 return rbuf;
324}
325
326
327/**
328 * Functions with this signature are called whenever we need
329 * to close a queue due to a disconnect or failure to
330 * establish a connection.
331 *
332 * @param queue queue to close down
333 */
334static void
335unix_plugin_queue_disconnect (struct Queue *queue)
336{
337 struct Plugin *plugin = cls;
338 struct UNIXMessageWrapper *msgw;
339 struct UNIXMessageWrapper *next;
340
341 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "Disconnecting queue for peer `%s'\n",
343 GNUNET_i2s (&queue->target));
344 plugin->env->queue_end (plugin->env->cls,
345 queue->address,
346 queue);
347 next = plugin->msg_head;
348 while (NULL != next)
349 {
350 msgw = next;
351 next = msgw->next;
352 if (msgw->queue != queue)
353 continue;
354 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
355 plugin->msg_tail,
356 msgw);
357 queue->msgs_in_queue--;
358 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
359 queue->bytes_in_queue -= msgw->msgsize;
360 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
361 plugin->bytes_in_queue -= msgw->msgsize;
362 if (NULL != msgw->cont)
363 msgw->cont (msgw->cont_cls,
364 &msgw->queue->target,
365 GNUNET_SYSERR,
366 msgw->payload, 0);
367 GNUNET_free (msgw->msg);
368 GNUNET_free (msgw);
369 }
370 GNUNET_assert (GNUNET_YES ==
371 GNUNET_CONTAINER_multipeermap_remove (plugin->queue_map,
372 &queue->target,
373 queue));
374 GNUNET_STATISTICS_set (stats,
375 "# UNIX queues active",
376 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map),
377 GNUNET_NO);
378 if (NULL != queue->timeout_task)
379 {
380 GNUNET_SCHEDULER_cancel (queue->timeout_task);
381 queue->timeout_task = NULL;
382 queue->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
383 }
384 GNUNET_free (queue->address);
385 GNUNET_break (0 == queue->bytes_in_queue);
386 GNUNET_break (0 == queue->msgs_in_queue);
387 GNUNET_free (queue);
388}
389
390
391/**
392 * Queue was idle for too long, so disconnect it
393 *
394 * @param cls the `struct Queue *` to disconnect
395 */
396static void
397queue_timeout (void *cls)
398{
399 struct Queue *queue = cls;
400 struct GNUNET_TIME_Relative left;
401
402 queue->timeout_task = NULL;
403 left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
404 if (0 != left.rel_value_us)
405 {
406 /* not actually our turn yet, but let's at least update
407 the monitor, it may think we're about to die ... */
408 queue->timeout_task
409 = GNUNET_SCHEDULER_add_delayed (left,
410 &queue_timeout,
411 queue);
412 return;
413 }
414 LOG (GNUNET_ERROR_TYPE_DEBUG,
415 "Queue %p was idle for %s, disconnecting\n",
416 queue,
417 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
418 GNUNET_YES));
419 unix_plugin_queue_disconnect (queue);
420}
421
422
423/**
424 * Increment queue timeout due to activity. We do not immediately
425 * notify the monitor here as that might generate excessive
426 * signalling.
427 *
428 * @param queue queue for which the timeout should be rescheduled
429 */
430static void
431reschedule_queue_timeout (struct Queue *queue)
432{
433 GNUNET_assert (NULL != queue->timeout_task);
434 queue->timeout
435 = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
436}
437
438
439/**
440 * Convert unix path to a `struct sockaddr_un *`
441 *
442 * @param unixpath path to convert
443 * @param[out] sock_len set to the length of the address
444 * @param is_abstract is this an abstract @a unixpath
445 * @return converted unix path
446 */
447static struct sockaddr_un *
448unix_address_to_sockaddr (const char *unixpath,
449 socklen_t *sock_len,
450 int is_abstract)
451{
452 struct sockaddr_un *un;
453 size_t slen;
454
455 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
456 un = GNUNET_new (struct sockaddr_un);
457 un->sun_family = AF_UNIX;
458 slen = strlen (unixpath);
459 if (slen >= sizeof (un->sun_path))
460 slen = sizeof (un->sun_path) - 1;
461 GNUNET_memcpy (un->sun_path, unixpath, slen);
462 un->sun_path[slen] = '\0';
463 slen = sizeof (struct sockaddr_un);
464#if HAVE_SOCKADDR_UN_SUN_LEN
465 un->sun_len = (u_char) slen;
466#endif
467 (*sock_len) = slen;
468 if (GNUNET_YES == is_abstract)
469 un->sun_path[0] = '\0';
470 return un;
471}
472
473
474/**
475 * Closure to #lookup_queue_it().
476 */
477struct LookupCtx
478{
479 /**
480 * Location to store the queue, if found.
481 */
482 struct Queue *res;
483
484 /**
485 * Address we are looking for.
486 */
487 const sockaddr_un *un;
488
489 /**
490 * Number of bytes in @a un
491 */
492 socklen_t un_len;
493};
494
495
496/**
497 * Function called to find a queue by address.
498 *
499 * @param cls the `struct LookupCtx *`
500 * @param key peer we are looking for (unused)
501 * @param value a queue
502 * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
503 */
504static int
505lookup_queue_it (void *cls,
506 const struct GNUNET_PeerIdentity * key,
507 void *value)
508{
509 struct LookupCtx *lctx = cls;
510 struct Queue *queue = value;
511
512 if ( (queue->address_len = lctx->un_len) &&
513 (0 == memcmp (lctx->un,
514 queue->address,
515 queue->address_len)) )
516 {
517 lctx->res = queue;
518 return GNUNET_NO;
519 }
520 return GNUNET_YES;
521}
522
523
524/**
525 * Find an existing queue by address.
526 *
527 * @param plugin the plugin
528 * @param address the address to find
529 * @return NULL if queue was not found
530 */
531static struct Queue *
532lookup_queue (const struct GNUNET_PeerIdentity *peer,
533 const sockaddr_un *un,
534 socklen_t un_len)
535{
536 struct LookupCtx lctx;
537
538 lctx.un = un;
539 lctx.un_len = un_len;
540 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map,
541 peer,
542 &lookup_queue_it,
543 &lctx);
544 return lctx.res;
545}
546
547
548
549/**
550 * Actually send out the message, assume we've got the address and
551 * send_handle squared away!
552 *
553 * @param cls closure
554 * @param send_handle which handle to send message on
555 * @param target who should receive this message (ignored by UNIX)
556 * @param msgbuf one or more GNUNET_MessageHeader(s) strung together
557 * @param msgbuf_size the size of the @a msgbuf to send
558 * @param priority how important is the message (ignored by UNIX)
559 * @param timeout when should we time out (give up) if we can not transmit?
560 * @param addr the addr to send the message to, needs to be a sockaddr for us
561 * @param addrlen the len of @a addr
562 * @param payload bytes payload to send
563 * @param cont continuation to call once the message has
564 * been transmitted (or if the transport is ready
565 * for the next transmission call; or if the
566 * peer disconnected...)
567 * @param cont_cls closure for @a cont
568 * @return on success the number of bytes written, RETRY for retry, -1 on errors
569 */
570static ssize_t
571unix_real_send (void *cls,
572 struct GNUNET_NETWORK_Handle *send_handle,
573 const struct GNUNET_PeerIdentity *target,
574 const char *msgbuf,
575 size_t msgbuf_size,
576 unsigned int priority,
577 struct GNUNET_TIME_Absolute timeout,
578 const struct UnixAddress *addr,
579 size_t addrlen,
580 size_t payload,
581 GNUNET_TRANSPORT_TransmitContinuation cont,
582 void *cont_cls)
583{
584 struct Plugin *plugin = cls;
585 ssize_t sent;
586 struct sockaddr_un *un;
587 socklen_t un_len;
588 const char *unixpath;
589
590 if (NULL == send_handle)
591 {
592 GNUNET_break (0); /* We do not have a send handle */
593 return GNUNET_SYSERR;
594 }
595 if ((NULL == addr) || (0 == addrlen))
596 {
597 GNUNET_break (0); /* Can never send if we don't have an address */
598 return GNUNET_SYSERR;
599 }
600
601 /* Prepare address */
602 unixpath = (const char *) &addr[1];
603 if (NULL == (un = unix_address_to_sockaddr (unixpath,
604 &un_len)))
605 {
606 GNUNET_break (0);
607 return -1;
608 }
609
610 if ((GNUNET_YES == plugin->is_abstract) &&
611 (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) )
612 {
613 un->sun_path[0] = '\0';
614 }
615resend:
616 /* Send the data */
617 sent = GNUNET_NETWORK_socket_sendto (send_handle,
618 msgbuf,
619 msgbuf_size,
620 (const struct sockaddr *) un,
621 un_len);
622 if (GNUNET_SYSERR == sent)
623 {
624 if ( (EAGAIN == errno) ||
625 (ENOBUFS == errno) )
626 {
627 GNUNET_free (un);
628 return RETRY; /* We have to retry later */
629 }
630 if (EMSGSIZE == errno)
631 {
632 socklen_t size = 0;
633 socklen_t len = sizeof (size);
634
635 GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *)
636 send_handle, SOL_SOCKET, SO_SNDBUF, &size,
637 &len);
638 if (size < msgbuf_size)
639 {
640 LOG (GNUNET_ERROR_TYPE_DEBUG,
641 "Trying to increase socket buffer size from %u to %u for message size %u\n",
642 (unsigned int) size,
643 (unsigned int) ((msgbuf_size / 1000) + 2) * 1000,
644 (unsigned int) msgbuf_size);
645 size = ((msgbuf_size / 1000) + 2) * 1000;
646 if (GNUNET_OK ==
647 GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle,
648 SOL_SOCKET, SO_SNDBUF,
649 &size, sizeof (size)))
650 goto resend; /* Increased buffer size, retry sending */
651 else
652 {
653 /* Could not increase buffer size: error, no retry */
654 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
655 GNUNET_free (un);
656 return GNUNET_SYSERR;
657 }
658 }
659 else
660 {
661 /* Buffer is bigger than message: error, no retry
662 * This should never happen!*/
663 GNUNET_break (0);
664 GNUNET_free (un);
665 return GNUNET_SYSERR;
666 }
667 }
668 }
669
670 LOG (GNUNET_ERROR_TYPE_DEBUG,
671 "UNIX transmitted %u-byte message to %s (%d: %s)\n",
672 (unsigned int) msgbuf_size,
673 GNUNET_a2s ((const struct sockaddr *)un, un_len),
674 (int) sent,
675 (sent < 0) ? STRERROR (errno) : "ok");
676 GNUNET_free (un);
677 return sent;
678}
679
680
681/**
682 * Function obtain the network type for a queue
683 *
684 * @param cls closure ('struct Plugin*')
685 * @param queue the queue
686 * @return the network type in HBO or #GNUNET_SYSERR
687 */
688static enum GNUNET_ATS_Network_Type
689unix_plugin_get_network (void *cls,
690 struct Queue *queue)
691{
692 GNUNET_assert (NULL != queue);
693 return GNUNET_ATS_NET_LOOPBACK;
694}
695
696
697/**
698 * Function obtain the network type for a queue
699 *
700 * @param cls closure (`struct Plugin *`)
701 * @param address the address
702 * @return the network type
703 */
704static enum GNUNET_ATS_Network_Type
705unix_plugin_get_network_for_address (void *cls,
706 const struct GNUNET_HELLO_Address *address)
707
708{
709 return GNUNET_ATS_NET_LOOPBACK;
710}
711
712
713/**
714 * Creates a new outbound queue the transport service will use to send data to the
715 * peer
716 *
717 * @param cls the plugin
718 * @param address the address
719 * @return the queue or NULL of max connections exceeded
720 */
721static struct Queue *
722unix_plugin_get_queue (void *cls,
723 const struct GNUNET_HELLO_Address *address)
724{
725 struct Plugin *plugin = cls;
726 struct Queue *queue;
727 struct UnixAddress *ua;
728 char * addrstr;
729 uint32_t addr_str_len;
730 uint32_t addr_option;
731
732 ua = (struct UnixAddress *) address->address;
733 if ((NULL == address->address) || (0 == address->address_length) ||
734 (sizeof (struct UnixAddress) > address->address_length))
735 {
736 GNUNET_break (0);
737 return NULL;
738 }
739 addrstr = (char *) &ua[1];
740 addr_str_len = ntohl (ua->addrlen);
741 addr_option = ntohl (ua->options);
742
743 if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) &&
744 (GNUNET_NO == plugin->is_abstract))
745 {
746 return NULL;
747 }
748
749 if (addr_str_len != address->address_length - sizeof (struct UnixAddress))
750 {
751 return NULL; /* This can be a legacy address */
752 }
753
754 if ('\0' != addrstr[addr_str_len - 1])
755 {
756 GNUNET_break (0);
757 return NULL;
758 }
759 if (strlen (addrstr) + 1 != addr_str_len)
760 {
761 GNUNET_break (0);
762 return NULL;
763 }
764
765 /* Check if a queue for this address already exists */
766 if (NULL != (queue = lookup_queue (plugin,
767 address)))
768 {
769 LOG (GNUNET_ERROR_TYPE_DEBUG,
770 "Found existing queue %p for address `%s'\n",
771 queue,
772 unix_plugin_address_to_string (NULL,
773 address->address,
774 address->address_length));
775 return queue;
776 }
777
778 /* create a new queue */
779 queue = GNUNET_new (struct Queue);
780 queue->target = address->peer;
781 queue->address = GNUNET_HELLO_address_copy (address);
782 queue->plugin = plugin;
783 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
784 queue->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
785 &queue_timeout,
786 queue);
787 LOG (GNUNET_ERROR_TYPE_DEBUG,
788 "Creating a new queue %p for address `%s'\n",
789 queue,
790 unix_plugin_address_to_string (NULL,
791 address->address,
792 address->address_length));
793 (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map,
794 &address->peer, queue,
795 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
796 GNUNET_STATISTICS_set (plugin->env->stats,
797 "# UNIX queues active",
798 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map),
799 GNUNET_NO);
800 notify_queue_monitor (plugin,
801 queue,
802 GNUNET_TRANSPORT_SS_INIT);
803 notify_queue_monitor (plugin,
804 queue,
805 GNUNET_TRANSPORT_SS_UP);
806 return queue;
807}
808
809
810/**
811 * Function that will be called whenever the transport service wants
812 * to notify the plugin that a queue is still active and in use and
813 * therefore the queue timeout for this queue has to be updated
814 *
815 * @param cls closure with the `struct Plugin *`
816 * @param peer which peer was the queue for
817 * @param queue which queue is being updated
818 */
819static void
820unix_plugin_update_queue_timeout (void *cls,
821 const struct GNUNET_PeerIdentity *peer,
822 struct Queue *queue)
823{
824 struct Plugin *plugin = cls;
825
826 if (GNUNET_OK !=
827 GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map,
828 &queue->target,
829 queue))
830 {
831 GNUNET_break (0);
832 return;
833 }
834 reschedule_queue_timeout (queue);
835}
836
837
838/**
839 * Demultiplexer for UNIX messages
840 *
841 * @param plugin the main plugin for this transport
842 * @param sender from which peer the message was received
843 * @param currhdr pointer to the header of the message
844 * @param ua address to look for
845 * @param ua_len length of the address @a ua
846 */
847static void
848unix_demultiplexer (struct Plugin *plugin,
849 struct GNUNET_PeerIdentity *sender,
850 const struct GNUNET_MessageHeader *currhdr,
851 const struct UnixAddress *ua,
852 size_t ua_len)
853{
854 struct Queue *queue;
855 struct GNUNET_HELLO_Address *address;
856
857 GNUNET_assert (ua_len >= sizeof (struct UnixAddress));
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Received message from %s\n",
860 unix_plugin_address_to_string (NULL, ua, ua_len));
861 GNUNET_STATISTICS_update (plugin->env->stats,
862 "# bytes received via UNIX",
863 ntohs (currhdr->size),
864 GNUNET_NO);
865
866 /* Look for existing queue */
867 address = GNUNET_HELLO_address_allocate (sender,
868 PLUGIN_NAME,
869 ua, ua_len,
870 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" queues */
871 queue = lookup_queue (plugin, address);
872 if (NULL == queue)
873 {
874 queue = unix_plugin_get_queue (plugin, address);
875 /* Notify transport and ATS about new inbound queue */
876 plugin->env->queue_start (NULL,
877 queue->address,
878 queue,
879 GNUNET_ATS_NET_LOOPBACK);
880 }
881 else
882 {
883 reschedule_queue_timeout (queue);
884 }
885 GNUNET_HELLO_address_free (address);
886 plugin->env->receive (plugin->env->cls,
887 queue->address,
888 queue,
889 currhdr);
890}
891
892
893/**
894 * Read from UNIX domain socket (it is ready).
895 *
896 * @param plugin the plugin
897 */
898static void
899unix_plugin_do_read (struct Plugin *plugin)
900{
901 char buf[65536] GNUNET_ALIGN;
902 struct UnixAddress *ua;
903 struct UNIXMessage *msg;
904 struct GNUNET_PeerIdentity sender;
905 struct sockaddr_un un;
906 socklen_t addrlen;
907 ssize_t ret;
908 int offset;
909 int tsize;
910 int is_abstract;
911 char *msgbuf;
912 const struct GNUNET_MessageHeader *currhdr;
913 uint16_t csize;
914 size_t ua_len;
915
916 addrlen = sizeof (un);
917 memset (&un, 0, sizeof (un));
918 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
919 buf, sizeof (buf),
920 (struct sockaddr *) &un,
921 &addrlen);
922 if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS)))
923 return;
924 if (GNUNET_SYSERR == ret)
925 {
926 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
927 "recvfrom");
928 return;
929 }
930 else
931 {
932 LOG (GNUNET_ERROR_TYPE_DEBUG,
933 "Read %d bytes from socket %s\n",
934 (int) ret,
935 un.sun_path);
936 }
937
938 GNUNET_assert (AF_UNIX == (un.sun_family));
939 is_abstract = GNUNET_NO;
940 if ('\0' == un.sun_path[0])
941 {
942 un.sun_path[0] = '@';
943 is_abstract = GNUNET_YES;
944 }
945
946 ua_len = sizeof (struct UnixAddress) + strlen (un.sun_path) + 1;
947 ua = GNUNET_malloc (ua_len);
948 ua->addrlen = htonl (strlen (&un.sun_path[0]) +1);
949 GNUNET_memcpy (&ua[1], &un.sun_path[0], strlen (un.sun_path) + 1);
950 if (is_abstract)
951 ua->options = htonl(UNIX_OPTIONS_USE_ABSTRACT_SOCKETS);
952 else
953 ua->options = htonl(UNIX_OPTIONS_NONE);
954
955 msg = (struct UNIXMessage *) buf;
956 csize = ntohs (msg->header.size);
957 if ((csize < sizeof (struct UNIXMessage)) || (csize > ret))
958 {
959 GNUNET_break_op (0);
960 GNUNET_free (ua);
961 return;
962 }
963 msgbuf = (char *) &msg[1];
964 GNUNET_memcpy (&sender,
965 &msg->sender,
966 sizeof (struct GNUNET_PeerIdentity));
967 offset = 0;
968 tsize = csize - sizeof (struct UNIXMessage);
969 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
970 {
971 currhdr = (struct GNUNET_MessageHeader *) &msgbuf[offset];
972 csize = ntohs (currhdr->size);
973 if ((csize < sizeof (struct GNUNET_MessageHeader)) ||
974 (csize > tsize - offset))
975 {
976 GNUNET_break_op (0);
977 break;
978 }
979 unix_demultiplexer (plugin, &sender, currhdr, ua, ua_len);
980 offset += csize;
981 }
982 GNUNET_free (ua);
983}
984
985
986/**
987 * Write to UNIX domain socket (it is ready).
988 *
989 * @param plugin handle to the plugin
990 */
991static void
992unix_plugin_do_write (struct Plugin *plugin)
993{
994 ssize_t sent = 0;
995 struct UNIXMessageWrapper *msgw;
996 struct Queue *queue;
997 int did_delete;
998
999 queue = NULL;
1000 did_delete = GNUNET_NO;
1001 while (NULL != (msgw = plugin->msg_head))
1002 {
1003 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
1004 break; /* Message is ready for sending */
1005 /* Message has a timeout */
1006 did_delete = GNUNET_YES;
1007 LOG (GNUNET_ERROR_TYPE_DEBUG,
1008 "Timeout for message with %u bytes \n",
1009 (unsigned int) msgw->msgsize);
1010 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1011 plugin->msg_tail,
1012 msgw);
1013 queue = msgw->queue;
1014 queue->msgs_in_queue--;
1015 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
1016 queue->bytes_in_queue -= msgw->msgsize;
1017 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1018 plugin->bytes_in_queue -= msgw->msgsize;
1019 GNUNET_STATISTICS_set (plugin->env->stats,
1020 "# bytes currently in UNIX buffers",
1021 plugin->bytes_in_queue,
1022 GNUNET_NO);
1023 GNUNET_STATISTICS_update (plugin->env->stats,
1024 "# UNIX bytes discarded",
1025 msgw->msgsize,
1026 GNUNET_NO);
1027 if (NULL != msgw->cont)
1028 msgw->cont (msgw->cont_cls,
1029 &msgw->queue->target,
1030 GNUNET_SYSERR,
1031 msgw->payload,
1032 0);
1033 GNUNET_free (msgw->msg);
1034 GNUNET_free (msgw);
1035 }
1036 if (NULL == msgw)
1037 {
1038 if (GNUNET_YES == did_delete)
1039 notify_queue_monitor (plugin,
1040 queue,
1041 GNUNET_TRANSPORT_SS_UPDATE);
1042 return; /* Nothing to send at the moment */
1043 }
1044 queue = msgw->queue;
1045 sent = unix_real_send (plugin,
1046 unix_sock,
1047 &queue->target,
1048 (const char *) msgw->msg,
1049 msgw->msgsize,
1050 msgw->priority,
1051 msgw->timeout,
1052 msgw->queue->address->address,
1053 msgw->queue->address->address_length,
1054 msgw->payload,
1055 msgw->cont, msgw->cont_cls);
1056 if (RETRY == sent)
1057 {
1058 GNUNET_STATISTICS_update (plugin->env->stats,
1059 "# UNIX retry attempts",
1060 1, GNUNET_NO);
1061 notify_queue_monitor (plugin,
1062 queue,
1063 GNUNET_TRANSPORT_SS_UPDATE);
1064 return;
1065 }
1066 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1067 plugin->msg_tail,
1068 msgw);
1069 queue->msgs_in_queue--;
1070 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
1071 queue->bytes_in_queue -= msgw->msgsize;
1072 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1073 plugin->bytes_in_queue -= msgw->msgsize;
1074 GNUNET_STATISTICS_set (plugin->env->stats,
1075 "# bytes currently in UNIX buffers",
1076 plugin->bytes_in_queue, GNUNET_NO);
1077 notify_queue_monitor (plugin,
1078 queue,
1079 GNUNET_TRANSPORT_SS_UPDATE);
1080 if (GNUNET_SYSERR == sent)
1081 {
1082 /* failed and no retry */
1083 if (NULL != msgw->cont)
1084 msgw->cont (msgw->cont_cls,
1085 &msgw->queue->target,
1086 GNUNET_SYSERR,
1087 msgw->payload, 0);
1088 GNUNET_STATISTICS_update (plugin->env->stats,
1089 "# UNIX bytes discarded",
1090 msgw->msgsize,
1091 GNUNET_NO);
1092 GNUNET_free (msgw->msg);
1093 GNUNET_free (msgw);
1094 return;
1095 }
1096 /* successfully sent bytes */
1097 GNUNET_break (sent > 0);
1098 GNUNET_STATISTICS_update (plugin->env->stats,
1099 "# bytes transmitted via UNIX",
1100 msgw->msgsize,
1101 GNUNET_NO);
1102 if (NULL != msgw->cont)
1103 msgw->cont (msgw->cont_cls,
1104 &msgw->queue->target,
1105 GNUNET_OK,
1106 msgw->payload,
1107 msgw->msgsize);
1108 GNUNET_free (msgw->msg);
1109 GNUNET_free (msgw);
1110}
1111
1112
1113/**
1114 * We have been notified that our socket has something to read.
1115 * Then reschedule this function to be called again once more is available.
1116 *
1117 * @param cls the plugin handle
1118 */
1119static void
1120unix_plugin_select_read (void *cls)
1121{
1122 struct Plugin *plugin = cls;
1123 const struct GNUNET_SCHEDULER_TaskContext *tc;
1124
1125 plugin->read_task = NULL;
1126 tc = GNUNET_SCHEDULER_get_task_context ();
1127 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
1128 unix_plugin_do_read (plugin);
1129 plugin->read_task =
1130 GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1131 unix_sock,
1132 &unix_plugin_select_read, plugin);
1133}
1134
1135
1136/**
1137 * We have been notified that our socket is ready to write.
1138 * Then reschedule this function to be called again once more is available.
1139 *
1140 * @param cls the plugin handle
1141 */
1142static void
1143unix_plugin_select_write (void *cls)
1144{
1145 struct Plugin *plugin = cls;
1146 const struct GNUNET_SCHEDULER_TaskContext *tc;
1147
1148 plugin->write_task = NULL;
1149 tc = GNUNET_SCHEDULER_get_task_context ();
1150 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
1151 unix_plugin_do_write (plugin);
1152 if (NULL == plugin->msg_head)
1153 return; /* write queue empty */
1154 plugin->write_task =
1155 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1156 unix_sock,
1157 &unix_plugin_select_write, plugin);
1158}
1159
1160
1161/**
1162 * Function that can be used by the transport service to transmit
1163 * a message using the plugin. Note that in the case of a
1164 * peer disconnecting, the continuation MUST be called
1165 * prior to the disconnect notification itself. This function
1166 * will be called with this peer's HELLO message to initiate
1167 * a fresh connection to another peer.
1168 *
1169 * @param cls closure
1170 * @param queue which queue must be used
1171 * @param msgbuf the message to transmit
1172 * @param msgbuf_size number of bytes in @a msgbuf
1173 * @param priority how important is the message (most plugins will
1174 * ignore message priority and just FIFO)
1175 * @param to how long to wait at most for the transmission (does not
1176 * require plugins to discard the message after the timeout,
1177 * just advisory for the desired delay; most plugins will ignore
1178 * this as well)
1179 * @param cont continuation to call once the message has
1180 * been transmitted (or if the transport is ready
1181 * for the next transmission call; or if the
1182 * peer disconnected...); can be NULL
1183 * @param cont_cls closure for @a cont
1184 * @return number of bytes used (on the physical network, with overheads);
1185 * -1 on hard errors (i.e. address invalid); 0 is a legal value
1186 * and does NOT mean that the message was not transmitted (DV)
1187 */
1188static ssize_t
1189unix_plugin_send (void *cls,
1190 struct Queue *queue,
1191 const char *msgbuf,
1192 size_t msgbuf_size,
1193 unsigned int priority,
1194 struct GNUNET_TIME_Relative to,
1195 GNUNET_TRANSPORT_TransmitContinuation cont,
1196 void *cont_cls)
1197{
1198 struct Plugin *plugin = cls;
1199 struct UNIXMessageWrapper *wrapper;
1200 struct UNIXMessage *message;
1201 int ssize;
1202
1203 if (GNUNET_OK !=
1204 GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map,
1205 &queue->target,
1206 queue))
1207 {
1208 LOG (GNUNET_ERROR_TYPE_ERROR,
1209 "Invalid queue for peer `%s' `%s'\n",
1210 GNUNET_i2s (&queue->target),
1211 unix_plugin_address_to_string (NULL,
1212 queue->address->address,
1213 queue->address->address_length));
1214 GNUNET_break (0);
1215 return GNUNET_SYSERR;
1216 }
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "Sending %u bytes with queue for peer `%s' `%s'\n",
1219 msgbuf_size,
1220 GNUNET_i2s (&queue->target),
1221 unix_plugin_address_to_string (NULL,
1222 queue->address->address,
1223 queue->address->address_length));
1224 ssize = sizeof (struct UNIXMessage) + msgbuf_size;
1225 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
1226 message->header.size = htons (ssize);
1227 message->header.type = htons (0);
1228 GNUNET_memcpy (&message->sender, plugin->env->my_identity,
1229 sizeof (struct GNUNET_PeerIdentity));
1230 GNUNET_memcpy (&message[1], msgbuf, msgbuf_size);
1231 wrapper = GNUNET_new (struct UNIXMessageWrapper);
1232 wrapper->msg = message;
1233 wrapper->msgsize = ssize;
1234 wrapper->payload = msgbuf_size;
1235 wrapper->priority = priority;
1236 wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
1237 to);
1238 wrapper->cont = cont;
1239 wrapper->cont_cls = cont_cls;
1240 wrapper->queue = queue;
1241 GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head,
1242 plugin->msg_tail,
1243 wrapper);
1244 plugin->bytes_in_queue += ssize;
1245 queue->bytes_in_queue += ssize;
1246 queue->msgs_in_queue++;
1247 GNUNET_STATISTICS_set (plugin->env->stats,
1248 "# bytes currently in UNIX buffers",
1249 plugin->bytes_in_queue,
1250 GNUNET_NO);
1251 notify_queue_monitor (plugin,
1252 queue,
1253 GNUNET_TRANSPORT_SS_UPDATE);
1254 if (NULL == plugin->write_task)
1255 plugin->write_task =
1256 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1257 unix_sock,
1258 &unix_plugin_select_write, plugin);
1259 return ssize;
1260}
1261
1262
1263/**
1264 * Signature of functions implementing the
1265 * sending functionality of a message queue.
1266 *
1267 * @param mq the message queue
1268 * @param msg the message to send
1269 * @param impl_state state of the implementation
1270 */
1271static void
1272mq_send (struct GNUNET_MQ_Handle *mq,
1273 const struct GNUNET_MessageHeader *msg,
1274 void *impl_state)
1275{
1276}
1277
1278
1279/**
1280 * Signature of functions implementing the
1281 * destruction of a message queue.
1282 * Implementations must not free @a mq, but should
1283 * take care of @a impl_state.
1284 *
1285 * @param mq the message queue to destroy
1286 * @param impl_state state of the implementation
1287 */
1288static void
1289mq_destroy (struct GNUNET_MQ_Handle *mq,
1290 void *impl_state)
1291{
1292}
1293
1294
1295/**
1296 * Implementation function that cancels the currently sent message.
1297 *
1298 * @param mq message queue
1299 * @param impl_state state specific to the implementation
1300 */
1301static void
1302mq_cancel (struct GNUNET_MQ_Handle *mq,
1303 void *impl_state)
1304{
1305}
1306
1307
1308/**
1309 * Generic error handler, called with the appropriate
1310 * error code and the same closure specified at the creation of
1311 * the message queue.
1312 * Not every message queue implementation supports an error handler.
1313 *
1314 * @param cls closure
1315 * @param error error code
1316 */
1317static void
1318mq_error (void *cls,
1319 enum GNUNET_MQ_Error error)
1320{
1321}
1322
1323
1324
1325/**
1326 * Function called by the transport service to initialize a
1327 * message queue given address information about another peer.
1328 * If and when the communication channel is established, the
1329 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
1330 * to notify the service that the channel is now up. It is
1331 * the responsibility of the communicator to manage sane
1332 * retries and timeouts for any @a peer/@a address combination
1333 * provided by the transport service. Timeouts and retries
1334 * do not need to be signalled to the transport service.
1335 *
1336 * @param cls closure
1337 * @param peer identity of the other peer
1338 * @param address where to send the message, human-readable
1339 * communicator-specific format, 0-terminated, UTF-8
1340 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
1341 */
1342static int
1343mq_init (void *cls,
1344 const struct GNUNET_PeerIdentity *peer,
1345 const void *address)
1346{
1347 struct Queue *queue;
1348 char *a;
1349 char *e;
1350 int is_abs;
1351 sockaddr_un *un;
1352 socklen_t un_len;
1353
1354 if (NULL == strncmp (address,
1355 COMMUNICATOR_NAME "-",
1356 strlen (COMMUNICATOR_NAME "-")))
1357 {
1358 GNUNET_break_op (0);
1359 return GNUNET_SYSERR;
1360 }
1361 a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]);
1362 e = strchr (a,
1363 (unsigned char) '#');
1364 if (NULL == e)
1365 {
1366 GNUNET_free (a);
1367 GNUNET_break_op (0);
1368 return GNUNET_SYSERR;
1369 }
1370 is_abs = ('1' == e[1]);
1371 *e = '\0';
1372 un = unix_address_to_sockaddr (a,
1373 &un_len,
1374 is_abs);
1375 queue = lookup_queue (peer,
1376 un,
1377 un_len);
1378 if (NULL != queue)
1379 {
1380 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1381 "Address `%s' ignored, queue exists\n",
1382 address);
1383 GNUNET_free (un);
1384 return GNUNET_OK;
1385 }
1386 queue = GNUNET_new (struct Queue);
1387 queue->target = *peer;
1388 queue->address = un;
1389 queue->address_len = un_len;
1390 (void) GNUNET_CONTAINER_multihashmap_put (queue_map,
1391 &queue->target,
1392 queue,
1393 GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1394 GNUNET_STATISTICS_set (stats,
1395 "# UNIX queues active",
1396 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map),
1397 GNUNET_NO);
1398 queue->timeout = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1399 &queue_timeout,
1400 queue);
1401 queue->mq
1402 = GNUNET_MQ_queue_for_callbacks (&mq_send,
1403 &mq_destroy,
1404 &mq_cancel,
1405 queue,
1406 NULL,
1407 &mq_error,
1408 queue);
1409 queue->qh
1410 = GNUNET_TRANSPORT_communicator_mq_add (ch,
1411 &queue->target,
1412 address,
1413 ATS,
1414 queue->mq);
1415 return GNUNET_OK;
1416}
1417
1418
1419/**
1420 * Shutdown the UNIX communicator.
1421 *
1422 * @param cls NULL (always)
1423 */
1424static void
1425do_shutdown (void *cls)
1426{
1427 struct UNIXMessageWrapper *msgw;
1428
1429 while (NULL != (msgw = msg_head))
1430 {
1431 GNUNET_CONTAINER_DLL_remove (msg_head,
1432 msg_tail,
1433 msgw);
1434 queue = msgw->queue;
1435 queue->msgs_in_queue--;
1436 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
1437 queue->bytes_in_queue -= msgw->msgsize;
1438 GNUNET_assert (bytes_in_queue >= msgw->msgsize);
1439 bytes_in_queue -= msgw->msgsize;
1440 GNUNET_free (msgw->msg);
1441 GNUNET_free (msgw);
1442 }
1443 if (NULL != read_task)
1444 {
1445 GNUNET_SCHEDULER_cancel (read_task);
1446 read_task = NULL;
1447 }
1448 if (NULL != write_task)
1449 {
1450 GNUNET_SCHEDULER_cancel (write_task);
1451 write_task = NULL;
1452 }
1453 if (NULL != unix_sock)
1454 {
1455 GNUNET_break (GNUNET_OK ==
1456 GNUNET_NETWORK_socket_close (unix_sock));
1457 unix_sock = NULL;
1458 }
1459 GNUNET_CONTAINER_multipeermap_iterate (queue_map,
1460 &get_queue_delete_it,
1461 NULL);
1462 GNUNET_CONTAINER_multipeermap_destroy (queue_map);
1463 if (NULL != ai)
1464 {
1465 GNUNET_TRANSPORT_communicator_address_remove (ai);
1466 ai = NULL;
1467 }
1468 if (NULL != ch)
1469 {
1470 GNUNET_TRANSPORT_communicator_disconnect (ch);
1471 ch = NULL;
1472 }
1473 GNUNET_break (0 == bytes_in_queue);
1474}
1475
1476
1477/**
1478 * Setup communicator and launch network interactions.
1479 *
1480 * @param cls NULL (always)
1481 * @param args remaining command-line arguments
1482 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1483 * @param cfg configuration
1484 */
1485static void
1486run (void *cls,
1487 char *const *args,
1488 const char *cfgfile,
1489 const struct GNUNET_CONFIGURATION_Handle *cfg)
1490{
1491 char *unix_socket_path;
1492 int is_abstract;
1493 struct sockaddr_un *un;
1494 socklen_t un_len;
1495 char *my_addr;
1496 (void) cls;
1497
1498 if (GNUNET_OK !=
1499 GNUNET_CONFIGURATION_get_value_filename (cfg,
1500 "transport-unix",
1501 "UNIXPATH",
1502 &unix_socket_path))
1503 {
1504 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1505 "communicator-unix",
1506 "UNIXPATH");
1507 return;
1508 }
1509
1510 /* Initialize my flags */
1511 is_abstract = 0;
1512#ifdef LINUX
1513 is_abstract
1514 = GNUNET_CONFIGURATION_get_value_yesno (cfg,
1515 "testing",
1516 "USE_ABSTRACT_SOCKETS");
1517#endif
1518 un = unix_address_to_sockaddr (unix_socket_path,
1519 &un_len,
1520 is_abstract);
1521 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1522 SOCK_DGRAM,
1523 0);
1524 if (NULL == unix_sock)
1525 {
1526 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1527 "socket");
1528 GNUNET_free (un);
1529 GNUNET_free (unix_socket_path);
1530 return;
1531 }
1532 if ( ('\0' != un->sun_path[0]) &&
1533 (GNUNET_OK !=
1534 GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1535 {
1536 LOG (GNUNET_ERROR_TYPE_ERROR,
1537 _("Cannot create path to `%s'\n"),
1538 un->sun_path);
1539 GNUNET_NETWORK_socket_close (unix_sock);
1540 unix_sock = NULL;
1541 GNUNET_free (un);
1542 GNUNET_free (unix_socket_path);
1543 return;
1544 }
1545 if (GNUNET_OK !=
1546 GNUNET_NETWORK_socket_bind (unix_sock,
1547 (const struct sockaddr *) un,
1548 un_len))
1549 {
1550 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1551 "bind");
1552 LOG (GNUNET_ERROR_TYPE_ERROR,
1553 _("Cannot bind to `%s'\n"),
1554 un->sun_path);
1555 GNUNET_NETWORK_socket_close (unix_sock);
1556 unix_sock = NULL;
1557 GNUNET_free (un);
1558 GNUNET_free (unix_socket_path);
1559 return;
1560 }
1561 GNUNET_free (un);
1562 LOG (GNUNET_ERROR_TYPE_DEBUG,
1563 "Bound to `%s'\n",
1564 unix_socket_path);
1565 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1566 NULL);
1567 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1568 unix_sock,
1569 &unix_plugin_select_read,
1570 NULL);
1571 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1572 GNUNET_NO);
1573 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1574 "unix",
1575 65535,
1576 &mq_init,
1577 NULL);
1578 if (NULL == ch)
1579 {
1580 GNUNET_break (0);
1581 GNUNET_SCHEDULER_shutdown ();
1582 GNUNET_free (unix_socket_path);
1583 return;
1584 }
1585 GNUNET_asprintf (&my_addr,
1586 "%s-%s#%d",
1587 COMMUNICATOR_NAME,
1588 unix_socket_path,
1589 is_abstract);
1590
1591 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1592 my_addr,
1593 GNUNET_ATS_NET_LOOPBACK,
1594 GNUNET_TIME_UNIT_FOREVER_REL);
1595 GNUNET_free (my_addr);
1596 GNUNET_free (unix_socket_path);
1597}
1598
1599
1600/**
1601 * The main function for the UNIX communicator.
1602 *
1603 * @param argc number of arguments from the command line
1604 * @param argv command line arguments
1605 * @return 0 ok, 1 on error
1606 */
1607int
1608main (int argc,
1609 char *const *argv)
1610{
1611 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1612 GNUNET_GETOPT_OPTION_END
1613 };
1614 int ret;
1615
1616 if (GNUNET_OK !=
1617 GNUNET_STRINGS_get_utf8_args (argc, argv,
1618 &argc, &argv))
1619 return 2;
1620
1621 ret =
1622 (GNUNET_OK ==
1623 GNUNET_PROGRAM_run (argc, argv,
1624 "gnunet-communicator-unix",
1625 _("GNUnet UNIX domain socket communicator"),
1626 options,
1627 &run,
1628 NULL)) ? 0 : 1;
1629 GNUNET_free ((void*) argv);
1630 return ret;
1631}
1632
1633
1634#if defined(LINUX) && defined(__GLIBC__)
1635#include <malloc.h>
1636
1637/**
1638 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1639 */
1640void __attribute__ ((constructor))
1641GNUNET_ARM_memory_init ()
1642{
1643 mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1644 mallopt (M_TOP_PAD, 1 * 1024);
1645 malloc_trim (0);
1646}
1647#endif
1648
1649/* end of gnunet-communicator-unix.c */