aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-communicator-unix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-communicator-unix.c')
-rw-r--r--src/transport/gnunet-communicator-unix.c1167
1 files changed, 0 insertions, 1167 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
deleted file mode 100644
index d7e18f87a..000000000
--- a/src/transport/gnunet-communicator-unix.c
+++ /dev/null
@@ -1,1167 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/gnunet-communicator-unix.c
23 * @brief Transport plugin using unix domain sockets (!)
24 * Clearly, can only be used locally on Unix/Linux hosts...
25 * ONLY INTENDED FOR TESTING!!!
26 * @author Christian Grothoff
27 * @author Nathan Evans
28 */
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_protocols.h"
32#include "gnunet_constants.h"
33#include "gnunet_nt_lib.h"
34#include "gnunet_statistics_service.h"
35#include "gnunet_transport_communication_service.h"
36
37/**
38 * How many messages do we keep at most in the queue to the
39 * transport service before we start to drop (default,
40 * can be changed via the configuration file).
41 * Should be _below_ the level of the communicator API, as
42 * otherwise we may read messages just to have them dropped
43 * by the communicator API.
44 */
45#define DEFAULT_MAX_QUEUE_LENGTH 8000
46
47/**
48 * Address prefix used by the communicator.
49 */
50#define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52/**
53 * Configuration section used by the communicator.
54 */
55#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57/**
58 * Our MTU.
59 */
60#ifndef DARWIN
61#define UNIX_MTU UINT16_MAX
62#else
63#define UNIX_MTU 2048
64#endif
65
66GNUNET_NETWORK_STRUCT_BEGIN
67
68/**
69 * UNIX Message-Packet header.
70 */
71struct UNIXMessage
72{
73 /**
74 * Message header.
75 */
76 struct GNUNET_MessageHeader header;
77
78 /**
79 * What is the identity of the sender (GNUNET_hash of public key)
80 */
81 struct GNUNET_PeerIdentity sender;
82};
83
84GNUNET_NETWORK_STRUCT_END
85
86
87/**
88 * Handle for a queue.
89 */
90struct Queue
91{
92 /**
93 * Queues with pending messages (!) are kept in a DLL.
94 */
95 struct Queue *next;
96
97 /**
98 * Queues with pending messages (!) are kept in a DLL.
99 */
100 struct Queue *prev;
101
102 /**
103 * To whom are we talking to.
104 */
105 struct GNUNET_PeerIdentity target;
106
107 /**
108 * Address of the other peer.
109 */
110 struct sockaddr_un *address;
111
112 /**
113 * Length of the address.
114 */
115 socklen_t address_len;
116
117 /**
118 * Message currently scheduled for transmission, non-NULL if and only
119 * if this queue is in the #queue_head DLL.
120 */
121 struct UNIXMessage *msg;
122
123 /**
124 * Message queue we are providing for the #ch.
125 */
126 struct GNUNET_MQ_Handle *mq;
127
128 /**
129 * handle for this queue with the #ch.
130 */
131 struct GNUNET_TRANSPORT_QueueHandle *qh;
132
133 /**
134 * Number of bytes we currently have in our write queue.
135 */
136 unsigned long long bytes_in_queue;
137
138 /**
139 * Timeout for this queue.
140 */
141 struct GNUNET_TIME_Absolute timeout;
142
143 /**
144 * Queue timeout task.
145 */
146 struct GNUNET_SCHEDULER_Task *timeout_task;
147};
148
149/**
150 * My Peer Identity
151 */
152static struct GNUNET_PeerIdentity my_identity;
153
154/**
155 * ID of read task
156 */
157static struct GNUNET_SCHEDULER_Task *read_task;
158
159/**
160 * ID of write task
161 */
162static struct GNUNET_SCHEDULER_Task *write_task;
163
164/**
165 * Number of messages we currently have in our queues towards the transport service.
166 */
167static unsigned long long delivering_messages;
168
169/**
170 * Maximum queue length before we stop reading towards the transport service.
171 */
172static unsigned long long max_queue_length;
173
174/**
175 * For logging statistics.
176 */
177static struct GNUNET_STATISTICS_Handle *stats;
178
179/**
180 * Our environment.
181 */
182static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
183
184/**
185 * Queues (map from peer identity to `struct Queue`)
186 */
187static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
188
189/**
190 * Head of queue of messages to transmit.
191 */
192static struct Queue *queue_head;
193
194/**
195 * Tail of queue of messages to transmit.
196 */
197static struct Queue *queue_tail;
198
199/**
200 * socket that we transmit all data with
201 */
202static struct GNUNET_NETWORK_Handle *unix_sock;
203
204/**
205 * Handle to the operation that publishes our address.
206 */
207static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
208
209
210/**
211 * Functions with this signature are called whenever we need
212 * to close a queue due to a disconnect or failure to
213 * establish a connection.
214 *
215 * @param queue queue to close down
216 */
217static void
218queue_destroy (struct Queue *queue)
219{
220 struct GNUNET_MQ_Handle *mq;
221
222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
223 "Disconnecting queue for peer `%s'\n",
224 GNUNET_i2s (&queue->target));
225 if (0 != queue->bytes_in_queue)
226 {
227 GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
228 queue->bytes_in_queue = 0;
229 }
230 if (NULL != (mq = queue->mq))
231 {
232 queue->mq = NULL;
233 GNUNET_MQ_destroy (mq);
234 }
235 GNUNET_assert (
236 GNUNET_YES ==
237 GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
238 GNUNET_STATISTICS_set (stats,
239 "# queues active",
240 GNUNET_CONTAINER_multipeermap_size (queue_map),
241 GNUNET_NO);
242 if (NULL != queue->timeout_task)
243 {
244 GNUNET_SCHEDULER_cancel (queue->timeout_task);
245 queue->timeout_task = NULL;
246 }
247 GNUNET_free (queue->address);
248 GNUNET_free (queue);
249}
250
251
252/**
253 * Queue was idle for too long, so disconnect it
254 *
255 * @param cls the `struct Queue *` to disconnect
256 */
257static void
258queue_timeout (void *cls)
259{
260 struct Queue *queue = cls;
261 struct GNUNET_TIME_Relative left;
262
263 queue->timeout_task = NULL;
264 left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
265 if (0 != left.rel_value_us)
266 {
267 /* not actually our turn yet, but let's at least update
268 the monitor, it may think we're about to die ... */
269 queue->timeout_task =
270 GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
271 return;
272 }
273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
274 "Queue %p was idle for %s, disconnecting\n",
275 queue,
276 GNUNET_STRINGS_relative_time_to_string (
277 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
278 GNUNET_YES));
279 queue_destroy (queue);
280}
281
282
283/**
284 * Increment queue timeout due to activity. We do not immediately
285 * notify the monitor here as that might generate excessive
286 * signalling.
287 *
288 * @param queue queue for which the timeout should be rescheduled
289 */
290static void
291reschedule_queue_timeout (struct Queue *queue)
292{
293 GNUNET_assert (NULL != queue->timeout_task);
294 queue->timeout =
295 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
296}
297
298
299/**
300 * Convert unix path to a `struct sockaddr_un *`
301 *
302 * @param unixpath path to convert
303 * @param[out] sock_len set to the length of the address
304 * @param is_abstract is this an abstract @a unixpath
305 * @return converted unix path
306 */
307static struct sockaddr_un *
308unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
309{
310 struct sockaddr_un *un;
311 size_t slen;
312
313 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
314 un = GNUNET_new (struct sockaddr_un);
315 un->sun_family = AF_UNIX;
316 slen = strlen (unixpath);
317 if (slen >= sizeof(un->sun_path))
318 slen = sizeof(un->sun_path) - 1;
319 GNUNET_memcpy (un->sun_path, unixpath, slen);
320 un->sun_path[slen] = '\0';
321 slen = sizeof(struct sockaddr_un);
322#if HAVE_SOCKADDR_UN_SUN_LEN
323 un->sun_len = (u_char) slen;
324#endif
325 (*sock_len) = slen;
326 if ('@' == un->sun_path[0])
327 un->sun_path[0] = '\0';
328 return un;
329}
330
331
332/**
333 * Closure to #lookup_queue_it().
334 */
335struct LookupCtx
336{
337 /**
338 * Location to store the queue, if found.
339 */
340 struct Queue *res;
341
342 /**
343 * Address we are looking for.
344 */
345 const struct sockaddr_un *un;
346
347 /**
348 * Number of bytes in @a un
349 */
350 socklen_t un_len;
351};
352
353
354/**
355 * Function called to find a queue by address.
356 *
357 * @param cls the `struct LookupCtx *`
358 * @param key peer we are looking for (unused)
359 * @param value a queue
360 * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
361 */
362static int
363lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
364{
365 struct LookupCtx *lctx = cls;
366 struct Queue *queue = value;
367
368 if ((queue->address_len == lctx->un_len) &&
369 (0 == memcmp (lctx->un, queue->address, queue->address_len)))
370 {
371 lctx->res = queue;
372 return GNUNET_NO;
373 }
374 return GNUNET_YES;
375}
376
377
378/**
379 * Find an existing queue by address.
380 *
381 * @param plugin the plugin
382 * @param address the address to find
383 * @return NULL if queue was not found
384 */
385static struct Queue *
386lookup_queue (const struct GNUNET_PeerIdentity *peer,
387 const struct sockaddr_un *un,
388 socklen_t un_len)
389{
390 struct LookupCtx lctx;
391
392 lctx.un = un;
393 lctx.un_len = un_len;
394 lctx.res = NULL;
395 GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
396 peer,
397 &lookup_queue_it,
398 &lctx);
399 return lctx.res;
400}
401
402
403/**
404 * We have been notified that our socket is ready to write.
405 * Then reschedule this function to be called again once more is available.
406 *
407 * @param cls NULL
408 */
409static void
410select_write_cb (void *cls)
411{
412 struct Queue *queue = queue_tail;
413 const struct GNUNET_MessageHeader *msg = &queue->msg->header;
414 size_t msg_size = ntohs (msg->size);
415 ssize_t sent;
416
417 /* take queue of the ready list */
418 write_task = NULL;
419resend:
420 /* Send the data */
421 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
422 msg,
423 msg_size,
424 (const struct sockaddr *) queue->address,
425 queue->address_len);
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "UNIX transmitted message to %s (%d/%u: %s)\n",
428 GNUNET_i2s (&queue->target),
429 (int) sent,
430 (unsigned int) msg_size,
431 (sent < 0) ? strerror (errno) : "ok");
432 if (-1 != sent)
433 {
434 GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
435 if (NULL != queue_head)
436 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
437 unix_sock,
438 &select_write_cb,
439 NULL);
440
441 /* send 'msg' */
442 GNUNET_free (queue->msg);
443 queue->msg = NULL;
444 GNUNET_MQ_impl_send_continue (queue->mq);
445 GNUNET_STATISTICS_update (stats,
446 "# bytes sent",
447 (long long) sent,
448 GNUNET_NO);
449 reschedule_queue_timeout (queue);
450 return; /* all good */
451 }
452 GNUNET_STATISTICS_update (stats,
453 "# network transmission failures",
454 1,
455 GNUNET_NO);
456 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
457 unix_sock,
458 &select_write_cb,
459 NULL);
460 switch (errno)
461 {
462 case EAGAIN:
463 case ENOBUFS:
464 /* We should retry later... */
465 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
466 return;
467
468 case EMSGSIZE: {
469 socklen_t size = 0;
470 socklen_t len = sizeof(size);
471
472 GNUNET_NETWORK_socket_getsockopt (unix_sock,
473 SOL_SOCKET,
474 SO_SNDBUF,
475 &size,
476 &len);
477 if (size > ntohs (msg->size))
478 {
479 /* Buffer is bigger than message: error, no retry
480 * This should never happen!*/
481 GNUNET_break (0);
482 return;
483 }
484 GNUNET_log (
485 GNUNET_ERROR_TYPE_WARNING,
486 "Trying to increase socket buffer size from %u to %u for message size %u\n",
487 (unsigned int) size,
488 (unsigned int) ((msg_size / 1000) + 2) * 1000,
489 (unsigned int) msg_size);
490 size = ((msg_size / 1000) + 2) * 1000;
491 if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
492 SOL_SOCKET,
493 SO_SNDBUF,
494 &size,
495 sizeof(size)))
496 goto resend; /* Increased buffer size, retry sending */
497 /* Ok, then just try very modest increase */
498 size = msg_size;
499 if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
500 SOL_SOCKET,
501 SO_SNDBUF,
502 &size,
503 sizeof(size)))
504 goto resend; /* Increased buffer size, retry sending */
505 /* Could not increase buffer size: error, no retry */
506 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
507 return;
508 }
509
510 default:
511 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
512 return;
513 }
514}
515
516
517/**
518 * Signature of functions implementing the sending functionality of a
519 * message queue.
520 *
521 * @param mq the message queue
522 * @param msg the message to send
523 * @param impl_state our `struct Queue`
524 */
525static void
526mq_send (struct GNUNET_MQ_Handle *mq,
527 const struct GNUNET_MessageHeader *msg,
528 void *impl_state)
529{
530 struct Queue *queue = impl_state;
531 size_t msize = ntohs (msg->size);
532
533 GNUNET_assert (mq == queue->mq);
534 GNUNET_assert (NULL == queue->msg);
535 // Convert to UNIXMessage
536 queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage));
537 queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage));
538 queue->msg->sender = my_identity;
539 memcpy (&queue->msg[1], msg, msize);
540 GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
541 GNUNET_assert (NULL != unix_sock);
542 if (NULL == write_task)
543 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
544 unix_sock,
545 &select_write_cb,
546 NULL);
547}
548
549
550/**
551 * Signature of functions implementing the destruction of a message
552 * queue. Implementations must not free @a mq, but should take care
553 * of @a impl_state.
554 *
555 * @param mq the message queue to destroy
556 * @param impl_state our `struct Queue`
557 */
558static void
559mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
560{
561 struct Queue *queue = impl_state;
562
563 if (mq == queue->mq)
564 {
565 queue->mq = NULL;
566 queue_destroy (queue);
567 }
568}
569
570
571/**
572 * Implementation function that cancels the currently sent message.
573 *
574 * @param mq message queue
575 * @param impl_state our `struct Queue`
576 */
577static void
578mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
579{
580 struct Queue *queue = impl_state;
581
582 GNUNET_assert (NULL != queue->msg);
583 queue->msg = NULL;
584 GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
585 GNUNET_assert (NULL != write_task);
586 if (NULL == queue_head)
587 {
588 GNUNET_SCHEDULER_cancel (write_task);
589 write_task = NULL;
590 }
591}
592
593
594/**
595 * Generic error handler, called with the appropriate
596 * error code and the same closure specified at the creation of
597 * the message queue.
598 * Not every message queue implementation supports an error handler.
599 *
600 * @param cls our `struct Queue`
601 * @param error error code
602 */
603static void
604mq_error (void *cls, enum GNUNET_MQ_Error error)
605{
606 struct Queue *queue = cls;
607
608 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
609 "UNIX MQ error in queue to %s: %d\n",
610 GNUNET_i2s (&queue->target),
611 (int) error);
612 queue_destroy (queue);
613}
614
615
616/**
617 * Creates a new outbound queue the transport service will use to send
618 * data to another peer.
619 *
620 * @param peer the target peer
621 * @param cs inbound or outbound queue
622 * @param un the address
623 * @param un_len number of bytes in @a un
624 * @return the queue or NULL of max connections exceeded
625 */
626static struct Queue *
627setup_queue (const struct GNUNET_PeerIdentity *target,
628 enum GNUNET_TRANSPORT_ConnectionStatus cs,
629 const struct sockaddr_un *un,
630 socklen_t un_len)
631{
632 struct Queue *queue;
633
634 queue = GNUNET_new (struct Queue);
635 queue->target = *target;
636 queue->address = GNUNET_memdup (un, un_len);
637 queue->address_len = un_len;
638 (void) GNUNET_CONTAINER_multipeermap_put (
639 queue_map,
640 &queue->target,
641 queue,
642 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
643 GNUNET_STATISTICS_set (stats,
644 "# queues active",
645 GNUNET_CONTAINER_multipeermap_size (queue_map),
646 GNUNET_NO);
647 queue->timeout =
648 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
649 queue->timeout_task =
650 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
651 &queue_timeout,
652 queue);
653 queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
654 &mq_destroy,
655 &mq_cancel,
656 queue,
657 NULL,
658 &mq_error,
659 queue);
660 {
661 char *foreign_addr;
662
663 if ('\0' == un->sun_path[0])
664 GNUNET_asprintf (&foreign_addr,
665 "%s-@%s",
666 COMMUNICATOR_ADDRESS_PREFIX,
667 &un->sun_path[1]);
668 else
669 GNUNET_asprintf (&foreign_addr,
670 "%s-%s",
671 COMMUNICATOR_ADDRESS_PREFIX,
672 un->sun_path);
673 queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
674 &queue->target,
675 foreign_addr,
676 UNIX_MTU - sizeof (struct
677 UNIXMessage),
678 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
679 0,
680 GNUNET_NT_LOOPBACK,
681 cs,
682 queue->mq);
683 GNUNET_free (foreign_addr);
684 }
685 return queue;
686}
687
688
689/**
690 * We have been notified that our socket has something to read. Do the
691 * read and reschedule this function to be called again once more is
692 * available.
693 *
694 * @param cls NULL
695 */
696static void
697select_read_cb (void *cls);
698
699
700/**
701 * Function called when message was successfully passed to
702 * transport service. Continue read activity.
703 *
704 * @param cls NULL
705 * @param success #GNUNET_OK on success
706 */
707static void
708receive_complete_cb (void *cls, int success)
709{
710 (void) cls;
711 delivering_messages--;
712 if (GNUNET_OK != success)
713 GNUNET_STATISTICS_update (stats,
714 "# transport transmission failures",
715 1,
716 GNUNET_NO);
717 if ((NULL == read_task) && (delivering_messages < max_queue_length) &&
718 (NULL != unix_sock))
719 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
720 unix_sock,
721 &select_read_cb,
722 NULL);
723}
724
725
726/**
727 * We have been notified that our socket has something to read. Do the
728 * read and reschedule this function to be called again once more is
729 * available.
730 *
731 * @param cls NULL
732 */
733static void
734select_read_cb (void *cls)
735{
736 char buf[65536] GNUNET_ALIGN;
737 struct Queue *queue;
738 const struct UNIXMessage *msg;
739 struct sockaddr_un un;
740 socklen_t addrlen;
741 ssize_t ret;
742 uint16_t msize;
743
744 GNUNET_assert (NULL != unix_sock);
745 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
746 unix_sock,
747 &select_read_cb,
748 NULL);
749 addrlen = sizeof(un);
750 memset (&un, 0, sizeof(un));
751 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
752 buf,
753 sizeof(buf),
754 (struct sockaddr *) &un,
755 &addrlen);
756 if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
757 return;
758 if (-1 == ret)
759 {
760 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
761 return;
762 }
763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764 "Read %d bytes from socket %s\n",
765 (int) ret,
766 un.sun_path);
767 GNUNET_assert (AF_UNIX == (un.sun_family));
768 msg = (struct UNIXMessage *) buf;
769 msize = ntohs (msg->header.size);
770 if ((msize < sizeof(struct UNIXMessage)) || (msize > ret))
771 {
772 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
773 "Wrong message size: %d bytes\n",
774 msize);
775 GNUNET_break_op (0);
776 return;
777 }
778 queue = lookup_queue (&msg->sender, &un, addrlen);
779 if (NULL == queue)
780 queue =
781 setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
782 else
783 reschedule_queue_timeout (queue);
784 if (NULL == queue)
785 {
786 GNUNET_log (
787 GNUNET_ERROR_TYPE_ERROR,
788 _ (
789 "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
790 return;
791 }
792
793 {
794 uint16_t tsize = msize - sizeof(struct UNIXMessage);
795
796 const struct GNUNET_MessageHeader *currhdr;
797 struct GNUNET_MessageHeader al_hdr;
798
799 currhdr = (const struct GNUNET_MessageHeader *) &msg[1];
800 /* ensure aligned access */
801 memcpy (&al_hdr, currhdr, sizeof(al_hdr));
802 if ((tsize < sizeof(struct GNUNET_MessageHeader)) ||
803 (tsize != ntohs (al_hdr.size)))
804 {
805 GNUNET_break_op (0);
806 return;
807 }
808 ret = GNUNET_TRANSPORT_communicator_receive (ch,
809 &msg->sender,
810 currhdr,
811 GNUNET_TIME_UNIT_FOREVER_REL,
812 &receive_complete_cb,
813 NULL);
814 if (GNUNET_SYSERR == ret)
815 {
816 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
817 "Transport not up!\n");
818 return; /* transport not up */
819 }
820 if (GNUNET_NO == ret)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Error sending message to transport\n");
824 return;
825 }
826 delivering_messages++;
827 }
828 if (delivering_messages >= max_queue_length)
829 {
830 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
831 "Back pressure %llu\n", delivering_messages);
832
833 /* we should try to apply 'back pressure' */
834 GNUNET_SCHEDULER_cancel (read_task);
835 read_task = NULL;
836 }
837}
838
839
840/**
841 * Function called by the transport service to initialize a
842 * message queue given address information about another peer.
843 * If and when the communication channel is established, the
844 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
845 * to notify the service that the channel is now up. It is
846 * the responsibility of the communicator to manage sane
847 * retries and timeouts for any @a peer/@a address combination
848 * provided by the transport service. Timeouts and retries
849 * do not need to be signalled to the transport service.
850 *
851 * @param cls closure
852 * @param peer identity of the other peer
853 * @param address where to send the message, human-readable
854 * communicator-specific format, 0-terminated, UTF-8
855 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
856 */
857static int
858mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
859{
860 struct Queue *queue;
861 const char *path;
862 struct sockaddr_un *un;
863 socklen_t un_len;
864
865 (void) cls;
866 if (0 != strncmp (address,
867 COMMUNICATOR_ADDRESS_PREFIX "-",
868 strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
869 {
870 GNUNET_break_op (0);
871 return GNUNET_SYSERR;
872 }
873 path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
874 un = unix_address_to_sockaddr (path, &un_len);
875 queue = lookup_queue (peer, un, un_len);
876 if (NULL != queue)
877 {
878 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
879 "Address `%s' for %s ignored, queue exists\n",
880 path,
881 GNUNET_i2s (peer));
882 GNUNET_free (un);
883 return GNUNET_OK;
884 }
885 queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
886 GNUNET_free (un);
887 if (NULL == queue)
888 {
889 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
890 "Failed to setup queue to %s at `%s'\n",
891 GNUNET_i2s (peer),
892 path);
893 return GNUNET_NO;
894 }
895 return GNUNET_OK;
896}
897
898
899/**
900 * Iterator over all message queues to clean up.
901 *
902 * @param cls NULL
903 * @param target unused
904 * @param value the queue to destroy
905 * @return #GNUNET_OK to continue to iterate
906 */
907static int
908get_queue_delete_it (void *cls,
909 const struct GNUNET_PeerIdentity *target,
910 void *value)
911{
912 struct Queue *queue = value;
913
914 (void) cls;
915 (void) target;
916 queue_destroy (queue);
917 return GNUNET_OK;
918}
919
920
921/**
922 * Shutdown the UNIX communicator.
923 *
924 * @param cls NULL (always)
925 */
926static void
927do_shutdown (void *cls)
928{
929 if (NULL != read_task)
930 {
931 GNUNET_SCHEDULER_cancel (read_task);
932 read_task = NULL;
933 }
934 if (NULL != write_task)
935 {
936 GNUNET_SCHEDULER_cancel (write_task);
937 write_task = NULL;
938 }
939 if (NULL != unix_sock)
940 {
941 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
942 unix_sock = NULL;
943 }
944 GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
945 GNUNET_CONTAINER_multipeermap_destroy (queue_map);
946 if (NULL != ai)
947 {
948 GNUNET_TRANSPORT_communicator_address_remove (ai);
949 ai = NULL;
950 }
951 if (NULL != ch)
952 {
953 GNUNET_TRANSPORT_communicator_disconnect (ch);
954 ch = NULL;
955 }
956 if (NULL != stats)
957 {
958 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
959 stats = NULL;
960 }
961}
962
963
964/**
965 * Function called when the transport service has received an
966 * acknowledgement for this communicator (!) via a different return
967 * path.
968 *
969 * Not applicable for UNIX.
970 *
971 * @param cls closure
972 * @param sender which peer sent the notification
973 * @param msg payload
974 */
975static void
976enc_notify_cb (void *cls,
977 const struct GNUNET_PeerIdentity *sender,
978 const struct GNUNET_MessageHeader *msg)
979{
980 (void) cls;
981 (void) sender;
982 (void) msg;
983 GNUNET_break_op (0);
984}
985
986
987/**
988 * Setup communicator and launch network interactions.
989 *
990 * @param cls NULL (always)
991 * @param args remaining command-line arguments
992 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
993 * @param cfg configuration
994 */
995static void
996run (void *cls,
997 char *const *args,
998 const char *cfgfile,
999 const struct GNUNET_CONFIGURATION_Handle *cfg)
1000{
1001 char *unix_socket_path;
1002 struct sockaddr_un *un;
1003 socklen_t un_len;
1004 char *my_addr;
1005 struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
1006
1007 (void) cls;
1008 delivering_messages = 0;
1009
1010 my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1011 if (NULL == my_private_key)
1012 {
1013 GNUNET_log (
1014 GNUNET_ERROR_TYPE_ERROR,
1015 _ (
1016 "UNIX communicator is lacking key configuration settings. Exiting.\n"));
1017 GNUNET_SCHEDULER_shutdown ();
1018 return;
1019 }
1020 GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, &my_identity.public_key);
1021
1022 if (GNUNET_OK !=
1023 GNUNET_CONFIGURATION_get_value_filename (cfg,
1024 COMMUNICATOR_CONFIG_SECTION,
1025 "UNIXPATH",
1026 &unix_socket_path))
1027 {
1028 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1029 COMMUNICATOR_CONFIG_SECTION,
1030 "UNIXPATH");
1031 return;
1032 }
1033 if (GNUNET_OK !=
1034 GNUNET_CONFIGURATION_get_value_number (cfg,
1035 COMMUNICATOR_CONFIG_SECTION,
1036 "MAX_QUEUE_LENGTH",
1037 &max_queue_length))
1038 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1039
1040 un = unix_address_to_sockaddr (unix_socket_path, &un_len);
1041 if (NULL == un)
1042 {
1043 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1044 "Failed to setup UNIX domain socket address with path `%s'\n",
1045 unix_socket_path);
1046 GNUNET_free (unix_socket_path);
1047 return;
1048 }
1049 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
1050 if (NULL == unix_sock)
1051 {
1052 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
1053 GNUNET_free (un);
1054 GNUNET_free (unix_socket_path);
1055 return;
1056 }
1057 if (('\0' != un->sun_path[0]) &&
1058 (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
1059 {
1060 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1061 _ ("Cannot create path to `%s'\n"),
1062 un->sun_path);
1063 GNUNET_NETWORK_socket_close (unix_sock);
1064 unix_sock = NULL;
1065 GNUNET_free (un);
1066 GNUNET_free (unix_socket_path);
1067 return;
1068 }
1069 if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
1070 (const struct sockaddr *) un,
1071 un_len))
1072 {
1073 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
1074 GNUNET_NETWORK_socket_close (unix_sock);
1075 unix_sock = NULL;
1076 GNUNET_free (un);
1077 GNUNET_free (unix_socket_path);
1078 return;
1079 }
1080 GNUNET_free (un);
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
1082 stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
1083 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
1084 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1085 unix_sock,
1086 &select_read_cb,
1087 NULL);
1088 queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1089 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1090 COMMUNICATOR_CONFIG_SECTION,
1091 COMMUNICATOR_ADDRESS_PREFIX,
1092 GNUNET_TRANSPORT_CC_RELIABLE,
1093 &mq_init,
1094 NULL,
1095 &enc_notify_cb,
1096 NULL);
1097 if (NULL == ch)
1098 {
1099 GNUNET_break (0);
1100 GNUNET_SCHEDULER_shutdown ();
1101 GNUNET_free (unix_socket_path);
1102 return;
1103 }
1104 GNUNET_asprintf (&my_addr,
1105 "%s-%s",
1106 COMMUNICATOR_ADDRESS_PREFIX,
1107 unix_socket_path);
1108 GNUNET_free (unix_socket_path);
1109 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1110 my_addr,
1111 GNUNET_NT_LOOPBACK,
1112 GNUNET_TIME_UNIT_FOREVER_REL);
1113 GNUNET_free (my_addr);
1114}
1115
1116
1117/**
1118 * The main function for the UNIX communicator.
1119 *
1120 * @param argc number of arguments from the command line
1121 * @param argv command line arguments
1122 * @return 0 ok, 1 on error
1123 */
1124int
1125main (int argc, char *const *argv)
1126{
1127 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1128 GNUNET_GETOPT_OPTION_END
1129 };
1130 int ret;
1131
1132 if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1133 return 2;
1134
1135 ret = (GNUNET_OK ==
1136 GNUNET_PROGRAM_run (argc,
1137 argv,
1138 "gnunet-communicator-unix",
1139 _ ("GNUnet UNIX domain socket communicator"),
1140 options,
1141 &run,
1142 NULL))
1143 ? 0
1144 : 1;
1145 GNUNET_free_nz ((void *) argv);
1146 return ret;
1147}
1148
1149
1150#if defined(__linux__) && defined(__GLIBC__)
1151#include <malloc.h>
1152
1153/**
1154 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1155 */
1156void __attribute__ ((constructor))
1157GNUNET_ARM_memory_init ()
1158{
1159 mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1160 mallopt (M_TOP_PAD, 1 * 1024);
1161 malloc_trim (0);
1162}
1163
1164
1165#endif
1166
1167/* end of gnunet-communicator-unix.c */