aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-communicator-unix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-communicator-unix.c')
-rw-r--r--src/transport/gnunet-communicator-unix.c1148
1 files changed, 1148 insertions, 0 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
new file mode 100644
index 000000000..cd3ae5dce
--- /dev/null
+++ b/src/transport/gnunet-communicator-unix.c
@@ -0,0 +1,1148 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/gnunet-communicator-unix.c
21 * @brief Transport plugin using unix domain sockets (!)
22 * Clearly, can only be used locally on Unix/Linux hosts...
23 * ONLY INTENDED FOR TESTING!!!
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet_transport_communication_service.h"
32
33/**
34 * How many messages do we keep at most in the queue to the
35 * transport service before we start to drop (default,
36 * can be changed via the configuration file).
37 * Should be _below_ the level of the communicator API, as
38 * otherwise we may read messages just to have them dropped
39 * by the communicator API.
40 */
41#define DEFAULT_MAX_QUEUE_LENGTH 8
42
43/**
44 * Address prefix used by the communicator.
45 */
46#define COMMUNICATOR_ADDRESS_PREFIX "unix"
47
48/**
49 * Configuration section used by the communicator.
50 */
51#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
52
53
54GNUNET_NETWORK_STRUCT_BEGIN
55
56/**
57 * UNIX Message-Packet header.
58 */
59struct UNIXMessage
60{
61 /**
62 * Message header.
63 */
64 struct GNUNET_MessageHeader header;
65
66 /**
67 * What is the identity of the sender (GNUNET_hash of public key)
68 */
69 struct GNUNET_PeerIdentity sender;
70
71};
72
73GNUNET_NETWORK_STRUCT_END
74
75
76/**
77 * Handle for a queue.
78 */
79struct Queue
80{
81
82 /**
83 * Queues with pending messages (!) are kept in a DLL.
84 */
85 struct Queue *next;
86
87 /**
88 * Queues with pending messages (!) are kept in a DLL.
89 */
90 struct Queue *prev;
91
92 /**
93 * To whom are we talking to.
94 */
95 struct GNUNET_PeerIdentity target;
96
97 /**
98 * Address of the other peer.
99 */
100 struct sockaddr_un *address;
101
102 /**
103 * Length of the address.
104 */
105 socklen_t address_len;
106
107 /**
108 * Message currently scheduled for transmission, non-NULL if and only
109 * if this queue is in the #queue_head DLL.
110 */
111 const struct GNUNET_MessageHeader *msg;
112
113 /**
114 * Message queue we are providing for the #ch.
115 */
116 struct GNUNET_MQ_Handle *mq;
117
118 /**
119 * handle for this queue with the #ch.
120 */
121 struct GNUNET_TRANSPORT_QueueHandle *qh;
122
123 /**
124 * Number of bytes we currently have in our write queue.
125 */
126 unsigned long long bytes_in_queue;
127
128 /**
129 * Timeout for this queue.
130 */
131 struct GNUNET_TIME_Absolute timeout;
132
133 /**
134 * Queue timeout task.
135 */
136 struct GNUNET_SCHEDULER_Task *timeout_task;
137
138};
139
140
141/**
142 * ID of read task
143 */
144static struct GNUNET_SCHEDULER_Task *read_task;
145
146/**
147 * ID of write task
148 */
149static struct GNUNET_SCHEDULER_Task *write_task;
150
151/**
152 * Number of messages we currently have in our queues towards the transport service.
153 */
154static unsigned long long delivering_messages;
155
156/**
157 * Maximum queue length before we stop reading towards the transport service.
158 */
159static unsigned long long max_queue_length;
160
161/**
162 * For logging statistics.
163 */
164static struct GNUNET_STATISTICS_Handle *stats;
165
166/**
167 * Our environment.
168 */
169static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
170
171/**
172 * Queues (map from peer identity to `struct Queue`)
173 */
174static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
175
176/**
177 * Head of queue of messages to transmit.
178 */
179static struct Queue *queue_head;
180
181/**
182 * Tail of queue of messages to transmit.
183 */
184static struct Queue *queue_tail;
185
186/**
187 * socket that we transmit all data with
188 */
189static struct GNUNET_NETWORK_Handle *unix_sock;
190
191/**
192 * Handle to the operation that publishes our address.
193 */
194static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
195
196
197/**
198 * Functions with this signature are called whenever we need
199 * to close a queue due to a disconnect or failure to
200 * establish a connection.
201 *
202 * @param queue queue to close down
203 */
204static void
205queue_destroy (struct Queue *queue)
206{
207 struct GNUNET_MQ_Handle *mq;
208
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Disconnecting queue for peer `%s'\n",
211 GNUNET_i2s (&queue->target));
212 if (0 != queue->bytes_in_queue)
213 {
214 GNUNET_CONTAINER_DLL_remove (queue_head,
215 queue_tail,
216 queue);
217 queue->bytes_in_queue = 0;
218 }
219 if (NULL != (mq = queue->mq))
220 {
221 queue->mq = NULL;
222 GNUNET_MQ_destroy (mq);
223 }
224 GNUNET_assert (GNUNET_YES ==
225 GNUNET_CONTAINER_multipeermap_remove (queue_map,
226 &queue->target,
227 queue));
228 GNUNET_STATISTICS_set (stats,
229 "# UNIX queues active",
230 GNUNET_CONTAINER_multipeermap_size (queue_map),
231 GNUNET_NO);
232 if (NULL != queue->timeout_task)
233 {
234 GNUNET_SCHEDULER_cancel (queue->timeout_task);
235 queue->timeout_task = NULL;
236 }
237 GNUNET_free (queue->address);
238 GNUNET_free (queue);
239}
240
241
242/**
243 * Queue was idle for too long, so disconnect it
244 *
245 * @param cls the `struct Queue *` to disconnect
246 */
247static void
248queue_timeout (void *cls)
249{
250 struct Queue *queue = cls;
251 struct GNUNET_TIME_Relative left;
252
253 queue->timeout_task = NULL;
254 left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
255 if (0 != left.rel_value_us)
256 {
257 /* not actually our turn yet, but let's at least update
258 the monitor, it may think we're about to die ... */
259 queue->timeout_task
260 = GNUNET_SCHEDULER_add_delayed (left,
261 &queue_timeout,
262 queue);
263 return;
264 }
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Queue %p was idle for %s, disconnecting\n",
267 queue,
268 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
269 GNUNET_YES));
270 queue_destroy (queue);
271}
272
273
274/**
275 * Increment queue timeout due to activity. We do not immediately
276 * notify the monitor here as that might generate excessive
277 * signalling.
278 *
279 * @param queue queue for which the timeout should be rescheduled
280 */
281static void
282reschedule_queue_timeout (struct Queue *queue)
283{
284 GNUNET_assert (NULL != queue->timeout_task);
285 queue->timeout
286 = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
287}
288
289
290/**
291 * Convert unix path to a `struct sockaddr_un *`
292 *
293 * @param unixpath path to convert
294 * @param[out] sock_len set to the length of the address
295 * @param is_abstract is this an abstract @a unixpath
296 * @return converted unix path
297 */
298static struct sockaddr_un *
299unix_address_to_sockaddr (const char *unixpath,
300 socklen_t *sock_len)
301{
302 struct sockaddr_un *un;
303 size_t slen;
304
305 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
306 un = GNUNET_new (struct sockaddr_un);
307 un->sun_family = AF_UNIX;
308 slen = strlen (unixpath);
309 if (slen >= sizeof (un->sun_path))
310 slen = sizeof (un->sun_path) - 1;
311 GNUNET_memcpy (un->sun_path,
312 unixpath,
313 slen);
314 un->sun_path[slen] = '\0';
315 slen = sizeof (struct sockaddr_un);
316#if HAVE_SOCKADDR_UN_SUN_LEN
317 un->sun_len = (u_char) slen;
318#endif
319 (*sock_len) = slen;
320 if ('@' == un->sun_path[0])
321 un->sun_path[0] = '\0';
322 return un;
323}
324
325
326/**
327 * Closure to #lookup_queue_it().
328 */
329struct LookupCtx
330{
331 /**
332 * Location to store the queue, if found.
333 */
334 struct Queue *res;
335
336 /**
337 * Address we are looking for.
338 */
339 const struct sockaddr_un *un;
340
341 /**
342 * Number of bytes in @a un
343 */
344 socklen_t un_len;
345};
346
347
348/**
349 * Function called to find a queue by address.
350 *
351 * @param cls the `struct LookupCtx *`
352 * @param key peer we are looking for (unused)
353 * @param value a queue
354 * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
355 */
356static int
357lookup_queue_it (void *cls,
358 const struct GNUNET_PeerIdentity *key,
359 void *value)
360{
361 struct LookupCtx *lctx = cls;
362 struct Queue *queue = value;
363
364 if ( (queue->address_len = lctx->un_len) &&
365 (0 == memcmp (lctx->un,
366 queue->address,
367 queue->address_len)) )
368 {
369 lctx->res = queue;
370 return GNUNET_NO;
371 }
372 return GNUNET_YES;
373}
374
375
376/**
377 * Find an existing queue by address.
378 *
379 * @param plugin the plugin
380 * @param address the address to find
381 * @return NULL if queue was not found
382 */
383static struct Queue *
384lookup_queue (const struct GNUNET_PeerIdentity *peer,
385 const struct sockaddr_un *un,
386 socklen_t un_len)
387{
388 struct LookupCtx lctx;
389
390 lctx.un = un;
391 lctx.un_len = un_len;
392 GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
393 peer,
394 &lookup_queue_it,
395 &lctx);
396 return lctx.res;
397}
398
399
400/**
401 * We have been notified that our socket is ready to write.
402 * Then reschedule this function to be called again once more is available.
403 *
404 * @param cls NULL
405 */
406static void
407select_write_cb (void *cls)
408{
409 struct Queue *queue = queue_tail;
410 const struct GNUNET_MessageHeader *msg = queue->msg;
411 size_t msg_size = ntohs (msg->size);
412 ssize_t sent;
413
414 /* take queue of the ready list */
415 write_task = NULL;
416 GNUNET_CONTAINER_DLL_remove (queue_head,
417 queue_tail,
418 queue);
419 if (NULL != queue_head)
420 write_task =
421 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
422 unix_sock,
423 &select_write_cb,
424 NULL);
425
426 /* send 'msg' */
427 queue->msg = NULL;
428 GNUNET_MQ_impl_send_continue (queue->mq);
429 resend:
430 /* Send the data */
431 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
432 queue->msg,
433 msg_size,
434 (const struct sockaddr *) queue->address,
435 queue->address_len);
436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437 "UNIX transmitted message to %s (%d/%u: %s)\n",
438 GNUNET_i2s (&queue->target),
439 (int) sent,
440 (unsigned int) msg_size,
441 (sent < 0) ? STRERROR (errno) : "ok");
442 if (-1 != sent)
443 {
444 GNUNET_STATISTICS_update (stats,
445 "# bytes sent",
446 (long long) sent,
447 GNUNET_NO);
448 reschedule_queue_timeout (queue);
449 return; /* all good */
450 }
451 GNUNET_STATISTICS_update (stats,
452 "# network transmission failures",
453 1,
454 GNUNET_NO);
455 switch (errno)
456 {
457 case EAGAIN:
458 case ENOBUFS:
459 /* We should retry later... */
460 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
461 "send");
462 return;
463 case EMSGSIZE:
464 {
465 socklen_t size = 0;
466 socklen_t len = sizeof (size);
467
468 GNUNET_NETWORK_socket_getsockopt (unix_sock,
469 SOL_SOCKET,
470 SO_SNDBUF,
471 &size,
472 &len);
473 if (size > ntohs (msg->size))
474 {
475 /* Buffer is bigger than message: error, no retry
476 * This should never happen!*/
477 GNUNET_break (0);
478 return;
479 }
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "Trying to increase socket buffer size from %u to %u for message size %u\n",
482 (unsigned int) size,
483 (unsigned int) ((msg_size / 1000) + 2) * 1000,
484 (unsigned int) msg_size);
485 size = ((msg_size / 1000) + 2) * 1000;
486 if (GNUNET_OK ==
487 GNUNET_NETWORK_socket_setsockopt (unix_sock,
488 SOL_SOCKET,
489 SO_SNDBUF,
490 &size,
491 sizeof (size)))
492 goto resend; /* Increased buffer size, retry sending */
493 /* Ok, then just try very modest increase */
494 size = msg_size;
495 if (GNUNET_OK ==
496 GNUNET_NETWORK_socket_setsockopt (unix_sock,
497 SOL_SOCKET,
498 SO_SNDBUF,
499 &size,
500 sizeof (size)))
501 goto resend; /* Increased buffer size, retry sending */
502 /* Could not increase buffer size: error, no retry */
503 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
504 "setsockopt");
505 return;
506 }
507 default:
508 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
509 "send");
510 return;
511 }
512}
513
514
515/**
516 * Signature of functions implementing the sending functionality of a
517 * message queue.
518 *
519 * @param mq the message queue
520 * @param msg the message to send
521 * @param impl_state our `struct Queue`
522 */
523static void
524mq_send (struct GNUNET_MQ_Handle *mq,
525 const struct GNUNET_MessageHeader *msg,
526 void *impl_state)
527{
528 struct Queue *queue = impl_state;
529
530 GNUNET_assert (mq == queue->mq);
531 GNUNET_assert (NULL == queue->msg);
532 queue->msg = msg;
533 GNUNET_CONTAINER_DLL_insert (queue_head,
534 queue_tail,
535 queue);
536 if (NULL == write_task)
537 write_task =
538 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
539 unix_sock,
540 &select_write_cb,
541 NULL);
542}
543
544
545/**
546 * Signature of functions implementing the destruction of a message
547 * queue. Implementations must not free @a mq, but should take care
548 * of @a impl_state.
549 *
550 * @param mq the message queue to destroy
551 * @param impl_state our `struct Queue`
552 */
553static void
554mq_destroy (struct GNUNET_MQ_Handle *mq,
555 void *impl_state)
556{
557 struct Queue *queue = impl_state;
558
559 if (mq == queue->mq)
560 {
561 queue->mq = NULL;
562 queue_destroy (queue);
563 }
564}
565
566
567/**
568 * Implementation function that cancels the currently sent message.
569 *
570 * @param mq message queue
571 * @param impl_state our `struct Queue`
572 */
573static void
574mq_cancel (struct GNUNET_MQ_Handle *mq,
575 void *impl_state)
576{
577 struct Queue *queue = impl_state;
578
579 GNUNET_assert (NULL != queue->msg);
580 queue->msg = NULL;
581 GNUNET_CONTAINER_DLL_remove (queue_head,
582 queue_tail,
583 queue);
584 GNUNET_assert (NULL != write_task);
585 if (NULL == queue_head)
586 {
587 GNUNET_SCHEDULER_cancel (write_task);
588 write_task = NULL;
589 }
590}
591
592
593/**
594 * Generic error handler, called with the appropriate
595 * error code and the same closure specified at the creation of
596 * the message queue.
597 * Not every message queue implementation supports an error handler.
598 *
599 * @param cls our `struct Queue`
600 * @param error error code
601 */
602static void
603mq_error (void *cls,
604 enum GNUNET_MQ_Error error)
605{
606 struct Queue *queue = cls;
607
608 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
609 "UNIX MQ error in queue to %s: %d\n",
610 GNUNET_i2s (&queue->target),
611 (int) error);
612 queue_destroy (queue);
613}
614
615
616/**
617 * Creates a new outbound queue the transport service will use to send
618 * data to another peer.
619 *
620 * @param peer the target peer
621 * @param un the address
622 * @param un_len number of bytes in @a un
623 * @return the queue or NULL of max connections exceeded
624 */
625static struct Queue *
626setup_queue (const struct GNUNET_PeerIdentity *target,
627 const struct sockaddr_un *un,
628 socklen_t un_len)
629{
630 struct Queue *queue;
631
632 queue = GNUNET_new (struct Queue);
633 queue->target = *target;
634 queue->address = GNUNET_memdup (un,
635 un_len);
636 queue->address_len = un_len;
637 (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
638 &queue->target,
639 queue,
640 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
641 GNUNET_STATISTICS_set (stats,
642 "# queues active",
643 GNUNET_CONTAINER_multipeermap_size (queue_map),
644 GNUNET_NO);
645 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
646 queue->timeout_task
647 = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
648 &queue_timeout,
649 queue);
650 queue->mq
651 = GNUNET_MQ_queue_for_callbacks (&mq_send,
652 &mq_destroy,
653 &mq_cancel,
654 queue,
655 NULL,
656 &mq_error,
657 queue);
658 {
659 char *foreign_addr;
660
661 if ('\0' == un->sun_path[0])
662 GNUNET_asprintf (&foreign_addr,
663 "%s-@%s",
664 COMMUNICATOR_ADDRESS_PREFIX,
665 &un->sun_path[1]);
666 else
667 GNUNET_asprintf (&foreign_addr,
668 "%s-%s",
669 COMMUNICATOR_ADDRESS_PREFIX,
670 un->sun_path);
671 queue->qh
672 = GNUNET_TRANSPORT_communicator_mq_add (ch,
673 &queue->target,
674 foreign_addr,
675 GNUNET_ATS_NET_LOOPBACK,
676 queue->mq);
677 GNUNET_free (foreign_addr);
678 }
679 return queue;
680}
681
682
683/**
684 * We have been notified that our socket has something to read. Do the
685 * read and reschedule this function to be called again once more is
686 * available.
687 *
688 * @param cls NULL
689 */
690static void
691select_read_cb (void *cls);
692
693
694/**
695 * Function called when message was successfully passed to
696 * transport service. Continue read activity.
697 *
698 * @param cls NULL
699 * @param success #GNUNET_OK on success
700 */
701static void
702receive_complete_cb (void *cls,
703 int success)
704{
705 delivering_messages--;
706 if (GNUNET_OK != success)
707 GNUNET_STATISTICS_update (stats,
708 "# transport transmission failures",
709 1,
710 GNUNET_NO);
711 if ( (NULL == read_task) &&
712 (delivering_messages < max_queue_length) )
713 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
714 unix_sock,
715 &select_read_cb,
716 NULL);
717}
718
719
720/**
721 * We have been notified that our socket has something to read. Do the
722 * read and reschedule this function to be called again once more is
723 * available.
724 *
725 * @param cls NULL
726 */
727static void
728select_read_cb (void *cls)
729{
730 char buf[65536] GNUNET_ALIGN;
731 struct Queue *queue;
732 const struct UNIXMessage *msg;
733 struct sockaddr_un un;
734 socklen_t addrlen;
735 ssize_t ret;
736 uint16_t msize;
737
738 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
739 unix_sock,
740 &select_read_cb,
741 NULL);
742 addrlen = sizeof (un);
743 memset (&un,
744 0,
745 sizeof (un));
746 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
747 buf,
748 sizeof (buf),
749 (struct sockaddr *) &un,
750 &addrlen);
751 if ( (-1 == ret) &&
752 ( (EAGAIN == errno) ||
753 (ENOBUFS == errno) ) )
754 return;
755 if (-1 == ret)
756 {
757 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
758 "recvfrom");
759 return;
760 }
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "Read %d bytes from socket %s\n",
763 (int) ret,
764 un.sun_path);
765 GNUNET_assert (AF_UNIX == (un.sun_family));
766 msg = (struct UNIXMessage *) buf;
767 msize = ntohs (msg->header.size);
768 if ( (msize < sizeof (struct UNIXMessage)) ||
769 (msize > ret) )
770 {
771 GNUNET_break_op (0);
772 return;
773 }
774 queue = lookup_queue (&msg->sender,
775 &un,
776 addrlen);
777 if (NULL == queue)
778 queue = setup_queue (&msg->sender,
779 &un,
780 addrlen);
781 else
782 reschedule_queue_timeout (queue);
783 if (NULL == queue)
784 {
785 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
786 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
787 return;
788 }
789
790 {
791 uint16_t offset = 0;
792 uint16_t tsize = msize - sizeof (struct UNIXMessage);
793 const char *msgbuf = (const char *) &msg[1];
794
795 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
796 {
797 const struct GNUNET_MessageHeader *currhdr;
798 struct GNUNET_MessageHeader al_hdr;
799 uint16_t csize;
800
801 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
802 /* ensure aligned access */
803 memcpy (&al_hdr,
804 currhdr,
805 sizeof (al_hdr));
806 csize = ntohs (al_hdr.size);
807 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
808 (csize > tsize - offset))
809 {
810 GNUNET_break_op (0);
811 break;
812 }
813 ret = GNUNET_TRANSPORT_communicator_receive (ch,
814 &msg->sender,
815 currhdr,
816 &receive_complete_cb,
817 NULL);
818 if (GNUNET_SYSERR == ret)
819 return; /* transport not up */
820 if (GNUNET_NO == ret)
821 break;
822 delivering_messages++;
823 offset += csize;
824 }
825 }
826 if (delivering_messages >= max_queue_length)
827 {
828 /* we should try to apply 'back pressure' */
829 GNUNET_SCHEDULER_cancel (read_task);
830 read_task = NULL;
831 }
832}
833
834
835/**
836 * Function called by the transport service to initialize a
837 * message queue given address information about another peer.
838 * If and when the communication channel is established, the
839 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
840 * to notify the service that the channel is now up. It is
841 * the responsibility of the communicator to manage sane
842 * retries and timeouts for any @a peer/@a address combination
843 * provided by the transport service. Timeouts and retries
844 * do not need to be signalled to the transport service.
845 *
846 * @param cls closure
847 * @param peer identity of the other peer
848 * @param address where to send the message, human-readable
849 * communicator-specific format, 0-terminated, UTF-8
850 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
851 */
852static int
853mq_init (void *cls,
854 const struct GNUNET_PeerIdentity *peer,
855 const char *address)
856{
857 struct Queue *queue;
858 const char *path;
859 struct sockaddr_un *un;
860 socklen_t un_len;
861
862 if (0 != strncmp (address,
863 COMMUNICATOR_ADDRESS_PREFIX "-",
864 strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
865 {
866 GNUNET_break_op (0);
867 return GNUNET_SYSERR;
868 }
869 path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
870 un = unix_address_to_sockaddr (path,
871 &un_len);
872 queue = lookup_queue (peer,
873 un,
874 un_len);
875 if (NULL != queue)
876 {
877 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
878 "Address `%s' for %s ignored, queue exists\n",
879 path,
880 GNUNET_i2s (peer));
881 GNUNET_free (un);
882 return GNUNET_OK;
883 }
884 queue = setup_queue (peer,
885 un,
886 un_len);
887 GNUNET_free (un);
888 if (NULL == queue)
889 {
890 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
891 "Failed to setup queue to %s at `%s'\n",
892 GNUNET_i2s (peer),
893 path);
894 return GNUNET_NO;
895 }
896 return GNUNET_OK;
897}
898
899
900/**
901 * Iterator over all message queues to clean up.
902 *
903 * @param cls NULL
904 * @param target unused
905 * @param value the queue to destroy
906 * @return #GNUNET_OK to continue to iterate
907 */
908static int
909get_queue_delete_it (void *cls,
910 const struct GNUNET_PeerIdentity *target,
911 void *value)
912{
913 struct Queue *queue = value;
914
915 (void) cls;
916 (void) target;
917 queue_destroy (queue);
918 return GNUNET_OK;
919}
920
921
922/**
923 * Shutdown the UNIX communicator.
924 *
925 * @param cls NULL (always)
926 */
927static void
928do_shutdown (void *cls)
929{
930 if (NULL != read_task)
931 {
932 GNUNET_SCHEDULER_cancel (read_task);
933 read_task = NULL;
934 }
935 if (NULL != write_task)
936 {
937 GNUNET_SCHEDULER_cancel (write_task);
938 write_task = NULL;
939 }
940 if (NULL != unix_sock)
941 {
942 GNUNET_break (GNUNET_OK ==
943 GNUNET_NETWORK_socket_close (unix_sock));
944 unix_sock = NULL;
945 }
946 GNUNET_CONTAINER_multipeermap_iterate (queue_map,
947 &get_queue_delete_it,
948 NULL);
949 GNUNET_CONTAINER_multipeermap_destroy (queue_map);
950 if (NULL != ai)
951 {
952 GNUNET_TRANSPORT_communicator_address_remove (ai);
953 ai = NULL;
954 }
955 if (NULL != ch)
956 {
957 GNUNET_TRANSPORT_communicator_disconnect (ch);
958 ch = NULL;
959 }
960 if (NULL != stats)
961 {
962 GNUNET_STATISTICS_destroy (stats,
963 GNUNET_NO);
964 stats = NULL;
965 }
966}
967
968
969/**
970 * Setup communicator and launch network interactions.
971 *
972 * @param cls NULL (always)
973 * @param args remaining command-line arguments
974 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
975 * @param cfg configuration
976 */
977static void
978run (void *cls,
979 char *const *args,
980 const char *cfgfile,
981 const struct GNUNET_CONFIGURATION_Handle *cfg)
982{
983 char *unix_socket_path;
984 struct sockaddr_un *un;
985 socklen_t un_len;
986 char *my_addr;
987 (void) cls;
988
989 if (GNUNET_OK !=
990 GNUNET_CONFIGURATION_get_value_filename (cfg,
991 COMMUNICATOR_CONFIG_SECTION,
992 "UNIXPATH",
993 &unix_socket_path))
994 {
995 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
996 COMMUNICATOR_CONFIG_SECTION,
997 "UNIXPATH");
998 return;
999 }
1000 if (GNUNET_OK !=
1001 GNUNET_CONFIGURATION_get_value_number (cfg,
1002 COMMUNICATOR_CONFIG_SECTION,
1003 "MAX_QUEUE_LENGTH",
1004 &max_queue_length))
1005 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1006
1007 un = unix_address_to_sockaddr (unix_socket_path,
1008 &un_len);
1009 if (NULL == un)
1010 {
1011 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1012 "Failed to setup UNIX domain socket address with path `%s'\n",
1013 unix_socket_path);
1014 GNUNET_free (unix_socket_path);
1015 return;
1016 }
1017 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1018 SOCK_DGRAM,
1019 0);
1020 if (NULL == unix_sock)
1021 {
1022 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1023 "socket");
1024 GNUNET_free (un);
1025 GNUNET_free (unix_socket_path);
1026 return;
1027 }
1028 if ( ('\0' != un->sun_path[0]) &&
1029 (GNUNET_OK !=
1030 GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1031 {
1032 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1033 _("Cannot create path to `%s'\n"),
1034 un->sun_path);
1035 GNUNET_NETWORK_socket_close (unix_sock);
1036 unix_sock = NULL;
1037 GNUNET_free (un);
1038 GNUNET_free (unix_socket_path);
1039 return;
1040 }
1041 if (GNUNET_OK !=
1042 GNUNET_NETWORK_socket_bind (unix_sock,
1043 (const struct sockaddr *) un,
1044 un_len))
1045 {
1046 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1047 "bind",
1048 un->sun_path);
1049 GNUNET_NETWORK_socket_close (unix_sock);
1050 unix_sock = NULL;
1051 GNUNET_free (un);
1052 GNUNET_free (unix_socket_path);
1053 return;
1054 }
1055 GNUNET_free (un);
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057 "Bound to `%s'\n",
1058 unix_socket_path);
1059 stats = GNUNET_STATISTICS_create ("C-UNIX",
1060 cfg);
1061 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1062 NULL);
1063 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1064 unix_sock,
1065 &select_read_cb,
1066 NULL);
1067 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1068 GNUNET_NO);
1069 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1070 COMMUNICATOR_CONFIG_SECTION,
1071 COMMUNICATOR_ADDRESS_PREFIX,
1072 65535,
1073 &mq_init,
1074 NULL);
1075 if (NULL == ch)
1076 {
1077 GNUNET_break (0);
1078 GNUNET_SCHEDULER_shutdown ();
1079 GNUNET_free (unix_socket_path);
1080 return;
1081 }
1082 GNUNET_asprintf (&my_addr,
1083 "%s-%s",
1084 COMMUNICATOR_ADDRESS_PREFIX,
1085 unix_socket_path);
1086 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1087 my_addr,
1088 GNUNET_ATS_NET_LOOPBACK,
1089 GNUNET_TIME_UNIT_FOREVER_REL);
1090 GNUNET_free (my_addr);
1091 GNUNET_free (unix_socket_path);
1092 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1093 unix_sock,
1094 &select_read_cb,
1095 NULL);
1096}
1097
1098
1099/**
1100 * The main function for the UNIX communicator.
1101 *
1102 * @param argc number of arguments from the command line
1103 * @param argv command line arguments
1104 * @return 0 ok, 1 on error
1105 */
1106int
1107main (int argc,
1108 char *const *argv)
1109{
1110 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1111 GNUNET_GETOPT_OPTION_END
1112 };
1113 int ret;
1114
1115 if (GNUNET_OK !=
1116 GNUNET_STRINGS_get_utf8_args (argc, argv,
1117 &argc, &argv))
1118 return 2;
1119
1120 ret =
1121 (GNUNET_OK ==
1122 GNUNET_PROGRAM_run (argc, argv,
1123 "gnunet-communicator-unix",
1124 _("GNUnet UNIX domain socket communicator"),
1125 options,
1126 &run,
1127 NULL)) ? 0 : 1;
1128 GNUNET_free ((void*) argv);
1129 return ret;
1130}
1131
1132
1133#if defined(LINUX) && defined(__GLIBC__)
1134#include <malloc.h>
1135
1136/**
1137 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1138 */
1139void __attribute__ ((constructor))
1140GNUNET_ARM_memory_init ()
1141{
1142 mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1143 mallopt (M_TOP_PAD, 1 * 1024);
1144 malloc_trim (0);
1145}
1146#endif
1147
1148/* end of gnunet-communicator-unix.c */