aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/.gitignore1
-rw-r--r--src/transport/Makefile.am56
-rw-r--r--src/transport/communicator-unix.conf2
-rw-r--r--src/transport/gnunet-communicator-unix.c1147
-rw-r--r--src/transport/gnunet-service-tng.c1263
-rw-r--r--src/transport/gnunet-service-transport.c1
-rw-r--r--src/transport/gnunet-service-transport_neighbours.c3
-rw-r--r--src/transport/gnunet-transport.c2
-rw-r--r--src/transport/transport-testing.c4
-rw-r--r--src/transport/transport-testing.h8
-rw-r--r--src/transport/transport.h161
-rw-r--r--src/transport/transport_api2_communication.c194
-rw-r--r--src/transport/transport_api2_core.c938
-rw-r--r--src/transport/transport_api2_monitor.c313
-rw-r--r--src/transport/transport_api_core.c17
15 files changed, 3998 insertions, 112 deletions
diff --git a/src/transport/.gitignore b/src/transport/.gitignore
index d035b4011..90f908a47 100644
--- a/src/transport/.gitignore
+++ b/src/transport/.gitignore
@@ -83,3 +83,4 @@ test_transport_blacklisting_outbound_bl_full
83test_transport_blacklisting_outbound_bl_plugin 83test_transport_blacklisting_outbound_bl_plugin
84test_transport_testing_restart 84test_transport_testing_restart
85test_transport_testing_startstop 85test_transport_testing_startstop
86gnunet-communicator-unix
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index d0db6b141..deeb39b48 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -8,7 +8,8 @@ pkgcfgdir= $(pkgdatadir)/config.d/
8libexecdir= $(pkglibdir)/libexec/ 8libexecdir= $(pkglibdir)/libexec/
9 9
10pkgcfg_DATA = \ 10pkgcfg_DATA = \
11 transport.conf 11 transport.conf \
12 communicator-unix.conf
12 13
13if HAVE_MHD 14if HAVE_MHD
14 GN_LIBMHD = -lmicrohttpd 15 GN_LIBMHD = -lmicrohttpd
@@ -140,6 +141,8 @@ endif
140 141
141noinst_PROGRAMS = \ 142noinst_PROGRAMS = \
142 gnunet-transport-profiler \ 143 gnunet-transport-profiler \
144 gnunet-communicator-unix \
145 gnunet-service-tng \
143 $(WLAN_BIN_SENDER) \ 146 $(WLAN_BIN_SENDER) \
144 $(WLAN_BIN_RECEIVER) 147 $(WLAN_BIN_RECEIVER)
145 148
@@ -149,6 +152,9 @@ endif
149 152
150lib_LTLIBRARIES = \ 153lib_LTLIBRARIES = \
151 libgnunettransport.la \ 154 libgnunettransport.la \
155 libgnunettransportcore.la \
156 libgnunettransportcommunicator.la \
157 libgnunettransportmonitor.la \
152 $(TESTING_LIBS) 158 $(TESTING_LIBS)
153 159
154libgnunettransporttesting_la_SOURCES = \ 160libgnunettransporttesting_la_SOURCES = \
@@ -187,6 +193,37 @@ libgnunettransport_la_LDFLAGS = \
187 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 193 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
188 -version-info 4:0:2 194 -version-info 4:0:2
189 195
196
197
198libgnunettransportcore_la_SOURCES = \
199 transport_api2_core.c
200libgnunettransportcore_la_LIBADD = \
201 $(top_builddir)/src/util/libgnunetutil.la \
202 $(GN_LIBINTL)
203libgnunettransportcore_la_LDFLAGS = \
204 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
205 -version-info 0:0:0
206
207libgnunettransportcommunicator_la_SOURCES = \
208 transport_api2_communication.c
209libgnunettransportcommunicator_la_LIBADD = \
210 $(top_builddir)/src/util/libgnunetutil.la \
211 $(GN_LIBINTL)
212libgnunettransportcommunicator_la_LDFLAGS = \
213 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
214 -version-info 0:0:0
215
216
217libgnunettransportmonitor_la_SOURCES = \
218 transport_api2_monitor.c
219libgnunettransportmonitor_la_LIBADD = \
220 $(top_builddir)/src/util/libgnunetutil.la \
221 $(GN_LIBINTL)
222libgnunettransportmonitor_la_LDFLAGS = \
223 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
224 -version-info 0:0:0
225
226
190libexec_PROGRAMS = \ 227libexec_PROGRAMS = \
191 $(WLAN_BIN) \ 228 $(WLAN_BIN) \
192 $(WLAN_BIN_DUMMY) \ 229 $(WLAN_BIN_DUMMY) \
@@ -207,6 +244,14 @@ gnunet_transport_certificate_creation_SOURCES = \
207gnunet_transport_certificate_creation_LDADD = \ 244gnunet_transport_certificate_creation_LDADD = \
208 $(top_builddir)/src/util/libgnunetutil.la 245 $(top_builddir)/src/util/libgnunetutil.la
209 246
247gnunet_communicator_unix_SOURCES = \
248 gnunet-communicator-unix.c
249gnunet_communicator_unix_LDADD = \
250 libgnunettransportcommunicator.la \
251 $(top_builddir)/src/statistics/libgnunetstatistics.la \
252 $(top_builddir)/src/util/libgnunetutil.la
253
254
210gnunet_helper_transport_wlan_SOURCES = \ 255gnunet_helper_transport_wlan_SOURCES = \
211 gnunet-helper-transport-wlan.c 256 gnunet-helper-transport-wlan.c
212 257
@@ -278,6 +323,15 @@ gnunet_service_transport_CFLAGS = \
278 $(CFLAGS) 323 $(CFLAGS)
279# -DANALYZE 324# -DANALYZE
280 325
326
327gnunet_service_tng_SOURCES = \
328 gnunet-service-tng.c
329gnunet_service_tng_LDADD = \
330 $(top_builddir)/src/ats/libgnunetats.la \
331 $(top_builddir)/src/statistics/libgnunetstatistics.la \
332 $(top_builddir)/src/util/libgnunetutil.la \
333 $(GN_LIBINTL)
334
281plugin_LTLIBRARIES = \ 335plugin_LTLIBRARIES = \
282 libgnunet_plugin_transport_tcp.la \ 336 libgnunet_plugin_transport_tcp.la \
283 libgnunet_plugin_transport_udp.la \ 337 libgnunet_plugin_transport_udp.la \
diff --git a/src/transport/communicator-unix.conf b/src/transport/communicator-unix.conf
new file mode 100644
index 000000000..ad92616c6
--- /dev/null
+++ b/src/transport/communicator-unix.conf
@@ -0,0 +1,2 @@
1[communicator-unix]
2UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-communicator-unix.sock
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
new file mode 100644
index 000000000..b2eebbe20
--- /dev/null
+++ b/src/transport/gnunet-communicator-unix.c
@@ -0,0 +1,1147 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/gnunet-communicator-unix.c
21 * @brief Transport plugin using unix domain sockets (!)
22 * Clearly, can only be used locally on Unix/Linux hosts...
23 * ONLY INTENDED FOR TESTING!!!
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet_transport_communication_service.h"
32
33/**
34 * How many messages do we keep at most in the queue to the
35 * transport service before we start to drop (default,
36 * can be changed via the configuration file).
37 * Should be _below_ the level of the communicator API, as
38 * otherwise we may read messages just to have them dropped
39 * by the communicator API.
40 */
41#define DEFAULT_MAX_QUEUE_LENGTH 8
42
43/**
44 * Address prefix used by the communicator.
45 */
46#define COMMUNICATOR_ADDRESS_PREFIX "unix"
47
48/**
49 * Configuration section used by the communicator.
50 */
51#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
52
53
54GNUNET_NETWORK_STRUCT_BEGIN
55
56/**
57 * UNIX Message-Packet header.
58 */
59struct UNIXMessage
60{
61 /**
62 * Message header.
63 */
64 struct GNUNET_MessageHeader header;
65
66 /**
67 * What is the identity of the sender (GNUNET_hash of public key)
68 */
69 struct GNUNET_PeerIdentity sender;
70
71};
72
73GNUNET_NETWORK_STRUCT_END
74
75
76/**
77 * Handle for a queue.
78 */
79struct Queue
80{
81
82 /**
83 * Queues with pending messages (!) are kept in a DLL.
84 */
85 struct Queue *next;
86
87 /**
88 * Queues with pending messages (!) are kept in a DLL.
89 */
90 struct Queue *prev;
91
92 /**
93 * To whom are we talking to.
94 */
95 struct GNUNET_PeerIdentity target;
96
97 /**
98 * Address of the other peer.
99 */
100 struct sockaddr_un *address;
101
102 /**
103 * Length of the address.
104 */
105 socklen_t address_len;
106
107 /**
108 * Message currently scheduled for transmission, non-NULL if and only
109 * if this queue is in the #queue_head DLL.
110 */
111 const struct GNUNET_MessageHeader *msg;
112
113 /**
114 * Message queue we are providing for the #ch.
115 */
116 struct GNUNET_MQ_Handle *mq;
117
118 /**
119 * handle for this queue with the #ch.
120 */
121 struct GNUNET_TRANSPORT_QueueHandle *qh;
122
123 /**
124 * Number of bytes we currently have in our write queue.
125 */
126 unsigned long long bytes_in_queue;
127
128 /**
129 * Timeout for this queue.
130 */
131 struct GNUNET_TIME_Absolute timeout;
132
133 /**
134 * Queue timeout task.
135 */
136 struct GNUNET_SCHEDULER_Task *timeout_task;
137
138};
139
140
141/**
142 * ID of read task
143 */
144static struct GNUNET_SCHEDULER_Task *read_task;
145
146/**
147 * ID of write task
148 */
149static struct GNUNET_SCHEDULER_Task *write_task;
150
151/**
152 * Number of messages we currently have in our queues towards the transport service.
153 */
154static unsigned long long delivering_messages;
155
156/**
157 * Maximum queue length before we stop reading towards the transport service.
158 */
159static unsigned long long max_queue_length;
160
161/**
162 * For logging statistics.
163 */
164static struct GNUNET_STATISTICS_Handle *stats;
165
166/**
167 * Our environment.
168 */
169static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
170
171/**
172 * Queues (map from peer identity to `struct Queue`)
173 */
174static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
175
176/**
177 * Head of queue of messages to transmit.
178 */
179static struct Queue *queue_head;
180
181/**
182 * Tail of queue of messages to transmit.
183 */
184static struct Queue *queue_tail;
185
186/**
187 * socket that we transmit all data with
188 */
189static struct GNUNET_NETWORK_Handle *unix_sock;
190
191/**
192 * Handle to the operation that publishes our address.
193 */
194static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
195
196
197/**
198 * Functions with this signature are called whenever we need
199 * to close a queue due to a disconnect or failure to
200 * establish a connection.
201 *
202 * @param queue queue to close down
203 */
204static void
205queue_destroy (struct Queue *queue)
206{
207 struct GNUNET_MQ_Handle *mq;
208
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Disconnecting queue for peer `%s'\n",
211 GNUNET_i2s (&queue->target));
212 if (0 != queue->bytes_in_queue)
213 {
214 GNUNET_CONTAINER_DLL_remove (queue_head,
215 queue_tail,
216 queue);
217 queue->bytes_in_queue = 0;
218 }
219 if (NULL != (mq = queue->mq))
220 {
221 queue->mq = NULL;
222 GNUNET_MQ_destroy (mq);
223 }
224 GNUNET_assert (GNUNET_YES ==
225 GNUNET_CONTAINER_multipeermap_remove (queue_map,
226 &queue->target,
227 queue));
228 GNUNET_STATISTICS_set (stats,
229 "# UNIX queues active",
230 GNUNET_CONTAINER_multipeermap_size (queue_map),
231 GNUNET_NO);
232 if (NULL != queue->timeout_task)
233 {
234 GNUNET_SCHEDULER_cancel (queue->timeout_task);
235 queue->timeout_task = NULL;
236 }
237 GNUNET_free (queue->address);
238 GNUNET_free (queue);
239}
240
241
242/**
243 * Queue was idle for too long, so disconnect it
244 *
245 * @param cls the `struct Queue *` to disconnect
246 */
247static void
248queue_timeout (void *cls)
249{
250 struct Queue *queue = cls;
251 struct GNUNET_TIME_Relative left;
252
253 queue->timeout_task = NULL;
254 left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
255 if (0 != left.rel_value_us)
256 {
257 /* not actually our turn yet, but let's at least update
258 the monitor, it may think we're about to die ... */
259 queue->timeout_task
260 = GNUNET_SCHEDULER_add_delayed (left,
261 &queue_timeout,
262 queue);
263 return;
264 }
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Queue %p was idle for %s, disconnecting\n",
267 queue,
268 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
269 GNUNET_YES));
270 queue_destroy (queue);
271}
272
273
274/**
275 * Increment queue timeout due to activity. We do not immediately
276 * notify the monitor here as that might generate excessive
277 * signalling.
278 *
279 * @param queue queue for which the timeout should be rescheduled
280 */
281static void
282reschedule_queue_timeout (struct Queue *queue)
283{
284 GNUNET_assert (NULL != queue->timeout_task);
285 queue->timeout
286 = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
287}
288
289
290/**
291 * Convert unix path to a `struct sockaddr_un *`
292 *
293 * @param unixpath path to convert
294 * @param[out] sock_len set to the length of the address
295 * @param is_abstract is this an abstract @a unixpath
296 * @return converted unix path
297 */
298static struct sockaddr_un *
299unix_address_to_sockaddr (const char *unixpath,
300 socklen_t *sock_len)
301{
302 struct sockaddr_un *un;
303 size_t slen;
304
305 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
306 un = GNUNET_new (struct sockaddr_un);
307 un->sun_family = AF_UNIX;
308 slen = strlen (unixpath);
309 if (slen >= sizeof (un->sun_path))
310 slen = sizeof (un->sun_path) - 1;
311 GNUNET_memcpy (un->sun_path,
312 unixpath,
313 slen);
314 un->sun_path[slen] = '\0';
315 slen = sizeof (struct sockaddr_un);
316#if HAVE_SOCKADDR_UN_SUN_LEN
317 un->sun_len = (u_char) slen;
318#endif
319 (*sock_len) = slen;
320 if ('@' == un->sun_path[0])
321 un->sun_path[0] = '\0';
322 return un;
323}
324
325
326/**
327 * Closure to #lookup_queue_it().
328 */
329struct LookupCtx
330{
331 /**
332 * Location to store the queue, if found.
333 */
334 struct Queue *res;
335
336 /**
337 * Address we are looking for.
338 */
339 const struct sockaddr_un *un;
340
341 /**
342 * Number of bytes in @a un
343 */
344 socklen_t un_len;
345};
346
347
348/**
349 * Function called to find a queue by address.
350 *
351 * @param cls the `struct LookupCtx *`
352 * @param key peer we are looking for (unused)
353 * @param value a queue
354 * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
355 */
356static int
357lookup_queue_it (void *cls,
358 const struct GNUNET_PeerIdentity *key,
359 void *value)
360{
361 struct LookupCtx *lctx = cls;
362 struct Queue *queue = value;
363
364 if ( (queue->address_len = lctx->un_len) &&
365 (0 == memcmp (lctx->un,
366 queue->address,
367 queue->address_len)) )
368 {
369 lctx->res = queue;
370 return GNUNET_NO;
371 }
372 return GNUNET_YES;
373}
374
375
376/**
377 * Find an existing queue by address.
378 *
379 * @param plugin the plugin
380 * @param address the address to find
381 * @return NULL if queue was not found
382 */
383static struct Queue *
384lookup_queue (const struct GNUNET_PeerIdentity *peer,
385 const struct sockaddr_un *un,
386 socklen_t un_len)
387{
388 struct LookupCtx lctx;
389
390 lctx.un = un;
391 lctx.un_len = un_len;
392 GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
393 peer,
394 &lookup_queue_it,
395 &lctx);
396 return lctx.res;
397}
398
399
400/**
401 * We have been notified that our socket is ready to write.
402 * Then reschedule this function to be called again once more is available.
403 *
404 * @param cls NULL
405 */
406static void
407select_write_cb (void *cls)
408{
409 struct Queue *queue = queue_tail;
410 const struct GNUNET_MessageHeader *msg = queue->msg;
411 size_t msg_size = ntohs (msg->size);
412 ssize_t sent;
413
414 /* take queue of the ready list */
415 write_task = NULL;
416 GNUNET_CONTAINER_DLL_remove (queue_head,
417 queue_tail,
418 queue);
419 if (NULL != queue_head)
420 write_task =
421 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
422 unix_sock,
423 &select_write_cb,
424 NULL);
425
426 /* send 'msg' */
427 queue->msg = NULL;
428 GNUNET_MQ_impl_send_continue (queue->mq);
429 resend:
430 /* Send the data */
431 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
432 queue->msg,
433 msg_size,
434 (const struct sockaddr *) queue->address,
435 queue->address_len);
436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437 "UNIX transmitted message to %s (%d/%u: %s)\n",
438 GNUNET_i2s (&queue->target),
439 (int) sent,
440 (unsigned int) msg_size,
441 (sent < 0) ? STRERROR (errno) : "ok");
442 if (-1 != sent)
443 {
444 GNUNET_STATISTICS_update (stats,
445 "# bytes sent",
446 (long long) sent,
447 GNUNET_NO);
448 reschedule_queue_timeout (queue);
449 return; /* all good */
450 }
451 GNUNET_STATISTICS_update (stats,
452 "# network transmission failures",
453 1,
454 GNUNET_NO);
455 switch (errno)
456 {
457 case EAGAIN:
458 case ENOBUFS:
459 /* We should retry later... */
460 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
461 "send");
462 return;
463 case EMSGSIZE:
464 {
465 socklen_t size = 0;
466 socklen_t len = sizeof (size);
467
468 GNUNET_NETWORK_socket_getsockopt (unix_sock,
469 SOL_SOCKET,
470 SO_SNDBUF,
471 &size,
472 &len);
473 if (size > ntohs (msg->size))
474 {
475 /* Buffer is bigger than message: error, no retry
476 * This should never happen!*/
477 GNUNET_break (0);
478 return;
479 }
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "Trying to increase socket buffer size from %u to %u for message size %u\n",
482 (unsigned int) size,
483 (unsigned int) ((msg_size / 1000) + 2) * 1000,
484 (unsigned int) msg_size);
485 size = ((msg_size / 1000) + 2) * 1000;
486 if (GNUNET_OK ==
487 GNUNET_NETWORK_socket_setsockopt (unix_sock,
488 SOL_SOCKET,
489 SO_SNDBUF,
490 &size,
491 sizeof (size)))
492 goto resend; /* Increased buffer size, retry sending */
493 /* Ok, then just try very modest increase */
494 size = msg_size;
495 if (GNUNET_OK ==
496 GNUNET_NETWORK_socket_setsockopt (unix_sock,
497 SOL_SOCKET,
498 SO_SNDBUF,
499 &size,
500 sizeof (size)))
501 goto resend; /* Increased buffer size, retry sending */
502 /* Could not increase buffer size: error, no retry */
503 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
504 "setsockopt");
505 return;
506 }
507 default:
508 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
509 "send");
510 return;
511 }
512}
513
514
515/**
516 * Signature of functions implementing the sending functionality of a
517 * message queue.
518 *
519 * @param mq the message queue
520 * @param msg the message to send
521 * @param impl_state our `struct Queue`
522 */
523static void
524mq_send (struct GNUNET_MQ_Handle *mq,
525 const struct GNUNET_MessageHeader *msg,
526 void *impl_state)
527{
528 struct Queue *queue = impl_state;
529
530 GNUNET_assert (mq == queue->mq);
531 GNUNET_assert (NULL == queue->msg);
532 queue->msg = msg;
533 GNUNET_CONTAINER_DLL_insert (queue_head,
534 queue_tail,
535 queue);
536 GNUNET_assert (NULL != unix_sock);
537 if (NULL == write_task)
538 write_task =
539 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
540 unix_sock,
541 &select_write_cb,
542 NULL);
543}
544
545
546/**
547 * Signature of functions implementing the destruction of a message
548 * queue. Implementations must not free @a mq, but should take care
549 * of @a impl_state.
550 *
551 * @param mq the message queue to destroy
552 * @param impl_state our `struct Queue`
553 */
554static void
555mq_destroy (struct GNUNET_MQ_Handle *mq,
556 void *impl_state)
557{
558 struct Queue *queue = impl_state;
559
560 if (mq == queue->mq)
561 {
562 queue->mq = NULL;
563 queue_destroy (queue);
564 }
565}
566
567
568/**
569 * Implementation function that cancels the currently sent message.
570 *
571 * @param mq message queue
572 * @param impl_state our `struct Queue`
573 */
574static void
575mq_cancel (struct GNUNET_MQ_Handle *mq,
576 void *impl_state)
577{
578 struct Queue *queue = impl_state;
579
580 GNUNET_assert (NULL != queue->msg);
581 queue->msg = NULL;
582 GNUNET_CONTAINER_DLL_remove (queue_head,
583 queue_tail,
584 queue);
585 GNUNET_assert (NULL != write_task);
586 if (NULL == queue_head)
587 {
588 GNUNET_SCHEDULER_cancel (write_task);
589 write_task = NULL;
590 }
591}
592
593
594/**
595 * Generic error handler, called with the appropriate
596 * error code and the same closure specified at the creation of
597 * the message queue.
598 * Not every message queue implementation supports an error handler.
599 *
600 * @param cls our `struct Queue`
601 * @param error error code
602 */
603static void
604mq_error (void *cls,
605 enum GNUNET_MQ_Error error)
606{
607 struct Queue *queue = cls;
608
609 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
610 "UNIX MQ error in queue to %s: %d\n",
611 GNUNET_i2s (&queue->target),
612 (int) error);
613 queue_destroy (queue);
614}
615
616
617/**
618 * Creates a new outbound queue the transport service will use to send
619 * data to another peer.
620 *
621 * @param peer the target peer
622 * @param un the address
623 * @param un_len number of bytes in @a un
624 * @return the queue or NULL of max connections exceeded
625 */
626static struct Queue *
627setup_queue (const struct GNUNET_PeerIdentity *target,
628 const struct sockaddr_un *un,
629 socklen_t un_len)
630{
631 struct Queue *queue;
632
633 queue = GNUNET_new (struct Queue);
634 queue->target = *target;
635 queue->address = GNUNET_memdup (un,
636 un_len);
637 queue->address_len = un_len;
638 (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
639 &queue->target,
640 queue,
641 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
642 GNUNET_STATISTICS_set (stats,
643 "# queues active",
644 GNUNET_CONTAINER_multipeermap_size (queue_map),
645 GNUNET_NO);
646 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
647 queue->timeout_task
648 = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
649 &queue_timeout,
650 queue);
651 queue->mq
652 = GNUNET_MQ_queue_for_callbacks (&mq_send,
653 &mq_destroy,
654 &mq_cancel,
655 queue,
656 NULL,
657 &mq_error,
658 queue);
659 {
660 char *foreign_addr;
661
662 if ('\0' == un->sun_path[0])
663 GNUNET_asprintf (&foreign_addr,
664 "%s-@%s",
665 COMMUNICATOR_ADDRESS_PREFIX,
666 &un->sun_path[1]);
667 else
668 GNUNET_asprintf (&foreign_addr,
669 "%s-%s",
670 COMMUNICATOR_ADDRESS_PREFIX,
671 un->sun_path);
672 queue->qh
673 = GNUNET_TRANSPORT_communicator_mq_add (ch,
674 &queue->target,
675 foreign_addr,
676 GNUNET_ATS_NET_LOOPBACK,
677 queue->mq);
678 GNUNET_free (foreign_addr);
679 }
680 return queue;
681}
682
683
684/**
685 * We have been notified that our socket has something to read. Do the
686 * read and reschedule this function to be called again once more is
687 * available.
688 *
689 * @param cls NULL
690 */
691static void
692select_read_cb (void *cls);
693
694
695/**
696 * Function called when message was successfully passed to
697 * transport service. Continue read activity.
698 *
699 * @param cls NULL
700 * @param success #GNUNET_OK on success
701 */
702static void
703receive_complete_cb (void *cls,
704 int success)
705{
706 delivering_messages--;
707 if (GNUNET_OK != success)
708 GNUNET_STATISTICS_update (stats,
709 "# transport transmission failures",
710 1,
711 GNUNET_NO);
712 GNUNET_assert (NULL != unix_sock);
713 if ( (NULL == read_task) &&
714 (delivering_messages < max_queue_length) )
715 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
716 unix_sock,
717 &select_read_cb,
718 NULL);
719}
720
721
722/**
723 * We have been notified that our socket has something to read. Do the
724 * read and reschedule this function to be called again once more is
725 * available.
726 *
727 * @param cls NULL
728 */
729static void
730select_read_cb (void *cls)
731{
732 char buf[65536] GNUNET_ALIGN;
733 struct Queue *queue;
734 const struct UNIXMessage *msg;
735 struct sockaddr_un un;
736 socklen_t addrlen;
737 ssize_t ret;
738 uint16_t msize;
739
740 GNUNET_assert (NULL != unix_sock);
741 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
742 unix_sock,
743 &select_read_cb,
744 NULL);
745 addrlen = sizeof (un);
746 memset (&un,
747 0,
748 sizeof (un));
749 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
750 buf,
751 sizeof (buf),
752 (struct sockaddr *) &un,
753 &addrlen);
754 if ( (-1 == ret) &&
755 ( (EAGAIN == errno) ||
756 (ENOBUFS == errno) ) )
757 return;
758 if (-1 == ret)
759 {
760 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
761 "recvfrom");
762 return;
763 }
764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765 "Read %d bytes from socket %s\n",
766 (int) ret,
767 un.sun_path);
768 GNUNET_assert (AF_UNIX == (un.sun_family));
769 msg = (struct UNIXMessage *) buf;
770 msize = ntohs (msg->header.size);
771 if ( (msize < sizeof (struct UNIXMessage)) ||
772 (msize > ret) )
773 {
774 GNUNET_break_op (0);
775 return;
776 }
777 queue = lookup_queue (&msg->sender,
778 &un,
779 addrlen);
780 if (NULL == queue)
781 queue = setup_queue (&msg->sender,
782 &un,
783 addrlen);
784 else
785 reschedule_queue_timeout (queue);
786 if (NULL == queue)
787 {
788 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
789 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
790 return;
791 }
792
793 {
794 uint16_t offset = 0;
795 uint16_t tsize = msize - sizeof (struct UNIXMessage);
796 const char *msgbuf = (const char *) &msg[1];
797
798 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
799 {
800 const struct GNUNET_MessageHeader *currhdr;
801 struct GNUNET_MessageHeader al_hdr;
802 uint16_t csize;
803
804 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
805 /* ensure aligned access */
806 memcpy (&al_hdr,
807 currhdr,
808 sizeof (al_hdr));
809 csize = ntohs (al_hdr.size);
810 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
811 (csize > tsize - offset))
812 {
813 GNUNET_break_op (0);
814 break;
815 }
816 ret = GNUNET_TRANSPORT_communicator_receive (ch,
817 &msg->sender,
818 currhdr,
819 &receive_complete_cb,
820 NULL);
821 if (GNUNET_SYSERR == ret)
822 return; /* transport not up */
823 if (GNUNET_NO == ret)
824 break;
825 delivering_messages++;
826 offset += csize;
827 }
828 }
829 if (delivering_messages >= max_queue_length)
830 {
831 /* we should try to apply 'back pressure' */
832 GNUNET_SCHEDULER_cancel (read_task);
833 read_task = NULL;
834 }
835}
836
837
838/**
839 * Function called by the transport service to initialize a
840 * message queue given address information about another peer.
841 * If and when the communication channel is established, the
842 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
843 * to notify the service that the channel is now up. It is
844 * the responsibility of the communicator to manage sane
845 * retries and timeouts for any @a peer/@a address combination
846 * provided by the transport service. Timeouts and retries
847 * do not need to be signalled to the transport service.
848 *
849 * @param cls closure
850 * @param peer identity of the other peer
851 * @param address where to send the message, human-readable
852 * communicator-specific format, 0-terminated, UTF-8
853 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
854 */
855static int
856mq_init (void *cls,
857 const struct GNUNET_PeerIdentity *peer,
858 const char *address)
859{
860 struct Queue *queue;
861 const char *path;
862 struct sockaddr_un *un;
863 socklen_t un_len;
864
865 if (0 != strncmp (address,
866 COMMUNICATOR_ADDRESS_PREFIX "-",
867 strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
868 {
869 GNUNET_break_op (0);
870 return GNUNET_SYSERR;
871 }
872 path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
873 un = unix_address_to_sockaddr (path,
874 &un_len);
875 queue = lookup_queue (peer,
876 un,
877 un_len);
878 if (NULL != queue)
879 {
880 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
881 "Address `%s' for %s ignored, queue exists\n",
882 path,
883 GNUNET_i2s (peer));
884 GNUNET_free (un);
885 return GNUNET_OK;
886 }
887 queue = setup_queue (peer,
888 un,
889 un_len);
890 GNUNET_free (un);
891 if (NULL == queue)
892 {
893 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
894 "Failed to setup queue to %s at `%s'\n",
895 GNUNET_i2s (peer),
896 path);
897 return GNUNET_NO;
898 }
899 return GNUNET_OK;
900}
901
902
903/**
904 * Iterator over all message queues to clean up.
905 *
906 * @param cls NULL
907 * @param target unused
908 * @param value the queue to destroy
909 * @return #GNUNET_OK to continue to iterate
910 */
911static int
912get_queue_delete_it (void *cls,
913 const struct GNUNET_PeerIdentity *target,
914 void *value)
915{
916 struct Queue *queue = value;
917
918 (void) cls;
919 (void) target;
920 queue_destroy (queue);
921 return GNUNET_OK;
922}
923
924
925/**
926 * Shutdown the UNIX communicator.
927 *
928 * @param cls NULL (always)
929 */
930static void
931do_shutdown (void *cls)
932{
933 if (NULL != read_task)
934 {
935 GNUNET_SCHEDULER_cancel (read_task);
936 read_task = NULL;
937 }
938 if (NULL != write_task)
939 {
940 GNUNET_SCHEDULER_cancel (write_task);
941 write_task = NULL;
942 }
943 if (NULL != unix_sock)
944 {
945 GNUNET_break (GNUNET_OK ==
946 GNUNET_NETWORK_socket_close (unix_sock));
947 unix_sock = NULL;
948 }
949 GNUNET_CONTAINER_multipeermap_iterate (queue_map,
950 &get_queue_delete_it,
951 NULL);
952 GNUNET_CONTAINER_multipeermap_destroy (queue_map);
953 if (NULL != ai)
954 {
955 GNUNET_TRANSPORT_communicator_address_remove (ai);
956 ai = NULL;
957 }
958 if (NULL != ch)
959 {
960 GNUNET_TRANSPORT_communicator_disconnect (ch);
961 ch = NULL;
962 }
963 if (NULL != stats)
964 {
965 GNUNET_STATISTICS_destroy (stats,
966 GNUNET_NO);
967 stats = NULL;
968 }
969}
970
971
972/**
973 * Setup communicator and launch network interactions.
974 *
975 * @param cls NULL (always)
976 * @param args remaining command-line arguments
977 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
978 * @param cfg configuration
979 */
980static void
981run (void *cls,
982 char *const *args,
983 const char *cfgfile,
984 const struct GNUNET_CONFIGURATION_Handle *cfg)
985{
986 char *unix_socket_path;
987 struct sockaddr_un *un;
988 socklen_t un_len;
989 char *my_addr;
990 (void) cls;
991
992 if (GNUNET_OK !=
993 GNUNET_CONFIGURATION_get_value_filename (cfg,
994 COMMUNICATOR_CONFIG_SECTION,
995 "UNIXPATH",
996 &unix_socket_path))
997 {
998 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
999 COMMUNICATOR_CONFIG_SECTION,
1000 "UNIXPATH");
1001 return;
1002 }
1003 if (GNUNET_OK !=
1004 GNUNET_CONFIGURATION_get_value_number (cfg,
1005 COMMUNICATOR_CONFIG_SECTION,
1006 "MAX_QUEUE_LENGTH",
1007 &max_queue_length))
1008 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1009
1010 un = unix_address_to_sockaddr (unix_socket_path,
1011 &un_len);
1012 if (NULL == un)
1013 {
1014 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1015 "Failed to setup UNIX domain socket address with path `%s'\n",
1016 unix_socket_path);
1017 GNUNET_free (unix_socket_path);
1018 return;
1019 }
1020 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1021 SOCK_DGRAM,
1022 0);
1023 if (NULL == unix_sock)
1024 {
1025 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1026 "socket");
1027 GNUNET_free (un);
1028 GNUNET_free (unix_socket_path);
1029 return;
1030 }
1031 if ( ('\0' != un->sun_path[0]) &&
1032 (GNUNET_OK !=
1033 GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1034 {
1035 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1036 _("Cannot create path to `%s'\n"),
1037 un->sun_path);
1038 GNUNET_NETWORK_socket_close (unix_sock);
1039 unix_sock = NULL;
1040 GNUNET_free (un);
1041 GNUNET_free (unix_socket_path);
1042 return;
1043 }
1044 if (GNUNET_OK !=
1045 GNUNET_NETWORK_socket_bind (unix_sock,
1046 (const struct sockaddr *) un,
1047 un_len))
1048 {
1049 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1050 "bind",
1051 un->sun_path);
1052 GNUNET_NETWORK_socket_close (unix_sock);
1053 unix_sock = NULL;
1054 GNUNET_free (un);
1055 GNUNET_free (unix_socket_path);
1056 return;
1057 }
1058 GNUNET_free (un);
1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060 "Bound to `%s'\n",
1061 unix_socket_path);
1062 stats = GNUNET_STATISTICS_create ("C-UNIX",
1063 cfg);
1064 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1065 NULL);
1066 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1067 unix_sock,
1068 &select_read_cb,
1069 NULL);
1070 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1071 GNUNET_NO);
1072 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1073 COMMUNICATOR_CONFIG_SECTION,
1074 COMMUNICATOR_ADDRESS_PREFIX,
1075 65535,
1076 &mq_init,
1077 NULL);
1078 if (NULL == ch)
1079 {
1080 GNUNET_break (0);
1081 GNUNET_SCHEDULER_shutdown ();
1082 GNUNET_free (unix_socket_path);
1083 return;
1084 }
1085 GNUNET_asprintf (&my_addr,
1086 "%s-%s",
1087 COMMUNICATOR_ADDRESS_PREFIX,
1088 unix_socket_path);
1089 GNUNET_free (unix_socket_path);
1090 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1091 my_addr,
1092 GNUNET_ATS_NET_LOOPBACK,
1093 GNUNET_TIME_UNIT_FOREVER_REL);
1094 GNUNET_free (my_addr);
1095}
1096
1097
1098/**
1099 * The main function for the UNIX communicator.
1100 *
1101 * @param argc number of arguments from the command line
1102 * @param argv command line arguments
1103 * @return 0 ok, 1 on error
1104 */
1105int
1106main (int argc,
1107 char *const *argv)
1108{
1109 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1110 GNUNET_GETOPT_OPTION_END
1111 };
1112 int ret;
1113
1114 if (GNUNET_OK !=
1115 GNUNET_STRINGS_get_utf8_args (argc, argv,
1116 &argc, &argv))
1117 return 2;
1118
1119 ret =
1120 (GNUNET_OK ==
1121 GNUNET_PROGRAM_run (argc, argv,
1122 "gnunet-communicator-unix",
1123 _("GNUnet UNIX domain socket communicator"),
1124 options,
1125 &run,
1126 NULL)) ? 0 : 1;
1127 GNUNET_free ((void*) argv);
1128 return ret;
1129}
1130
1131
1132#if defined(LINUX) && defined(__GLIBC__)
1133#include <malloc.h>
1134
1135/**
1136 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1137 */
1138void __attribute__ ((constructor))
1139GNUNET_ARM_memory_init ()
1140{
1141 mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1142 mallopt (M_TOP_PAD, 1 * 1024);
1143 malloc_trim (0);
1144}
1145#endif
1146
1147/* end of gnunet-communicator-unix.c */
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
new file mode 100644
index 000000000..c7bdfd77c
--- /dev/null
+++ b/src/transport/gnunet-service-tng.c
@@ -0,0 +1,1263 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18/**
19 * @file transport/gnunet-service-transport.c
20 * @brief main for gnunet-service-transport
21 * @author Christian Grothoff
22 *
23 * TODO:
24 * - make *our* collected addresses available somehow somewhere
25 * => Choices: in peerstore or revive/keep peerinfo?
26 * - MTU information is missing for queues!
27 * - start supporting monitor logic (add functions to signal monitors!)
28 * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
29 * - ask ATS about bandwidth allocation
30 * -
31 */
32#include "platform.h"
33#include "gnunet_util_lib.h"
34#include "gnunet_statistics_service.h"
35#include "gnunet_transport_service.h"
36#include "gnunet_peerinfo_service.h"
37#include "gnunet_ats_service.h"
38#include "gnunet-service-transport.h"
39#include "transport.h"
40
41
42/**
43 * How many messages can we have pending for a given client process
44 * before we start to drop incoming messages? We typically should
45 * have only one client and so this would be the primary buffer for
46 * messages, so the number should be chosen rather generously.
47 *
48 * The expectation here is that most of the time the queue is large
49 * enough so that a drop is virtually never required. Note that
50 * this value must be about as large as 'TOTAL_MSGS' in the
51 * 'test_transport_api_reliability.c', otherwise that testcase may
52 * fail.
53 */
54#define MAX_PENDING (128 * 1024)
55
56
57/**
58 * What type of client is the `struct TransportClient` about?
59 */
60enum ClientType
61{
62 /**
63 * We do not know yet (client is fresh).
64 */
65 CT_NONE = 0,
66
67 /**
68 * Is the CORE service, we need to forward traffic to it.
69 */
70 CT_CORE = 1,
71
72 /**
73 * It is a monitor, forward monitor data.
74 */
75 CT_MONITOR = 2,
76
77 /**
78 * It is a communicator, use for communication.
79 */
80 CT_COMMUNICATOR = 3
81};
82
83
84/**
85 * Client connected to the transport service.
86 */
87struct TransportClient;
88
89
90/**
91 * A neighbour that at least one communicator is connected to.
92 */
93struct Neighbour;
94
95
96/**
97 * List of available queues for a particular neighbour.
98 */
99struct Queue
100{
101 /**
102 * Kept in a MDLL.
103 */
104 struct Queue *next_neighbour;
105
106 /**
107 * Kept in a MDLL.
108 */
109 struct Queue *prev_neighbour;
110
111 /**
112 * Kept in a MDLL.
113 */
114 struct Queue *prev_client;
115
116 /**
117 * Kept in a MDLL.
118 */
119 struct Queue *next_client;
120
121 /**
122 * Which neighbour is this queue for?
123 */
124 struct Neighbour *neighbour;
125
126 /**
127 * Which communicator offers this queue?
128 */
129 struct TransportClient *tc;
130
131 /**
132 * Address served by the queue.
133 */
134 const char *address;
135
136 /**
137 * Unique identifier of this queue with the communicator.
138 */
139 uint32_t qid;
140
141 /**
142 * Network type offered by this queue.
143 */
144 enum GNUNET_ATS_Network_Type nt;
145
146 // FIXME: add ATS-specific fields here!
147};
148
149
150/**
151 * A neighbour that at least one communicator is connected to.
152 */
153struct Neighbour
154{
155
156 /**
157 * Which peer is this about?
158 */
159 struct GNUNET_PeerIdentity pid;
160
161 /**
162 * Head of list of messages pending for this neighbour.
163 */
164 struct PendingMessage *pending_msg_head;
165
166 /**
167 * Tail of list of messages pending for this neighbour.
168 */
169 struct PendingMessage *pending_msg_tail;
170
171 /**
172 * Head of DLL of queues to this peer.
173 */
174 struct Queue *queue_head;
175
176 /**
177 * Tail of DLL of queues to this peer.
178 */
179 struct Queue *queue_tail;
180
181 /**
182 * Quota at which CORE is allowed to transmit to this peer
183 * according to ATS.
184 *
185 * FIXME: not yet used, tricky to get right given multiple queues!
186 * (=> Idea: let ATS set a quota per queue and we add them up here?)
187 * FIXME: how do we set this value initially when we tell CORE?
188 * Options: start at a minimum value or at literally zero (before ATS?)
189 * (=> Current thought: clean would be zero!)
190 */
191 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
192
193};
194
195
196/**
197 * Transmission request from CORE that is awaiting delivery.
198 */
199struct PendingMessage
200{
201 /**
202 * Kept in a MDLL of messages for this @a target.
203 */
204 struct PendingMessage *next_neighbour;
205
206 /**
207 * Kept in a MDLL of messages for this @a target.
208 */
209 struct PendingMessage *prev_neighbour;
210
211 /**
212 * Kept in a MDLL of messages from this @a client.
213 */
214 struct PendingMessage *next_client;
215
216 /**
217 * Kept in a MDLL of messages from this @a client.
218 */
219 struct PendingMessage *prev_client;
220
221 /**
222 * Target of the request.
223 */
224 struct Neighbour *target;
225
226 /**
227 * Client that issued the transmission request.
228 */
229 struct TransportClient *client;
230
231 /**
232 * Size of the original message.
233 */
234 uint32_t bytes_msg;
235
236};
237
238
239/**
240 * One of the addresses of this peer.
241 */
242struct AddressListEntry
243{
244
245 /**
246 * Kept in a DLL.
247 */
248 struct AddressListEntry *next;
249
250 /**
251 * Kept in a DLL.
252 */
253 struct AddressListEntry *prev;
254
255 /**
256 * Which communicator provides this address?
257 */
258 struct TransportClient *tc;
259
260 /**
261 * The actual address.
262 */
263 const char *address;
264
265 /**
266 * What is a typical lifetime the communicator expects this
267 * address to have? (Always from now.)
268 */
269 struct GNUNET_TIME_Relative expiration;
270
271 /**
272 * Address identifier used by the communicator.
273 */
274 uint32_t aid;
275
276 /**
277 * Network type offered by this address.
278 */
279 enum GNUNET_ATS_Network_Type nt;
280
281};
282
283
284/**
285 * Client connected to the transport service.
286 */
287struct TransportClient
288{
289
290 /**
291 * Kept in a DLL.
292 */
293 struct TransportClient *next;
294
295 /**
296 * Kept in a DLL.
297 */
298 struct TransportClient *prev;
299
300 /**
301 * Handle to the client.
302 */
303 struct GNUNET_SERVICE_Client *client;
304
305 /**
306 * Message queue to the client.
307 */
308 struct GNUNET_MQ_Handle *mq;
309
310 /**
311 * What type of client is this?
312 */
313 enum ClientType type;
314
315 union
316 {
317
318 /**
319 * Information for @e type #CT_CORE.
320 */
321 struct {
322
323 /**
324 * Head of list of messages pending for this client.
325 */
326 struct PendingMessage *pending_msg_head;
327
328 /**
329 * Tail of list of messages pending for this client.
330 */
331 struct PendingMessage *pending_msg_tail;
332
333 } core;
334
335 /**
336 * Information for @e type #CT_MONITOR.
337 */
338 struct {
339
340 /**
341 * Peer identity to monitor the addresses of.
342 * Zero to monitor all neighbours. Valid if
343 * @e type is #CT_MONITOR.
344 */
345 struct GNUNET_PeerIdentity peer;
346
347 /**
348 * Is this a one-shot monitor?
349 */
350 int one_shot;
351
352 } monitor;
353
354
355 /**
356 * Information for @e type #CT_COMMUNICATOR.
357 */
358 struct {
359 /**
360 * If @e type is #CT_COMMUNICATOR, this communicator
361 * supports communicating using these addresses.
362 */
363 char *address_prefix;
364
365 /**
366 * Head of DLL of queues offered by this communicator.
367 */
368 struct Queue *queue_head;
369
370 /**
371 * Tail of DLL of queues offered by this communicator.
372 */
373 struct Queue *queue_tail;
374
375 /**
376 * Head of list of the addresses of this peer offered by this communicator.
377 */
378 struct AddressListEntry *addr_head;
379
380 /**
381 * Tail of list of the addresses of this peer offered by this communicator.
382 */
383 struct AddressListEntry *addr_tail;
384
385 } communicator;
386
387 } details;
388
389};
390
391
392/**
393 * Head of linked list of all clients to this service.
394 */
395static struct TransportClient *clients_head;
396
397/**
398 * Tail of linked list of all clients to this service.
399 */
400static struct TransportClient *clients_tail;
401
402/**
403 * Statistics handle.
404 */
405struct GNUNET_STATISTICS_Handle *GST_stats;
406
407/**
408 * Configuration handle.
409 */
410const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
411
412/**
413 * Our public key.
414 */
415struct GNUNET_PeerIdentity GST_my_identity;
416
417/**
418 * Our private key.
419 */
420struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
421
422/**
423 * Map from PIDs to `struct Neighbour` entries. A peer is
424 * a neighbour if we have an MQ to it from some communicator.
425 */
426static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
427
428
429/**
430 * Lookup neighbour record for peer @a pid.
431 *
432 * @param pid neighbour to look for
433 * @return NULL if we do not have this peer as a neighbour
434 */
435static struct Neighbour *
436lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
437{
438 return GNUNET_CONTAINER_multipeermap_get (neighbours,
439 pid);
440}
441
442
443/**
444 * Called whenever a client connects. Allocates our
445 * data structures associated with that client.
446 *
447 * @param cls closure, NULL
448 * @param client identification of the client
449 * @param mq message queue for the client
450 * @return our `struct TransportClient`
451 */
452static void *
453client_connect_cb (void *cls,
454 struct GNUNET_SERVICE_Client *client,
455 struct GNUNET_MQ_Handle *mq)
456{
457 struct TransportClient *tc;
458
459 tc = GNUNET_new (struct TransportClient);
460 tc->client = client;
461 tc->mq = mq;
462 GNUNET_CONTAINER_DLL_insert (clients_head,
463 clients_tail,
464 tc);
465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466 "Client %p connected\n",
467 tc);
468 return tc;
469}
470
471
472/**
473 * Called whenever a client is disconnected. Frees our
474 * resources associated with that client.
475 *
476 * @param cls closure, NULL
477 * @param client identification of the client
478 * @param app_ctx our `struct TransportClient`
479 */
480static void
481client_disconnect_cb (void *cls,
482 struct GNUNET_SERVICE_Client *client,
483 void *app_ctx)
484{
485 struct TransportClient *tc = app_ctx;
486
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488 "Client %p disconnected, cleaning up.\n",
489 tc);
490 GNUNET_CONTAINER_DLL_remove (clients_head,
491 clients_tail,
492 tc);
493 switch (tc->type)
494 {
495 case CT_NONE:
496 break;
497 case CT_CORE:
498 {
499 struct PendingMessage *pm;
500
501 while (NULL != (pm = tc->details.core.pending_msg_head))
502 {
503 GNUNET_CONTAINER_MDLL_remove (client,
504 tc->details.core.pending_msg_head,
505 tc->details.core.pending_msg_tail,
506 pm);
507 pm->client = NULL;
508 }
509 }
510 break;
511 case CT_MONITOR:
512 break;
513 case CT_COMMUNICATOR:
514 GNUNET_free (tc->details.communicator.address_prefix);
515 break;
516 }
517 GNUNET_free (tc);
518}
519
520
521/**
522 * Initialize a "CORE" client. We got a start message from this
523 * client, so add it to the list of clients for broadcasting of
524 * inbound messages.
525 *
526 * @param cls the client
527 * @param start the start message that was sent
528 */
529static void
530handle_client_start (void *cls,
531 const struct StartMessage *start)
532{
533 struct TransportClient *tc = cls;
534 uint32_t options;
535
536 options = ntohl (start->options);
537 if ( (0 != (1 & options)) &&
538 (0 !=
539 memcmp (&start->self,
540 &GST_my_identity,
541 sizeof (struct GNUNET_PeerIdentity)) ) )
542 {
543 /* client thinks this is a different peer, reject */
544 GNUNET_break (0);
545 GNUNET_SERVICE_client_drop (tc->client);
546 return;
547 }
548 if (CT_NONE != tc->type)
549 {
550 GNUNET_break (0);
551 GNUNET_SERVICE_client_drop (tc->client);
552 return;
553 }
554 tc->type = CT_CORE;
555 GNUNET_SERVICE_client_continue (tc->client);
556}
557
558
559/**
560 * Client asked for transmission to a peer. Process the request.
561 *
562 * @param cls the client
563 * @param obm the send message that was sent
564 */
565static int
566check_client_send (void *cls,
567 const struct OutboundMessage *obm)
568{
569 struct TransportClient *tc = cls;
570 uint16_t size;
571 const struct GNUNET_MessageHeader *obmm;
572
573 if (CT_CORE != tc->type)
574 {
575 GNUNET_break (0);
576 return GNUNET_SYSERR;
577 }
578 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
579 if (size < sizeof (struct GNUNET_MessageHeader))
580 {
581 GNUNET_break (0);
582 return GNUNET_SYSERR;
583 }
584 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
585 if (size != ntohs (obmm->size))
586 {
587 GNUNET_break (0);
588 return GNUNET_SYSERR;
589 }
590 return GNUNET_OK;
591}
592
593
594/**
595 * Send a response to the @a pm that we have processed a
596 * "send" request with status @a success. We
597 * transmitted @a bytes_physical on the actual wire.
598 * Sends a confirmation to the "core" client responsible
599 * for the original request and free's @a pm.
600 *
601 * @param pm handle to the original pending message
602 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
603 * for transmission failure
604 * @param bytes_physical amount of bandwidth consumed
605 */
606static void
607client_send_response (struct PendingMessage *pm,
608 int success,
609 uint32_t bytes_physical)
610{
611 struct TransportClient *tc = pm->client;
612 struct Neighbour *target = pm->target;
613 struct GNUNET_MQ_Envelope *env;
614 struct SendOkMessage *som;
615
616 if (NULL != tc)
617 {
618 env = GNUNET_MQ_msg (som,
619 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
620 som->success = htonl ((uint32_t) success);
621 som->bytes_msg = htonl (pm->bytes_msg);
622 som->bytes_physical = htonl (bytes_physical);
623 som->peer = target->pid;
624 GNUNET_MQ_send (tc->mq,
625 env);
626 GNUNET_CONTAINER_MDLL_remove (client,
627 tc->details.core.pending_msg_head,
628 tc->details.core.pending_msg_tail,
629 pm);
630 }
631 GNUNET_CONTAINER_MDLL_remove (neighbour,
632 target->pending_msg_head,
633 target->pending_msg_tail,
634 pm);
635 GNUNET_free (pm);
636}
637
638
639/**
640 * Client asked for transmission to a peer. Process the request.
641 *
642 * @param cls the client
643 * @param obm the send message that was sent
644 */
645static void
646handle_client_send (void *cls,
647 const struct OutboundMessage *obm)
648{
649 struct TransportClient *tc = cls;
650 struct PendingMessage *pm;
651 const struct GNUNET_MessageHeader *obmm;
652 struct Neighbour *target;
653 uint32_t bytes_msg;
654
655 GNUNET_assert (CT_CORE == tc->type);
656 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
657 bytes_msg = ntohs (obmm->size);
658 target = lookup_neighbour (&obm->peer);
659 if (NULL == target)
660 {
661 /* Failure: don't have this peer as a neighbour (anymore).
662 Might have gone down asynchronously, so this is NOT
663 a protocol violation by CORE. Still count the event,
664 as this should be rare. */
665 struct GNUNET_MQ_Envelope *env;
666 struct SendOkMessage *som;
667
668 env = GNUNET_MQ_msg (som,
669 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
670 som->success = htonl (GNUNET_SYSERR);
671 som->bytes_msg = htonl (bytes_msg);
672 som->bytes_physical = htonl (0);
673 som->peer = obm->peer;
674 GNUNET_MQ_send (tc->mq,
675 env);
676 GNUNET_SERVICE_client_continue (tc->client);
677 GNUNET_STATISTICS_update (GST_stats,
678 "# messages dropped (neighbour unknown)",
679 1,
680 GNUNET_NO);
681 return;
682 }
683 pm = GNUNET_new (struct PendingMessage);
684 pm->client = tc;
685 pm->target = target;
686 pm->bytes_msg = bytes_msg;
687 GNUNET_CONTAINER_MDLL_insert (neighbour,
688 target->pending_msg_head,
689 target->pending_msg_tail,
690 pm);
691 GNUNET_CONTAINER_MDLL_insert (client,
692 tc->details.core.pending_msg_head,
693 tc->details.core.pending_msg_tail,
694 pm);
695 // FIXME: do the work, continuation with:
696 client_send_response (pm,
697 GNUNET_NO,
698 0);
699}
700
701
702/**
703 * Communicator started. Test message is well-formed.
704 *
705 * @param cls the client
706 * @param cam the send message that was sent
707 */
708static int
709check_communicator_available (void *cls,
710 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
711{
712 struct TransportClient *tc = cls;
713 const char *addr;
714 uint16_t size;
715
716 if (CT_NONE != tc->type)
717 {
718 GNUNET_break (0);
719 return GNUNET_SYSERR;
720 }
721 tc->type = CT_COMMUNICATOR;
722 size = ntohs (cam->header.size) - sizeof (*cam);
723 if (0 == size)
724 return GNUNET_OK; /* receive-only communicator */
725 addr = (const char *) &cam[1];
726 if ('\0' != addr[size-1])
727 {
728 GNUNET_break (0);
729 return GNUNET_SYSERR;
730 }
731 return GNUNET_OK;
732}
733
734
735/**
736 * Communicator started. Process the request.
737 *
738 * @param cls the client
739 * @param cam the send message that was sent
740 */
741static void
742handle_communicator_available (void *cls,
743 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
744{
745 struct TransportClient *tc = cls;
746 uint16_t size;
747
748 size = ntohs (cam->header.size) - sizeof (*cam);
749 if (0 == size)
750 return; /* receive-only communicator */
751 tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]);
752 GNUNET_SERVICE_client_continue (tc->client);
753}
754
755
756/**
757 * Address of our peer added. Test message is well-formed.
758 *
759 * @param cls the client
760 * @param aam the send message that was sent
761 */
762static int
763check_add_address (void *cls,
764 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
765{
766 struct TransportClient *tc = cls;
767 const char *addr;
768 uint16_t size;
769
770 if (CT_COMMUNICATOR != tc->type)
771 {
772 GNUNET_break (0);
773 return GNUNET_SYSERR;
774 }
775 size = ntohs (aam->header.size) - sizeof (*aam);
776 if (0 == size)
777 {
778 GNUNET_break (0);
779 return GNUNET_SYSERR;
780 }
781 addr = (const char *) &aam[1];
782 if ('\0' != addr[size-1])
783 {
784 GNUNET_break (0);
785 return GNUNET_SYSERR;
786 }
787 return GNUNET_OK;
788}
789
790
791/**
792 * Address of our peer added. Process the request.
793 *
794 * @param cls the client
795 * @param aam the send message that was sent
796 */
797static void
798handle_add_address (void *cls,
799 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
800{
801 struct TransportClient *tc = cls;
802 struct AddressListEntry *ale;
803 size_t slen;
804
805 slen = ntohs (aam->header.size) - sizeof (*aam);
806 ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
807 ale->tc = tc;
808 ale->address = (const char *) &ale[1];
809 ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
810 ale->aid = aam->aid;
811 ale->nt = (enum GNUNET_ATS_Network_Type) ntohl (aam->nt);
812 memcpy (&ale[1],
813 &aam[1],
814 slen);
815 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
816 tc->details.communicator.addr_tail,
817 ale);
818 // FIXME: notify somebody?!
819 GNUNET_SERVICE_client_continue (tc->client);
820}
821
822
823/**
824 * Address of our peer deleted. Process the request.
825 *
826 * @param cls the client
827 * @param dam the send message that was sent
828 */
829static void
830handle_del_address (void *cls,
831 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
832{
833 struct TransportClient *tc = cls;
834
835 if (CT_COMMUNICATOR != tc->type)
836 {
837 GNUNET_break (0);
838 GNUNET_SERVICE_client_drop (tc->client);
839 return;
840 }
841 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
842 NULL != ale;
843 ale = ale->next)
844 {
845 if (dam->aid != ale->aid)
846 continue;
847 GNUNET_assert (ale->tc == tc);
848 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
849 tc->details.communicator.addr_tail,
850 ale);
851 // FIXME: notify somebody?
852 GNUNET_free (ale);
853 GNUNET_SERVICE_client_continue (tc->client);
854 }
855 GNUNET_break (0);
856 GNUNET_SERVICE_client_drop (tc->client);
857}
858
859
860/**
861 * Client notified us about transmission from a peer. Process the request.
862 *
863 * @param cls the client
864 * @param obm the send message that was sent
865 */
866static int
867check_incoming_msg (void *cls,
868 const struct GNUNET_TRANSPORT_IncomingMessage *im)
869{
870 struct TransportClient *tc = cls;
871 uint16_t size;
872 const struct GNUNET_MessageHeader *obmm;
873
874 if (CT_COMMUNICATOR != tc->type)
875 {
876 GNUNET_break (0);
877 return GNUNET_SYSERR;
878 }
879 size = ntohs (im->header.size) - sizeof (*im);
880 if (size < sizeof (struct GNUNET_MessageHeader))
881 {
882 GNUNET_break (0);
883 return GNUNET_SYSERR;
884 }
885 obmm = (const struct GNUNET_MessageHeader *) &im[1];
886 if (size != ntohs (obmm->size))
887 {
888 GNUNET_break (0);
889 return GNUNET_SYSERR;
890 }
891 return GNUNET_OK;
892}
893
894
895/**
896 * Incoming meessage. Process the request.
897 *
898 * @param cls the client
899 * @param im the send message that was received
900 */
901static void
902handle_incoming_msg (void *cls,
903 const struct GNUNET_TRANSPORT_IncomingMessage *im)
904{
905 struct TransportClient *tc = cls;
906
907 GNUNET_SERVICE_client_continue (tc->client);
908}
909
910
911/**
912 * New queue became available. Check message.
913 *
914 * @param cls the client
915 * @param aqm the send message that was sent
916 */
917static int
918check_add_queue_message (void *cls,
919 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
920{
921 struct TransportClient *tc = cls;
922 const char *addr;
923 uint16_t size;
924
925 if (CT_COMMUNICATOR != tc->type)
926 {
927 GNUNET_break (0);
928 return GNUNET_SYSERR;
929 }
930 size = ntohs (aqm->header.size) - sizeof (*aqm);
931 if (0 == size)
932 {
933 GNUNET_break (0);
934 return GNUNET_SYSERR;
935 }
936 addr = (const char *) &aqm[1];
937 if ('\0' != addr[size-1])
938 {
939 GNUNET_break (0);
940 return GNUNET_SYSERR;
941 }
942 return GNUNET_OK;
943}
944
945
946/**
947 * New queue became available. Process the request.
948 *
949 * @param cls the client
950 * @param aqm the send message that was sent
951 */
952static void
953handle_add_queue_message (void *cls,
954 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
955{
956 struct TransportClient *tc = cls;
957 struct Queue *queue;
958 struct Neighbour *neighbour;
959 const char *addr;
960 uint16_t addr_len;
961
962 neighbour = lookup_neighbour (&aqm->receiver);
963 if (NULL == neighbour)
964 {
965 neighbour = GNUNET_new (struct Neighbour);
966 neighbour->pid = aqm->receiver;
967 GNUNET_assert (GNUNET_OK ==
968 GNUNET_CONTAINER_multipeermap_put (neighbours,
969 &neighbour->pid,
970 neighbour,
971 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
972 // FIXME: notify ATS/COREs/monitors!
973 }
974 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
975 addr = (const char *) &aqm[1];
976
977 queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
978 queue->qid = aqm->qid;
979 queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
980 queue->tc = tc;
981 queue->neighbour = neighbour;
982 queue->address = (const char *) &queue[1];
983 memcpy (&queue[1],
984 addr,
985 addr_len);
986 GNUNET_CONTAINER_MDLL_insert (neighbour,
987 neighbour->queue_head,
988 neighbour->queue_tail,
989 queue);
990 GNUNET_CONTAINER_MDLL_insert (client,
991 tc->details.communicator.queue_head,
992 tc->details.communicator.queue_tail,
993 queue);
994 // FIXME: possibly transmit queued messages?
995 GNUNET_SERVICE_client_continue (tc->client);
996}
997
998
999/**
1000 * Release memory used by @a neighbour.
1001 *
1002 * @param neighbour neighbour entry to free
1003 */
1004static void
1005free_neighbour (struct Neighbour *neighbour)
1006{
1007 GNUNET_assert (NULL == neighbour->queue_head);
1008 GNUNET_assert (GNUNET_YES ==
1009 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1010 &neighbour->pid,
1011 neighbour));
1012 GNUNET_free (neighbour);
1013}
1014
1015
1016/**
1017 * Queue to a peer went down. Process the request.
1018 *
1019 * @param cls the client
1020 * @param dqm the send message that was sent
1021 */
1022static void
1023handle_del_queue_message (void *cls,
1024 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
1025{
1026 struct TransportClient *tc = cls;
1027
1028 if (CT_COMMUNICATOR != tc->type)
1029 {
1030 GNUNET_break (0);
1031 GNUNET_SERVICE_client_drop (tc->client);
1032 return;
1033 }
1034 for (struct Queue *queue = tc->details.communicator.queue_head;
1035 NULL != queue;
1036 queue = queue->next_client)
1037 {
1038 struct Neighbour *neighbour = queue->neighbour;
1039
1040 if ( (dqm->qid != queue->qid) ||
1041 (0 != memcmp (&dqm->receiver,
1042 &neighbour->pid,
1043 sizeof (struct GNUNET_PeerIdentity))) )
1044 continue;
1045 GNUNET_CONTAINER_MDLL_remove (neighbour,
1046 neighbour->queue_head,
1047 neighbour->queue_tail,
1048 queue);
1049 GNUNET_CONTAINER_MDLL_remove (client,
1050 tc->details.communicator.queue_head,
1051 tc->details.communicator.queue_tail,
1052 queue);
1053 GNUNET_free (queue);
1054 if (NULL == neighbour->queue_head)
1055 {
1056 // FIXME: notify cores/monitors!
1057 free_neighbour (neighbour);
1058 }
1059 GNUNET_SERVICE_client_continue (tc->client);
1060 return;
1061 }
1062 GNUNET_break (0);
1063 GNUNET_SERVICE_client_drop (tc->client);
1064}
1065
1066
1067/**
1068 * Message was transmitted. Process the request.
1069 *
1070 * @param cls the client
1071 * @param sma the send message that was sent
1072 */
1073static void
1074handle_send_message_ack (void *cls,
1075 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
1076{
1077 struct TransportClient *tc = cls;
1078
1079 if (CT_COMMUNICATOR != tc->type)
1080 {
1081 GNUNET_break (0);
1082 GNUNET_SERVICE_client_drop (tc->client);
1083 return;
1084 }
1085 GNUNET_SERVICE_client_continue (tc->client);
1086}
1087
1088
1089/**
1090 * Initialize a monitor client.
1091 *
1092 * @param cls the client
1093 * @param start the start message that was sent
1094 */
1095static void
1096handle_monitor_start (void *cls,
1097 const struct GNUNET_TRANSPORT_MonitorStart *start)
1098{
1099 struct TransportClient *tc = cls;
1100
1101 if (CT_NONE != tc->type)
1102 {
1103 GNUNET_break (0);
1104 GNUNET_SERVICE_client_drop (tc->client);
1105 return;
1106 }
1107 tc->type = CT_MONITOR;
1108 tc->details.monitor.peer = start->peer;
1109 tc->details.monitor.one_shot = ntohl (start->one_shot);
1110 // FIXME: do work!
1111 GNUNET_SERVICE_client_continue (tc->client);
1112}
1113
1114
1115/**
1116 * Free neighbour entry.
1117 *
1118 * @param cls NULL
1119 * @param pid unused
1120 * @param value a `struct Neighbour`
1121 * @return #GNUNET_OK (always)
1122 */
1123static int
1124free_neighbour_cb (void *cls,
1125 const struct GNUNET_PeerIdentity *pid,
1126 void *value)
1127{
1128 struct Neighbour *neighbour = value;
1129
1130 (void) cls;
1131 (void) pid;
1132 GNUNET_break (0); // should this ever happen?
1133 free_neighbour (neighbour);
1134
1135 return GNUNET_OK;
1136}
1137
1138
1139/**
1140 * Function called when the service shuts down. Unloads our plugins
1141 * and cancels pending validations.
1142 *
1143 * @param cls closure, unused
1144 */
1145static void
1146do_shutdown (void *cls)
1147{
1148 (void) cls;
1149
1150 if (NULL != GST_stats)
1151 {
1152 GNUNET_STATISTICS_destroy (GST_stats,
1153 GNUNET_NO);
1154 GST_stats = NULL;
1155 }
1156 if (NULL != GST_my_private_key)
1157 {
1158 GNUNET_free (GST_my_private_key);
1159 GST_my_private_key = NULL;
1160 }
1161 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1162 &free_neighbour_cb,
1163 NULL);
1164 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
1165}
1166
1167
1168/**
1169 * Initiate transport service.
1170 *
1171 * @param cls closure
1172 * @param c configuration to use
1173 * @param service the initialized service
1174 */
1175static void
1176run (void *cls,
1177 const struct GNUNET_CONFIGURATION_Handle *c,
1178 struct GNUNET_SERVICE_Handle *service)
1179{
1180 (void) cls;
1181 /* setup globals */
1182 GST_cfg = c;
1183 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
1184 GNUNET_YES);
1185 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
1186 if (NULL == GST_my_private_key)
1187 {
1188 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1189 _("Transport service is lacking key configuration settings. Exiting.\n"));
1190 GNUNET_SCHEDULER_shutdown ();
1191 return;
1192 }
1193 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
1194 &GST_my_identity.public_key);
1195 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1196 "My identity is `%s'\n",
1197 GNUNET_i2s_full (&GST_my_identity));
1198
1199 GST_stats = GNUNET_STATISTICS_create ("transport",
1200 GST_cfg);
1201 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1202 NULL);
1203 /* start subsystems */
1204}
1205
1206
1207/**
1208 * Define "main" method using service macro.
1209 */
1210GNUNET_SERVICE_MAIN
1211("transport",
1212 GNUNET_SERVICE_OPTION_NONE,
1213 &run,
1214 &client_connect_cb,
1215 &client_disconnect_cb,
1216 NULL,
1217 /* communication with core */
1218 GNUNET_MQ_hd_fixed_size (client_start,
1219 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
1220 struct StartMessage,
1221 NULL),
1222 GNUNET_MQ_hd_var_size (client_send,
1223 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
1224 struct OutboundMessage,
1225 NULL),
1226 /* communication with communicators */
1227 GNUNET_MQ_hd_var_size (communicator_available,
1228 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
1229 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
1230 NULL),
1231 GNUNET_MQ_hd_var_size (add_address,
1232 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
1233 struct GNUNET_TRANSPORT_AddAddressMessage,
1234 NULL),
1235 GNUNET_MQ_hd_fixed_size (del_address,
1236 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
1237 struct GNUNET_TRANSPORT_DelAddressMessage,
1238 NULL),
1239 GNUNET_MQ_hd_var_size (incoming_msg,
1240 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
1241 struct GNUNET_TRANSPORT_IncomingMessage,
1242 NULL),
1243 GNUNET_MQ_hd_var_size (add_queue_message,
1244 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
1245 struct GNUNET_TRANSPORT_AddQueueMessage,
1246 NULL),
1247 GNUNET_MQ_hd_fixed_size (del_queue_message,
1248 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
1249 struct GNUNET_TRANSPORT_DelQueueMessage,
1250 NULL),
1251 GNUNET_MQ_hd_fixed_size (send_message_ack,
1252 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
1253 struct GNUNET_TRANSPORT_SendMessageToAck,
1254 NULL),
1255 /* communication with monitors */
1256 GNUNET_MQ_hd_fixed_size (monitor_start,
1257 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
1258 struct GNUNET_TRANSPORT_MonitorStart,
1259 NULL),
1260 GNUNET_MQ_handler_end ());
1261
1262
1263/* end of file gnunet-service-transport.c */
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index 8c4f33fd0..2d9803651 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -602,7 +602,6 @@ notify_client_about_neighbour (void *cls,
602 cim.header.size = htons (sizeof (struct ConnectInfoMessage)); 602 cim.header.size = htons (sizeof (struct ConnectInfoMessage));
603 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 603 cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
604 cim.id = *peer; 604 cim.id = *peer;
605 cim.quota_in = bandwidth_in;
606 cim.quota_out = bandwidth_out; 605 cim.quota_out = bandwidth_out;
607 unicast (tc, 606 unicast (tc,
608 &cim.header, 607 &cim.header,
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c
index 3965bc13e..68344bcf4 100644
--- a/src/transport/gnunet-service-transport_neighbours.c
+++ b/src/transport/gnunet-service-transport_neighbours.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/ 17*/
@@ -571,7 +571,6 @@ neighbours_connect_notification (struct NeighbourMapEntry *n)
571 connect_msg->header.size = htons (sizeof(buf)); 571 connect_msg->header.size = htons (sizeof(buf));
572 connect_msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 572 connect_msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
573 connect_msg->id = n->id; 573 connect_msg->id = n->id;
574 connect_msg->quota_in = n->primary_address.bandwidth_in;
575 connect_msg->quota_out = bandwidth_min; 574 connect_msg->quota_out = bandwidth_min;
576 GST_clients_broadcast (&connect_msg->header, 575 GST_clients_broadcast (&connect_msg->header,
577 GNUNET_NO); 576 GNUNET_NO);
diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c
index fed509fe1..6c589307b 100644
--- a/src/transport/gnunet-transport.c
+++ b/src/transport/gnunet-transport.c
@@ -489,7 +489,7 @@ operation_timeout (void *cls)
489 } 489 }
490 FPRINTF (stdout, 490 FPRINTF (stdout,
491 "%s", 491 "%s",
492 _("Failed to list connections, timeout occured\n")); 492 _("Failed to list connections, timeout occurred\n"));
493 GNUNET_SCHEDULER_shutdown (); 493 GNUNET_SCHEDULER_shutdown ();
494 ret = 1; 494 ret = 1;
495 return; 495 return;
diff --git a/src/transport/transport-testing.c b/src/transport/transport-testing.c
index e3d4b7a9b..4295446d2 100644
--- a/src/transport/transport-testing.c
+++ b/src/transport/transport-testing.c
@@ -374,8 +374,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth
374 const char *cfgname, 374 const char *cfgname,
375 int peer_id, 375 int peer_id,
376 const struct GNUNET_MQ_MessageHandler *handlers, 376 const struct GNUNET_MQ_MessageHandler *handlers,
377 GNUNET_TRANSPORT_NotifyConnecT nc, 377 GNUNET_TRANSPORT_NotifyConnect nc,
378 GNUNET_TRANSPORT_NotifyDisconnecT nd, 378 GNUNET_TRANSPORT_NotifyDisconnect nd,
379 void *cb_cls, 379 void *cb_cls,
380 GNUNET_SCHEDULER_TaskCallback start_cb, 380 GNUNET_SCHEDULER_TaskCallback start_cb,
381 void *start_cb_cls) 381 void *start_cb_cls)
diff --git a/src/transport/transport-testing.h b/src/transport/transport-testing.h
index a4cfd89f6..3a638580d 100644
--- a/src/transport/transport-testing.h
+++ b/src/transport/transport-testing.h
@@ -115,12 +115,12 @@ struct GNUNET_TRANSPORT_TESTING_PeerContext
115 /** 115 /**
116 * Notify connect callback 116 * Notify connect callback
117 */ 117 */
118 GNUNET_TRANSPORT_NotifyConnecT nc; 118 GNUNET_TRANSPORT_NotifyConnect nc;
119 119
120 /** 120 /**
121 * Notify disconnect callback 121 * Notify disconnect callback
122 */ 122 */
123 GNUNET_TRANSPORT_NotifyDisconnecT nd; 123 GNUNET_TRANSPORT_NotifyDisconnect nd;
124 124
125 /** 125 /**
126 * Startup completed callback 126 * Startup completed callback
@@ -291,8 +291,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth
291 const char *cfgname, 291 const char *cfgname,
292 int peer_id, 292 int peer_id,
293 const struct GNUNET_MQ_MessageHandler *handlers, 293 const struct GNUNET_MQ_MessageHandler *handlers,
294 GNUNET_TRANSPORT_NotifyConnecT nc, 294 GNUNET_TRANSPORT_NotifyConnect nc,
295 GNUNET_TRANSPORT_NotifyDisconnecT nd, 295 GNUNET_TRANSPORT_NotifyDisconnect nd,
296 void *cb_cls, 296 void *cb_cls,
297 GNUNET_SCHEDULER_TaskCallback start_cb, 297 GNUNET_SCHEDULER_TaskCallback start_cb,
298 void *start_cb_cls); 298 void *start_cb_cls);
diff --git a/src/transport/transport.h b/src/transport/transport.h
index e68536bcc..423d3cefa 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/ 17*/
@@ -94,7 +94,7 @@ struct StartMessage
94 94
95 /** 95 /**
96 * 0: no options 96 * 0: no options
97 * 1: The 'self' field should be checked 97 * 1: The @e self field should be checked
98 * 2: this client is interested in payload traffic 98 * 2: this client is interested in payload traffic
99 */ 99 */
100 uint32_t options; 100 uint32_t options;
@@ -121,19 +121,14 @@ struct ConnectInfoMessage
121 struct GNUNET_MessageHeader header; 121 struct GNUNET_MessageHeader header;
122 122
123 /** 123 /**
124 * Identity of the new neighbour. 124 * Current outbound quota for this peer
125 */
126 struct GNUNET_PeerIdentity id;
127
128 /**
129 * Current inbound quota for this peer
130 */ 125 */
131 struct GNUNET_BANDWIDTH_Value32NBO quota_in; 126 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
132 127
133 /** 128 /**
134 * Current outbound quota for this peer 129 * Identity of the new neighbour.
135 */ 130 */
136 struct GNUNET_BANDWIDTH_Value32NBO quota_out; 131 struct GNUNET_PeerIdentity id;
137}; 132};
138 133
139 134
@@ -404,6 +399,7 @@ struct ValidationIterateResponseMessage
404 struct GNUNET_TIME_AbsoluteNBO next_validation; 399 struct GNUNET_TIME_AbsoluteNBO next_validation;
405}; 400};
406 401
402
407/** 403/**
408 * Message from the library to the transport service 404 * Message from the library to the transport service
409 * asking for binary addresses known for a peer. 405 * asking for binary addresses known for a peer.
@@ -654,6 +650,22 @@ struct TransportPluginMonitorMessage
654/* *********************** TNG messages ***************** */ 650/* *********************** TNG messages ***************** */
655 651
656/** 652/**
653 * Communicator goes online. Note which addresses it can
654 * work with.
655 */
656struct GNUNET_TRANSPORT_CommunicatorAvailableMessage
657{
658
659 /**
660 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR.
661 */
662 struct GNUNET_MessageHeader header;
663
664 /* Followed by the address prefix of the communicator */
665};
666
667
668/**
657 * Add address to the list. 669 * Add address to the list.
658 */ 670 */
659struct GNUNET_TRANSPORT_AddAddressMessage 671struct GNUNET_TRANSPORT_AddAddressMessage
@@ -678,7 +690,7 @@ struct GNUNET_TRANSPORT_AddAddressMessage
678 * An `enum GNUNET_ATS_Network_Type` in NBO. 690 * An `enum GNUNET_ATS_Network_Type` in NBO.
679 */ 691 */
680 uint32_t nt; 692 uint32_t nt;
681 693
682 /* followed by UTF-8 encoded, 0-terminated human-readable address */ 694 /* followed by UTF-8 encoded, 0-terminated human-readable address */
683}; 695};
684 696
@@ -717,12 +729,12 @@ struct GNUNET_TRANSPORT_IncomingMessage
717 * Do we use flow control or not? 729 * Do we use flow control or not?
718 */ 730 */
719 uint32_t fc_on GNUNET_PACKED; 731 uint32_t fc_on GNUNET_PACKED;
720 732
721 /** 733 /**
722 * 64-bit number to identify the matching ACK. 734 * 64-bit number to identify the matching ACK.
723 */ 735 */
724 uint64_t fc_id GNUNET_PACKED; 736 uint64_t fc_id GNUNET_PACKED;
725 737
726 /** 738 /**
727 * Sender identifier. 739 * Sender identifier.
728 */ 740 */
@@ -748,12 +760,12 @@ struct GNUNET_TRANSPORT_IncomingMessageAck
748 * Reserved (0) 760 * Reserved (0)
749 */ 761 */
750 uint32_t reserved GNUNET_PACKED; 762 uint32_t reserved GNUNET_PACKED;
751 763
752 /** 764 /**
753 * Which message is being ACKed? 765 * Which message is being ACKed?
754 */ 766 */
755 uint64_t fc_id GNUNET_PACKED; 767 uint64_t fc_id GNUNET_PACKED;
756 768
757 /** 769 /**
758 * Sender identifier of the original message. 770 * Sender identifier of the original message.
759 */ 771 */
@@ -769,7 +781,7 @@ struct GNUNET_TRANSPORT_AddQueueMessage
769{ 781{
770 782
771 /** 783 /**
772 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE. 784 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP.
773 */ 785 */
774 struct GNUNET_MessageHeader header; 786 struct GNUNET_MessageHeader header;
775 787
@@ -787,7 +799,9 @@ struct GNUNET_TRANSPORT_AddQueueMessage
787 * An `enum GNUNET_ATS_Network_Type` in NBO. 799 * An `enum GNUNET_ATS_Network_Type` in NBO.
788 */ 800 */
789 uint32_t nt; 801 uint32_t nt;
790 802
803 // FIXME: add MTU?
804
791 /* followed by UTF-8 encoded, 0-terminated human-readable address */ 805 /* followed by UTF-8 encoded, 0-terminated human-readable address */
792}; 806};
793 807
@@ -799,7 +813,7 @@ struct GNUNET_TRANSPORT_DelQueueMessage
799{ 813{
800 814
801 /** 815 /**
802 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE. 816 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN.
803 */ 817 */
804 struct GNUNET_MessageHeader header; 818 struct GNUNET_MessageHeader header;
805 819
@@ -828,9 +842,9 @@ struct GNUNET_TRANSPORT_CreateQueue
828 struct GNUNET_MessageHeader header; 842 struct GNUNET_MessageHeader header;
829 843
830 /** 844 /**
831 * Always zero. 845 * Unique ID for the request.
832 */ 846 */
833 uint32_t reserved GNUNET_PACKED; 847 uint32_t request_id GNUNET_PACKED;
834 848
835 /** 849 /**
836 * Receiver that can be addressed via the queue. 850 * Receiver that can be addressed via the queue.
@@ -842,6 +856,24 @@ struct GNUNET_TRANSPORT_CreateQueue
842 856
843 857
844/** 858/**
859 * Transport tells communicator that it wants a new queue.
860 */
861struct GNUNET_TRANSPORT_CreateQueueResponse
862{
863
864 /**
865 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK or #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL.
866 */
867 struct GNUNET_MessageHeader header;
868
869 /**
870 * Unique ID for the request.
871 */
872 uint32_t request_id GNUNET_PACKED;
873};
874
875
876/**
845 * Inform communicator about transport's desire to send a message. 877 * Inform communicator about transport's desire to send a message.
846 */ 878 */
847struct GNUNET_TRANSPORT_SendMessageTo 879struct GNUNET_TRANSPORT_SendMessageTo
@@ -861,7 +893,7 @@ struct GNUNET_TRANSPORT_SendMessageTo
861 * Message ID, used for flow control. 893 * Message ID, used for flow control.
862 */ 894 */
863 uint64_t mid GNUNET_PACKED; 895 uint64_t mid GNUNET_PACKED;
864 896
865 /** 897 /**
866 * Receiver identifier. 898 * Receiver identifier.
867 */ 899 */
@@ -891,7 +923,7 @@ struct GNUNET_TRANSPORT_SendMessageToAck
891 * Message ID of the original message. 923 * Message ID of the original message.
892 */ 924 */
893 uint64_t mid GNUNET_PACKED; 925 uint64_t mid GNUNET_PACKED;
894 926
895 /** 927 /**
896 * Receiver identifier. 928 * Receiver identifier.
897 */ 929 */
@@ -901,6 +933,89 @@ struct GNUNET_TRANSPORT_SendMessageToAck
901 933
902 934
903 935
936
937/**
938 * Request to start monitoring.
939 */
940struct GNUNET_TRANSPORT_MonitorStart
941{
942
943 /**
944 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START.
945 */
946 struct GNUNET_MessageHeader header;
947
948 /**
949 * #GNUNET_YES for one-shot montoring, #GNUNET_NO for continuous monitoring.
950 */
951 uint32_t one_shot;
952
953 /**
954 * Target identifier to monitor, all zeros for "all peers".
955 */
956 struct GNUNET_PeerIdentity peer;
957
958};
959
960
961/**
962 * Monitoring data.
963 */
964struct GNUNET_TRANSPORT_MonitorData
965{
966
967 /**
968 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA.
969 */
970 struct GNUNET_MessageHeader header;
971
972 /**
973 * Network type (an `enum GNUNET_ATS_Network_Type` in NBO).
974 */
975 uint32_t nt GNUNET_PACKED;
976
977 /**
978 * Target identifier.
979 */
980 struct GNUNET_PeerIdentity peer;
981
982 /**
983 * @deprecated To be discussed if we keep these...
984 */
985 struct GNUNET_TIME_AbsoluteNBO last_validation;
986 struct GNUNET_TIME_AbsoluteNBO valid_until;
987 struct GNUNET_TIME_AbsoluteNBO next_validation;
988
989 /**
990 * Current round-trip time estimate.
991 */
992 struct GNUNET_TIME_RelativeNBO rtt;
993
994 /**
995 * Is inbound (in NBO).
996 */
997 uint32_t is_inbound GNUNET_PACKED;
998
999 /**
1000 * Messages pending (in NBO).
1001 */
1002 uint32_t num_msg_pending GNUNET_PACKED;
1003
1004 /**
1005 * Bytes pending (in NBO).
1006 */
1007 uint32_t num_bytes_pending GNUNET_PACKED;
1008
1009 /* Followed by 0-terminated address of the peer
1010 (TODO: do we allow no address? If so,
1011 adjust transport_api2_monitor!) */
1012
1013};
1014
1015
1016
1017
1018
904GNUNET_NETWORK_STRUCT_END 1019GNUNET_NETWORK_STRUCT_END
905 1020
906/* end of transport.h */ 1021/* end of transport.h */
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index d446516bd..3a68c6eba 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -90,6 +90,11 @@ struct AckPending
90 struct AckPending *prev; 90 struct AckPending *prev;
91 91
92 /** 92 /**
93 * Communicator this entry belongs to.
94 */
95 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
96
97 /**
93 * Which peer is this about? 98 * Which peer is this about?
94 */ 99 */
95 struct GNUNET_PeerIdentity receiver; 100 struct GNUNET_PeerIdentity receiver;
@@ -134,17 +139,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
134 /** 139 /**
135 * DLL of messages awaiting transmission confirmation (ack). 140 * DLL of messages awaiting transmission confirmation (ack).
136 */ 141 */
137 struct AckPending *ac_tail; 142 struct AckPending *ap_tail;
138 143
139 /** 144 /**
140 * DLL of queues we offer. 145 * DLL of queues we offer.
141 */ 146 */
142 struct QueueHandle *queue_head; 147 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
143 148
144 /** 149 /**
145 * DLL of queues we offer. 150 * DLL of queues we offer.
146 */ 151 */
147 struct QueueHandle *queue_tail; 152 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
148 153
149 /** 154 /**
150 * Our configuration. 155 * Our configuration.
@@ -152,9 +157,14 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
152 const struct GNUNET_CONFIGURATION_Handle *cfg; 157 const struct GNUNET_CONFIGURATION_Handle *cfg;
153 158
154 /** 159 /**
155 * Name of the communicator. 160 * Config section to use.
161 */
162 const char *config_section;
163
164 /**
165 * Address prefix to use.
156 */ 166 */
157 const char *name; 167 const char *addr_prefix;
158 168
159 /** 169 /**
160 * Function to call when the transport service wants us to initiate 170 * Function to call when the transport service wants us to initiate
@@ -168,6 +178,11 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
168 void *mq_init_cls; 178 void *mq_init_cls;
169 179
170 /** 180 /**
181 * Queue to talk to the transport service.
182 */
183 struct GNUNET_MQ_Handle *mq;
184
185 /**
171 * Maximum permissable queue length. 186 * Maximum permissable queue length.
172 */ 187 */
173 unsigned long long max_queue_length; 188 unsigned long long max_queue_length;
@@ -202,6 +217,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
202 */ 217 */
203struct GNUNET_TRANSPORT_QueueHandle 218struct GNUNET_TRANSPORT_QueueHandle
204{ 219{
220
221 /**
222 * Kept in a DLL.
223 */
224 struct GNUNET_TRANSPORT_QueueHandle *next;
225
226 /**
227 * Kept in a DLL.
228 */
229 struct GNUNET_TRANSPORT_QueueHandle *prev;
230
205 /** 231 /**
206 * Handle this queue belongs to. 232 * Handle this queue belongs to.
207 */ 233 */
@@ -308,7 +334,7 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
308 env = GNUNET_MQ_msg_extra (aam, 334 env = GNUNET_MQ_msg_extra (aam,
309 strlen (ai->address) + 1, 335 strlen (ai->address) + 1,
310 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); 336 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
311 aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration); 337 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
312 aam->nt = htonl ((uint32_t) ai->nt); 338 aam->nt = htonl ((uint32_t) ai->nt);
313 memcpy (&aam[1], 339 memcpy (&aam[1],
314 ai->address, 340 ai->address,
@@ -334,7 +360,7 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
334 return; 360 return;
335 env = GNUNET_MQ_msg (dam, 361 env = GNUNET_MQ_msg (dam,
336 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); 362 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
337 dam.aid = htonl (ai->aid); 363 dam->aid = htonl (ai->aid);
338 GNUNET_MQ_send (ai->ch->mq, 364 GNUNET_MQ_send (ai->ch->mq,
339 env); 365 env);
340} 366}
@@ -352,18 +378,18 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
352 struct GNUNET_MQ_Envelope *env; 378 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm; 379 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354 380
355 if (NULL == ai->ch->mq) 381 if (NULL == qh->ch->mq)
356 return; 382 return;
357 env = GNUNET_MQ_msg_extra (aqm, 383 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1, 384 strlen (qh->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); 385 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
360 aqm.receiver = qh->peer; 386 aqm->receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt); 387 aqm->nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid); 388 aqm->qid = htonl (qh->queue_id);
363 memcpy (&aqm[1], 389 memcpy (&aqm[1],
364 ai->address, 390 qh->address,
365 strlen (ai->address) + 1); 391 strlen (qh->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq, 392 GNUNET_MQ_send (qh->ch->mq,
367 env); 393 env);
368} 394}
369 395
@@ -380,13 +406,13 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
380 struct GNUNET_MQ_Envelope *env; 406 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm; 407 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382 408
383 if (NULL == ai->ch->mq) 409 if (NULL == qh->ch->mq)
384 return; 410 return;
385 env = GNUNET_MQ_msg (dqm, 411 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); 412 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
387 dqm.qid = htonl (qh->qid); 413 dqm->qid = htonl (qh->queue_id);
388 dqm.receiver = qh->peer; 414 dqm->receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq, 415 GNUNET_MQ_send (qh->ch->mq,
390 env); 416 env);
391} 417}
392 418
@@ -444,7 +470,8 @@ error_handler (void *cls,
444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 470 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445 471
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 472 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n"); 473 "MQ failure %d, reconnecting to transport service.\n",
474 error);
448 disconnect (ch); 475 disconnect (ch);
449 /* TODO: maybe do this with exponential backoff/delay */ 476 /* TODO: maybe do this with exponential backoff/delay */
450 reconnect (ch); 477 reconnect (ch);
@@ -460,7 +487,7 @@ error_handler (void *cls,
460 */ 487 */
461static void 488static void
462handle_incoming_ack (void *cls, 489handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) 490 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464{ 491{
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 492 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
466 493
@@ -470,7 +497,7 @@ handle_incoming_ack (void *cls,
470 { 497 {
471 if ( (fc->id == incoming_ack->fc_id) && 498 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender, 499 (0 == memcmp (&fc->sender,
473 incoming_ack->sender, 500 &incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) ) 501 sizeof (struct GNUNET_PeerIdentity))) )
475 { 502 {
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head, 503 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
@@ -499,11 +526,12 @@ handle_incoming_ack (void *cls,
499 */ 526 */
500static int 527static int
501check_create_queue (void *cls, 528check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq) 529 const struct GNUNET_TRANSPORT_CreateQueue *cq)
503{ 530{
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq); 531 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1]; 532 const char *addr = (const char *) &cq[1];
506 533
534 (void) cls;
507 if ( (0 == len) || 535 if ( (0 == len) ||
508 ('\0' != addr[len-1]) ) 536 ('\0' != addr[len-1]) )
509 { 537 {
@@ -522,11 +550,13 @@ check_create_queue (void *cls,
522 */ 550 */
523static void 551static void
524handle_create_queue (void *cls, 552handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq) 553 const struct GNUNET_TRANSPORT_CreateQueue *cq)
526{ 554{
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 555 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1]; 556 const char *addr = (const char *) &cq[1];
529 557 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
558 struct GNUNET_MQ_Envelope *env;
559
530 if (GNUNET_OK != 560 if (GNUNET_OK !=
531 ch->mq_init (ch->mq_init_cls, 561 ch->mq_init (ch->mq_init_cls,
532 &cq->receiver, 562 &cq->receiver,
@@ -535,8 +565,17 @@ handle_create_queue (void *cls,
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 565 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n", 566 "Address `%s' invalid for this communicator\n",
537 addr); 567 addr);
538 // TODO: do we notify the transport!? 568 env = GNUNET_MQ_msg (cqr,
569 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
539 } 570 }
571 else
572 {
573 env = GNUNET_MQ_msg (cqr,
574 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
575 }
576 cqr->request_id = cq->request_id;
577 GNUNET_MQ_send (ch->mq,
578 env);
540} 579}
541 580
542 581
@@ -550,11 +589,12 @@ handle_create_queue (void *cls,
550 */ 589 */
551static int 590static int
552check_send_msg (void *cls, 591check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt) 592 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
554{ 593{
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt); 594 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; 595 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557 596
597 (void) cls;
558 if (ntohs (mh->size) != len) 598 if (ntohs (mh->size) != len)
559 { 599 {
560 GNUNET_break (0); 600 GNUNET_break (0);
@@ -584,9 +624,9 @@ send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
584 624
585 env = GNUNET_MQ_msg (ack, 625 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); 626 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK); 627 ack->status = htonl (status);
588 ack->mid = ap->mid; 628 ack->mid = mid;
589 ack->receiver = ap->receiver; 629 ack->receiver = *receiver;
590 GNUNET_MQ_send (ch->mq, 630 GNUNET_MQ_send (ch->mq,
591 env); 631 env);
592} 632}
@@ -623,18 +663,18 @@ send_ack_cb (void *cls)
623 */ 663 */
624static void 664static void
625handle_send_msg (void *cls, 665handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt) 666 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
627{ 667{
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 668 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh; 669 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env; 670 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap; 671 struct AckPending *ap;
632 struct QueueHandle *qh; 672 struct GNUNET_TRANSPORT_QueueHandle *qh;
633 673
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next) 674 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) && 675 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer, 676 (0 == memcmp (&qh->peer,
637 &smt->target, 677 &smt->receiver,
638 sizeof (struct GNUNET_PeerIdentity))) ) 678 sizeof (struct GNUNET_PeerIdentity))) )
639 break; 679 break;
640 if (NULL == qh) 680 if (NULL == qh)
@@ -653,7 +693,7 @@ handle_send_msg (void *cls,
653 ap->receiver = smt->receiver; 693 ap->receiver = smt->receiver;
654 ap->mid = smt->mid; 694 ap->mid = smt->mid;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head, 695 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656 cp->ap_tail, 696 ch->ap_tail,
657 ap); 697 ap);
658 mh = (const struct GNUNET_MessageHeader *) &smt[1]; 698 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh); 699 env = GNUNET_MQ_msg_copy (mh);
@@ -679,7 +719,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
679 struct GNUNET_TRANSPORT_IncomingMessageAck, 719 struct GNUNET_TRANSPORT_IncomingMessageAck,
680 ch), 720 ch),
681 GNUNET_MQ_hd_var_size (create_queue, 721 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, 722 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
683 struct GNUNET_TRANSPORT_CreateQueue, 723 struct GNUNET_TRANSPORT_CreateQueue,
684 ch), 724 ch),
685 GNUNET_MQ_hd_var_size (send_msg, 725 GNUNET_MQ_hd_var_size (send_msg,
@@ -688,12 +728,24 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
688 ch), 728 ch),
689 GNUNET_MQ_handler_end() 729 GNUNET_MQ_handler_end()
690 }; 730 };
731 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
732 struct GNUNET_MQ_Envelope *env;
691 733
692 ch->mq = GNUNET_CLIENT_connect (cfg, 734 ch->mq = GNUNET_CLIENT_connect (ch->cfg,
693 "transport", 735 "transport",
694 handlers, 736 handlers,
695 &error_handler, 737 &error_handler,
696 ch); 738 ch);
739 if (NULL == ch->mq)
740 return;
741 env = GNUNET_MQ_msg_extra (cam,
742 strlen (ch->addr_prefix) + 1,
743 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
744 memcpy (&cam[1],
745 ch->addr_prefix,
746 strlen (ch->addr_prefix) + 1);
747 GNUNET_MQ_send (ch->mq,
748 env);
697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; 749 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
698 NULL != ai; 750 NULL != ai;
699 ai = ai->next) 751 ai = ai->next)
@@ -709,7 +761,9 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
709 * Connect to the transport service. 761 * Connect to the transport service.
710 * 762 *
711 * @param cfg configuration to use 763 * @param cfg configuration to use
712 * @param name name of the communicator that is connecting 764 * @param config_section section of the configuration to use for options
765 * @param addr_prefix address prefix for addresses supported by this
766 * communicator, could be NULL for incoming-only communicators
713 * @param mtu maximum message size supported by communicator, 0 if 767 * @param mtu maximum message size supported by communicator, 0 if
714 * sending is not supported, SIZE_MAX for no MTU 768 * sending is not supported, SIZE_MAX for no MTU
715 * @param mq_init function to call to initialize a message queue given 769 * @param mq_init function to call to initialize a message queue given
@@ -720,7 +774,8 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
720 */ 774 */
721struct GNUNET_TRANSPORT_CommunicatorHandle * 775struct GNUNET_TRANSPORT_CommunicatorHandle *
722GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 776GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
723 const char *name, 777 const char *config_section,
778 const char *addr_prefix,
724 size_t mtu, 779 size_t mtu,
725 GNUNET_TRANSPORT_CommunicatorMqInit mq_init, 780 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
726 void *mq_init_cls) 781 void *mq_init_cls)
@@ -729,14 +784,15 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
729 784
730 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); 785 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
731 ch->cfg = cfg; 786 ch->cfg = cfg;
732 ch->name = name; 787 ch->config_section = config_section;
788 ch->addr_prefix = addr_prefix;
733 ch->mtu = mtu; 789 ch->mtu = mtu;
734 ch->mq_init = mq_init; 790 ch->mq_init = mq_init;
735 ch->mq_init_cls = mq_init_cls; 791 ch->mq_init_cls = mq_init_cls;
736 reconnect (ch); 792 reconnect (ch);
737 if (GNUNET_OK != 793 if (GNUNET_OK !=
738 GNUNET_CONFIGURATION_get_value_number (cfg, 794 GNUNET_CONFIGURATION_get_value_number (cfg,
739 name, 795 config_section,
740 "MAX_QUEUE_LENGTH", 796 "MAX_QUEUE_LENGTH",
741 &ch->max_queue_length)) 797 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 798 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
@@ -798,32 +854,15 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
798 struct GNUNET_TRANSPORT_IncomingMessage *im; 854 struct GNUNET_TRANSPORT_IncomingMessage *im;
799 uint16_t msize; 855 uint16_t msize;
800 856
801 if (NULL == ai->ch->mq) 857 if (NULL == ch->mq)
802 return GNUNET_SYSERR; 858 return GNUNET_SYSERR;
803 if (NULL != cb) 859 if ( (NULL == cb) &&
860 (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
804 { 861 {
805 struct FlowControl *fc; 862 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
806 863 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
807 im->fc_on = htonl (GNUNET_YES); 864 ch->max_queue_length);
808 im->fc_id = ai->ch->fc_gen++; 865 return GNUNET_NO;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
811 fc->id = im->fc_id;
812 fc->cb = cb;
813 fc->cb_cls = cb_cls;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815 ch->fc_tail,
816 fc);
817 }
818 else
819 {
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
825 return GNUNET_NO;
826 }
827 } 866 }
828 867
829 msize = ntohs (msg->size); 868 msize = ntohs (msg->size);
@@ -839,7 +878,22 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
839 memcpy (&im[1], 878 memcpy (&im[1],
840 msg, 879 msg,
841 msize); 880 msize);
842 GNUNET_MQ_send (ai->ch->mq, 881 if (NULL != cb)
882 {
883 struct FlowControl *fc;
884
885 im->fc_on = htonl (GNUNET_YES);
886 im->fc_id = ch->fc_gen++;
887 fc = GNUNET_new (struct FlowControl);
888 fc->sender = *sender;
889 fc->id = im->fc_id;
890 fc->cb = cb;
891 fc->cb_cls = cb_cls;
892 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
893 ch->fc_tail,
894 fc);
895 }
896 GNUNET_MQ_send (ch->mq,
843 env); 897 env);
844 return GNUNET_OK; 898 return GNUNET_OK;
845} 899}
@@ -927,9 +981,9 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH
927 ai->address = GNUNET_strdup (address); 981 ai->address = GNUNET_strdup (address);
928 ai->nt = nt; 982 ai->nt = nt;
929 ai->expiration = expiration; 983 ai->expiration = expiration;
930 ai->aid = handle->aid_gen++; 984 ai->aid = ch->aid_gen++;
931 GNUNET_CONTAINER_DLL_insert (handle->ai_head, 985 GNUNET_CONTAINER_DLL_insert (ch->ai_head,
932 handle->ai_tail, 986 ch->ai_tail,
933 ai); 987 ai);
934 send_add_address (ai); 988 send_add_address (ai);
935 return ai; 989 return ai;
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c
new file mode 100644
index 000000000..78d8dcce0
--- /dev/null
+++ b/src/transport/transport_api2_core.c
@@ -0,0 +1,938 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2013, 2016, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/transport_api_core.c
21 * @brief library to access the transport service for message exchange
22 * @author Christian Grothoff
23 */
24#include "platform.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_constants.h"
27#include "gnunet_arm_service.h"
28#include "gnunet_hello_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_transport_core_service.h"
31#include "transport.h"
32
33#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
34
35/**
36 * How large to start with for the hashmap of neighbours.
37 */
38#define STARTING_NEIGHBOURS_SIZE 16
39
40
41/**
42 * Entry in hash table of all of our current (connected) neighbours.
43 */
44struct Neighbour
45{
46
47 /**
48 * Identity of this neighbour.
49 */
50 struct GNUNET_PeerIdentity id;
51
52 /**
53 * Overall transport handle.
54 */
55 struct GNUNET_TRANSPORT_CoreHandle *h;
56
57 /**
58 * Active message queue for the peer.
59 */
60 struct GNUNET_MQ_Handle *mq;
61
62 /**
63 * Envelope with the message we are currently transmitting (or NULL).
64 */
65 struct GNUNET_MQ_Envelope *env;
66
67 /**
68 * Closure for @e mq handlers.
69 */
70 void *handlers_cls;
71
72 /**
73 * Entry in our readyness heap (which is sorted by @e next_ready
74 * value). NULL if there is no pending transmission request for
75 * this neighbour or if we're waiting for @e is_ready to become
76 * true AFTER the @e out_tracker suggested that this peer's quota
77 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
78 * we should immediately go back into the heap).
79 */
80 struct GNUNET_CONTAINER_HeapNode *hn;
81
82 /**
83 * Task to trigger MQ when we have enough bandwidth for the
84 * next transmission.
85 */
86 struct GNUNET_SCHEDULER_Task *timeout_task;
87
88 /**
89 * Outbound bandwidh tracker.
90 */
91 struct GNUNET_BANDWIDTH_Tracker out_tracker;
92
93 /**
94 * Sending consumed more bytes on wire than payload was announced
95 * This overhead is added to the delay of next sending operation
96 */
97 unsigned long long traffic_overhead;
98
99 /**
100 * Is this peer currently ready to receive a message?
101 */
102 int is_ready;
103
104 /**
105 * Size of the message in @e env.
106 */
107 uint16_t env_size;
108
109};
110
111
112
113/**
114 * Handle for the transport service (includes all of the
115 * state for the transport service).
116 */
117struct GNUNET_TRANSPORT_CoreHandle
118{
119
120 /**
121 * Closure for the callbacks.
122 */
123 void *cls;
124
125 /**
126 * Functions to call for received data (template for
127 * new message queues).
128 */
129 struct GNUNET_MQ_MessageHandler *handlers;
130
131 /**
132 * function to call on connect events
133 */
134 GNUNET_TRANSPORT_NotifyConnect nc_cb;
135
136 /**
137 * function to call on disconnect events
138 */
139 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
140
141 /**
142 * function to call on excess bandwidth events
143 */
144 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
145
146 /**
147 * My client connection to the transport service.
148 */
149 struct GNUNET_MQ_Handle *mq;
150
151 /**
152 * My configuration.
153 */
154 const struct GNUNET_CONFIGURATION_Handle *cfg;
155
156 /**
157 * Hash map of the current connected neighbours of this peer.
158 * Maps peer identities to `struct Neighbour` entries.
159 */
160 struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
161
162 /**
163 * Peer identity as assumed by this process, or all zeros.
164 */
165 struct GNUNET_PeerIdentity self;
166
167 /**
168 * ID of the task trying to reconnect to the service.
169 */
170 struct GNUNET_SCHEDULER_Task *reconnect_task;
171
172 /**
173 * Delay until we try to reconnect.
174 */
175 struct GNUNET_TIME_Relative reconnect_delay;
176
177 /**
178 * Should we check that @e self matches what the service thinks?
179 * (if #GNUNET_NO, then @e self is all zeros!).
180 */
181 int check_self;
182
183};
184
185
186/**
187 * Function that will schedule the job that will try
188 * to connect us again to the client.
189 *
190 * @param h transport service to reconnect
191 */
192static void
193disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
194
195
196/**
197 * Get the neighbour list entry for the given peer
198 *
199 * @param h our context
200 * @param peer peer to look up
201 * @return NULL if no such peer entry exists
202 */
203static struct Neighbour *
204neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
205 const struct GNUNET_PeerIdentity *peer)
206{
207 return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
208 peer);
209}
210
211
212/**
213 * Function called by the bandwidth tracker if we have excess
214 * bandwidth.
215 *
216 * @param cls the `struct Neighbour` that has excess bandwidth
217 */
218static void
219notify_excess_cb (void *cls)
220{
221 struct Neighbour *n = cls;
222 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
223
224 LOG (GNUNET_ERROR_TYPE_DEBUG,
225 "Notifying CORE that more bandwidth is available for %s\n",
226 GNUNET_i2s (&n->id));
227
228 if (NULL != h->neb_cb)
229 h->neb_cb (h->cls,
230 &n->id,
231 n->handlers_cls);
232}
233
234
235/**
236 * Iterator over hash map entries, for deleting state of a neighbour.
237 *
238 * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
239 * @param key peer identity
240 * @param value value in the hash map, the neighbour entry to delete
241 * @return #GNUNET_YES if we should continue to
242 * iterate,
243 * #GNUNET_NO if not.
244 */
245static int
246neighbour_delete (void *cls,
247 const struct GNUNET_PeerIdentity *key,
248 void *value)
249{
250 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
251 struct Neighbour *n = value;
252
253 LOG (GNUNET_ERROR_TYPE_DEBUG,
254 "Dropping entry for neighbour `%s'.\n",
255 GNUNET_i2s (key));
256 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
257 if (NULL != handle->nd_cb)
258 handle->nd_cb (handle->cls,
259 &n->id,
260 n->handlers_cls);
261 if (NULL != n->timeout_task)
262 {
263 GNUNET_SCHEDULER_cancel (n->timeout_task);
264 n->timeout_task = NULL;
265 }
266 if (NULL != n->env)
267 {
268 GNUNET_MQ_send_cancel (n->env);
269 n->env = NULL;
270 }
271 GNUNET_MQ_destroy (n->mq);
272 GNUNET_assert (NULL == n->mq);
273 GNUNET_assert (GNUNET_YES ==
274 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
275 key,
276 n));
277 GNUNET_free (n);
278 return GNUNET_YES;
279}
280
281
282/**
283 * Generic error handler, called with the appropriate
284 * error code and the same closure specified at the creation of
285 * the message queue.
286 * Not every message queue implementation supports an error handler.
287 *
288 * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
289 * @param error error code
290 */
291static void
292mq_error_handler (void *cls,
293 enum GNUNET_MQ_Error error)
294{
295 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
296
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Error receiving from transport service, disconnecting temporarily.\n");
299 disconnect_and_schedule_reconnect (h);
300}
301
302
303/**
304 * A message from the handler's message queue to a neighbour was
305 * transmitted. Now trigger (possibly delayed) notification of the
306 * neighbour's message queue that we are done and thus ready for
307 * the next message.
308 *
309 * @param cls the `struct Neighbour` where the message was sent
310 */
311static void
312notify_send_done_fin (void *cls)
313{
314 struct Neighbour *n = cls;
315
316 n->timeout_task = NULL;
317 n->is_ready = GNUNET_YES;
318 GNUNET_MQ_impl_send_continue (n->mq);
319}
320
321
322/**
323 * A message from the handler's message queue to a neighbour was
324 * transmitted. Now trigger (possibly delayed) notification of the
325 * neighbour's message queue that we are done and thus ready for
326 * the next message.
327 *
328 * @param cls the `struct Neighbour` where the message was sent
329 */
330static void
331notify_send_done (void *cls)
332{
333 struct Neighbour *n = cls;
334 struct GNUNET_TIME_Relative delay;
335
336 n->timeout_task = NULL;
337 if (NULL != n->env)
338 {
339 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
340 n->env_size + n->traffic_overhead);
341 n->env = NULL;
342 n->traffic_overhead = 0;
343 }
344 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
345 128);
346 if (0 == delay.rel_value_us)
347 {
348 n->is_ready = GNUNET_YES;
349 GNUNET_MQ_impl_send_continue (n->mq);
350 return;
351 }
352 GNUNET_MQ_impl_send_in_flight (n->mq);
353 /* cannot send even a small message without violating
354 quota, wait a before allowing MQ to send next message */
355 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
356 &notify_send_done_fin,
357 n);
358}
359
360
361/**
362 * Implement sending functionality of a message queue.
363 * Called one message at a time. Should send the @a msg
364 * to the transport service and then notify the queue
365 * once we are ready for the next one.
366 *
367 * @param mq the message queue
368 * @param msg the message to send
369 * @param impl_state state of the implementation
370 */
371static void
372mq_send_impl (struct GNUNET_MQ_Handle *mq,
373 const struct GNUNET_MessageHeader *msg,
374 void *impl_state)
375{
376 struct Neighbour *n = impl_state;
377 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
378 struct OutboundMessage *obm;
379 uint16_t msize;
380
381 GNUNET_assert (GNUNET_YES == n->is_ready);
382 msize = ntohs (msg->size);
383 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
384 {
385 GNUNET_break (0);
386 GNUNET_MQ_impl_send_continue (mq);
387 return;
388 }
389 GNUNET_assert (NULL == n->env);
390 n->env = GNUNET_MQ_msg_nested_mh (obm,
391 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
392 msg);
393 obm->reserved = htonl (0);
394 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
395 obm->peer = n->id;
396 GNUNET_assert (NULL == n->timeout_task);
397 n->is_ready = GNUNET_NO;
398 n->env_size = ntohs (msg->size);
399 GNUNET_MQ_notify_sent (n->env,
400 &notify_send_done,
401 n);
402 GNUNET_MQ_send (h->mq,
403 n->env);
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "Queued message of type %u for neighbour `%s'.\n",
406 ntohs (msg->type),
407 GNUNET_i2s (&n->id));
408}
409
410
411/**
412 * Handle destruction of a message queue. Implementations must not
413 * free @a mq, but should take care of @a impl_state.
414 *
415 * @param mq the message queue to destroy
416 * @param impl_state state of the implementation
417 */
418static void
419mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
420 void *impl_state)
421{
422 struct Neighbour *n = impl_state;
423
424 GNUNET_assert (mq == n->mq);
425 n->mq = NULL;
426}
427
428
429/**
430 * Implementation function that cancels the currently sent message.
431 * Should basically undo whatever #mq_send_impl() did.
432 *
433 * @param mq message queue
434 * @param impl_state state specific to the implementation
435 */
436static void
437mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
438 void *impl_state)
439{
440 struct Neighbour *n = impl_state;
441
442 GNUNET_assert (GNUNET_NO == n->is_ready);
443 if (NULL != n->env)
444 {
445 GNUNET_MQ_send_cancel (n->env);
446 n->env = NULL;
447 }
448
449 n->is_ready = GNUNET_YES;
450}
451
452
453/**
454 * We had an error processing a message we forwarded from a peer to
455 * the CORE service. We should just complain about it but otherwise
456 * continue processing.
457 *
458 * @param cls closure
459 * @param error error code
460 */
461static void
462peer_mq_error_handler (void *cls,
463 enum GNUNET_MQ_Error error)
464{
465 /* struct Neighbour *n = cls; */
466
467 GNUNET_break_op (0);
468}
469
470
471/**
472 * The outbound quota has changed in a way that may require
473 * us to reset the timeout. Update the timeout.
474 *
475 * @param cls the `struct Neighbour` for which the timeout changed
476 */
477static void
478outbound_bw_tracker_update (void *cls)
479{
480 struct Neighbour *n = cls;
481 struct GNUNET_TIME_Relative delay;
482
483 if (NULL == n->timeout_task)
484 return;
485 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
486 128);
487 GNUNET_SCHEDULER_cancel (n->timeout_task);
488 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
489 &notify_send_done,
490 n);
491}
492
493
494/**
495 * Function we use for handling incoming connect messages.
496 *
497 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
498 * @param cim message received
499 */
500static void
501handle_connect (void *cls,
502 const struct ConnectInfoMessage *cim)
503{
504 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
505 struct Neighbour *n;
506
507 LOG (GNUNET_ERROR_TYPE_DEBUG,
508 "Receiving CONNECT message for `%s' with quota %u\n",
509 GNUNET_i2s (&cim->id),
510 ntohl (cim->quota_out.value__));
511 n = neighbour_find (h,
512 &cim->id);
513 if (NULL != n)
514 {
515 GNUNET_break (0);
516 disconnect_and_schedule_reconnect (h);
517 return;
518 }
519 n = GNUNET_new (struct Neighbour);
520 n->id = cim->id;
521 n->h = h;
522 n->is_ready = GNUNET_YES;
523 n->traffic_overhead = 0;
524 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
525 &outbound_bw_tracker_update,
526 n,
527 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
528 MAX_BANDWIDTH_CARRY_S,
529 &notify_excess_cb,
530 n);
531 GNUNET_assert (GNUNET_OK ==
532 GNUNET_CONTAINER_multipeermap_put (h->neighbours,
533 &n->id,
534 n,
535 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
536
537 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
538 cim->quota_out);
539 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
540 &mq_destroy_impl,
541 &mq_cancel_impl,
542 n,
543 h->handlers,
544 &peer_mq_error_handler,
545 n);
546 if (NULL != h->nc_cb)
547 {
548 n->handlers_cls = h->nc_cb (h->cls,
549 &n->id,
550 n->mq);
551 GNUNET_MQ_set_handlers_closure (n->mq,
552 n->handlers_cls);
553 }
554}
555
556
557/**
558 * Function we use for handling incoming disconnect messages.
559 *
560 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
561 * @param dim message received
562 */
563static void
564handle_disconnect (void *cls,
565 const struct DisconnectInfoMessage *dim)
566{
567 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
568 struct Neighbour *n;
569
570 GNUNET_break (ntohl (dim->reserved) == 0);
571 LOG (GNUNET_ERROR_TYPE_DEBUG,
572 "Receiving DISCONNECT message for `%s'.\n",
573 GNUNET_i2s (&dim->peer));
574 n = neighbour_find (h,
575 &dim->peer);
576 if (NULL == n)
577 {
578 GNUNET_break (0);
579 disconnect_and_schedule_reconnect (h);
580 return;
581 }
582 GNUNET_assert (GNUNET_YES ==
583 neighbour_delete (h,
584 &dim->peer,
585 n));
586}
587
588
589/**
590 * Function we use for handling incoming send-ok messages.
591 *
592 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
593 * @param okm message received
594 */
595static void
596handle_send_ok (void *cls,
597 const struct SendOkMessage *okm)
598{
599 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
600 struct Neighbour *n;
601 uint32_t bytes_msg;
602 uint32_t bytes_physical;
603
604 bytes_msg = ntohl (okm->bytes_msg);
605 bytes_physical = ntohl (okm->bytes_physical);
606 LOG (GNUNET_ERROR_TYPE_DEBUG,
607 "Receiving SEND_OK message, transmission to %s %s.\n",
608 GNUNET_i2s (&okm->peer),
609 (GNUNET_OK == ntohl (okm->success))
610 ? "succeeded"
611 : "failed");
612 n = neighbour_find (h,
613 &okm->peer);
614 if (NULL == n)
615 {
616 /* We should never get a 'SEND_OK' for a peer that we are not
617 connected to */
618 GNUNET_break (0);
619 disconnect_and_schedule_reconnect (h);
620 return;
621 }
622 if (bytes_physical > bytes_msg)
623 {
624 LOG (GNUNET_ERROR_TYPE_DEBUG,
625 "Overhead for %u byte message was %u\n",
626 (unsigned int) bytes_msg,
627 (unsigned int) (bytes_physical - bytes_msg));
628 n->traffic_overhead += bytes_physical - bytes_msg;
629 }
630}
631
632
633/**
634 * Function we use for checking incoming "inbound" messages.
635 *
636 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
637 * @param im message received
638 */
639static int
640check_recv (void *cls,
641 const struct InboundMessage *im)
642{
643 const struct GNUNET_MessageHeader *imm;
644 uint16_t size;
645
646 size = ntohs (im->header.size) - sizeof (*im);
647 if (size < sizeof (struct GNUNET_MessageHeader))
648 {
649 GNUNET_break (0);
650 return GNUNET_SYSERR;
651 }
652 imm = (const struct GNUNET_MessageHeader *) &im[1];
653 if (ntohs (imm->size) != size)
654 {
655 GNUNET_break (0);
656 return GNUNET_SYSERR;
657 }
658 return GNUNET_OK;
659}
660
661
662/**
663 * Function we use for handling incoming messages.
664 *
665 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
666 * @param im message received
667 */
668static void
669handle_recv (void *cls,
670 const struct InboundMessage *im)
671{
672 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
673 const struct GNUNET_MessageHeader *imm
674 = (const struct GNUNET_MessageHeader *) &im[1];
675 struct Neighbour *n;
676
677 LOG (GNUNET_ERROR_TYPE_DEBUG,
678 "Received message of type %u with %u bytes from `%s'.\n",
679 (unsigned int) ntohs (imm->type),
680 (unsigned int) ntohs (imm->size),
681 GNUNET_i2s (&im->peer));
682 n = neighbour_find (h,
683 &im->peer);
684 if (NULL == n)
685 {
686 GNUNET_break (0);
687 disconnect_and_schedule_reconnect (h);
688 return;
689 }
690 GNUNET_MQ_inject_message (n->mq,
691 imm);
692}
693
694
695/**
696 * Function we use for handling incoming set quota messages.
697 *
698 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
699 * @param msg message received
700 */
701static void
702handle_set_quota (void *cls,
703 const struct QuotaSetMessage *qm)
704{
705 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
706 struct Neighbour *n;
707
708 n = neighbour_find (h,
709 &qm->peer);
710 if (NULL == n)
711 {
712 GNUNET_break (0);
713 disconnect_and_schedule_reconnect (h);
714 return;
715 }
716 LOG (GNUNET_ERROR_TYPE_DEBUG,
717 "Receiving SET_QUOTA message for `%s' with quota %u\n",
718 GNUNET_i2s (&qm->peer),
719 (unsigned int) ntohl (qm->quota.value__));
720 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
721 qm->quota);
722}
723
724
725/**
726 * Try again to connect to transport service.
727 *
728 * @param cls the handle to the transport service
729 */
730static void
731reconnect (void *cls)
732{
733 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
734 struct GNUNET_MQ_MessageHandler handlers[] = {
735 GNUNET_MQ_hd_fixed_size (connect,
736 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
737 struct ConnectInfoMessage,
738 h),
739 GNUNET_MQ_hd_fixed_size (disconnect,
740 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
741 struct DisconnectInfoMessage,
742 h),
743 GNUNET_MQ_hd_fixed_size (send_ok,
744 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
745 struct SendOkMessage,
746 h),
747 GNUNET_MQ_hd_var_size (recv,
748 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
749 struct InboundMessage,
750 h),
751 GNUNET_MQ_hd_fixed_size (set_quota,
752 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
753 struct QuotaSetMessage,
754 h),
755 GNUNET_MQ_handler_end ()
756 };
757 struct GNUNET_MQ_Envelope *env;
758 struct StartMessage *s;
759 uint32_t options;
760
761 h->reconnect_task = NULL;
762 LOG (GNUNET_ERROR_TYPE_DEBUG,
763 "Connecting to transport service.\n");
764 GNUNET_assert (NULL == h->mq);
765 h->mq = GNUNET_CLIENT_connect (h->cfg,
766 "transport",
767 handlers,
768 &mq_error_handler,
769 h);
770 if (NULL == h->mq)
771 return;
772 env = GNUNET_MQ_msg (s,
773 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
774 options = 0;
775 if (h->check_self)
776 options |= 1;
777 if (NULL != h->handlers)
778 options |= 2;
779 s->options = htonl (options);
780 s->self = h->self;
781 GNUNET_MQ_send (h->mq,
782 env);
783}
784
785
786/**
787 * Disconnect from the transport service.
788 *
789 * @param h transport service to reconnect
790 */
791static void
792disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
793{
794 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
795 &neighbour_delete,
796 h);
797 if (NULL != h->mq)
798 {
799 GNUNET_MQ_destroy (h->mq);
800 h->mq = NULL;
801 }
802}
803
804
805/**
806 * Function that will schedule the job that will try
807 * to connect us again to the client.
808 *
809 * @param h transport service to reconnect
810 */
811static void
812disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
813{
814 GNUNET_assert (NULL == h->reconnect_task);
815 disconnect (h);
816 LOG (GNUNET_ERROR_TYPE_DEBUG,
817 "Scheduling task to reconnect to transport service in %s.\n",
818 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
819 GNUNET_YES));
820 h->reconnect_task =
821 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
822 &reconnect,
823 h);
824 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
825}
826
827
828/**
829 * Checks if a given peer is connected to us and get the message queue.
830 *
831 * @param handle connection to transport service
832 * @param peer the peer to check
833 * @return NULL if disconnected, otherwise message queue for @a peer
834 */
835struct GNUNET_MQ_Handle *
836GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
837 const struct GNUNET_PeerIdentity *peer)
838{
839 struct Neighbour *n;
840
841 n = neighbour_find (handle,
842 peer);
843 if (NULL == n)
844 return NULL;
845 return n->mq;
846}
847
848
849/**
850 * Connect to the transport service. Note that the connection may
851 * complete (or fail) asynchronously.
852 *
853 * @param cfg configuration to use
854 * @param self our own identity (API should check that it matches
855 * the identity found by transport), or NULL (no check)
856 * @param cls closure for the callbacks
857 * @param rec receive function to call
858 * @param nc function to call on connect events
859 * @param nd function to call on disconnect events
860 * @param neb function to call if we have excess bandwidth to a peer
861 * @return NULL on error
862 */
863struct GNUNET_TRANSPORT_CoreHandle *
864GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
865 const struct GNUNET_PeerIdentity *self,
866 const struct GNUNET_MQ_MessageHandler *handlers,
867 void *cls,
868 GNUNET_TRANSPORT_NotifyConnect nc,
869 GNUNET_TRANSPORT_NotifyDisconnect nd,
870 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
871{
872 struct GNUNET_TRANSPORT_CoreHandle *h;
873 unsigned int i;
874
875 h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
876 if (NULL != self)
877 {
878 h->self = *self;
879 h->check_self = GNUNET_YES;
880 }
881 h->cfg = cfg;
882 h->cls = cls;
883 h->nc_cb = nc;
884 h->nd_cb = nd;
885 h->neb_cb = neb;
886 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
887 if (NULL != handlers)
888 {
889 for (i=0;NULL != handlers[i].cb; i++) ;
890 h->handlers = GNUNET_new_array (i + 1,
891 struct GNUNET_MQ_MessageHandler);
892 GNUNET_memcpy (h->handlers,
893 handlers,
894 i * sizeof (struct GNUNET_MQ_MessageHandler));
895 }
896 LOG (GNUNET_ERROR_TYPE_DEBUG,
897 "Connecting to transport service\n");
898 reconnect (h);
899 if (NULL == h->mq)
900 {
901 GNUNET_free_non_null (h->handlers);
902 GNUNET_free (h);
903 return NULL;
904 }
905 h->neighbours =
906 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
907 GNUNET_YES);
908 return h;
909}
910
911
912/**
913 * Disconnect from the transport service.
914 *
915 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
916 */
917void
918GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
919{
920 LOG (GNUNET_ERROR_TYPE_DEBUG,
921 "Transport disconnect called!\n");
922 /* this disconnects all neighbours... */
923 disconnect (handle);
924 /* and now we stop trying to connect again... */
925 if (NULL != handle->reconnect_task)
926 {
927 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
928 handle->reconnect_task = NULL;
929 }
930 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
931 handle->neighbours = NULL;
932 GNUNET_free_non_null (handle->handlers);
933 handle->handlers = NULL;
934 GNUNET_free (handle);
935}
936
937
938/* end of transport_api_core.c */
diff --git a/src/transport/transport_api2_monitor.c b/src/transport/transport_api2_monitor.c
new file mode 100644
index 000000000..d7b13ec74
--- /dev/null
+++ b/src/transport/transport_api2_monitor.c
@@ -0,0 +1,313 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/transport_api2_monitor.c
21 * @brief implementation of the gnunet_transport_monitor_service.h API
22 * @author Christian Grothoff
23 */
24#include "platform.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_protocols.h"
27#include "gnunet_transport_monitor_service.h"
28#include "transport.h"
29
30
31/**
32 * Opaque handle to the transport service for monitors.
33 */
34struct GNUNET_TRANSPORT_MonitorContext
35{
36 /**
37 * Our configuration.
38 */
39 const struct GNUNET_CONFIGURATION_Handle *cfg;
40
41 /**
42 * Queue to talk to the transport service.
43 */
44 struct GNUNET_MQ_Handle *mq;
45
46 /**
47 * Peer we monitor, all zeros for "all"
48 */
49 struct GNUNET_PeerIdentity peer;
50
51 /**
52 * #GNUNET_YES to return the current state and then end.
53 */
54 int one_shot;
55
56 /**
57 * Function to call with monitor data.
58 */
59 GNUNET_TRANSPORT_MonitorCallback cb;
60
61 /**
62 * Closure for @e cb.
63 */
64 void *cb_cls;
65
66};
67
68
69/**
70 * (re)connect our monitor to the transport service
71 *
72 * @param mc handle to reconnect
73 */
74static void
75reconnect (struct GNUNET_TRANSPORT_MonitorContext *mc);
76
77
78/**
79 * Send message to the transport service about our montoring
80 * desire.
81 *
82 * @param ai address to delete
83 */
84static void
85send_start_monitor (struct GNUNET_TRANSPORT_MonitorContext *mc)
86{
87 struct GNUNET_MQ_Envelope *env;
88 struct GNUNET_TRANSPORT_MonitorStart *smm;
89
90 if (NULL == mc->mq)
91 return;
92 env = GNUNET_MQ_msg (smm,
93 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START);
94 smm->one_shot = htonl ((uint32_t) mc->one_shot);
95 smm->peer = mc->peer;
96 GNUNET_MQ_send (mc->mq,
97 env);
98}
99
100
101/**
102 * Disconnect from the transport service.
103 *
104 * @param mc service to disconnect from
105 */
106static void
107disconnect (struct GNUNET_TRANSPORT_MonitorContext *mc)
108{
109 if (NULL == mc->mq)
110 return;
111 GNUNET_MQ_destroy (mc->mq);
112 mc->mq = NULL;
113}
114
115
116/**
117 * Function called on MQ errors. Reconnects to the service.
118 *
119 * @param cls our `struct GNUNET_TRANSPORT_MonitorContext *`
120 * @param error what error happened?
121 */
122static void
123error_handler (void *cls,
124 enum GNUNET_MQ_Error error)
125{
126 struct GNUNET_TRANSPORT_MonitorContext *mc = cls;
127
128 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
129 "MQ failure %d, reconnecting to transport service.\n",
130 error);
131 disconnect (mc);
132 /* TODO: maybe do this with exponential backoff/delay */
133 reconnect (mc);
134}
135
136
137/**
138 * Transport service sends us information about what is going on.
139 * Check if @a md is well-formed.
140 *
141 * @param cls our `struct GNUNET_TRANSPORT_MonitorContext *`
142 * @param md the monitor data we got
143 * @return #GNUNET_OK if @a smt is well-formed
144 */
145static int
146check_monitor_data (void *cls,
147 const struct GNUNET_TRANSPORT_MonitorData *md)
148{
149 uint16_t len = ntohs (md->header.size) - sizeof (*md);
150 const char *addr = (const char *) &md[1];
151
152 (void) cls;
153 if ( (0 == len) ||
154 ('\0' != addr[len-1]) )
155 {
156 GNUNET_break (0);
157 return GNUNET_SYSERR;
158 }
159 return GNUNET_OK;
160}
161
162
163/**
164 * Transport service sends us information about what is going on.
165 *
166 * @param cls our `struct GNUNET_TRANSPORT_MonitorContext *`
167 * @param md monitor data
168 */
169static void
170handle_monitor_data (void *cls,
171 const struct GNUNET_TRANSPORT_MonitorData *md)
172{
173 struct GNUNET_TRANSPORT_MonitorContext *mc = cls;
174 struct GNUNET_TRANSPORT_MonitorInformation mi;
175
176 mi.address = (const char *) &md[1];
177 mi.nt = (enum GNUNET_ATS_Network_Type) ntohl (md->nt);
178 mi.is_inbound = (int) ntohl (md->is_inbound);
179 mi.num_msg_pending = ntohl (md->num_msg_pending);
180 mi.num_bytes_pending = ntohl (md->num_bytes_pending);
181 mi.last_validation = GNUNET_TIME_absolute_ntoh (md->last_validation);
182 mi.valid_until = GNUNET_TIME_absolute_ntoh (md->valid_until);
183 mi.next_validation = GNUNET_TIME_absolute_ntoh (md->next_validation);
184 mi.rtt = GNUNET_TIME_relative_ntoh (md->rtt);
185 mc->cb (mc->cb_cls,
186 &md->peer,
187 &mi);
188}
189
190
191/**
192 * One shot was requested, and transport service is done.
193 *
194 * @param cls our `struct GNUNET_TRANSPORT_MonitorContext *`
195 * @param me end message
196 */
197static void
198handle_monitor_end (void *cls,
199 const struct GNUNET_MessageHeader *me)
200{
201 struct GNUNET_TRANSPORT_MonitorContext *mc = cls;
202
203 if (GNUNET_YES != mc->one_shot)
204 {
205 GNUNET_break (0);
206 disconnect (mc);
207 reconnect (mc);
208 return;
209 }
210 mc->cb (mc->cb_cls,
211 NULL,
212 NULL);
213 GNUNET_TRANSPORT_monitor_cancel (mc);
214}
215
216
217/**
218 * (re)connect our monitor to the transport service
219 *
220 * @param mc handle to reconnect
221 */
222static void
223reconnect (struct GNUNET_TRANSPORT_MonitorContext *mc)
224{
225 struct GNUNET_MQ_MessageHandler handlers[] = {
226 GNUNET_MQ_hd_var_size (monitor_data,
227 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA,
228 struct GNUNET_TRANSPORT_MonitorData,
229 mc),
230 GNUNET_MQ_hd_fixed_size (monitor_end,
231 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_END,
232 struct GNUNET_MessageHeader,
233 mc),
234 GNUNET_MQ_handler_end()
235 };
236
237 mc->mq = GNUNET_CLIENT_connect (mc->cfg,
238 "transport",
239 handlers,
240 &error_handler,
241 mc);
242 if (NULL == mc->mq)
243 return;
244 send_start_monitor (mc);
245}
246
247
248/**
249 * Return information about a specific peer or all peers currently known to
250 * transport service once or in monitoring mode. To obtain information about
251 * a specific peer, a peer identity can be passed. To obtain information about
252 * all peers currently known to transport service, NULL can be passed as peer
253 * identity.
254 *
255 * For each peer, the callback is called with information about the address used
256 * to communicate with this peer, the state this peer is currently in and the
257 * the current timeout for this state.
258 *
259 * Upon completion, the #GNUNET_TRANSPORT_PeerIterateCallback is called one
260 * more time with `NULL`. After this, the operation must no longer be
261 * explicitly canceled.
262 *
263 * The #GNUNET_TRANSPORT_monitor_peers_cancel call MUST not be called in the
264 * the peer_callback!
265 *
266 * @param cfg configuration to use
267 * @param peer a specific peer identity to obtain information for,
268 * NULL for all peers
269 * @param one_shot #GNUNET_YES to return the current state and then end (with NULL+NULL),
270 * #GNUNET_NO to monitor peers continuously
271 * @param cb function to call with the results
272 * @param cb_cls closure for @a mc
273 */
274struct GNUNET_TRANSPORT_MonitorContext *
275GNUNET_TRANSPORT_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg,
276 const struct GNUNET_PeerIdentity *peer,
277 int one_shot,
278 GNUNET_TRANSPORT_MonitorCallback cb,
279 void *cb_cls)
280{
281 struct GNUNET_TRANSPORT_MonitorContext *mc;
282
283 mc = GNUNET_new (struct GNUNET_TRANSPORT_MonitorContext);
284 mc->cfg = cfg;
285 if (NULL != peer)
286 mc->peer = *peer;
287 mc->one_shot = one_shot;
288 mc->cb = cb;
289 mc->cb_cls = cb_cls;
290 reconnect (mc);
291 if (NULL == mc->mq)
292 {
293 GNUNET_free (mc);
294 return NULL;
295 }
296 return mc;
297}
298
299
300
301/**
302 * Cancel request to monitor peers
303 *
304 * @param pmc handle for the request to cancel
305 */
306void
307GNUNET_TRANSPORT_monitor_cancel (struct GNUNET_TRANSPORT_MonitorContext *mc)
308{
309 disconnect (mc);
310 GNUNET_free (mc);
311}
312
313/* end of transport_api2_monitor.c */
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index b7edc3cc1..2e897d94a 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/ 17*/
@@ -136,17 +136,17 @@ struct GNUNET_TRANSPORT_CoreHandle
136 /** 136 /**
137 * function to call on connect events 137 * function to call on connect events
138 */ 138 */
139 GNUNET_TRANSPORT_NotifyConnecT nc_cb; 139 GNUNET_TRANSPORT_NotifyConnect nc_cb;
140 140
141 /** 141 /**
142 * function to call on disconnect events 142 * function to call on disconnect events
143 */ 143 */
144 GNUNET_TRANSPORT_NotifyDisconnecT nd_cb; 144 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
145 145
146 /** 146 /**
147 * function to call on excess bandwidth events 147 * function to call on excess bandwidth events
148 */ 148 */
149 GNUNET_TRANSPORT_NotifyExcessBandwidtH neb_cb; 149 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
150 150
151 /** 151 /**
152 * My client connection to the transport service. 152 * My client connection to the transport service.
@@ -551,7 +551,8 @@ handle_connect (void *cls,
551 "Receiving CONNECT message for `%s' with quota %u\n", 551 "Receiving CONNECT message for `%s' with quota %u\n",
552 GNUNET_i2s (&cim->id), 552 GNUNET_i2s (&cim->id),
553 ntohl (cim->quota_out.value__)); 553 ntohl (cim->quota_out.value__));
554 n = neighbour_find (h, &cim->id); 554 n = neighbour_find (h,
555 &cim->id);
555 if (NULL != n) 556 if (NULL != n)
556 { 557 {
557 GNUNET_break (0); 558 GNUNET_break (0);
@@ -896,9 +897,9 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
896 const struct GNUNET_PeerIdentity *self, 897 const struct GNUNET_PeerIdentity *self,
897 const struct GNUNET_MQ_MessageHandler *handlers, 898 const struct GNUNET_MQ_MessageHandler *handlers,
898 void *cls, 899 void *cls,
899 GNUNET_TRANSPORT_NotifyConnecT nc, 900 GNUNET_TRANSPORT_NotifyConnect nc,
900 GNUNET_TRANSPORT_NotifyDisconnecT nd, 901 GNUNET_TRANSPORT_NotifyDisconnect nd,
901 GNUNET_TRANSPORT_NotifyExcessBandwidtH neb) 902 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
902{ 903{
903 struct GNUNET_TRANSPORT_CoreHandle *h; 904 struct GNUNET_TRANSPORT_CoreHandle *h;
904 unsigned int i; 905 unsigned int i;