aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/.gitignore1
-rw-r--r--src/transport/Makefile.am21
-rw-r--r--src/transport/gnunet-communicator-unix.c1148
-rw-r--r--src/transport/gnunet-service-tng.c719
-rw-r--r--src/transport/transport.h63
-rw-r--r--src/transport/transport_api2_communication.c194
6 files changed, 2062 insertions, 84 deletions
diff --git a/src/transport/.gitignore b/src/transport/.gitignore
index d035b4011..90f908a47 100644
--- a/src/transport/.gitignore
+++ b/src/transport/.gitignore
@@ -83,3 +83,4 @@ test_transport_blacklisting_outbound_bl_full
83test_transport_blacklisting_outbound_bl_plugin 83test_transport_blacklisting_outbound_bl_plugin
84test_transport_testing_restart 84test_transport_testing_restart
85test_transport_testing_startstop 85test_transport_testing_startstop
86gnunet-communicator-unix
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index d0db6b141..92b53137f 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -140,6 +140,7 @@ endif
140 140
141noinst_PROGRAMS = \ 141noinst_PROGRAMS = \
142 gnunet-transport-profiler \ 142 gnunet-transport-profiler \
143 gnunet-communicator-unix \
143 $(WLAN_BIN_SENDER) \ 144 $(WLAN_BIN_SENDER) \
144 $(WLAN_BIN_RECEIVER) 145 $(WLAN_BIN_RECEIVER)
145 146
@@ -149,6 +150,7 @@ endif
149 150
150lib_LTLIBRARIES = \ 151lib_LTLIBRARIES = \
151 libgnunettransport.la \ 152 libgnunettransport.la \
153 libgnunettransportcommunicator.la \
152 $(TESTING_LIBS) 154 $(TESTING_LIBS)
153 155
154libgnunettransporttesting_la_SOURCES = \ 156libgnunettransporttesting_la_SOURCES = \
@@ -187,6 +189,17 @@ libgnunettransport_la_LDFLAGS = \
187 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 189 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
188 -version-info 4:0:2 190 -version-info 4:0:2
189 191
192
193
194libgnunettransportcommunicator_la_SOURCES = \
195 transport_api2_communication.c
196libgnunettransportcommunicator_la_LIBADD = \
197 $(top_builddir)/src/util/libgnunetutil.la \
198 $(GN_LIBINTL)
199libgnunettransportcommunicator_la_LDFLAGS = \
200 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
201 -version-info 0:0:0
202
190libexec_PROGRAMS = \ 203libexec_PROGRAMS = \
191 $(WLAN_BIN) \ 204 $(WLAN_BIN) \
192 $(WLAN_BIN_DUMMY) \ 205 $(WLAN_BIN_DUMMY) \
@@ -207,6 +220,14 @@ gnunet_transport_certificate_creation_SOURCES = \
207gnunet_transport_certificate_creation_LDADD = \ 220gnunet_transport_certificate_creation_LDADD = \
208 $(top_builddir)/src/util/libgnunetutil.la 221 $(top_builddir)/src/util/libgnunetutil.la
209 222
223gnunet_communicator_unix_SOURCES = \
224 gnunet-communicator-unix.c
225gnunet_communicator_unix_LDADD = \
226 libgnunettransportcommunicator.la \
227 $(top_builddir)/src/statistics/libgnunetstatistics.la \
228 $(top_builddir)/src/util/libgnunetutil.la
229
230
210gnunet_helper_transport_wlan_SOURCES = \ 231gnunet_helper_transport_wlan_SOURCES = \
211 gnunet-helper-transport-wlan.c 232 gnunet-helper-transport-wlan.c
212 233
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
new file mode 100644
index 000000000..cd3ae5dce
--- /dev/null
+++ b/src/transport/gnunet-communicator-unix.c
@@ -0,0 +1,1148 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/gnunet-communicator-unix.c
21 * @brief Transport plugin using unix domain sockets (!)
22 * Clearly, can only be used locally on Unix/Linux hosts...
23 * ONLY INTENDED FOR TESTING!!!
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet_transport_communication_service.h"
32
33/**
34 * How many messages do we keep at most in the queue to the
35 * transport service before we start to drop (default,
36 * can be changed via the configuration file).
37 * Should be _below_ the level of the communicator API, as
38 * otherwise we may read messages just to have them dropped
39 * by the communicator API.
40 */
41#define DEFAULT_MAX_QUEUE_LENGTH 8
42
43/**
44 * Address prefix used by the communicator.
45 */
46#define COMMUNICATOR_ADDRESS_PREFIX "unix"
47
48/**
49 * Configuration section used by the communicator.
50 */
51#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
52
53
54GNUNET_NETWORK_STRUCT_BEGIN
55
56/**
57 * UNIX Message-Packet header.
58 */
59struct UNIXMessage
60{
61 /**
62 * Message header.
63 */
64 struct GNUNET_MessageHeader header;
65
66 /**
67 * What is the identity of the sender (GNUNET_hash of public key)
68 */
69 struct GNUNET_PeerIdentity sender;
70
71};
72
73GNUNET_NETWORK_STRUCT_END
74
75
76/**
77 * Handle for a queue.
78 */
79struct Queue
80{
81
82 /**
83 * Queues with pending messages (!) are kept in a DLL.
84 */
85 struct Queue *next;
86
87 /**
88 * Queues with pending messages (!) are kept in a DLL.
89 */
90 struct Queue *prev;
91
92 /**
93 * To whom are we talking to.
94 */
95 struct GNUNET_PeerIdentity target;
96
97 /**
98 * Address of the other peer.
99 */
100 struct sockaddr_un *address;
101
102 /**
103 * Length of the address.
104 */
105 socklen_t address_len;
106
107 /**
108 * Message currently scheduled for transmission, non-NULL if and only
109 * if this queue is in the #queue_head DLL.
110 */
111 const struct GNUNET_MessageHeader *msg;
112
113 /**
114 * Message queue we are providing for the #ch.
115 */
116 struct GNUNET_MQ_Handle *mq;
117
118 /**
119 * handle for this queue with the #ch.
120 */
121 struct GNUNET_TRANSPORT_QueueHandle *qh;
122
123 /**
124 * Number of bytes we currently have in our write queue.
125 */
126 unsigned long long bytes_in_queue;
127
128 /**
129 * Timeout for this queue.
130 */
131 struct GNUNET_TIME_Absolute timeout;
132
133 /**
134 * Queue timeout task.
135 */
136 struct GNUNET_SCHEDULER_Task *timeout_task;
137
138};
139
140
141/**
142 * ID of read task
143 */
144static struct GNUNET_SCHEDULER_Task *read_task;
145
146/**
147 * ID of write task
148 */
149static struct GNUNET_SCHEDULER_Task *write_task;
150
151/**
152 * Number of messages we currently have in our queues towards the transport service.
153 */
154static unsigned long long delivering_messages;
155
156/**
157 * Maximum queue length before we stop reading towards the transport service.
158 */
159static unsigned long long max_queue_length;
160
161/**
162 * For logging statistics.
163 */
164static struct GNUNET_STATISTICS_Handle *stats;
165
166/**
167 * Our environment.
168 */
169static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
170
171/**
172 * Queues (map from peer identity to `struct Queue`)
173 */
174static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
175
176/**
177 * Head of queue of messages to transmit.
178 */
179static struct Queue *queue_head;
180
181/**
182 * Tail of queue of messages to transmit.
183 */
184static struct Queue *queue_tail;
185
186/**
187 * socket that we transmit all data with
188 */
189static struct GNUNET_NETWORK_Handle *unix_sock;
190
191/**
192 * Handle to the operation that publishes our address.
193 */
194static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
195
196
197/**
198 * Functions with this signature are called whenever we need
199 * to close a queue due to a disconnect or failure to
200 * establish a connection.
201 *
202 * @param queue queue to close down
203 */
204static void
205queue_destroy (struct Queue *queue)
206{
207 struct GNUNET_MQ_Handle *mq;
208
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Disconnecting queue for peer `%s'\n",
211 GNUNET_i2s (&queue->target));
212 if (0 != queue->bytes_in_queue)
213 {
214 GNUNET_CONTAINER_DLL_remove (queue_head,
215 queue_tail,
216 queue);
217 queue->bytes_in_queue = 0;
218 }
219 if (NULL != (mq = queue->mq))
220 {
221 queue->mq = NULL;
222 GNUNET_MQ_destroy (mq);
223 }
224 GNUNET_assert (GNUNET_YES ==
225 GNUNET_CONTAINER_multipeermap_remove (queue_map,
226 &queue->target,
227 queue));
228 GNUNET_STATISTICS_set (stats,
229 "# UNIX queues active",
230 GNUNET_CONTAINER_multipeermap_size (queue_map),
231 GNUNET_NO);
232 if (NULL != queue->timeout_task)
233 {
234 GNUNET_SCHEDULER_cancel (queue->timeout_task);
235 queue->timeout_task = NULL;
236 }
237 GNUNET_free (queue->address);
238 GNUNET_free (queue);
239}
240
241
242/**
243 * Queue was idle for too long, so disconnect it
244 *
245 * @param cls the `struct Queue *` to disconnect
246 */
247static void
248queue_timeout (void *cls)
249{
250 struct Queue *queue = cls;
251 struct GNUNET_TIME_Relative left;
252
253 queue->timeout_task = NULL;
254 left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
255 if (0 != left.rel_value_us)
256 {
257 /* not actually our turn yet, but let's at least update
258 the monitor, it may think we're about to die ... */
259 queue->timeout_task
260 = GNUNET_SCHEDULER_add_delayed (left,
261 &queue_timeout,
262 queue);
263 return;
264 }
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Queue %p was idle for %s, disconnecting\n",
267 queue,
268 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
269 GNUNET_YES));
270 queue_destroy (queue);
271}
272
273
274/**
275 * Increment queue timeout due to activity. We do not immediately
276 * notify the monitor here as that might generate excessive
277 * signalling.
278 *
279 * @param queue queue for which the timeout should be rescheduled
280 */
281static void
282reschedule_queue_timeout (struct Queue *queue)
283{
284 GNUNET_assert (NULL != queue->timeout_task);
285 queue->timeout
286 = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
287}
288
289
290/**
291 * Convert unix path to a `struct sockaddr_un *`
292 *
293 * @param unixpath path to convert
294 * @param[out] sock_len set to the length of the address
295 * @param is_abstract is this an abstract @a unixpath
296 * @return converted unix path
297 */
298static struct sockaddr_un *
299unix_address_to_sockaddr (const char *unixpath,
300 socklen_t *sock_len)
301{
302 struct sockaddr_un *un;
303 size_t slen;
304
305 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
306 un = GNUNET_new (struct sockaddr_un);
307 un->sun_family = AF_UNIX;
308 slen = strlen (unixpath);
309 if (slen >= sizeof (un->sun_path))
310 slen = sizeof (un->sun_path) - 1;
311 GNUNET_memcpy (un->sun_path,
312 unixpath,
313 slen);
314 un->sun_path[slen] = '\0';
315 slen = sizeof (struct sockaddr_un);
316#if HAVE_SOCKADDR_UN_SUN_LEN
317 un->sun_len = (u_char) slen;
318#endif
319 (*sock_len) = slen;
320 if ('@' == un->sun_path[0])
321 un->sun_path[0] = '\0';
322 return un;
323}
324
325
326/**
327 * Closure to #lookup_queue_it().
328 */
329struct LookupCtx
330{
331 /**
332 * Location to store the queue, if found.
333 */
334 struct Queue *res;
335
336 /**
337 * Address we are looking for.
338 */
339 const struct sockaddr_un *un;
340
341 /**
342 * Number of bytes in @a un
343 */
344 socklen_t un_len;
345};
346
347
348/**
349 * Function called to find a queue by address.
350 *
351 * @param cls the `struct LookupCtx *`
352 * @param key peer we are looking for (unused)
353 * @param value a queue
354 * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
355 */
356static int
357lookup_queue_it (void *cls,
358 const struct GNUNET_PeerIdentity *key,
359 void *value)
360{
361 struct LookupCtx *lctx = cls;
362 struct Queue *queue = value;
363
364 if ( (queue->address_len = lctx->un_len) &&
365 (0 == memcmp (lctx->un,
366 queue->address,
367 queue->address_len)) )
368 {
369 lctx->res = queue;
370 return GNUNET_NO;
371 }
372 return GNUNET_YES;
373}
374
375
376/**
377 * Find an existing queue by address.
378 *
379 * @param plugin the plugin
380 * @param address the address to find
381 * @return NULL if queue was not found
382 */
383static struct Queue *
384lookup_queue (const struct GNUNET_PeerIdentity *peer,
385 const struct sockaddr_un *un,
386 socklen_t un_len)
387{
388 struct LookupCtx lctx;
389
390 lctx.un = un;
391 lctx.un_len = un_len;
392 GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
393 peer,
394 &lookup_queue_it,
395 &lctx);
396 return lctx.res;
397}
398
399
400/**
401 * We have been notified that our socket is ready to write.
402 * Then reschedule this function to be called again once more is available.
403 *
404 * @param cls NULL
405 */
406static void
407select_write_cb (void *cls)
408{
409 struct Queue *queue = queue_tail;
410 const struct GNUNET_MessageHeader *msg = queue->msg;
411 size_t msg_size = ntohs (msg->size);
412 ssize_t sent;
413
414 /* take queue of the ready list */
415 write_task = NULL;
416 GNUNET_CONTAINER_DLL_remove (queue_head,
417 queue_tail,
418 queue);
419 if (NULL != queue_head)
420 write_task =
421 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
422 unix_sock,
423 &select_write_cb,
424 NULL);
425
426 /* send 'msg' */
427 queue->msg = NULL;
428 GNUNET_MQ_impl_send_continue (queue->mq);
429 resend:
430 /* Send the data */
431 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
432 queue->msg,
433 msg_size,
434 (const struct sockaddr *) queue->address,
435 queue->address_len);
436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437 "UNIX transmitted message to %s (%d/%u: %s)\n",
438 GNUNET_i2s (&queue->target),
439 (int) sent,
440 (unsigned int) msg_size,
441 (sent < 0) ? STRERROR (errno) : "ok");
442 if (-1 != sent)
443 {
444 GNUNET_STATISTICS_update (stats,
445 "# bytes sent",
446 (long long) sent,
447 GNUNET_NO);
448 reschedule_queue_timeout (queue);
449 return; /* all good */
450 }
451 GNUNET_STATISTICS_update (stats,
452 "# network transmission failures",
453 1,
454 GNUNET_NO);
455 switch (errno)
456 {
457 case EAGAIN:
458 case ENOBUFS:
459 /* We should retry later... */
460 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
461 "send");
462 return;
463 case EMSGSIZE:
464 {
465 socklen_t size = 0;
466 socklen_t len = sizeof (size);
467
468 GNUNET_NETWORK_socket_getsockopt (unix_sock,
469 SOL_SOCKET,
470 SO_SNDBUF,
471 &size,
472 &len);
473 if (size > ntohs (msg->size))
474 {
475 /* Buffer is bigger than message: error, no retry
476 * This should never happen!*/
477 GNUNET_break (0);
478 return;
479 }
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "Trying to increase socket buffer size from %u to %u for message size %u\n",
482 (unsigned int) size,
483 (unsigned int) ((msg_size / 1000) + 2) * 1000,
484 (unsigned int) msg_size);
485 size = ((msg_size / 1000) + 2) * 1000;
486 if (GNUNET_OK ==
487 GNUNET_NETWORK_socket_setsockopt (unix_sock,
488 SOL_SOCKET,
489 SO_SNDBUF,
490 &size,
491 sizeof (size)))
492 goto resend; /* Increased buffer size, retry sending */
493 /* Ok, then just try very modest increase */
494 size = msg_size;
495 if (GNUNET_OK ==
496 GNUNET_NETWORK_socket_setsockopt (unix_sock,
497 SOL_SOCKET,
498 SO_SNDBUF,
499 &size,
500 sizeof (size)))
501 goto resend; /* Increased buffer size, retry sending */
502 /* Could not increase buffer size: error, no retry */
503 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
504 "setsockopt");
505 return;
506 }
507 default:
508 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
509 "send");
510 return;
511 }
512}
513
514
515/**
516 * Signature of functions implementing the sending functionality of a
517 * message queue.
518 *
519 * @param mq the message queue
520 * @param msg the message to send
521 * @param impl_state our `struct Queue`
522 */
523static void
524mq_send (struct GNUNET_MQ_Handle *mq,
525 const struct GNUNET_MessageHeader *msg,
526 void *impl_state)
527{
528 struct Queue *queue = impl_state;
529
530 GNUNET_assert (mq == queue->mq);
531 GNUNET_assert (NULL == queue->msg);
532 queue->msg = msg;
533 GNUNET_CONTAINER_DLL_insert (queue_head,
534 queue_tail,
535 queue);
536 if (NULL == write_task)
537 write_task =
538 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
539 unix_sock,
540 &select_write_cb,
541 NULL);
542}
543
544
545/**
546 * Signature of functions implementing the destruction of a message
547 * queue. Implementations must not free @a mq, but should take care
548 * of @a impl_state.
549 *
550 * @param mq the message queue to destroy
551 * @param impl_state our `struct Queue`
552 */
553static void
554mq_destroy (struct GNUNET_MQ_Handle *mq,
555 void *impl_state)
556{
557 struct Queue *queue = impl_state;
558
559 if (mq == queue->mq)
560 {
561 queue->mq = NULL;
562 queue_destroy (queue);
563 }
564}
565
566
567/**
568 * Implementation function that cancels the currently sent message.
569 *
570 * @param mq message queue
571 * @param impl_state our `struct Queue`
572 */
573static void
574mq_cancel (struct GNUNET_MQ_Handle *mq,
575 void *impl_state)
576{
577 struct Queue *queue = impl_state;
578
579 GNUNET_assert (NULL != queue->msg);
580 queue->msg = NULL;
581 GNUNET_CONTAINER_DLL_remove (queue_head,
582 queue_tail,
583 queue);
584 GNUNET_assert (NULL != write_task);
585 if (NULL == queue_head)
586 {
587 GNUNET_SCHEDULER_cancel (write_task);
588 write_task = NULL;
589 }
590}
591
592
593/**
594 * Generic error handler, called with the appropriate
595 * error code and the same closure specified at the creation of
596 * the message queue.
597 * Not every message queue implementation supports an error handler.
598 *
599 * @param cls our `struct Queue`
600 * @param error error code
601 */
602static void
603mq_error (void *cls,
604 enum GNUNET_MQ_Error error)
605{
606 struct Queue *queue = cls;
607
608 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
609 "UNIX MQ error in queue to %s: %d\n",
610 GNUNET_i2s (&queue->target),
611 (int) error);
612 queue_destroy (queue);
613}
614
615
616/**
617 * Creates a new outbound queue the transport service will use to send
618 * data to another peer.
619 *
620 * @param peer the target peer
621 * @param un the address
622 * @param un_len number of bytes in @a un
623 * @return the queue or NULL of max connections exceeded
624 */
625static struct Queue *
626setup_queue (const struct GNUNET_PeerIdentity *target,
627 const struct sockaddr_un *un,
628 socklen_t un_len)
629{
630 struct Queue *queue;
631
632 queue = GNUNET_new (struct Queue);
633 queue->target = *target;
634 queue->address = GNUNET_memdup (un,
635 un_len);
636 queue->address_len = un_len;
637 (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
638 &queue->target,
639 queue,
640 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
641 GNUNET_STATISTICS_set (stats,
642 "# queues active",
643 GNUNET_CONTAINER_multipeermap_size (queue_map),
644 GNUNET_NO);
645 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
646 queue->timeout_task
647 = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
648 &queue_timeout,
649 queue);
650 queue->mq
651 = GNUNET_MQ_queue_for_callbacks (&mq_send,
652 &mq_destroy,
653 &mq_cancel,
654 queue,
655 NULL,
656 &mq_error,
657 queue);
658 {
659 char *foreign_addr;
660
661 if ('\0' == un->sun_path[0])
662 GNUNET_asprintf (&foreign_addr,
663 "%s-@%s",
664 COMMUNICATOR_ADDRESS_PREFIX,
665 &un->sun_path[1]);
666 else
667 GNUNET_asprintf (&foreign_addr,
668 "%s-%s",
669 COMMUNICATOR_ADDRESS_PREFIX,
670 un->sun_path);
671 queue->qh
672 = GNUNET_TRANSPORT_communicator_mq_add (ch,
673 &queue->target,
674 foreign_addr,
675 GNUNET_ATS_NET_LOOPBACK,
676 queue->mq);
677 GNUNET_free (foreign_addr);
678 }
679 return queue;
680}
681
682
683/**
684 * We have been notified that our socket has something to read. Do the
685 * read and reschedule this function to be called again once more is
686 * available.
687 *
688 * @param cls NULL
689 */
690static void
691select_read_cb (void *cls);
692
693
694/**
695 * Function called when message was successfully passed to
696 * transport service. Continue read activity.
697 *
698 * @param cls NULL
699 * @param success #GNUNET_OK on success
700 */
701static void
702receive_complete_cb (void *cls,
703 int success)
704{
705 delivering_messages--;
706 if (GNUNET_OK != success)
707 GNUNET_STATISTICS_update (stats,
708 "# transport transmission failures",
709 1,
710 GNUNET_NO);
711 if ( (NULL == read_task) &&
712 (delivering_messages < max_queue_length) )
713 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
714 unix_sock,
715 &select_read_cb,
716 NULL);
717}
718
719
720/**
721 * We have been notified that our socket has something to read. Do the
722 * read and reschedule this function to be called again once more is
723 * available.
724 *
725 * @param cls NULL
726 */
727static void
728select_read_cb (void *cls)
729{
730 char buf[65536] GNUNET_ALIGN;
731 struct Queue *queue;
732 const struct UNIXMessage *msg;
733 struct sockaddr_un un;
734 socklen_t addrlen;
735 ssize_t ret;
736 uint16_t msize;
737
738 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
739 unix_sock,
740 &select_read_cb,
741 NULL);
742 addrlen = sizeof (un);
743 memset (&un,
744 0,
745 sizeof (un));
746 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
747 buf,
748 sizeof (buf),
749 (struct sockaddr *) &un,
750 &addrlen);
751 if ( (-1 == ret) &&
752 ( (EAGAIN == errno) ||
753 (ENOBUFS == errno) ) )
754 return;
755 if (-1 == ret)
756 {
757 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
758 "recvfrom");
759 return;
760 }
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "Read %d bytes from socket %s\n",
763 (int) ret,
764 un.sun_path);
765 GNUNET_assert (AF_UNIX == (un.sun_family));
766 msg = (struct UNIXMessage *) buf;
767 msize = ntohs (msg->header.size);
768 if ( (msize < sizeof (struct UNIXMessage)) ||
769 (msize > ret) )
770 {
771 GNUNET_break_op (0);
772 return;
773 }
774 queue = lookup_queue (&msg->sender,
775 &un,
776 addrlen);
777 if (NULL == queue)
778 queue = setup_queue (&msg->sender,
779 &un,
780 addrlen);
781 else
782 reschedule_queue_timeout (queue);
783 if (NULL == queue)
784 {
785 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
786 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
787 return;
788 }
789
790 {
791 uint16_t offset = 0;
792 uint16_t tsize = msize - sizeof (struct UNIXMessage);
793 const char *msgbuf = (const char *) &msg[1];
794
795 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
796 {
797 const struct GNUNET_MessageHeader *currhdr;
798 struct GNUNET_MessageHeader al_hdr;
799 uint16_t csize;
800
801 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
802 /* ensure aligned access */
803 memcpy (&al_hdr,
804 currhdr,
805 sizeof (al_hdr));
806 csize = ntohs (al_hdr.size);
807 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
808 (csize > tsize - offset))
809 {
810 GNUNET_break_op (0);
811 break;
812 }
813 ret = GNUNET_TRANSPORT_communicator_receive (ch,
814 &msg->sender,
815 currhdr,
816 &receive_complete_cb,
817 NULL);
818 if (GNUNET_SYSERR == ret)
819 return; /* transport not up */
820 if (GNUNET_NO == ret)
821 break;
822 delivering_messages++;
823 offset += csize;
824 }
825 }
826 if (delivering_messages >= max_queue_length)
827 {
828 /* we should try to apply 'back pressure' */
829 GNUNET_SCHEDULER_cancel (read_task);
830 read_task = NULL;
831 }
832}
833
834
835/**
836 * Function called by the transport service to initialize a
837 * message queue given address information about another peer.
838 * If and when the communication channel is established, the
839 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
840 * to notify the service that the channel is now up. It is
841 * the responsibility of the communicator to manage sane
842 * retries and timeouts for any @a peer/@a address combination
843 * provided by the transport service. Timeouts and retries
844 * do not need to be signalled to the transport service.
845 *
846 * @param cls closure
847 * @param peer identity of the other peer
848 * @param address where to send the message, human-readable
849 * communicator-specific format, 0-terminated, UTF-8
850 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
851 */
852static int
853mq_init (void *cls,
854 const struct GNUNET_PeerIdentity *peer,
855 const char *address)
856{
857 struct Queue *queue;
858 const char *path;
859 struct sockaddr_un *un;
860 socklen_t un_len;
861
862 if (0 != strncmp (address,
863 COMMUNICATOR_ADDRESS_PREFIX "-",
864 strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
865 {
866 GNUNET_break_op (0);
867 return GNUNET_SYSERR;
868 }
869 path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
870 un = unix_address_to_sockaddr (path,
871 &un_len);
872 queue = lookup_queue (peer,
873 un,
874 un_len);
875 if (NULL != queue)
876 {
877 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
878 "Address `%s' for %s ignored, queue exists\n",
879 path,
880 GNUNET_i2s (peer));
881 GNUNET_free (un);
882 return GNUNET_OK;
883 }
884 queue = setup_queue (peer,
885 un,
886 un_len);
887 GNUNET_free (un);
888 if (NULL == queue)
889 {
890 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
891 "Failed to setup queue to %s at `%s'\n",
892 GNUNET_i2s (peer),
893 path);
894 return GNUNET_NO;
895 }
896 return GNUNET_OK;
897}
898
899
900/**
901 * Iterator over all message queues to clean up.
902 *
903 * @param cls NULL
904 * @param target unused
905 * @param value the queue to destroy
906 * @return #GNUNET_OK to continue to iterate
907 */
908static int
909get_queue_delete_it (void *cls,
910 const struct GNUNET_PeerIdentity *target,
911 void *value)
912{
913 struct Queue *queue = value;
914
915 (void) cls;
916 (void) target;
917 queue_destroy (queue);
918 return GNUNET_OK;
919}
920
921
922/**
923 * Shutdown the UNIX communicator.
924 *
925 * @param cls NULL (always)
926 */
927static void
928do_shutdown (void *cls)
929{
930 if (NULL != read_task)
931 {
932 GNUNET_SCHEDULER_cancel (read_task);
933 read_task = NULL;
934 }
935 if (NULL != write_task)
936 {
937 GNUNET_SCHEDULER_cancel (write_task);
938 write_task = NULL;
939 }
940 if (NULL != unix_sock)
941 {
942 GNUNET_break (GNUNET_OK ==
943 GNUNET_NETWORK_socket_close (unix_sock));
944 unix_sock = NULL;
945 }
946 GNUNET_CONTAINER_multipeermap_iterate (queue_map,
947 &get_queue_delete_it,
948 NULL);
949 GNUNET_CONTAINER_multipeermap_destroy (queue_map);
950 if (NULL != ai)
951 {
952 GNUNET_TRANSPORT_communicator_address_remove (ai);
953 ai = NULL;
954 }
955 if (NULL != ch)
956 {
957 GNUNET_TRANSPORT_communicator_disconnect (ch);
958 ch = NULL;
959 }
960 if (NULL != stats)
961 {
962 GNUNET_STATISTICS_destroy (stats,
963 GNUNET_NO);
964 stats = NULL;
965 }
966}
967
968
969/**
970 * Setup communicator and launch network interactions.
971 *
972 * @param cls NULL (always)
973 * @param args remaining command-line arguments
974 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
975 * @param cfg configuration
976 */
977static void
978run (void *cls,
979 char *const *args,
980 const char *cfgfile,
981 const struct GNUNET_CONFIGURATION_Handle *cfg)
982{
983 char *unix_socket_path;
984 struct sockaddr_un *un;
985 socklen_t un_len;
986 char *my_addr;
987 (void) cls;
988
989 if (GNUNET_OK !=
990 GNUNET_CONFIGURATION_get_value_filename (cfg,
991 COMMUNICATOR_CONFIG_SECTION,
992 "UNIXPATH",
993 &unix_socket_path))
994 {
995 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
996 COMMUNICATOR_CONFIG_SECTION,
997 "UNIXPATH");
998 return;
999 }
1000 if (GNUNET_OK !=
1001 GNUNET_CONFIGURATION_get_value_number (cfg,
1002 COMMUNICATOR_CONFIG_SECTION,
1003 "MAX_QUEUE_LENGTH",
1004 &max_queue_length))
1005 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1006
1007 un = unix_address_to_sockaddr (unix_socket_path,
1008 &un_len);
1009 if (NULL == un)
1010 {
1011 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1012 "Failed to setup UNIX domain socket address with path `%s'\n",
1013 unix_socket_path);
1014 GNUNET_free (unix_socket_path);
1015 return;
1016 }
1017 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1018 SOCK_DGRAM,
1019 0);
1020 if (NULL == unix_sock)
1021 {
1022 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1023 "socket");
1024 GNUNET_free (un);
1025 GNUNET_free (unix_socket_path);
1026 return;
1027 }
1028 if ( ('\0' != un->sun_path[0]) &&
1029 (GNUNET_OK !=
1030 GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1031 {
1032 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1033 _("Cannot create path to `%s'\n"),
1034 un->sun_path);
1035 GNUNET_NETWORK_socket_close (unix_sock);
1036 unix_sock = NULL;
1037 GNUNET_free (un);
1038 GNUNET_free (unix_socket_path);
1039 return;
1040 }
1041 if (GNUNET_OK !=
1042 GNUNET_NETWORK_socket_bind (unix_sock,
1043 (const struct sockaddr *) un,
1044 un_len))
1045 {
1046 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1047 "bind",
1048 un->sun_path);
1049 GNUNET_NETWORK_socket_close (unix_sock);
1050 unix_sock = NULL;
1051 GNUNET_free (un);
1052 GNUNET_free (unix_socket_path);
1053 return;
1054 }
1055 GNUNET_free (un);
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057 "Bound to `%s'\n",
1058 unix_socket_path);
1059 stats = GNUNET_STATISTICS_create ("C-UNIX",
1060 cfg);
1061 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1062 NULL);
1063 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1064 unix_sock,
1065 &select_read_cb,
1066 NULL);
1067 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1068 GNUNET_NO);
1069 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1070 COMMUNICATOR_CONFIG_SECTION,
1071 COMMUNICATOR_ADDRESS_PREFIX,
1072 65535,
1073 &mq_init,
1074 NULL);
1075 if (NULL == ch)
1076 {
1077 GNUNET_break (0);
1078 GNUNET_SCHEDULER_shutdown ();
1079 GNUNET_free (unix_socket_path);
1080 return;
1081 }
1082 GNUNET_asprintf (&my_addr,
1083 "%s-%s",
1084 COMMUNICATOR_ADDRESS_PREFIX,
1085 unix_socket_path);
1086 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1087 my_addr,
1088 GNUNET_ATS_NET_LOOPBACK,
1089 GNUNET_TIME_UNIT_FOREVER_REL);
1090 GNUNET_free (my_addr);
1091 GNUNET_free (unix_socket_path);
1092 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1093 unix_sock,
1094 &select_read_cb,
1095 NULL);
1096}
1097
1098
1099/**
1100 * The main function for the UNIX communicator.
1101 *
1102 * @param argc number of arguments from the command line
1103 * @param argv command line arguments
1104 * @return 0 ok, 1 on error
1105 */
1106int
1107main (int argc,
1108 char *const *argv)
1109{
1110 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1111 GNUNET_GETOPT_OPTION_END
1112 };
1113 int ret;
1114
1115 if (GNUNET_OK !=
1116 GNUNET_STRINGS_get_utf8_args (argc, argv,
1117 &argc, &argv))
1118 return 2;
1119
1120 ret =
1121 (GNUNET_OK ==
1122 GNUNET_PROGRAM_run (argc, argv,
1123 "gnunet-communicator-unix",
1124 _("GNUnet UNIX domain socket communicator"),
1125 options,
1126 &run,
1127 NULL)) ? 0 : 1;
1128 GNUNET_free ((void*) argv);
1129 return ret;
1130}
1131
1132
1133#if defined(LINUX) && defined(__GLIBC__)
1134#include <malloc.h>
1135
1136/**
1137 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1138 */
1139void __attribute__ ((constructor))
1140GNUNET_ARM_memory_init ()
1141{
1142 mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1143 mallopt (M_TOP_PAD, 1 * 1024);
1144 malloc_trim (0);
1145}
1146#endif
1147
1148/* end of gnunet-communicator-unix.c */
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
new file mode 100644
index 000000000..8cbca3188
--- /dev/null
+++ b/src/transport/gnunet-service-tng.c
@@ -0,0 +1,719 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18/**
19 * @file transport/gnunet-service-transport.c
20 * @brief main for gnunet-service-transport
21 * @author Christian Grothoff
22 */
23#include "platform.h"
24#include "gnunet_util_lib.h"
25#include "gnunet_statistics_service.h"
26#include "gnunet_transport_service.h"
27#include "gnunet_peerinfo_service.h"
28#include "gnunet_ats_service.h"
29#include "gnunet-service-transport.h"
30#include "transport.h"
31
32
33/**
34 * How many messages can we have pending for a given client process
35 * before we start to drop incoming messages? We typically should
36 * have only one client and so this would be the primary buffer for
37 * messages, so the number should be chosen rather generously.
38 *
39 * The expectation here is that most of the time the queue is large
40 * enough so that a drop is virtually never required. Note that
41 * this value must be about as large as 'TOTAL_MSGS' in the
42 * 'test_transport_api_reliability.c', otherwise that testcase may
43 * fail.
44 */
45#define MAX_PENDING (128 * 1024)
46
47
48/**
49 * What type of client is the `struct TransportClient` about?
50 */
51enum ClientType
52{
53 /**
54 * We do not know yet (client is fresh).
55 */
56 CT_NONE = 0,
57
58 /**
59 * Is the CORE service, we need to forward traffic to it.
60 */
61 CT_CORE = 1,
62
63 /**
64 * It is a monitor, forward monitor data.
65 */
66 CT_MONITOR = 2,
67
68 /**
69 * It is a communicator, use for communication.
70 */
71 CT_COMMUNICATOR = 3
72};
73
74
75/**
76 * Client connected to the transport service.
77 */
78struct TransportClient
79{
80
81 /**
82 * Kept in a DLL.
83 */
84 struct TransportClient *next;
85
86 /**
87 * Kept in a DLL.
88 */
89 struct TransportClient *prev;
90
91 /**
92 * Handle to the client.
93 */
94 struct GNUNET_SERVICE_Client *client;
95
96 /**
97 * Message queue to the client.
98 */
99 struct GNUNET_MQ_Handle *mq;
100
101 /**
102 * What type of client is this?
103 */
104 enum ClientType type;
105
106 union
107 {
108
109 /**
110 * Peer identity to monitor the addresses of.
111 * Zero to monitor all neighbours. Valid if
112 * @e type is #CT_MONITOR.
113 */
114 struct GNUNET_PeerIdentity monitor_peer;
115
116 /**
117 * If @e type is #CT_COMMUNICATOR, this communicator
118 * supports communicating using these addresses.
119 */
120 const char *address_prefix;
121
122 } details;
123
124};
125
126
127/**
128 * Head of linked list of all clients to this service.
129 */
130static struct TransportClient *clients_head;
131
132/**
133 * Tail of linked list of all clients to this service.
134 */
135static struct TransportClient *clients_tail;
136
137/**
138 * Statistics handle.
139 */
140struct GNUNET_STATISTICS_Handle *GST_stats;
141
142/**
143 * Configuration handle.
144 */
145const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
146
147/**
148 * Configuration handle.
149 */
150struct GNUNET_PeerIdentity GST_my_identity;
151
152/**
153 * Handle to peerinfo service.
154 */
155struct GNUNET_PEERINFO_Handle *GST_peerinfo;
156
157/**
158 * Our private key.
159 */
160struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
161
162
163/**
164 * Called whenever a client connects. Allocates our
165 * data structures associated with that client.
166 *
167 * @param cls closure, NULL
168 * @param client identification of the client
169 * @param mq message queue for the client
170 * @return our `struct TransportClient`
171 */
172static void *
173client_connect_cb (void *cls,
174 struct GNUNET_SERVICE_Client *client,
175 struct GNUNET_MQ_Handle *mq)
176{
177 struct TransportClient *tc;
178
179 tc = GNUNET_new (struct TransportClient);
180 tc->client = client;
181 tc->mq = mq;
182 GNUNET_CONTAINER_DLL_insert (clients_head,
183 clients_tail,
184 tc);
185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
186 "Client %p connected\n",
187 tc);
188 return tc;
189}
190
191
192/**
193 * Called whenever a client is disconnected. Frees our
194 * resources associated with that client.
195 *
196 * @param cls closure, NULL
197 * @param client identification of the client
198 * @param app_ctx our `struct TransportClient`
199 */
200static void
201client_disconnect_cb (void *cls,
202 struct GNUNET_SERVICE_Client *client,
203 void *app_ctx)
204{
205 struct TransportClient *tc = app_ctx;
206
207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
208 "Client %p disconnected, cleaning up.\n",
209 tc);
210 GNUNET_CONTAINER_DLL_remove (clients_head,
211 clients_tail,
212 tc);
213 switch (tc->type)
214 {
215 case CT_NONE:
216 break;
217 case CT_CORE:
218 break;
219 case CT_MONITOR:
220 break;
221 case CT_COMMUNICATOR:
222 break;
223 }
224 GNUNET_free (tc);
225}
226
227
228/**
229 * Initialize a "CORE" client. We got a start message from this
230 * client, so add it to the list of clients for broadcasting of
231 * inbound messages.
232 *
233 * @param cls the client
234 * @param start the start message that was sent
235 */
236static void
237handle_client_start (void *cls,
238 const struct StartMessage *start)
239{
240 struct TransportClient *tc = cls;
241 const struct GNUNET_MessageHeader *hello;
242 uint32_t options;
243
244 options = ntohl (start->options);
245 if ( (0 != (1 & options)) &&
246 (0 !=
247 memcmp (&start->self,
248 &GST_my_identity,
249 sizeof (struct GNUNET_PeerIdentity)) ) )
250 {
251 /* client thinks this is a different peer, reject */
252 GNUNET_break (0);
253 GNUNET_SERVICE_client_drop (tc->client);
254 return;
255 }
256 if (CT_NONE != tc->type)
257 {
258 GNUNET_break (0);
259 GNUNET_SERVICE_client_drop (tc->client);
260 return;
261 }
262 tc->type = CT_CORE;
263#if 0
264 hello = GST_hello_get ();
265 if (NULL != hello)
266 unicast (tc,
267 hello,
268 GNUNET_NO);
269#endif
270 GNUNET_SERVICE_client_continue (tc->client);
271}
272
273
274/**
275 * Client sent us a HELLO. Check the request.
276 *
277 * @param cls the client
278 * @param message the HELLO message
279 */
280static int
281check_client_hello (void *cls,
282 const struct GNUNET_MessageHeader *message)
283{
284 (void) cls;
285 return GNUNET_OK; /* FIXME: check here? */
286}
287
288
289/**
290 * Client sent us a HELLO. Process the request.
291 *
292 * @param cls the client
293 * @param message the HELLO message
294 */
295static void
296handle_client_hello (void *cls,
297 const struct GNUNET_MessageHeader *message)
298{
299 struct TransportClient *tc = cls;
300
301 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
302 "Received HELLO message\n");
303 GNUNET_SERVICE_client_continue (tc->client);
304}
305
306
307/**
308 * Client asked for transmission to a peer. Process the request.
309 *
310 * @param cls the client
311 * @param obm the send message that was sent
312 */
313static int
314check_client_send (void *cls,
315 const struct OutboundMessage *obm)
316{
317 uint16_t size;
318 const struct GNUNET_MessageHeader *obmm;
319
320 (void) cls;
321 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
322 if (size < sizeof (struct GNUNET_MessageHeader))
323 {
324 GNUNET_break (0);
325 return GNUNET_SYSERR;
326 }
327 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
328 if (size != ntohs (obmm->size))
329 {
330 GNUNET_break (0);
331 return GNUNET_SYSERR;
332 }
333 return GNUNET_OK;
334}
335
336
337/**
338 * Client asked for transmission to a peer. Process the request.
339 *
340 * @param cls the client
341 * @param obm the send message that was sent
342 */
343static void
344handle_client_send (void *cls,
345 const struct OutboundMessage *obm)
346{
347 struct TransportClient *tc = cls;
348 const struct GNUNET_MessageHeader *obmm;
349
350 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
351}
352
353
354/**
355 * Communicator started. Test message is well-formed.
356 *
357 * @param cls the client
358 * @param cam the send message that was sent
359 */
360static int
361check_communicator_available (void *cls,
362 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
363{
364 const char *addr;
365 uint16_t size;
366
367 (void) cls;
368 size = ntohs (cam->header.size) - sizeof (*cam);
369 if (0 == size)
370 return GNUNET_OK; /* receive-only communicator */
371 addr = (const char *) &cam[1];
372 if ('\0' != addr[size-1])
373 {
374 GNUNET_break (0);
375 return GNUNET_SYSERR;
376 }
377 return GNUNET_OK;
378}
379
380
381/**
382 * Communicator started. Process the request.
383 *
384 * @param cls the client
385 * @param cam the send message that was sent
386 */
387static void
388handle_communicator_available (void *cls,
389 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
390{
391 struct TransportClient *tc = cls;
392 uint16_t size;
393
394 if (CT_NONE != tc->type)
395 {
396 GNUNET_break (0);
397 GNUNET_SERVICE_client_drop (tc->client);
398 return;
399 }
400 tc->type = CT_COMMUNICATOR;
401 size = ntohs (cam->header.size) - sizeof (*cam);
402 if (0 == size)
403 return GNUNET_OK; /* receive-only communicator */
404 tc->details.address_prefix = GNUNET_strdup ((const char *) &cam[1]);
405 GNUNET_SERVICE_client_continue (tc->client);
406}
407
408
409/**
410 * Address of our peer added. Test message is well-formed.
411 *
412 * @param cls the client
413 * @param aam the send message that was sent
414 */
415static int
416check_add_address (void *cls,
417 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
418{
419 const char *addr;
420 uint16_t size;
421
422 (void) cls;
423 size = ntohs (aam->header.size) - sizeof (*aam);
424 if (0 == size)
425 {
426 GNUNET_break (0);
427 return GNUNET_SYSERR;
428 }
429 addr = (const char *) &cam[1];
430 if ('\0' != addr[size-1])
431 {
432 GNUNET_break (0);
433 return GNUNET_SYSERR;
434 }
435 return GNUNET_OK;
436}
437
438
439/**
440 * Address of our peer added. Process the request.
441 *
442 * @param cls the client
443 * @param aam the send message that was sent
444 */
445static void
446handle_add_address (void *cls,
447 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
448{
449 struct TransportClient *tc = cls;
450
451 GNUNET_SERVICE_client_continue (tc->client);
452}
453
454
455/**
456 * Address of our peer deleted. Process the request.
457 *
458 * @param cls the client
459 * @param dam the send message that was sent
460 */
461static void
462handle_del_address (void *cls,
463 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
464{
465 struct TransportClient *tc = cls;
466
467 GNUNET_SERVICE_client_continue (tc->client);
468}
469
470
471/**
472 * Client asked for transmission to a peer. Process the request.
473 *
474 * @param cls the client
475 * @param obm the send message that was sent
476 */
477static int
478check_incoming_msg (void *cls,
479 const struct GNUNET_TRANSPORT_IncomingMessage *im)
480{
481 uint16_t size;
482 const struct GNUNET_MessageHeader *obmm;
483
484 (void) cls;
485 size = ntohs (im->header.size) - sizeof (*im);
486 if (size < sizeof (struct GNUNET_MessageHeader))
487 {
488 GNUNET_break (0);
489 return GNUNET_SYSERR;
490 }
491 obmm = (const struct GNUNET_MessageHeader *) &im[1];
492 if (size != ntohs (obmm->size))
493 {
494 GNUNET_break (0);
495 return GNUNET_SYSERR;
496 }
497 return GNUNET_OK;
498}
499
500
501/**
502 * Incoming meessage. Process the request.
503 *
504 * @param cls the client
505 * @param im the send message that was received
506 */
507static void
508handle_incoming_msg (void *cls,
509 const struct GNUNET_TRANSPORT_IncomingMessage *im)
510{
511 struct TransportClient *tc = cls;
512
513 GNUNET_SERVICE_client_continue (tc->client);
514}
515
516
517/**
518 * New queue became available. Check message.
519 *
520 * @param cls the client
521 * @param aqm the send message that was sent
522 */
523static int
524check_add_queue_message (void *cls,
525 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
526{
527 const char *addr;
528 uint16_t size;
529
530 (void) cls;
531 size = ntohs (aqm->header.size) - sizeof (*aqm);
532 if (0 == size)
533 {
534 GNUNET_break (0);
535 return GNUNET_SYSERR;
536 }
537 addr = (const char *) &aqm[1];
538 if ('\0' != addr[size-1])
539 {
540 GNUNET_break (0);
541 return GNUNET_SYSERR;
542 }
543 return GNUNET_OK;
544}
545
546
547/**
548 * New queue became available. Process the request.
549 *
550 * @param cls the client
551 * @param aqm the send message that was sent
552 */
553static void
554handle_add_queue_message (void *cls,
555 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
556{
557 struct TransportClient *tc = cls;
558
559 GNUNET_SERVICE_client_continue (tc->client);
560}
561
562
563/**
564 * Queue to a peer went down. Process the request.
565 *
566 * @param cls the client
567 * @param dqm the send message that was sent
568 */
569static void
570handle_del_queue_message (void *cls,
571 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
572{
573 struct TransportClient *tc = cls;
574
575 GNUNET_SERVICE_client_continue (tc->client);
576}
577
578
579/**
580 * Message was transmitted. Process the request.
581 *
582 * @param cls the client
583 * @param sma the send message that was sent
584 */
585static void
586handle_send_message_ack (void *cls,
587 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
588{
589 struct TransportClient *tc = cls;
590
591 GNUNET_SERVICE_client_continue (tc->client);
592}
593
594
595/**
596 * Function called when the service shuts down. Unloads our plugins
597 * and cancels pending validations.
598 *
599 * @param cls closure, unused
600 */
601static void
602shutdown_task (void *cls)
603{
604 (void) cls;
605
606 if (NULL != GST_stats)
607 {
608 GNUNET_STATISTICS_destroy (GST_stats,
609 GNUNET_NO);
610 GST_stats = NULL;
611 }
612 if (NULL != GST_my_private_key)
613 {
614 GNUNET_free (GST_my_private_key);
615 GST_my_private_key = NULL;
616 }
617}
618
619
620/**
621 * Initiate transport service.
622 *
623 * @param cls closure
624 * @param c configuration to use
625 * @param service the initialized service
626 */
627static void
628run (void *cls,
629 const struct GNUNET_CONFIGURATION_Handle *c,
630 struct GNUNET_SERVICE_Handle *service)
631{
632 /* setup globals */
633 GST_cfg = c;
634 if (GNUNET_OK !=
635 GNUNET_CONFIGURATION_get_value_time (c,
636 "transport",
637 "HELLO_EXPIRATION",
638 &hello_expiration))
639 {
640 hello_expiration = GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION;
641 }
642 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
643 if (NULL == GST_my_private_key)
644 {
645 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
646 _("Transport service is lacking key configuration settings. Exiting.\n"));
647 GNUNET_SCHEDULER_shutdown ();
648 return;
649 }
650 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
651 &GST_my_identity.public_key);
652 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
653 "My identity is `%s'\n",
654 GNUNET_i2s_full (&GST_my_identity));
655
656 GST_stats = GNUNET_STATISTICS_create ("transport",
657 GST_cfg);
658 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
659 NULL);
660 /* start subsystems */
661}
662
663
664/**
665 * Define "main" method using service macro.
666 */
667GNUNET_SERVICE_MAIN
668("transport",
669 GNUNET_SERVICE_OPTION_NONE,
670 &run,
671 &client_connect_cb,
672 &client_disconnect_cb,
673 NULL,
674 /* communication with core */
675 GNUNET_MQ_hd_fixed_size (client_start,
676 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
677 struct StartMessage,
678 NULL),
679 GNUNET_MQ_hd_var_size (client_hello,
680 GNUNET_MESSAGE_TYPE_HELLO,
681 struct GNUNET_MessageHeader,
682 NULL),
683 GNUNET_MQ_hd_var_size (client_send,
684 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
685 struct OutboundMessage,
686 NULL),
687 /* communication with communicators */
688 GNUNET_MQ_hd_var_size (communicator_available,
689 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
690 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
691 NULL),
692 GNUNET_MQ_hd_var_size (add_address,
693 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
694 struct GNUNET_TRANSPORT_AddAddressMessage,
695 NULL),
696 GNUNET_MQ_hd_fixed_size (del_address,
697 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
698 struct GNUNET_TRANSPORT_DelAddressMessage,
699 NULL),
700 GNUNET_MQ_hd_var_size (incoming_msg,
701 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
702 struct GNUNET_TRANSPORT_IncomingMessage,
703 NULL),
704 GNUNET_MQ_hd_var_size (add_queue_message,
705 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
706 struct GNUNET_TRANSPORT_AddQueueMessage,
707 NULL),
708 GNUNET_MQ_hd_fixed_size (del_queue_message,
709 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
710 struct GNUNET_TRANSPORT_DelQueueMessage,
711 NULL),
712 GNUNET_MQ_hd_fixed_size (send_message_ack,
713 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
714 struct GNUNET_TRANSPORT_SendMessageToAck,
715 NULL),
716 GNUNET_MQ_handler_end ());
717
718
719/* end of file gnunet-service-transport.c */
diff --git a/src/transport/transport.h b/src/transport/transport.h
index e68536bcc..1b46213cf 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/ 17*/
@@ -94,7 +94,7 @@ struct StartMessage
94 94
95 /** 95 /**
96 * 0: no options 96 * 0: no options
97 * 1: The 'self' field should be checked 97 * 1: The @e self field should be checked
98 * 2: this client is interested in payload traffic 98 * 2: this client is interested in payload traffic
99 */ 99 */
100 uint32_t options; 100 uint32_t options;
@@ -404,6 +404,7 @@ struct ValidationIterateResponseMessage
404 struct GNUNET_TIME_AbsoluteNBO next_validation; 404 struct GNUNET_TIME_AbsoluteNBO next_validation;
405}; 405};
406 406
407
407/** 408/**
408 * Message from the library to the transport service 409 * Message from the library to the transport service
409 * asking for binary addresses known for a peer. 410 * asking for binary addresses known for a peer.
@@ -654,6 +655,22 @@ struct TransportPluginMonitorMessage
654/* *********************** TNG messages ***************** */ 655/* *********************** TNG messages ***************** */
655 656
656/** 657/**
658 * Communicator goes online. Note which addresses it can
659 * work with.
660 */
661struct GNUNET_TRANSPORT_CommunicatorAvailableMessage
662{
663
664 /**
665 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR.
666 */
667 struct GNUNET_MessageHeader header;
668
669 /* Followed by the address prefix of the communicator */
670};
671
672
673/**
657 * Add address to the list. 674 * Add address to the list.
658 */ 675 */
659struct GNUNET_TRANSPORT_AddAddressMessage 676struct GNUNET_TRANSPORT_AddAddressMessage
@@ -678,7 +695,7 @@ struct GNUNET_TRANSPORT_AddAddressMessage
678 * An `enum GNUNET_ATS_Network_Type` in NBO. 695 * An `enum GNUNET_ATS_Network_Type` in NBO.
679 */ 696 */
680 uint32_t nt; 697 uint32_t nt;
681 698
682 /* followed by UTF-8 encoded, 0-terminated human-readable address */ 699 /* followed by UTF-8 encoded, 0-terminated human-readable address */
683}; 700};
684 701
@@ -717,12 +734,12 @@ struct GNUNET_TRANSPORT_IncomingMessage
717 * Do we use flow control or not? 734 * Do we use flow control or not?
718 */ 735 */
719 uint32_t fc_on GNUNET_PACKED; 736 uint32_t fc_on GNUNET_PACKED;
720 737
721 /** 738 /**
722 * 64-bit number to identify the matching ACK. 739 * 64-bit number to identify the matching ACK.
723 */ 740 */
724 uint64_t fc_id GNUNET_PACKED; 741 uint64_t fc_id GNUNET_PACKED;
725 742
726 /** 743 /**
727 * Sender identifier. 744 * Sender identifier.
728 */ 745 */
@@ -748,12 +765,12 @@ struct GNUNET_TRANSPORT_IncomingMessageAck
748 * Reserved (0) 765 * Reserved (0)
749 */ 766 */
750 uint32_t reserved GNUNET_PACKED; 767 uint32_t reserved GNUNET_PACKED;
751 768
752 /** 769 /**
753 * Which message is being ACKed? 770 * Which message is being ACKed?
754 */ 771 */
755 uint64_t fc_id GNUNET_PACKED; 772 uint64_t fc_id GNUNET_PACKED;
756 773
757 /** 774 /**
758 * Sender identifier of the original message. 775 * Sender identifier of the original message.
759 */ 776 */
@@ -769,7 +786,7 @@ struct GNUNET_TRANSPORT_AddQueueMessage
769{ 786{
770 787
771 /** 788 /**
772 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE. 789 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP.
773 */ 790 */
774 struct GNUNET_MessageHeader header; 791 struct GNUNET_MessageHeader header;
775 792
@@ -787,7 +804,7 @@ struct GNUNET_TRANSPORT_AddQueueMessage
787 * An `enum GNUNET_ATS_Network_Type` in NBO. 804 * An `enum GNUNET_ATS_Network_Type` in NBO.
788 */ 805 */
789 uint32_t nt; 806 uint32_t nt;
790 807
791 /* followed by UTF-8 encoded, 0-terminated human-readable address */ 808 /* followed by UTF-8 encoded, 0-terminated human-readable address */
792}; 809};
793 810
@@ -799,7 +816,7 @@ struct GNUNET_TRANSPORT_DelQueueMessage
799{ 816{
800 817
801 /** 818 /**
802 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE. 819 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN.
803 */ 820 */
804 struct GNUNET_MessageHeader header; 821 struct GNUNET_MessageHeader header;
805 822
@@ -828,9 +845,9 @@ struct GNUNET_TRANSPORT_CreateQueue
828 struct GNUNET_MessageHeader header; 845 struct GNUNET_MessageHeader header;
829 846
830 /** 847 /**
831 * Always zero. 848 * Unique ID for the request.
832 */ 849 */
833 uint32_t reserved GNUNET_PACKED; 850 uint32_t request_id GNUNET_PACKED;
834 851
835 /** 852 /**
836 * Receiver that can be addressed via the queue. 853 * Receiver that can be addressed via the queue.
@@ -842,6 +859,24 @@ struct GNUNET_TRANSPORT_CreateQueue
842 859
843 860
844/** 861/**
862 * Transport tells communicator that it wants a new queue.
863 */
864struct GNUNET_TRANSPORT_CreateQueueResponse
865{
866
867 /**
868 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK or #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL.
869 */
870 struct GNUNET_MessageHeader header;
871
872 /**
873 * Unique ID for the request.
874 */
875 uint32_t request_id GNUNET_PACKED;
876};
877
878
879/**
845 * Inform communicator about transport's desire to send a message. 880 * Inform communicator about transport's desire to send a message.
846 */ 881 */
847struct GNUNET_TRANSPORT_SendMessageTo 882struct GNUNET_TRANSPORT_SendMessageTo
@@ -861,7 +896,7 @@ struct GNUNET_TRANSPORT_SendMessageTo
861 * Message ID, used for flow control. 896 * Message ID, used for flow control.
862 */ 897 */
863 uint64_t mid GNUNET_PACKED; 898 uint64_t mid GNUNET_PACKED;
864 899
865 /** 900 /**
866 * Receiver identifier. 901 * Receiver identifier.
867 */ 902 */
@@ -891,7 +926,7 @@ struct GNUNET_TRANSPORT_SendMessageToAck
891 * Message ID of the original message. 926 * Message ID of the original message.
892 */ 927 */
893 uint64_t mid GNUNET_PACKED; 928 uint64_t mid GNUNET_PACKED;
894 929
895 /** 930 /**
896 * Receiver identifier. 931 * Receiver identifier.
897 */ 932 */
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index d446516bd..3a68c6eba 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -90,6 +90,11 @@ struct AckPending
90 struct AckPending *prev; 90 struct AckPending *prev;
91 91
92 /** 92 /**
93 * Communicator this entry belongs to.
94 */
95 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
96
97 /**
93 * Which peer is this about? 98 * Which peer is this about?
94 */ 99 */
95 struct GNUNET_PeerIdentity receiver; 100 struct GNUNET_PeerIdentity receiver;
@@ -134,17 +139,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
134 /** 139 /**
135 * DLL of messages awaiting transmission confirmation (ack). 140 * DLL of messages awaiting transmission confirmation (ack).
136 */ 141 */
137 struct AckPending *ac_tail; 142 struct AckPending *ap_tail;
138 143
139 /** 144 /**
140 * DLL of queues we offer. 145 * DLL of queues we offer.
141 */ 146 */
142 struct QueueHandle *queue_head; 147 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
143 148
144 /** 149 /**
145 * DLL of queues we offer. 150 * DLL of queues we offer.
146 */ 151 */
147 struct QueueHandle *queue_tail; 152 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
148 153
149 /** 154 /**
150 * Our configuration. 155 * Our configuration.
@@ -152,9 +157,14 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
152 const struct GNUNET_CONFIGURATION_Handle *cfg; 157 const struct GNUNET_CONFIGURATION_Handle *cfg;
153 158
154 /** 159 /**
155 * Name of the communicator. 160 * Config section to use.
161 */
162 const char *config_section;
163
164 /**
165 * Address prefix to use.
156 */ 166 */
157 const char *name; 167 const char *addr_prefix;
158 168
159 /** 169 /**
160 * Function to call when the transport service wants us to initiate 170 * Function to call when the transport service wants us to initiate
@@ -168,6 +178,11 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
168 void *mq_init_cls; 178 void *mq_init_cls;
169 179
170 /** 180 /**
181 * Queue to talk to the transport service.
182 */
183 struct GNUNET_MQ_Handle *mq;
184
185 /**
171 * Maximum permissable queue length. 186 * Maximum permissable queue length.
172 */ 187 */
173 unsigned long long max_queue_length; 188 unsigned long long max_queue_length;
@@ -202,6 +217,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
202 */ 217 */
203struct GNUNET_TRANSPORT_QueueHandle 218struct GNUNET_TRANSPORT_QueueHandle
204{ 219{
220
221 /**
222 * Kept in a DLL.
223 */
224 struct GNUNET_TRANSPORT_QueueHandle *next;
225
226 /**
227 * Kept in a DLL.
228 */
229 struct GNUNET_TRANSPORT_QueueHandle *prev;
230
205 /** 231 /**
206 * Handle this queue belongs to. 232 * Handle this queue belongs to.
207 */ 233 */
@@ -308,7 +334,7 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
308 env = GNUNET_MQ_msg_extra (aam, 334 env = GNUNET_MQ_msg_extra (aam,
309 strlen (ai->address) + 1, 335 strlen (ai->address) + 1,
310 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); 336 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
311 aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration); 337 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
312 aam->nt = htonl ((uint32_t) ai->nt); 338 aam->nt = htonl ((uint32_t) ai->nt);
313 memcpy (&aam[1], 339 memcpy (&aam[1],
314 ai->address, 340 ai->address,
@@ -334,7 +360,7 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
334 return; 360 return;
335 env = GNUNET_MQ_msg (dam, 361 env = GNUNET_MQ_msg (dam,
336 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); 362 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
337 dam.aid = htonl (ai->aid); 363 dam->aid = htonl (ai->aid);
338 GNUNET_MQ_send (ai->ch->mq, 364 GNUNET_MQ_send (ai->ch->mq,
339 env); 365 env);
340} 366}
@@ -352,18 +378,18 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
352 struct GNUNET_MQ_Envelope *env; 378 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm; 379 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354 380
355 if (NULL == ai->ch->mq) 381 if (NULL == qh->ch->mq)
356 return; 382 return;
357 env = GNUNET_MQ_msg_extra (aqm, 383 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1, 384 strlen (qh->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); 385 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
360 aqm.receiver = qh->peer; 386 aqm->receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt); 387 aqm->nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid); 388 aqm->qid = htonl (qh->queue_id);
363 memcpy (&aqm[1], 389 memcpy (&aqm[1],
364 ai->address, 390 qh->address,
365 strlen (ai->address) + 1); 391 strlen (qh->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq, 392 GNUNET_MQ_send (qh->ch->mq,
367 env); 393 env);
368} 394}
369 395
@@ -380,13 +406,13 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
380 struct GNUNET_MQ_Envelope *env; 406 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm; 407 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382 408
383 if (NULL == ai->ch->mq) 409 if (NULL == qh->ch->mq)
384 return; 410 return;
385 env = GNUNET_MQ_msg (dqm, 411 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); 412 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
387 dqm.qid = htonl (qh->qid); 413 dqm->qid = htonl (qh->queue_id);
388 dqm.receiver = qh->peer; 414 dqm->receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq, 415 GNUNET_MQ_send (qh->ch->mq,
390 env); 416 env);
391} 417}
392 418
@@ -444,7 +470,8 @@ error_handler (void *cls,
444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 470 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445 471
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 472 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n"); 473 "MQ failure %d, reconnecting to transport service.\n",
474 error);
448 disconnect (ch); 475 disconnect (ch);
449 /* TODO: maybe do this with exponential backoff/delay */ 476 /* TODO: maybe do this with exponential backoff/delay */
450 reconnect (ch); 477 reconnect (ch);
@@ -460,7 +487,7 @@ error_handler (void *cls,
460 */ 487 */
461static void 488static void
462handle_incoming_ack (void *cls, 489handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) 490 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464{ 491{
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 492 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
466 493
@@ -470,7 +497,7 @@ handle_incoming_ack (void *cls,
470 { 497 {
471 if ( (fc->id == incoming_ack->fc_id) && 498 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender, 499 (0 == memcmp (&fc->sender,
473 incoming_ack->sender, 500 &incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) ) 501 sizeof (struct GNUNET_PeerIdentity))) )
475 { 502 {
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head, 503 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
@@ -499,11 +526,12 @@ handle_incoming_ack (void *cls,
499 */ 526 */
500static int 527static int
501check_create_queue (void *cls, 528check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq) 529 const struct GNUNET_TRANSPORT_CreateQueue *cq)
503{ 530{
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq); 531 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1]; 532 const char *addr = (const char *) &cq[1];
506 533
534 (void) cls;
507 if ( (0 == len) || 535 if ( (0 == len) ||
508 ('\0' != addr[len-1]) ) 536 ('\0' != addr[len-1]) )
509 { 537 {
@@ -522,11 +550,13 @@ check_create_queue (void *cls,
522 */ 550 */
523static void 551static void
524handle_create_queue (void *cls, 552handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq) 553 const struct GNUNET_TRANSPORT_CreateQueue *cq)
526{ 554{
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 555 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1]; 556 const char *addr = (const char *) &cq[1];
529 557 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
558 struct GNUNET_MQ_Envelope *env;
559
530 if (GNUNET_OK != 560 if (GNUNET_OK !=
531 ch->mq_init (ch->mq_init_cls, 561 ch->mq_init (ch->mq_init_cls,
532 &cq->receiver, 562 &cq->receiver,
@@ -535,8 +565,17 @@ handle_create_queue (void *cls,
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 565 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n", 566 "Address `%s' invalid for this communicator\n",
537 addr); 567 addr);
538 // TODO: do we notify the transport!? 568 env = GNUNET_MQ_msg (cqr,
569 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
539 } 570 }
571 else
572 {
573 env = GNUNET_MQ_msg (cqr,
574 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
575 }
576 cqr->request_id = cq->request_id;
577 GNUNET_MQ_send (ch->mq,
578 env);
540} 579}
541 580
542 581
@@ -550,11 +589,12 @@ handle_create_queue (void *cls,
550 */ 589 */
551static int 590static int
552check_send_msg (void *cls, 591check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt) 592 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
554{ 593{
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt); 594 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; 595 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557 596
597 (void) cls;
558 if (ntohs (mh->size) != len) 598 if (ntohs (mh->size) != len)
559 { 599 {
560 GNUNET_break (0); 600 GNUNET_break (0);
@@ -584,9 +624,9 @@ send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
584 624
585 env = GNUNET_MQ_msg (ack, 625 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); 626 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK); 627 ack->status = htonl (status);
588 ack->mid = ap->mid; 628 ack->mid = mid;
589 ack->receiver = ap->receiver; 629 ack->receiver = *receiver;
590 GNUNET_MQ_send (ch->mq, 630 GNUNET_MQ_send (ch->mq,
591 env); 631 env);
592} 632}
@@ -623,18 +663,18 @@ send_ack_cb (void *cls)
623 */ 663 */
624static void 664static void
625handle_send_msg (void *cls, 665handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt) 666 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
627{ 667{
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 668 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh; 669 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env; 670 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap; 671 struct AckPending *ap;
632 struct QueueHandle *qh; 672 struct GNUNET_TRANSPORT_QueueHandle *qh;
633 673
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next) 674 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) && 675 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer, 676 (0 == memcmp (&qh->peer,
637 &smt->target, 677 &smt->receiver,
638 sizeof (struct GNUNET_PeerIdentity))) ) 678 sizeof (struct GNUNET_PeerIdentity))) )
639 break; 679 break;
640 if (NULL == qh) 680 if (NULL == qh)
@@ -653,7 +693,7 @@ handle_send_msg (void *cls,
653 ap->receiver = smt->receiver; 693 ap->receiver = smt->receiver;
654 ap->mid = smt->mid; 694 ap->mid = smt->mid;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head, 695 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656 cp->ap_tail, 696 ch->ap_tail,
657 ap); 697 ap);
658 mh = (const struct GNUNET_MessageHeader *) &smt[1]; 698 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh); 699 env = GNUNET_MQ_msg_copy (mh);
@@ -679,7 +719,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
679 struct GNUNET_TRANSPORT_IncomingMessageAck, 719 struct GNUNET_TRANSPORT_IncomingMessageAck,
680 ch), 720 ch),
681 GNUNET_MQ_hd_var_size (create_queue, 721 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, 722 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
683 struct GNUNET_TRANSPORT_CreateQueue, 723 struct GNUNET_TRANSPORT_CreateQueue,
684 ch), 724 ch),
685 GNUNET_MQ_hd_var_size (send_msg, 725 GNUNET_MQ_hd_var_size (send_msg,
@@ -688,12 +728,24 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
688 ch), 728 ch),
689 GNUNET_MQ_handler_end() 729 GNUNET_MQ_handler_end()
690 }; 730 };
731 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
732 struct GNUNET_MQ_Envelope *env;
691 733
692 ch->mq = GNUNET_CLIENT_connect (cfg, 734 ch->mq = GNUNET_CLIENT_connect (ch->cfg,
693 "transport", 735 "transport",
694 handlers, 736 handlers,
695 &error_handler, 737 &error_handler,
696 ch); 738 ch);
739 if (NULL == ch->mq)
740 return;
741 env = GNUNET_MQ_msg_extra (cam,
742 strlen (ch->addr_prefix) + 1,
743 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
744 memcpy (&cam[1],
745 ch->addr_prefix,
746 strlen (ch->addr_prefix) + 1);
747 GNUNET_MQ_send (ch->mq,
748 env);
697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; 749 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
698 NULL != ai; 750 NULL != ai;
699 ai = ai->next) 751 ai = ai->next)
@@ -709,7 +761,9 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
709 * Connect to the transport service. 761 * Connect to the transport service.
710 * 762 *
711 * @param cfg configuration to use 763 * @param cfg configuration to use
712 * @param name name of the communicator that is connecting 764 * @param config_section section of the configuration to use for options
765 * @param addr_prefix address prefix for addresses supported by this
766 * communicator, could be NULL for incoming-only communicators
713 * @param mtu maximum message size supported by communicator, 0 if 767 * @param mtu maximum message size supported by communicator, 0 if
714 * sending is not supported, SIZE_MAX for no MTU 768 * sending is not supported, SIZE_MAX for no MTU
715 * @param mq_init function to call to initialize a message queue given 769 * @param mq_init function to call to initialize a message queue given
@@ -720,7 +774,8 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
720 */ 774 */
721struct GNUNET_TRANSPORT_CommunicatorHandle * 775struct GNUNET_TRANSPORT_CommunicatorHandle *
722GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 776GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
723 const char *name, 777 const char *config_section,
778 const char *addr_prefix,
724 size_t mtu, 779 size_t mtu,
725 GNUNET_TRANSPORT_CommunicatorMqInit mq_init, 780 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
726 void *mq_init_cls) 781 void *mq_init_cls)
@@ -729,14 +784,15 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
729 784
730 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); 785 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
731 ch->cfg = cfg; 786 ch->cfg = cfg;
732 ch->name = name; 787 ch->config_section = config_section;
788 ch->addr_prefix = addr_prefix;
733 ch->mtu = mtu; 789 ch->mtu = mtu;
734 ch->mq_init = mq_init; 790 ch->mq_init = mq_init;
735 ch->mq_init_cls = mq_init_cls; 791 ch->mq_init_cls = mq_init_cls;
736 reconnect (ch); 792 reconnect (ch);
737 if (GNUNET_OK != 793 if (GNUNET_OK !=
738 GNUNET_CONFIGURATION_get_value_number (cfg, 794 GNUNET_CONFIGURATION_get_value_number (cfg,
739 name, 795 config_section,
740 "MAX_QUEUE_LENGTH", 796 "MAX_QUEUE_LENGTH",
741 &ch->max_queue_length)) 797 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 798 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
@@ -798,32 +854,15 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
798 struct GNUNET_TRANSPORT_IncomingMessage *im; 854 struct GNUNET_TRANSPORT_IncomingMessage *im;
799 uint16_t msize; 855 uint16_t msize;
800 856
801 if (NULL == ai->ch->mq) 857 if (NULL == ch->mq)
802 return GNUNET_SYSERR; 858 return GNUNET_SYSERR;
803 if (NULL != cb) 859 if ( (NULL == cb) &&
860 (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
804 { 861 {
805 struct FlowControl *fc; 862 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
806 863 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
807 im->fc_on = htonl (GNUNET_YES); 864 ch->max_queue_length);
808 im->fc_id = ai->ch->fc_gen++; 865 return GNUNET_NO;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
811 fc->id = im->fc_id;
812 fc->cb = cb;
813 fc->cb_cls = cb_cls;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815 ch->fc_tail,
816 fc);
817 }
818 else
819 {
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
825 return GNUNET_NO;
826 }
827 } 866 }
828 867
829 msize = ntohs (msg->size); 868 msize = ntohs (msg->size);
@@ -839,7 +878,22 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
839 memcpy (&im[1], 878 memcpy (&im[1],
840 msg, 879 msg,
841 msize); 880 msize);
842 GNUNET_MQ_send (ai->ch->mq, 881 if (NULL != cb)
882 {
883 struct FlowControl *fc;
884
885 im->fc_on = htonl (GNUNET_YES);
886 im->fc_id = ch->fc_gen++;
887 fc = GNUNET_new (struct FlowControl);
888 fc->sender = *sender;
889 fc->id = im->fc_id;
890 fc->cb = cb;
891 fc->cb_cls = cb_cls;
892 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
893 ch->fc_tail,
894 fc);
895 }
896 GNUNET_MQ_send (ch->mq,
843 env); 897 env);
844 return GNUNET_OK; 898 return GNUNET_OK;
845} 899}
@@ -927,9 +981,9 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH
927 ai->address = GNUNET_strdup (address); 981 ai->address = GNUNET_strdup (address);
928 ai->nt = nt; 982 ai->nt = nt;
929 ai->expiration = expiration; 983 ai->expiration = expiration;
930 ai->aid = handle->aid_gen++; 984 ai->aid = ch->aid_gen++;
931 GNUNET_CONTAINER_DLL_insert (handle->ai_head, 985 GNUNET_CONTAINER_DLL_insert (ch->ai_head,
932 handle->ai_tail, 986 ch->ai_tail,
933 ai); 987 ai);
934 send_add_address (ai); 988 send_add_address (ai);
935 return ai; 989 return ai;