aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/plugin_transport_udp.c')
-rw-r--r--src/transport/plugin_transport_udp.c3897
1 files changed, 0 insertions, 3897 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
deleted file mode 100644
index 2db31caa3..000000000
--- a/src/transport/plugin_transport_udp.c
+++ /dev/null
@@ -1,3897 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2010-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/plugin_transport_udp.c
23 * @brief Implementation of the UDP transport protocol
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 * @author Matthias Wachs
27 */
28#include "platform.h"
29#include "plugin_transport_udp.h"
30#include "gnunet_hello_lib.h"
31#include "gnunet_util_lib.h"
32#include "gnunet_fragmentation_lib.h"
33#include "gnunet_nat_service.h"
34#include "gnunet_protocols.h"
35#include "gnunet_resolver_service.h"
36#include "gnunet_signatures.h"
37#include "gnunet_constants.h"
38#include "gnunet_statistics_service.h"
39#include "gnunet_transport_service.h"
40#include "gnunet_transport_plugin.h"
41#include "transport.h"
42
43#define LOG(kind, ...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45/**
46 * After how much inactivity should a UDP session time out?
47 */
48#define UDP_SESSION_TIME_OUT \
49 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
50
51/**
52 * Number of messages we can defragment in parallel. We only really
53 * defragment 1 message at a time, but if messages get re-ordered, we
54 * may want to keep knowledge about the previous message to avoid
55 * discarding the current message in favor of a single fragment of a
56 * previous message. 3 should be good since we don't expect massive
57 * message reorderings with UDP.
58 */
59#define UDP_MAX_MESSAGES_IN_DEFRAG 3
60
61/**
62 * We keep a defragmentation queue per sender address. How many
63 * sender addresses do we support at the same time? Memory consumption
64 * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
65 * value. (So 128 corresponds to 12 MB and should suffice for
66 * connecting to roughly 128 peers via UDP).
67 */
68#define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
69
70
71/**
72 * UDP Message-Packet header (after defragmentation).
73 */
74struct UDPMessage
75{
76 /**
77 * Message header.
78 */
79 struct GNUNET_MessageHeader header;
80
81 /**
82 * Always zero for now.
83 */
84 uint32_t reserved;
85
86 /**
87 * What is the identity of the sender
88 */
89 struct GNUNET_PeerIdentity sender;
90};
91
92
93/**
94 * Closure for #append_port().
95 */
96struct PrettyPrinterContext
97{
98 /**
99 * DLL
100 */
101 struct PrettyPrinterContext *next;
102
103 /**
104 * DLL
105 */
106 struct PrettyPrinterContext *prev;
107
108 /**
109 * Our plugin.
110 */
111 struct Plugin *plugin;
112
113 /**
114 * Resolver handle
115 */
116 struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118 /**
119 * Function to call with the result.
120 */
121 GNUNET_TRANSPORT_AddressStringCallback asc;
122
123 /**
124 * Clsoure for @e asc.
125 */
126 void *asc_cls;
127
128 /**
129 * Timeout task
130 */
131 struct GNUNET_SCHEDULER_Task *timeout_task;
132
133 /**
134 * Is this an IPv6 address?
135 */
136 int ipv6;
137
138 /**
139 * Options
140 */
141 uint32_t options;
142
143 /**
144 * Port to add after the IP address.
145 */
146 uint16_t port;
147};
148
149
150/**
151 * Session with another peer.
152 */
153struct GNUNET_ATS_Session
154{
155 /**
156 * Which peer is this session for?
157 */
158 struct GNUNET_PeerIdentity target;
159
160 /**
161 * Tokenizer for inbound messages.
162 */
163 struct GNUNET_MessageStreamTokenizer *mst;
164
165 /**
166 * Plugin this session belongs to.
167 */
168 struct Plugin *plugin;
169
170 /**
171 * Context for dealing with fragments.
172 */
173 struct UDP_FragmentationContext *frag_ctx;
174
175 /**
176 * Desired delay for next sending we send to other peer
177 */
178 struct GNUNET_TIME_Relative flow_delay_for_other_peer;
179
180 /**
181 * Desired delay for transmissions we received from other peer.
182 * This is for full messages, the value needs to be adjusted for
183 * fragmented messages.
184 */
185 struct GNUNET_TIME_Relative flow_delay_from_other_peer;
186
187 /**
188 * Session timeout task
189 */
190 struct GNUNET_SCHEDULER_Task *timeout_task;
191
192 /**
193 * When does this session time out?
194 */
195 struct GNUNET_TIME_Absolute timeout;
196
197 /**
198 * What time did we last transmit?
199 */
200 struct GNUNET_TIME_Absolute last_transmit_time;
201
202 /**
203 * expected delay for ACKs
204 */
205 struct GNUNET_TIME_Relative last_expected_ack_delay;
206
207 /**
208 * desired delay between UDP messages
209 */
210 struct GNUNET_TIME_Relative last_expected_msg_delay;
211
212 /**
213 * Our own address.
214 */
215 struct GNUNET_HELLO_Address *address;
216
217 /**
218 * Number of bytes waiting for transmission to this peer.
219 */
220 unsigned long long bytes_in_queue;
221
222 /**
223 * Number of messages waiting for transmission to this peer.
224 */
225 unsigned int msgs_in_queue;
226
227 /**
228 * Reference counter to indicate that this session is
229 * currently being used and must not be destroyed;
230 * setting @e in_destroy will destroy it as soon as
231 * possible.
232 */
233 unsigned int rc;
234
235 /**
236 * Network type of the address.
237 */
238 enum GNUNET_NetworkType scope;
239
240 /**
241 * Is this session about to be destroyed (sometimes we cannot
242 * destroy a session immediately as below us on the stack
243 * there might be code that still uses it; in this case,
244 * @e rc is non-zero).
245 */
246 int in_destroy;
247};
248
249
250/**
251 * Data structure to track defragmentation contexts based
252 * on the source of the UDP traffic.
253 */
254struct DefragContext
255{
256 /**
257 * Defragmentation context.
258 */
259 struct GNUNET_DEFRAGMENT_Context *defrag;
260
261 /**
262 * Reference to master plugin struct.
263 */
264 struct Plugin *plugin;
265
266 /**
267 * Node in the defrag heap.
268 */
269 struct GNUNET_CONTAINER_HeapNode *hnode;
270
271 /**
272 * Source address this receive context is for (allocated at the
273 * end of the struct).
274 */
275 const union UdpAddress *udp_addr;
276
277 /**
278 * Who's message(s) are we defragmenting here?
279 * Only initialized once we succeeded and
280 * @e have_sender is set.
281 */
282 struct GNUNET_PeerIdentity sender;
283
284 /**
285 * Length of @e udp_addr.
286 */
287 size_t udp_addr_len;
288
289 /**
290 * Network type the address belongs to.
291 */
292 enum GNUNET_NetworkType network_type;
293
294 /**
295 * Has the @e sender field been initialized yet?
296 */
297 int have_sender;
298};
299
300
301/**
302 * Context to send fragmented messages
303 */
304struct UDP_FragmentationContext
305{
306 /**
307 * Next in linked list
308 */
309 struct UDP_FragmentationContext *next;
310
311 /**
312 * Previous in linked list
313 */
314 struct UDP_FragmentationContext *prev;
315
316 /**
317 * The plugin
318 */
319 struct Plugin *plugin;
320
321 /**
322 * Handle for fragmentation.
323 */
324 struct GNUNET_FRAGMENT_Context *frag;
325
326 /**
327 * The session this fragmentation context belongs to
328 */
329 struct GNUNET_ATS_Session *session;
330
331 /**
332 * Function to call upon completion of the transmission.
333 */
334 GNUNET_TRANSPORT_TransmitContinuation cont;
335
336 /**
337 * Closure for @e cont.
338 */
339 void *cont_cls;
340
341 /**
342 * Start time.
343 */
344 struct GNUNET_TIME_Absolute start_time;
345
346 /**
347 * Transmission time for the next fragment. Incremented by
348 * the @e flow_delay_from_other_peer for each fragment when
349 * we setup the fragments.
350 */
351 struct GNUNET_TIME_Absolute next_frag_time;
352
353 /**
354 * Desired delay for transmissions we received from other peer.
355 * Adjusted to be per fragment (UDP_MTU), even though on the
356 * wire it was for "full messages".
357 */
358 struct GNUNET_TIME_Relative flow_delay_from_other_peer;
359
360 /**
361 * Message timeout
362 */
363 struct GNUNET_TIME_Absolute timeout;
364
365 /**
366 * Payload size of original unfragmented message
367 */
368 size_t payload_size;
369
370 /**
371 * Bytes used to send all fragments on wire including UDP overhead
372 */
373 size_t on_wire_size;
374};
375
376
377/**
378 * Function called when a message is removed from the
379 * transmission queue.
380 *
381 * @param cls closure
382 * @param udpw message wrapper finished
383 * @param result #GNUNET_OK on success (message was sent)
384 * #GNUNET_SYSERR if the target disconnected
385 * or we had a timeout or other trouble sending
386 */
387typedef void (*QueueContinuation) (void *cls,
388 struct UDP_MessageWrapper *udpw,
389 int result);
390
391
392/**
393 * Information we track for each message in the queue.
394 */
395struct UDP_MessageWrapper
396{
397 /**
398 * Session this message belongs to
399 */
400 struct GNUNET_ATS_Session *session;
401
402 /**
403 * DLL of messages, previous element
404 */
405 struct UDP_MessageWrapper *prev;
406
407 /**
408 * DLL of messages, next element
409 */
410 struct UDP_MessageWrapper *next;
411
412 /**
413 * Message with @e msg_size bytes including UDP-specific overhead.
414 */
415 char *msg_buf;
416
417 /**
418 * Function to call once the message wrapper is being removed
419 * from the queue (with success or failure).
420 */
421 QueueContinuation qc;
422
423 /**
424 * Closure for @e qc.
425 */
426 void *qc_cls;
427
428 /**
429 * External continuation to call upon completion of the
430 * transmission, NULL if this queue entry is not for a
431 * message from the application.
432 */
433 GNUNET_TRANSPORT_TransmitContinuation cont;
434
435 /**
436 * Closure for @e cont.
437 */
438 void *cont_cls;
439
440 /**
441 * Fragmentation context.
442 * frag_ctx == NULL if transport <= MTU
443 * frag_ctx != NULL if transport > MTU
444 */
445 struct UDP_FragmentationContext *frag_ctx;
446
447 /**
448 * Message enqueue time.
449 */
450 struct GNUNET_TIME_Absolute start_time;
451
452 /**
453 * Desired transmission time for this message, based on the
454 * flow limiting information we got from the other peer.
455 */
456 struct GNUNET_TIME_Absolute transmission_time;
457
458 /**
459 * Message timeout.
460 */
461 struct GNUNET_TIME_Absolute timeout;
462
463 /**
464 * Size of UDP message to send, including UDP-specific overhead.
465 */
466 size_t msg_size;
467
468 /**
469 * Payload size of original message.
470 */
471 size_t payload_size;
472};
473
474
475GNUNET_NETWORK_STRUCT_BEGIN
476
477/**
478 * UDP ACK Message-Packet header.
479 */
480struct UDP_ACK_Message
481{
482 /**
483 * Message header.
484 */
485 struct GNUNET_MessageHeader header;
486
487 /**
488 * Desired delay for flow control, in us (in NBO).
489 * A value of UINT32_MAX indicates that the other
490 * peer wants us to disconnect.
491 */
492 uint32_t delay GNUNET_PACKED;
493
494 /**
495 * What is the identity of the sender
496 */
497 struct GNUNET_PeerIdentity sender;
498};
499
500GNUNET_NETWORK_STRUCT_END
501
502
503/* ************************* Monitoring *********** */
504
505
506/**
507 * If a session monitor is attached, notify it about the new
508 * session state.
509 *
510 * @param plugin our plugin
511 * @param session session that changed state
512 * @param state new state of the session
513 */
514static void
515notify_session_monitor (struct Plugin *plugin,
516 struct GNUNET_ATS_Session *session,
517 enum GNUNET_TRANSPORT_SessionState state)
518{
519 struct GNUNET_TRANSPORT_SessionInfo info;
520
521 if (NULL == plugin->sic)
522 return;
523 if (GNUNET_YES == session->in_destroy)
524 return; /* already destroyed, just RC>0 left-over actions */
525 memset (&info, 0, sizeof(info));
526 info.state = state;
527 info.is_inbound = GNUNET_SYSERR; /* hard to say */
528 info.num_msg_pending = session->msgs_in_queue;
529 info.num_bytes_pending = session->bytes_in_queue;
530 /* info.receive_delay remains zero as this is not supported by UDP
531 (cannot selectively not receive from 'some' peer while continuing
532 to receive from others) */
533 info.session_timeout = session->timeout;
534 info.address = session->address;
535 plugin->sic (plugin->sic_cls, session, &info);
536}
537
538
539/**
540 * Return information about the given session to the monitor callback.
541 *
542 * @param cls the `struct Plugin` with the monitor callback (`sic`)
543 * @param peer peer we send information about
544 * @param value our `struct GNUNET_ATS_Session` to send information about
545 * @return #GNUNET_OK (continue to iterate)
546 */
547static int
548send_session_info_iter (void *cls,
549 const struct GNUNET_PeerIdentity *peer,
550 void *value)
551{
552 struct Plugin *plugin = cls;
553 struct GNUNET_ATS_Session *session = value;
554
555 notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_INIT);
556 notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP);
557 return GNUNET_OK;
558}
559
560
561/**
562 * Begin monitoring sessions of a plugin. There can only
563 * be one active monitor per plugin (i.e. if there are
564 * multiple monitors, the transport service needs to
565 * multiplex the generated events over all of them).
566 *
567 * @param cls closure of the plugin
568 * @param sic callback to invoke, NULL to disable monitor;
569 * plugin will being by iterating over all active
570 * sessions immediately and then enter monitor mode
571 * @param sic_cls closure for @a sic
572 */
573static void
574udp_plugin_setup_monitor (void *cls,
575 GNUNET_TRANSPORT_SessionInfoCallback sic,
576 void *sic_cls)
577{
578 struct Plugin *plugin = cls;
579
580 plugin->sic = sic;
581 plugin->sic_cls = sic_cls;
582 if (NULL != sic)
583 {
584 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
585 &send_session_info_iter,
586 plugin);
587 /* signal end of first iteration */
588 sic (sic_cls, NULL, NULL);
589 }
590}
591
592
593/* ****************** Little Helpers ****************** */
594
595
596/**
597 * Function to free last resources associated with a session.
598 *
599 * @param s session to free
600 */
601static void
602free_session (struct GNUNET_ATS_Session *s)
603{
604 if (NULL != s->address)
605 {
606 GNUNET_HELLO_address_free (s->address);
607 s->address = NULL;
608 }
609 if (NULL != s->frag_ctx)
610 {
611 GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL);
612 GNUNET_free (s->frag_ctx);
613 s->frag_ctx = NULL;
614 }
615 if (NULL != s->mst)
616 {
617 GNUNET_MST_destroy (s->mst);
618 s->mst = NULL;
619 }
620 GNUNET_free (s);
621}
622
623
624/**
625 * Function that is called to get the keepalive factor.
626 * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
627 * calculate the interval between keepalive packets.
628 *
629 * @param cls closure with the `struct Plugin`
630 * @return keepalive factor
631 */
632static unsigned int
633udp_query_keepalive_factor (void *cls)
634{
635 return 15;
636}
637
638
639/**
640 * Function obtain the network type for a session
641 *
642 * @param cls closure (`struct Plugin *`)
643 * @param session the session
644 * @return the network type
645 */
646static enum GNUNET_NetworkType
647udp_plugin_get_network (void *cls, struct GNUNET_ATS_Session *session)
648{
649 return session->scope;
650}
651
652
653/**
654 * Function obtain the network type for an address.
655 *
656 * @param cls closure (`struct Plugin *`)
657 * @param address the address
658 * @return the network type
659 */
660static enum GNUNET_NetworkType
661udp_plugin_get_network_for_address (void *cls,
662 const struct GNUNET_HELLO_Address *address)
663{
664 struct Plugin *plugin = cls;
665 size_t addrlen;
666 struct sockaddr_in a4;
667 struct sockaddr_in6 a6;
668 const struct IPv4UdpAddress *u4;
669 const struct IPv6UdpAddress *u6;
670 const void *sb;
671 size_t sbs;
672
673 addrlen = address->address_length;
674 if (addrlen == sizeof(struct IPv6UdpAddress))
675 {
676 GNUNET_assert (NULL != address->address); /* make static analysis happy */
677 u6 = address->address;
678 memset (&a6, 0, sizeof(a6));
679#if HAVE_SOCKADDR_IN_SIN_LEN
680 a6.sin6_len = sizeof(a6);
681#endif
682 a6.sin6_family = AF_INET6;
683 a6.sin6_port = u6->u6_port;
684 GNUNET_memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
685 sb = &a6;
686 sbs = sizeof(a6);
687 }
688 else if (addrlen == sizeof(struct IPv4UdpAddress))
689 {
690 GNUNET_assert (NULL != address->address); /* make static analysis happy */
691 u4 = address->address;
692 memset (&a4, 0, sizeof(a4));
693#if HAVE_SOCKADDR_IN_SIN_LEN
694 a4.sin_len = sizeof(a4);
695#endif
696 a4.sin_family = AF_INET;
697 a4.sin_port = u4->u4_port;
698 a4.sin_addr.s_addr = u4->ipv4_addr;
699 sb = &a4;
700 sbs = sizeof(a4);
701 }
702 else
703 {
704 GNUNET_break (0);
705 return GNUNET_NT_UNSPECIFIED;
706 }
707 return plugin->env->get_address_type (plugin->env->cls, sb, sbs);
708}
709
710
711/* ******************* Event loop ******************** */
712
713/**
714 * We have been notified that our readset has something to read. We don't
715 * know which socket needs to be read, so we have to check each one
716 * Then reschedule this function to be called again once more is available.
717 *
718 * @param cls the plugin handle
719 */
720static void
721udp_plugin_select_v4 (void *cls);
722
723
724/**
725 * We have been notified that our readset has something to read. We don't
726 * know which socket needs to be read, so we have to check each one
727 * Then reschedule this function to be called again once more is available.
728 *
729 * @param cls the plugin handle
730 */
731static void
732udp_plugin_select_v6 (void *cls);
733
734
735/**
736 * (re)schedule IPv4-select tasks for this plugin.
737 *
738 * @param plugin plugin to reschedule
739 */
740static void
741schedule_select_v4 (struct Plugin *plugin)
742{
743 struct GNUNET_TIME_Relative min_delay;
744 struct GNUNET_TIME_Relative delay;
745 struct UDP_MessageWrapper *udpw;
746 struct UDP_MessageWrapper *min_udpw;
747
748 if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
749 {
750 /* Find a message ready to send:
751 * Flow delay from other peer is expired or not set (0) */
752 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
753 min_udpw = NULL;
754 for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
755 {
756 delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
757 if (delay.rel_value_us < min_delay.rel_value_us)
758 {
759 min_delay = delay;
760 min_udpw = udpw;
761 }
762 }
763 if (NULL != plugin->select_task_v4)
764 GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
765 if (NULL != min_udpw)
766 {
767 if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
768 {
769 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
770 "Calculated flow delay for UDPv4 at %s for %s\n",
771 GNUNET_STRINGS_relative_time_to_string (min_delay,
772 GNUNET_YES),
773 GNUNET_i2s (&min_udpw->session->target));
774 }
775 else
776 {
777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778 "Calculated flow delay for UDPv4 at %s for %s\n",
779 GNUNET_STRINGS_relative_time_to_string (min_delay,
780 GNUNET_YES),
781 GNUNET_i2s (&min_udpw->session->target));
782 }
783 }
784 plugin->select_task_v4 =
785 GNUNET_SCHEDULER_add_read_net (min_delay,
786 plugin->sockv4,
787 &udp_plugin_select_v4,
788 plugin);
789 }
790}
791
792
793/**
794 * (re)schedule IPv6-select tasks for this plugin.
795 *
796 * @param plugin plugin to reschedule
797 */
798static void
799schedule_select_v6 (struct Plugin *plugin)
800{
801 struct GNUNET_TIME_Relative min_delay;
802 struct GNUNET_TIME_Relative delay;
803 struct UDP_MessageWrapper *udpw;
804 struct UDP_MessageWrapper *min_udpw;
805
806 if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
807 {
808 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
809 min_udpw = NULL;
810 for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
811 {
812 delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
813 if (delay.rel_value_us < min_delay.rel_value_us)
814 {
815 min_delay = delay;
816 min_udpw = udpw;
817 }
818 }
819 if (NULL != plugin->select_task_v6)
820 GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
821 if (NULL != min_udpw)
822 {
823 if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
824 {
825 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
826 "Calculated flow delay for UDPv6 at %s for %s\n",
827 GNUNET_STRINGS_relative_time_to_string (min_delay,
828 GNUNET_YES),
829 GNUNET_i2s (&min_udpw->session->target));
830 }
831 else
832 {
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834 "Calculated flow delay for UDPv6 at %s for %s\n",
835 GNUNET_STRINGS_relative_time_to_string (min_delay,
836 GNUNET_YES),
837 GNUNET_i2s (&min_udpw->session->target));
838 }
839 }
840 plugin->select_task_v6 =
841 GNUNET_SCHEDULER_add_read_net (min_delay,
842 plugin->sockv6,
843 &udp_plugin_select_v6,
844 plugin);
845 }
846}
847
848
849/* ******************* Address to string and back ***************** */
850
851
852/**
853 * Function called for a quick conversion of the binary address to
854 * a numeric address. Note that the caller must not free the
855 * address and that the next call to this function is allowed
856 * to override the address again.
857 *
858 * @param cls closure
859 * @param addr binary address (a `union UdpAddress`)
860 * @param addrlen length of the @a addr
861 * @return string representing the same address
862 */
863const char *
864udp_address_to_string (void *cls, const void *addr, size_t addrlen)
865{
866 static char rbuf[INET6_ADDRSTRLEN + 10];
867 char buf[INET6_ADDRSTRLEN];
868 const void *sb;
869 struct in_addr a4;
870 struct in6_addr a6;
871 const struct IPv4UdpAddress *t4;
872 const struct IPv6UdpAddress *t6;
873 int af;
874 uint16_t port;
875 uint32_t options;
876
877 if (NULL == addr)
878 {
879 GNUNET_break_op (0);
880 return NULL;
881 }
882
883 if (addrlen == sizeof(struct IPv6UdpAddress))
884 {
885 t6 = addr;
886 af = AF_INET6;
887 options = ntohl (t6->options);
888 port = ntohs (t6->u6_port);
889 a6 = t6->ipv6_addr;
890 sb = &a6;
891 }
892 else if (addrlen == sizeof(struct IPv4UdpAddress))
893 {
894 t4 = addr;
895 af = AF_INET;
896 options = ntohl (t4->options);
897 port = ntohs (t4->u4_port);
898 a4.s_addr = t4->ipv4_addr;
899 sb = &a4;
900 }
901 else
902 {
903 GNUNET_break_op (0);
904 return NULL;
905 }
906 inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
907 GNUNET_snprintf (rbuf,
908 sizeof(rbuf),
909 (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
910 PLUGIN_NAME,
911 options,
912 buf,
913 port);
914 return rbuf;
915}
916
917
918/**
919 * Function called to convert a string address to a binary address.
920 *
921 * @param cls closure (`struct Plugin *`)
922 * @param addr string address
923 * @param addrlen length of the address
924 * @param buf location to store the buffer
925 * @param added location to store the number of bytes in the buffer.
926 * If the function returns #GNUNET_SYSERR, its contents are undefined.
927 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
928 */
929static int
930udp_string_to_address (void *cls,
931 const char *addr,
932 uint16_t addrlen,
933 void **buf,
934 size_t *added)
935{
936 struct sockaddr_storage socket_address;
937 char *address;
938 char *plugin;
939 char *optionstr;
940 uint32_t options;
941
942 /* Format tcp.options.address:port */
943 address = NULL;
944 plugin = NULL;
945 optionstr = NULL;
946
947 if ((NULL == addr) || (0 == addrlen))
948 {
949 GNUNET_break (0);
950 return GNUNET_SYSERR;
951 }
952 if ('\0' != addr[addrlen - 1])
953 {
954 GNUNET_break (0);
955 return GNUNET_SYSERR;
956 }
957 if (strlen (addr) != addrlen - 1)
958 {
959 GNUNET_break (0);
960 return GNUNET_SYSERR;
961 }
962 plugin = GNUNET_strdup (addr);
963 optionstr = strchr (plugin, '.');
964 if (NULL == optionstr)
965 {
966 GNUNET_break (0);
967 GNUNET_free (plugin);
968 return GNUNET_SYSERR;
969 }
970 optionstr[0] = '\0';
971 optionstr++;
972 options = atol (optionstr);
973 address = strchr (optionstr, '.');
974 if (NULL == address)
975 {
976 GNUNET_break (0);
977 GNUNET_free (plugin);
978 return GNUNET_SYSERR;
979 }
980 address[0] = '\0';
981 address++;
982
983 if (GNUNET_OK !=
984 GNUNET_STRINGS_to_address_ip (address, strlen (address), &socket_address))
985 {
986 GNUNET_break (0);
987 GNUNET_free (plugin);
988 return GNUNET_SYSERR;
989 }
990 GNUNET_free (plugin);
991
992 switch (socket_address.ss_family)
993 {
994 case AF_INET: {
995 struct IPv4UdpAddress *u4;
996 const struct sockaddr_in *in4 =
997 (const struct sockaddr_in *) &socket_address;
998
999 u4 = GNUNET_new (struct IPv4UdpAddress);
1000 u4->options = htonl (options);
1001 u4->ipv4_addr = in4->sin_addr.s_addr;
1002 u4->u4_port = in4->sin_port;
1003 *buf = u4;
1004 *added = sizeof(struct IPv4UdpAddress);
1005 return GNUNET_OK;
1006 }
1007
1008 case AF_INET6: {
1009 struct IPv6UdpAddress *u6;
1010 const struct sockaddr_in6 *in6 =
1011 (const struct sockaddr_in6 *) &socket_address;
1012
1013 u6 = GNUNET_new (struct IPv6UdpAddress);
1014 u6->options = htonl (options);
1015 u6->ipv6_addr = in6->sin6_addr;
1016 u6->u6_port = in6->sin6_port;
1017 *buf = u6;
1018 *added = sizeof(struct IPv6UdpAddress);
1019 return GNUNET_OK;
1020 }
1021
1022 default:
1023 GNUNET_break (0);
1024 return GNUNET_SYSERR;
1025 }
1026}
1027
1028
1029/**
1030 * Append our port and forward the result.
1031 *
1032 * @param cls a `struct PrettyPrinterContext *`
1033 * @param hostname result from DNS resolver
1034 */
1035static void
1036append_port (void *cls, const char *hostname)
1037{
1038 struct PrettyPrinterContext *ppc = cls;
1039 struct Plugin *plugin = ppc->plugin;
1040 char *ret;
1041
1042 if (NULL == hostname)
1043 {
1044 /* Final call, done */
1045 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1046 plugin->ppc_dll_tail,
1047 ppc);
1048 ppc->resolver_handle = NULL;
1049 ppc->asc (ppc->asc_cls, NULL, GNUNET_OK);
1050 GNUNET_free (ppc);
1051 return;
1052 }
1053 if (GNUNET_YES == ppc->ipv6)
1054 GNUNET_asprintf (&ret,
1055 "%s.%u.[%s]:%d",
1056 PLUGIN_NAME,
1057 ppc->options,
1058 hostname,
1059 ppc->port);
1060 else
1061 GNUNET_asprintf (&ret,
1062 "%s.%u.%s:%d",
1063 PLUGIN_NAME,
1064 ppc->options,
1065 hostname,
1066 ppc->port);
1067 ppc->asc (ppc->asc_cls, ret, GNUNET_OK);
1068 GNUNET_free (ret);
1069}
1070
1071
1072/**
1073 * Convert the transports address to a nice, human-readable format.
1074 *
1075 * @param cls closure with the `struct Plugin *`
1076 * @param type name of the transport that generated the address
1077 * @param addr one of the addresses of the host, NULL for the last address
1078 * the specific address format depends on the transport;
1079 * a `union UdpAddress`
1080 * @param addrlen length of the address
1081 * @param numeric should (IP) addresses be displayed in numeric form?
1082 * @param timeout after how long should we give up?
1083 * @param asc function to call on each string
1084 * @param asc_cls closure for @a asc
1085 */
1086static void
1087udp_plugin_address_pretty_printer (void *cls,
1088 const char *type,
1089 const void *addr,
1090 size_t addrlen,
1091 int numeric,
1092 struct GNUNET_TIME_Relative timeout,
1093 GNUNET_TRANSPORT_AddressStringCallback asc,
1094 void *asc_cls)
1095{
1096 struct Plugin *plugin = cls;
1097 struct PrettyPrinterContext *ppc;
1098 const struct sockaddr *sb;
1099 size_t sbs;
1100 struct sockaddr_in a4;
1101 struct sockaddr_in6 a6;
1102 const struct IPv4UdpAddress *u4;
1103 const struct IPv6UdpAddress *u6;
1104 uint16_t port;
1105 uint32_t options;
1106
1107 if (addrlen == sizeof(struct IPv6UdpAddress))
1108 {
1109 u6 = addr;
1110 memset (&a6, 0, sizeof(a6));
1111 a6.sin6_family = AF_INET6;
1112#if HAVE_SOCKADDR_IN_SIN_LEN
1113 a6.sin6_len = sizeof(a6);
1114#endif
1115 a6.sin6_port = u6->u6_port;
1116 a6.sin6_addr = u6->ipv6_addr;
1117 port = ntohs (u6->u6_port);
1118 options = ntohl (u6->options);
1119 sb = (const struct sockaddr *) &a6;
1120 sbs = sizeof(a6);
1121 }
1122 else if (addrlen == sizeof(struct IPv4UdpAddress))
1123 {
1124 u4 = addr;
1125 memset (&a4, 0, sizeof(a4));
1126 a4.sin_family = AF_INET;
1127#if HAVE_SOCKADDR_IN_SIN_LEN
1128 a4.sin_len = sizeof(a4);
1129#endif
1130 a4.sin_port = u4->u4_port;
1131 a4.sin_addr.s_addr = u4->ipv4_addr;
1132 port = ntohs (u4->u4_port);
1133 options = ntohl (u4->options);
1134 sb = (const struct sockaddr *) &a4;
1135 sbs = sizeof(a4);
1136 }
1137 else
1138 {
1139 /* invalid address */
1140 GNUNET_break_op (0);
1141 asc (asc_cls, NULL, GNUNET_SYSERR);
1142 asc (asc_cls, NULL, GNUNET_OK);
1143 return;
1144 }
1145 ppc = GNUNET_new (struct PrettyPrinterContext);
1146 ppc->plugin = plugin;
1147 ppc->asc = asc;
1148 ppc->asc_cls = asc_cls;
1149 ppc->port = port;
1150 ppc->options = options;
1151 if (addrlen == sizeof(struct IPv6UdpAddress))
1152 ppc->ipv6 = GNUNET_YES;
1153 else
1154 ppc->ipv6 = GNUNET_NO;
1155 GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head, plugin->ppc_dll_tail, ppc);
1156 ppc->resolver_handle = GNUNET_RESOLVER_hostname_get (sb,
1157 sbs,
1158 ! numeric,
1159 timeout,
1160 &append_port,
1161 ppc);
1162}
1163
1164
1165/**
1166 * Check if the given port is plausible (must be either our listen
1167 * port or our advertised port). If it is neither, we return
1168 * #GNUNET_SYSERR.
1169 *
1170 * @param plugin global variables
1171 * @param in_port port number to check
1172 * @return #GNUNET_OK if port is either our open or advertised port
1173 */
1174static int
1175check_port (const struct Plugin *plugin, uint16_t in_port)
1176{
1177 if ((plugin->port == in_port) || (plugin->aport == in_port))
1178 return GNUNET_OK;
1179 return GNUNET_SYSERR;
1180}
1181
1182
1183/**
1184 * Function that will be called to check if a binary address for this
1185 * plugin is well-formed and corresponds to an address for THIS peer
1186 * (as per our configuration). Naturally, if absolutely necessary,
1187 * plugins can be a bit conservative in their answer, but in general
1188 * plugins should make sure that the address does not redirect
1189 * traffic to a 3rd party that might try to man-in-the-middle our
1190 * traffic.
1191 *
1192 * @param cls closure, should be our handle to the Plugin
1193 * @param addr pointer to a `union UdpAddress`
1194 * @param addrlen length of @a addr
1195 * @return #GNUNET_OK if this is a plausible address for this peer
1196 * and transport, #GNUNET_SYSERR if not
1197 */
1198static int
1199udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
1200{
1201 struct Plugin *plugin = cls;
1202 const struct IPv4UdpAddress *v4;
1203 const struct IPv6UdpAddress *v6;
1204
1205 if (sizeof(struct IPv4UdpAddress) == addrlen)
1206 {
1207 struct sockaddr_in s4;
1208
1209 v4 = (const struct IPv4UdpAddress *) addr;
1210 if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1211 return GNUNET_SYSERR;
1212 memset (&s4, 0, sizeof(s4));
1213 s4.sin_family = AF_INET;
1214#if HAVE_SOCKADDR_IN_SIN_LEN
1215 s4.sin_len = sizeof(s4);
1216#endif
1217 s4.sin_port = v4->u4_port;
1218 s4.sin_addr.s_addr = v4->ipv4_addr;
1219
1220 if (GNUNET_OK !=
1221 GNUNET_NAT_test_address (plugin->nat, &s4, sizeof(struct sockaddr_in)))
1222 return GNUNET_SYSERR;
1223 }
1224 else if (sizeof(struct IPv6UdpAddress) == addrlen)
1225 {
1226 struct sockaddr_in6 s6;
1227
1228 v6 = (const struct IPv6UdpAddress *) addr;
1229 if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1230 return GNUNET_OK; /* plausible, if unlikely... */
1231 memset (&s6, 0, sizeof(s6));
1232 s6.sin6_family = AF_INET6;
1233#if HAVE_SOCKADDR_IN_SIN_LEN
1234 s6.sin6_len = sizeof(s6);
1235#endif
1236 s6.sin6_port = v6->u6_port;
1237 s6.sin6_addr = v6->ipv6_addr;
1238
1239 if (GNUNET_OK != GNUNET_NAT_test_address (plugin->nat,
1240 &s6,
1241 sizeof(struct sockaddr_in6)))
1242 return GNUNET_SYSERR;
1243 }
1244 else
1245 {
1246 GNUNET_break_op (0);
1247 return GNUNET_SYSERR;
1248 }
1249 return GNUNET_OK;
1250}
1251
1252
1253/**
1254 * Our external IP address/port mapping has changed.
1255 *
1256 * @param cls closure, the `struct Plugin`
1257 * @param app_ctx[in,out] location where the app can store stuff
1258 * on add and retrieve it on remove
1259 * @param add_remove #GNUNET_YES to mean the new public IP address,
1260 * #GNUNET_NO to mean the previous (now invalid) one
1261 * @param ac address class the address belongs to
1262 * @param addr either the previous or the new public IP address
1263 * @param addrlen actual length of the @a addr
1264 */
1265static void
1266udp_nat_port_map_callback (void *cls,
1267 void **app_ctx,
1268 int add_remove,
1269 enum GNUNET_NAT_AddressClass ac,
1270 const struct sockaddr *addr,
1271 socklen_t addrlen)
1272{
1273 struct Plugin *plugin = cls;
1274 struct GNUNET_HELLO_Address *address;
1275 struct IPv4UdpAddress u4;
1276 struct IPv6UdpAddress u6;
1277 void *arg;
1278 size_t args;
1279
1280 (void) app_ctx;
1281 LOG (GNUNET_ERROR_TYPE_DEBUG,
1282 (GNUNET_YES == add_remove) ? "NAT notification to add address `%s'\n"
1283 : "NAT notification to remove address `%s'\n",
1284 GNUNET_a2s (addr, addrlen));
1285 /* convert 'address' to our internal format */
1286 switch (addr->sa_family)
1287 {
1288 case AF_INET: {
1289 const struct sockaddr_in *i4;
1290
1291 GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1292 i4 = (const struct sockaddr_in *) addr;
1293 if (0 == ntohs (i4->sin_port))
1294 return; /* Port = 0 means unmapped, ignore these for UDP. */
1295 memset (&u4, 0, sizeof(u4));
1296 u4.options = htonl (plugin->myoptions);
1297 u4.ipv4_addr = i4->sin_addr.s_addr;
1298 u4.u4_port = i4->sin_port;
1299 arg = &u4;
1300 args = sizeof(struct IPv4UdpAddress);
1301 break;
1302 }
1303
1304 case AF_INET6: {
1305 const struct sockaddr_in6 *i6;
1306
1307 GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1308 i6 = (const struct sockaddr_in6 *) addr;
1309 if (0 == ntohs (i6->sin6_port))
1310 return; /* Port = 0 means unmapped, ignore these for UDP. */
1311 memset (&u6, 0, sizeof(u6));
1312 u6.options = htonl (plugin->myoptions);
1313 u6.ipv6_addr = i6->sin6_addr;
1314 u6.u6_port = i6->sin6_port;
1315 arg = &u6;
1316 args = sizeof(struct IPv6UdpAddress);
1317 break;
1318 }
1319
1320 default:
1321 GNUNET_break (0);
1322 return;
1323 }
1324 /* modify our published address list */
1325 /* TODO: use 'ac' here in the future... */
1326 address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1327 PLUGIN_NAME,
1328 arg,
1329 args,
1330 GNUNET_HELLO_ADDRESS_INFO_NONE);
1331 plugin->env->notify_address (plugin->env->cls, add_remove, address);
1332 GNUNET_HELLO_address_free (address);
1333}
1334
1335
1336/* ********************* Finding sessions ******************* */
1337
1338
1339/**
1340 * Closure for #session_cmp_it().
1341 */
1342struct GNUNET_ATS_SessionCompareContext
1343{
1344 /**
1345 * Set to session matching the address.
1346 */
1347 struct GNUNET_ATS_Session *res;
1348
1349 /**
1350 * Address we are looking for.
1351 */
1352 const struct GNUNET_HELLO_Address *address;
1353};
1354
1355
1356/**
1357 * Find a session with a matching address.
1358 *
1359 * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1360 * @param key peer identity (unused)
1361 * @param value the `struct GNUNET_ATS_Session *`
1362 * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1363 */
1364static int
1365session_cmp_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
1366{
1367 struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1368 struct GNUNET_ATS_Session *s = value;
1369
1370 if (0 == GNUNET_HELLO_address_cmp (s->address, cctx->address))
1371 {
1372 GNUNET_assert (GNUNET_NO == s->in_destroy);
1373 cctx->res = s;
1374 return GNUNET_NO;
1375 }
1376 return GNUNET_OK;
1377}
1378
1379
1380/**
1381 * Locate an existing session the transport service is using to
1382 * send data to another peer. Performs some basic sanity checks
1383 * on the address and then tries to locate a matching session.
1384 *
1385 * @param cls the plugin
1386 * @param address the address we should locate the session by
1387 * @return the session if it exists, or NULL if it is not found
1388 */
1389static struct GNUNET_ATS_Session *
1390udp_plugin_lookup_session (void *cls,
1391 const struct GNUNET_HELLO_Address *address)
1392{
1393 struct Plugin *plugin = cls;
1394 const struct IPv6UdpAddress *udp_a6;
1395 const struct IPv4UdpAddress *udp_a4;
1396 struct GNUNET_ATS_SessionCompareContext cctx;
1397
1398 if (NULL == address->address)
1399 {
1400 GNUNET_break (0);
1401 return NULL;
1402 }
1403 if (sizeof(struct IPv4UdpAddress) == address->address_length)
1404 {
1405 if (NULL == plugin->sockv4)
1406 return NULL;
1407 udp_a4 = (const struct IPv4UdpAddress *) address->address;
1408 if (0 == udp_a4->u4_port)
1409 {
1410 GNUNET_break (0);
1411 return NULL;
1412 }
1413 }
1414 else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1415 {
1416 if (NULL == plugin->sockv6)
1417 return NULL;
1418 udp_a6 = (const struct IPv6UdpAddress *) address->address;
1419 if (0 == udp_a6->u6_port)
1420 {
1421 GNUNET_break (0);
1422 return NULL;
1423 }
1424 }
1425 else
1426 {
1427 GNUNET_break (0);
1428 return NULL;
1429 }
1430
1431 /* check if session already exists */
1432 cctx.address = address;
1433 cctx.res = NULL;
1434 LOG (GNUNET_ERROR_TYPE_DEBUG,
1435 "Looking for existing session for peer `%s' with address `%s'\n",
1436 GNUNET_i2s (&address->peer),
1437 udp_address_to_string (plugin,
1438 address->address,
1439 address->address_length));
1440 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1441 &address->peer,
1442 &session_cmp_it,
1443 &cctx);
1444 if (NULL == cctx.res)
1445 return NULL;
1446 LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1447 return cctx.res;
1448}
1449
1450
1451/* ********************** Timeout ****************** */
1452
1453
1454/**
1455 * Increment session timeout due to activity.
1456 *
1457 * @param s session to reschedule timeout activity for
1458 */
1459static void
1460reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1461{
1462 if (GNUNET_YES == s->in_destroy)
1463 return;
1464 GNUNET_assert (NULL != s->timeout_task);
1465 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1466}
1467
1468
1469/**
1470 * Function that will be called whenever the transport service wants to
1471 * notify the plugin that a session is still active and in use and
1472 * therefore the session timeout for this session has to be updated
1473 *
1474 * @param cls closure with the `struct Plugin`
1475 * @param peer which peer was the session for
1476 * @param session which session is being updated
1477 */
1478static void
1479udp_plugin_update_session_timeout (void *cls,
1480 const struct GNUNET_PeerIdentity *peer,
1481 struct GNUNET_ATS_Session *session)
1482{
1483 struct Plugin *plugin = cls;
1484
1485 if (GNUNET_YES !=
1486 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1487 peer,
1488 session))
1489 {
1490 GNUNET_break (0);
1491 return;
1492 }
1493 /* Reschedule session timeout */
1494 reschedule_session_timeout (session);
1495}
1496
1497
1498/* ************************* Sending ************************ */
1499
1500
1501/**
1502 * Remove the given message from the transmission queue and
1503 * update all applicable statistics.
1504 *
1505 * @param plugin the UDP plugin
1506 * @param udpw message wrapper to dequeue
1507 */
1508static void
1509dequeue (struct Plugin *plugin, struct UDP_MessageWrapper *udpw)
1510{
1511 struct GNUNET_ATS_Session *session = udpw->session;
1512
1513 if (plugin->bytes_in_buffer < udpw->msg_size)
1514 {
1515 GNUNET_break (0);
1516 }
1517 else
1518 {
1519 GNUNET_STATISTICS_update (plugin->env->stats,
1520 "# UDP, total bytes in send buffers",
1521 -(long long) udpw->msg_size,
1522 GNUNET_NO);
1523 plugin->bytes_in_buffer -= udpw->msg_size;
1524 }
1525 GNUNET_STATISTICS_update (plugin->env->stats,
1526 "# UDP, total messages in send buffers",
1527 -1,
1528 GNUNET_NO);
1529 if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1530 {
1531 GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1532 plugin->ipv4_queue_tail,
1533 udpw);
1534 }
1535 else if (sizeof(struct IPv6UdpAddress) ==
1536 udpw->session->address->address_length)
1537 {
1538 GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1539 plugin->ipv6_queue_tail,
1540 udpw);
1541 }
1542 else
1543 {
1544 GNUNET_break (0);
1545 return;
1546 }
1547 GNUNET_assert (session->msgs_in_queue > 0);
1548 session->msgs_in_queue--;
1549 GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1550 session->bytes_in_queue -= udpw->msg_size;
1551}
1552
1553
1554/**
1555 * Enqueue a message for transmission and update statistics.
1556 *
1557 * @param plugin the UDP plugin
1558 * @param udpw message wrapper to queue
1559 */
1560static void
1561enqueue (struct Plugin *plugin, struct UDP_MessageWrapper *udpw)
1562{
1563 struct GNUNET_ATS_Session *session = udpw->session;
1564
1565 if (GNUNET_YES == session->in_destroy)
1566 {
1567 GNUNET_break (0);
1568 GNUNET_free (udpw);
1569 return;
1570 }
1571 if (plugin->bytes_in_buffer > INT64_MAX - udpw->msg_size)
1572 {
1573 GNUNET_break (0);
1574 }
1575 else
1576 {
1577 GNUNET_STATISTICS_update (plugin->env->stats,
1578 "# UDP, total bytes in send buffers",
1579 udpw->msg_size,
1580 GNUNET_NO);
1581 plugin->bytes_in_buffer += udpw->msg_size;
1582 }
1583 GNUNET_STATISTICS_update (plugin->env->stats,
1584 "# UDP, total messages in send buffers",
1585 1,
1586 GNUNET_NO);
1587 if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1588 {
1589 GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1590 plugin->ipv4_queue_tail,
1591 udpw);
1592 }
1593 else if (sizeof(struct IPv6UdpAddress) ==
1594 udpw->session->address->address_length)
1595 {
1596 GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1597 plugin->ipv6_queue_tail,
1598 udpw);
1599 }
1600 else
1601 {
1602 GNUNET_break (0);
1603 udpw->cont (udpw->cont_cls,
1604 &session->target,
1605 GNUNET_SYSERR,
1606 udpw->msg_size,
1607 0);
1608 GNUNET_free (udpw);
1609 return;
1610 }
1611 session->msgs_in_queue++;
1612 session->bytes_in_queue += udpw->msg_size;
1613}
1614
1615
1616/**
1617 * We have completed our (attempt) to transmit a message that had to
1618 * be fragmented -- either because we got an ACK saying that all
1619 * fragments were received, or because of timeout / disconnect. Clean
1620 * up our state.
1621 *
1622 * @param frag_ctx fragmentation context to clean up
1623 * @param result #GNUNET_OK if we succeeded (got ACK),
1624 * #GNUNET_SYSERR if the transmission failed
1625 */
1626static void
1627fragmented_message_done (struct UDP_FragmentationContext *frag_ctx, int result)
1628{
1629 struct Plugin *plugin = frag_ctx->plugin;
1630 struct GNUNET_ATS_Session *s = frag_ctx->session;
1631 struct UDP_MessageWrapper *udpw;
1632 struct UDP_MessageWrapper *tmp;
1633 size_t overhead;
1634 struct GNUNET_TIME_Relative delay;
1635
1636 LOG (GNUNET_ERROR_TYPE_DEBUG,
1637 "%p: Fragmented message removed with result %s\n",
1638 frag_ctx,
1639 (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1640 /* Call continuation for fragmented message */
1641 if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1642 overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1643 else
1644 overhead = frag_ctx->on_wire_size;
1645 delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1646 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1647 {
1648 LOG (GNUNET_ERROR_TYPE_WARNING,
1649 "Fragmented message acknowledged after %s (expected at %s)\n",
1650 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES),
1651 GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1652 }
1653 else
1654 {
1655 LOG (GNUNET_ERROR_TYPE_DEBUG,
1656 "Fragmented message acknowledged after %s (expected at %s)\n",
1657 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES),
1658 GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1659 }
1660
1661 if (NULL != frag_ctx->cont)
1662 frag_ctx->cont (frag_ctx->cont_cls,
1663 &s->target,
1664 result,
1665 s->frag_ctx->payload_size,
1666 frag_ctx->on_wire_size);
1667 GNUNET_STATISTICS_update (plugin->env->stats,
1668 "# UDP, fragmented messages active",
1669 -1,
1670 GNUNET_NO);
1671
1672 if (GNUNET_OK == result)
1673 {
1674 GNUNET_STATISTICS_update (plugin->env->stats,
1675 "# UDP, fragmented msgs, messages, sent, success",
1676 1,
1677 GNUNET_NO);
1678 GNUNET_STATISTICS_update (plugin->env->stats,
1679 "# UDP, fragmented msgs, bytes payload, sent, success",
1680 s->frag_ctx->payload_size,
1681 GNUNET_NO);
1682 GNUNET_STATISTICS_update (
1683 plugin->env->stats,
1684 "# UDP, fragmented msgs, bytes overhead, sent, success",
1685 overhead,
1686 GNUNET_NO);
1687 GNUNET_STATISTICS_update (plugin->env->stats,
1688 "# UDP, total, bytes overhead, sent",
1689 overhead,
1690 GNUNET_NO);
1691 GNUNET_STATISTICS_update (plugin->env->stats,
1692 "# UDP, total, bytes payload, sent",
1693 s->frag_ctx->payload_size,
1694 GNUNET_NO);
1695 }
1696 else
1697 {
1698 GNUNET_STATISTICS_update (plugin->env->stats,
1699 "# UDP, fragmented msgs, messages, sent, failure",
1700 1,
1701 GNUNET_NO);
1702 GNUNET_STATISTICS_update (plugin->env->stats,
1703 "# UDP, fragmented msgs, bytes payload, sent, failure",
1704 s->frag_ctx->payload_size,
1705 GNUNET_NO);
1706 GNUNET_STATISTICS_update (plugin->env->stats,
1707 "# UDP, fragmented msgs, bytes payload, sent, failure",
1708 overhead,
1709 GNUNET_NO);
1710 GNUNET_STATISTICS_update (plugin->env->stats,
1711 "# UDP, fragmented msgs, bytes payload, sent, failure",
1712 overhead,
1713 GNUNET_NO);
1714 }
1715
1716 /* Remove remaining fragments from queue, no need to transmit those
1717 any longer. */
1718 if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1719 {
1720 udpw = plugin->ipv6_queue_head;
1721 while (NULL != udpw)
1722 {
1723 tmp = udpw->next;
1724 if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == frag_ctx))
1725 {
1726 dequeue (plugin, udpw);
1727 GNUNET_free (udpw);
1728 }
1729 udpw = tmp;
1730 }
1731 }
1732 if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1733 {
1734 udpw = plugin->ipv4_queue_head;
1735 while (NULL != udpw)
1736 {
1737 tmp = udpw->next;
1738 if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == frag_ctx))
1739 {
1740 dequeue (plugin, udpw);
1741 GNUNET_free (udpw);
1742 }
1743 udpw = tmp;
1744 }
1745 }
1746 notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
1747 GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1748 &s->last_expected_msg_delay,
1749 &s->last_expected_ack_delay);
1750 s->frag_ctx = NULL;
1751 GNUNET_free (frag_ctx);
1752}
1753
1754
1755/**
1756 * We are finished with a fragment in the message queue.
1757 * Notify the continuation and update statistics.
1758 *
1759 * @param cls the `struct Plugin *`
1760 * @param udpw the queue entry
1761 * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1762 */
1763static void
1764qc_fragment_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
1765{
1766 struct Plugin *plugin = cls;
1767
1768 GNUNET_assert (NULL != udpw->frag_ctx);
1769 if (GNUNET_OK == result)
1770 {
1771 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772 "Fragment of message with %u bytes transmitted to %s\n",
1773 (unsigned int) udpw->payload_size,
1774 GNUNET_i2s (&udpw->session->target));
1775 GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1776 GNUNET_STATISTICS_update (plugin->env->stats,
1777 "# UDP, fragmented msgs, fragments, sent, success",
1778 1,
1779 GNUNET_NO);
1780 GNUNET_STATISTICS_update (
1781 plugin->env->stats,
1782 "# UDP, fragmented msgs, fragments bytes, sent, success",
1783 udpw->msg_size,
1784 GNUNET_NO);
1785 }
1786 else
1787 {
1788 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789 "Failed to transmit fragment of message with %u bytes to %s\n",
1790 (unsigned int) udpw->payload_size,
1791 GNUNET_i2s (&udpw->session->target));
1792 fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
1793 GNUNET_STATISTICS_update (plugin->env->stats,
1794 "# UDP, fragmented msgs, fragments, sent, failure",
1795 1,
1796 GNUNET_NO);
1797 GNUNET_STATISTICS_update (
1798 plugin->env->stats,
1799 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1800 udpw->msg_size,
1801 GNUNET_NO);
1802 }
1803}
1804
1805
1806/**
1807 * Function that is called with messages created by the fragmentation
1808 * module. In the case of the `proc` callback of the
1809 * #GNUNET_FRAGMENT_context_create() function, this function must
1810 * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1811 *
1812 * @param cls closure, the `struct UDP_FragmentationContext`
1813 * @param msg the message that was created
1814 */
1815static void
1816enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1817{
1818 struct UDP_FragmentationContext *frag_ctx = cls;
1819 struct Plugin *plugin = frag_ctx->plugin;
1820 struct UDP_MessageWrapper *udpw;
1821 struct GNUNET_ATS_Session *session = frag_ctx->session;
1822 size_t msg_len = ntohs (msg->size);
1823
1824 LOG (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %lu bytes\n",
1825 (unsigned long) msg_len);
1826 udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + msg_len);
1827 udpw->session = session;
1828 udpw->msg_buf = (char *) &udpw[1];
1829 udpw->msg_size = msg_len;
1830 udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1831 udpw->timeout = frag_ctx->timeout;
1832 udpw->start_time = frag_ctx->start_time;
1833 udpw->transmission_time = frag_ctx->next_frag_time;
1834 frag_ctx->next_frag_time =
1835 GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1836 frag_ctx->flow_delay_from_other_peer);
1837 udpw->frag_ctx = frag_ctx;
1838 udpw->qc = &qc_fragment_sent;
1839 udpw->qc_cls = plugin;
1840 GNUNET_memcpy (udpw->msg_buf, msg, msg_len);
1841 enqueue (plugin, udpw);
1842 if (session->address->address_length == sizeof(struct IPv4UdpAddress))
1843 schedule_select_v4 (plugin);
1844 else
1845 schedule_select_v6 (plugin);
1846}
1847
1848
1849/**
1850 * We are finished with a message from the message queue.
1851 * Notify the continuation and update statistics.
1852 *
1853 * @param cls the `struct Plugin *`
1854 * @param udpw the queue entry
1855 * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1856 */
1857static void
1858qc_message_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
1859{
1860 struct Plugin *plugin = cls;
1861 size_t overhead;
1862 struct GNUNET_TIME_Relative delay;
1863
1864 if (udpw->msg_size >= udpw->payload_size)
1865 overhead = udpw->msg_size - udpw->payload_size;
1866 else
1867 overhead = udpw->msg_size;
1868
1869 if (NULL != udpw->cont)
1870 {
1871 delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1872 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1873 {
1874 LOG (GNUNET_ERROR_TYPE_WARNING,
1875 "Message sent via UDP with delay of %s\n",
1876 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1877 }
1878 else
1879 {
1880 LOG (GNUNET_ERROR_TYPE_DEBUG,
1881 "Message sent via UDP with delay of %s\n",
1882 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1883 }
1884 udpw->cont (udpw->cont_cls,
1885 &udpw->session->target,
1886 result,
1887 udpw->payload_size,
1888 overhead);
1889 }
1890 if (GNUNET_OK == result)
1891 {
1892 GNUNET_STATISTICS_update (plugin->env->stats,
1893 "# UDP, unfragmented msgs, messages, sent, success",
1894 1,
1895 GNUNET_NO);
1896 GNUNET_STATISTICS_update (
1897 plugin->env->stats,
1898 "# UDP, unfragmented msgs, bytes payload, sent, success",
1899 udpw->payload_size,
1900 GNUNET_NO);
1901 GNUNET_STATISTICS_update (
1902 plugin->env->stats,
1903 "# UDP, unfragmented msgs, bytes overhead, sent, success",
1904 overhead,
1905 GNUNET_NO);
1906 GNUNET_STATISTICS_update (plugin->env->stats,
1907 "# UDP, total, bytes overhead, sent",
1908 overhead,
1909 GNUNET_NO);
1910 GNUNET_STATISTICS_update (plugin->env->stats,
1911 "# UDP, total, bytes payload, sent",
1912 udpw->payload_size,
1913 GNUNET_NO);
1914 }
1915 else
1916 {
1917 GNUNET_STATISTICS_update (plugin->env->stats,
1918 "# UDP, unfragmented msgs, messages, sent, failure",
1919 1,
1920 GNUNET_NO);
1921 GNUNET_STATISTICS_update (
1922 plugin->env->stats,
1923 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1924 udpw->payload_size,
1925 GNUNET_NO);
1926 GNUNET_STATISTICS_update (
1927 plugin->env->stats,
1928 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1929 overhead,
1930 GNUNET_NO);
1931 }
1932}
1933
1934
1935/**
1936 * Function that can be used by the transport service to transmit a
1937 * message using the plugin. Note that in the case of a peer
1938 * disconnecting, the continuation MUST be called prior to the
1939 * disconnect notification itself. This function will be called with
1940 * this peer's HELLO message to initiate a fresh connection to another
1941 * peer.
1942 *
1943 * @param cls closure
1944 * @param s which session must be used
1945 * @param msgbuf the message to transmit
1946 * @param msgbuf_size number of bytes in @a msgbuf
1947 * @param priority how important is the message (most plugins will
1948 * ignore message priority and just FIFO)
1949 * @param to how long to wait at most for the transmission (does not
1950 * require plugins to discard the message after the timeout,
1951 * just advisory for the desired delay; most plugins will ignore
1952 * this as well)
1953 * @param cont continuation to call once the message has
1954 * been transmitted (or if the transport is ready
1955 * for the next transmission call; or if the
1956 * peer disconnected...); can be NULL
1957 * @param cont_cls closure for @a cont
1958 * @return number of bytes used (on the physical network, with overheads);
1959 * -1 on hard errors (i.e. address invalid); 0 is a legal value
1960 * and does NOT mean that the message was not transmitted (DV)
1961 */
1962static ssize_t
1963udp_plugin_send (void *cls,
1964 struct GNUNET_ATS_Session *s,
1965 const char *msgbuf,
1966 size_t msgbuf_size,
1967 unsigned int priority,
1968 struct GNUNET_TIME_Relative to,
1969 GNUNET_TRANSPORT_TransmitContinuation cont,
1970 void *cont_cls)
1971{
1972 struct Plugin *plugin = cls;
1973 size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1974 struct UDP_FragmentationContext *frag_ctx;
1975 struct UDP_MessageWrapper *udpw;
1976 struct UDPMessage *udp;
1977 char mbuf[udpmlen] GNUNET_ALIGN;
1978 struct GNUNET_TIME_Relative latency;
1979
1980 if ((sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1981 (NULL == plugin->sockv6))
1982 return GNUNET_SYSERR;
1983 if ((sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1984 (NULL == plugin->sockv4))
1985 return GNUNET_SYSERR;
1986 if (udpmlen >= GNUNET_MAX_MESSAGE_SIZE)
1987 {
1988 GNUNET_break (0);
1989 return GNUNET_SYSERR;
1990 }
1991 if (GNUNET_YES !=
1992 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1993 &s->target,
1994 s))
1995 {
1996 GNUNET_break (0);
1997 return GNUNET_SYSERR;
1998 }
1999 LOG (GNUNET_ERROR_TYPE_DEBUG,
2000 "UDP transmits %lu-byte message to `%s' using address `%s'\n",
2001 (unsigned long) udpmlen,
2002 GNUNET_i2s (&s->target),
2003 udp_address_to_string (plugin,
2004 s->address->address,
2005 s->address->address_length));
2006
2007 udp = (struct UDPMessage *) mbuf;
2008 udp->header.size = htons (udpmlen);
2009 udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2010 udp->reserved = htonl (0);
2011 udp->sender = *plugin->env->my_identity;
2012
2013 /* We do not update the session time out here! Otherwise this
2014 * session will not timeout since we send keep alive before session
2015 * can timeout.
2016 *
2017 * For UDP we update session timeout only on receive, this will
2018 * cover keep alives, since remote peer will reply with keep alive
2019 * responses!
2020 */if (udpmlen <= UDP_MTU)
2021 {
2022 /* unfragmented message */
2023 udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + udpmlen);
2024 udpw->session = s;
2025 udpw->msg_buf = (char *) &udpw[1];
2026 udpw->msg_size = udpmlen; /* message size with UDP overhead */
2027 udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2028 udpw->start_time = GNUNET_TIME_absolute_get ();
2029 udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2030 udpw->transmission_time = s->last_transmit_time;
2031 s->last_transmit_time =
2032 GNUNET_TIME_absolute_add (s->last_transmit_time,
2033 s->flow_delay_from_other_peer);
2034 udpw->cont = cont;
2035 udpw->cont_cls = cont_cls;
2036 udpw->frag_ctx = NULL;
2037 udpw->qc = &qc_message_sent;
2038 udpw->qc_cls = plugin;
2039 GNUNET_memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2040 GNUNET_memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2041 msgbuf,
2042 msgbuf_size);
2043 enqueue (plugin, udpw);
2044 GNUNET_STATISTICS_update (plugin->env->stats,
2045 "# UDP, unfragmented messages queued total",
2046 1,
2047 GNUNET_NO);
2048 GNUNET_STATISTICS_update (plugin->env->stats,
2049 "# UDP, unfragmented bytes payload queued total",
2050 msgbuf_size,
2051 GNUNET_NO);
2052 if (s->address->address_length == sizeof(struct IPv4UdpAddress))
2053 schedule_select_v4 (plugin);
2054 else
2055 schedule_select_v6 (plugin);
2056 }
2057 else
2058 {
2059 /* fragmented message */
2060 if (NULL != s->frag_ctx)
2061 return GNUNET_SYSERR;
2062 GNUNET_memcpy (&udp[1], msgbuf, msgbuf_size);
2063 frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2064 frag_ctx->plugin = plugin;
2065 frag_ctx->session = s;
2066 frag_ctx->cont = cont;
2067 frag_ctx->cont_cls = cont_cls;
2068 frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2069 frag_ctx->next_frag_time = s->last_transmit_time;
2070 frag_ctx->flow_delay_from_other_peer =
2071 GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
2072 1 + (msgbuf_size / UDP_MTU));
2073 frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2074 frag_ctx->payload_size =
2075 msgbuf_size; /* unfragmented message size without UDP overhead */
2076 frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2077 frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2078 UDP_MTU,
2079 &plugin->tracker,
2080 s->last_expected_msg_delay,
2081 s->last_expected_ack_delay,
2082 &udp->header,
2083 &enqueue_fragment,
2084 frag_ctx);
2085 s->frag_ctx = frag_ctx;
2086 s->last_transmit_time = frag_ctx->next_frag_time;
2087 latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
2088 if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2089 LOG (GNUNET_ERROR_TYPE_WARNING,
2090 "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2091 GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_YES),
2092 GNUNET_i2s (&s->target),
2093 (unsigned int) s->msgs_in_queue);
2094 else
2095 LOG (GNUNET_ERROR_TYPE_DEBUG,
2096 "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2097 GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_YES),
2098 GNUNET_i2s (&s->target),
2099 (unsigned int) s->msgs_in_queue);
2100
2101 GNUNET_STATISTICS_update (plugin->env->stats,
2102 "# UDP, fragmented messages active",
2103 1,
2104 GNUNET_NO);
2105 GNUNET_STATISTICS_update (plugin->env->stats,
2106 "# UDP, fragmented messages, total",
2107 1,
2108 GNUNET_NO);
2109 GNUNET_STATISTICS_update (plugin->env->stats,
2110 "# UDP, fragmented bytes (payload)",
2111 frag_ctx->payload_size,
2112 GNUNET_NO);
2113 }
2114 notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2115 return udpmlen;
2116}
2117
2118
2119/* ********************** Receiving ********************** */
2120
2121
2122/**
2123 * Closure for #find_receive_context().
2124 */
2125struct FindReceiveContext
2126{
2127 /**
2128 * Where to store the result.
2129 */
2130 struct DefragContext *rc;
2131
2132 /**
2133 * Session associated with this context.
2134 */
2135 struct GNUNET_ATS_Session *session;
2136
2137 /**
2138 * Address to find.
2139 */
2140 const union UdpAddress *udp_addr;
2141
2142 /**
2143 * Number of bytes in @e udp_addr.
2144 */
2145 size_t udp_addr_len;
2146};
2147
2148
2149/**
2150 * Scan the heap for a receive context with the given address.
2151 *
2152 * @param cls the `struct FindReceiveContext`
2153 * @param node internal node of the heap
2154 * @param element value stored at the node (a `struct ReceiveContext`)
2155 * @param cost cost associated with the node
2156 * @return #GNUNET_YES if we should continue to iterate,
2157 * #GNUNET_NO if not.
2158 */
2159static int
2160find_receive_context (void *cls,
2161 struct GNUNET_CONTAINER_HeapNode *node,
2162 void *element,
2163 GNUNET_CONTAINER_HeapCostType cost)
2164{
2165 struct FindReceiveContext *frc = cls;
2166 struct DefragContext *e = element;
2167
2168 if ((frc->udp_addr_len == e->udp_addr_len) &&
2169 (0 == memcmp (frc->udp_addr, e->udp_addr, frc->udp_addr_len)))
2170 {
2171 frc->rc = e;
2172 return GNUNET_NO;
2173 }
2174 return GNUNET_YES;
2175}
2176
2177
2178/**
2179 * Functions with this signature are called whenever we need to close
2180 * a session due to a disconnect or failure to establish a connection.
2181 *
2182 * @param cls closure with the `struct Plugin`
2183 * @param s session to close down
2184 * @return #GNUNET_OK on success
2185 */
2186static int
2187udp_disconnect_session (void *cls, struct GNUNET_ATS_Session *s)
2188{
2189 struct Plugin *plugin = cls;
2190 struct UDP_MessageWrapper *udpw;
2191 struct UDP_MessageWrapper *next;
2192 struct FindReceiveContext frc;
2193
2194 GNUNET_assert (GNUNET_YES != s->in_destroy);
2195 LOG (GNUNET_ERROR_TYPE_DEBUG,
2196 "Session %p to peer `%s' at address %s ended\n",
2197 s,
2198 GNUNET_i2s (&s->target),
2199 udp_address_to_string (plugin,
2200 s->address->address,
2201 s->address->address_length));
2202 if (NULL != s->timeout_task)
2203 {
2204 GNUNET_SCHEDULER_cancel (s->timeout_task);
2205 s->timeout_task = NULL;
2206 }
2207 if (NULL != s->frag_ctx)
2208 {
2209 /* Remove fragmented message due to disconnect */
2210 fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
2211 }
2212 GNUNET_assert (
2213 GNUNET_YES ==
2214 GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, &s->target, s));
2215 frc.rc = NULL;
2216 frc.udp_addr = s->address->address;
2217 frc.udp_addr_len = s->address->address_length;
2218 /* Lookup existing receive context for this address */
2219 if (NULL != plugin->defrag_ctxs)
2220 {
2221 GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2222 &find_receive_context,
2223 &frc);
2224 if (NULL != frc.rc)
2225 {
2226 struct DefragContext *d_ctx = frc.rc;
2227
2228 GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2229 GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2230 GNUNET_free (d_ctx);
2231 }
2232 }
2233 s->in_destroy = GNUNET_YES;
2234 next = plugin->ipv4_queue_head;
2235 while (NULL != (udpw = next))
2236 {
2237 next = udpw->next;
2238 if (udpw->session == s)
2239 {
2240 dequeue (plugin, udpw);
2241 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
2242 GNUNET_free (udpw);
2243 }
2244 }
2245 next = plugin->ipv6_queue_head;
2246 while (NULL != (udpw = next))
2247 {
2248 next = udpw->next;
2249 if (udpw->session == s)
2250 {
2251 dequeue (plugin, udpw);
2252 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
2253 GNUNET_free (udpw);
2254 }
2255 }
2256 if ((NULL != s->frag_ctx) && (NULL != s->frag_ctx->cont))
2257 {
2258 /* The 'frag_ctx' itself will be freed in #free_session() a bit
2259 later, as it might be in use right now */
2260 LOG (GNUNET_ERROR_TYPE_DEBUG,
2261 "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2262 GNUNET_i2s (&s->target));
2263 s->frag_ctx->cont (s->frag_ctx->cont_cls,
2264 &s->target,
2265 GNUNET_SYSERR,
2266 s->frag_ctx->payload_size,
2267 s->frag_ctx->on_wire_size);
2268 }
2269 notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_DONE);
2270 plugin->env->session_end (plugin->env->cls, s->address, s);
2271 GNUNET_STATISTICS_set (plugin->env->stats,
2272 "# UDP sessions active",
2273 GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2274 GNUNET_NO);
2275 if (0 == s->rc)
2276 free_session (s);
2277 return GNUNET_OK;
2278}
2279
2280
2281/**
2282 * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
2283 *
2284 * @param plugin the UDP plugin
2285 * @param msg the (presumed) UDP ACK message
2286 * @param udp_addr sender address
2287 * @param udp_addr_len number of bytes in @a udp_addr
2288 */
2289static void
2290read_process_ack (struct Plugin *plugin,
2291 const struct GNUNET_MessageHeader *msg,
2292 const union UdpAddress *udp_addr,
2293 socklen_t udp_addr_len)
2294{
2295 const struct GNUNET_MessageHeader *ack;
2296 const struct UDP_ACK_Message *udp_ack;
2297 struct GNUNET_HELLO_Address *address;
2298 struct GNUNET_ATS_Session *s;
2299 struct GNUNET_TIME_Relative flow_delay;
2300
2301 /* check message format */
2302 if (ntohs (msg->size) <
2303 sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2304 {
2305 GNUNET_break_op (0);
2306 return;
2307 }
2308 udp_ack = (const struct UDP_ACK_Message *) msg;
2309 ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2310 if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2311 {
2312 GNUNET_break_op (0);
2313 return;
2314 }
2315
2316 /* Locate session */
2317 address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2318 PLUGIN_NAME,
2319 udp_addr,
2320 udp_addr_len,
2321 GNUNET_HELLO_ADDRESS_INFO_NONE);
2322 s = udp_plugin_lookup_session (plugin, address);
2323 if (NULL == s)
2324 {
2325 LOG (GNUNET_ERROR_TYPE_WARNING,
2326 "UDP session of address %s for ACK not found\n",
2327 udp_address_to_string (plugin,
2328 address->address,
2329 address->address_length));
2330 GNUNET_HELLO_address_free (address);
2331 return;
2332 }
2333 if (NULL == s->frag_ctx)
2334 {
2335 LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2336 "Fragmentation context of address %s for ACK (%s) not found\n",
2337 udp_address_to_string (plugin,
2338 address->address,
2339 address->address_length),
2340 GNUNET_FRAGMENT_print_ack (ack));
2341 GNUNET_HELLO_address_free (address);
2342 return;
2343 }
2344 GNUNET_HELLO_address_free (address);
2345
2346 /* evaluate flow delay: how long should we wait between messages? */
2347 if (UINT32_MAX == ntohl (udp_ack->delay))
2348 {
2349 /* Other peer asked for us to terminate the session */
2350 LOG (GNUNET_ERROR_TYPE_INFO,
2351 "Asked to disconnect UDP session of %s\n",
2352 GNUNET_i2s (&udp_ack->sender));
2353 udp_disconnect_session (plugin, s);
2354 return;
2355 }
2356 flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2357 if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2358 LOG (GNUNET_ERROR_TYPE_WARNING,
2359 "We received a sending delay of %s for %s\n",
2360 GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES),
2361 GNUNET_i2s (&udp_ack->sender));
2362 else
2363 LOG (GNUNET_ERROR_TYPE_DEBUG,
2364 "We received a sending delay of %s for %s\n",
2365 GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES),
2366 GNUNET_i2s (&udp_ack->sender));
2367 /* Flow delay is for the reassembled packet, however, our delay
2368 is per packet, so we need to adjust: */
2369 s->flow_delay_from_other_peer = flow_delay;
2370
2371 /* Handle ACK */
2372 if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2373 {
2374 LOG (GNUNET_ERROR_TYPE_DEBUG,
2375 "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2376 (unsigned int) ntohs (msg->size),
2377 GNUNET_i2s (&udp_ack->sender),
2378 udp_address_to_string (plugin, udp_addr, udp_addr_len));
2379 /* Expect more ACKs to arrive */
2380 return;
2381 }
2382
2383 /* Remove fragmented message after successful sending */
2384 LOG (GNUNET_ERROR_TYPE_DEBUG,
2385 "Message from %s at %s full ACK'ed\n",
2386 GNUNET_i2s (&udp_ack->sender),
2387 udp_address_to_string (plugin, udp_addr, udp_addr_len));
2388 fragmented_message_done (s->frag_ctx, GNUNET_OK);
2389}
2390
2391
2392/**
2393 * Message tokenizer has broken up an incoming message. Pass it on
2394 * to the service.
2395 *
2396 * @param cls the `struct GNUNET_ATS_Session *`
2397 * @param hdr the actual message
2398 * @return #GNUNET_OK (always)
2399 */
2400static int
2401process_inbound_tokenized_messages (void *cls,
2402 const struct GNUNET_MessageHeader *hdr)
2403{
2404 struct GNUNET_ATS_Session *session = cls;
2405 struct Plugin *plugin = session->plugin;
2406
2407 if (GNUNET_YES == session->in_destroy)
2408 return GNUNET_OK;
2409 reschedule_session_timeout (session);
2410 session->flow_delay_for_other_peer =
2411 plugin->env->receive (plugin->env->cls, session->address, session, hdr);
2412 return GNUNET_OK;
2413}
2414
2415
2416/**
2417 * Destroy a session, plugin is being unloaded.
2418 *
2419 * @param cls the `struct Plugin`
2420 * @param key hash of public key of target peer
2421 * @param value a `struct PeerSession *` to clean up
2422 * @return #GNUNET_OK (continue to iterate)
2423 */
2424static int
2425disconnect_and_free_it (void *cls,
2426 const struct GNUNET_PeerIdentity *key,
2427 void *value)
2428{
2429 struct Plugin *plugin = cls;
2430
2431 udp_disconnect_session (plugin, value);
2432 return GNUNET_OK;
2433}
2434
2435
2436/**
2437 * Disconnect from a remote node. Clean up session if we have one for
2438 * this peer.
2439 *
2440 * @param cls closure for this call (should be handle to Plugin)
2441 * @param target the peeridentity of the peer to disconnect
2442 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2443 */
2444static void
2445udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
2446{
2447 struct Plugin *plugin = cls;
2448
2449 LOG (GNUNET_ERROR_TYPE_DEBUG,
2450 "Disconnecting from peer `%s'\n",
2451 GNUNET_i2s (target));
2452 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2453 target,
2454 &disconnect_and_free_it,
2455 plugin);
2456}
2457
2458
2459/**
2460 * Session was idle, so disconnect it.
2461 *
2462 * @param cls the `struct GNUNET_ATS_Session` to time out
2463 */
2464static void
2465session_timeout (void *cls)
2466{
2467 struct GNUNET_ATS_Session *s = cls;
2468 struct Plugin *plugin = s->plugin;
2469 struct GNUNET_TIME_Relative left;
2470
2471 s->timeout_task = NULL;
2472 left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2473 if (left.rel_value_us > 0)
2474 {
2475 /* not actually our turn yet, but let's at least update
2476 the monitor, it may think we're about to die ... */
2477 notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2478 s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &session_timeout, s);
2479 return;
2480 }
2481 LOG (GNUNET_ERROR_TYPE_DEBUG,
2482 "Session %p was idle for %s, disconnecting\n",
2483 s,
2484 GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2485 GNUNET_YES));
2486 /* call session destroy function */
2487 udp_disconnect_session (plugin, s);
2488}
2489
2490
2491/**
2492 * Allocate a new session for the given endpoint address.
2493 * Note that this function does not inform the service
2494 * of the new session, this is the responsibility of the
2495 * caller (if needed).
2496 *
2497 * @param cls the `struct Plugin`
2498 * @param address address of the other peer to use
2499 * @param network_type network type the address belongs to
2500 * @return NULL on error, otherwise session handle
2501 */
2502static struct GNUNET_ATS_Session *
2503udp_plugin_create_session (void *cls,
2504 const struct GNUNET_HELLO_Address *address,
2505 enum GNUNET_NetworkType network_type)
2506{
2507 struct Plugin *plugin = cls;
2508 struct GNUNET_ATS_Session *s;
2509
2510 s = GNUNET_new (struct GNUNET_ATS_Session);
2511 s->mst = GNUNET_MST_create (&process_inbound_tokenized_messages, s);
2512 s->plugin = plugin;
2513 s->address = GNUNET_HELLO_address_copy (address);
2514 s->target = address->peer;
2515 s->last_transmit_time = GNUNET_TIME_absolute_get ();
2516 s->last_expected_ack_delay =
2517 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
2518 s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2519 s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2520 s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2521 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2522 s->timeout_task =
2523 GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, &session_timeout, s);
2524 s->scope = network_type;
2525
2526 LOG (GNUNET_ERROR_TYPE_DEBUG,
2527 "Creating new session %p for peer `%s' address `%s'\n",
2528 s,
2529 GNUNET_i2s (&address->peer),
2530 udp_address_to_string (plugin,
2531 address->address,
2532 address->address_length));
2533 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (
2534 plugin->sessions,
2535 &s->target,
2536 s,
2537 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2538 GNUNET_STATISTICS_set (plugin->env->stats,
2539 "# UDP sessions active",
2540 GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2541 GNUNET_NO);
2542 notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_INIT);
2543 return s;
2544}
2545
2546
2547/**
2548 * Creates a new outbound session the transport service will use to
2549 * send data to the peer.
2550 *
2551 * @param cls the `struct Plugin *`
2552 * @param address the address
2553 * @return the session or NULL of max connections exceeded
2554 */
2555static struct GNUNET_ATS_Session *
2556udp_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address)
2557{
2558 struct Plugin *plugin = cls;
2559 struct GNUNET_ATS_Session *s;
2560 enum GNUNET_NetworkType network_type = GNUNET_NT_UNSPECIFIED;
2561 const struct IPv4UdpAddress *udp_v4;
2562 const struct IPv6UdpAddress *udp_v6;
2563
2564 if (NULL == address)
2565 {
2566 GNUNET_break (0);
2567 return NULL;
2568 }
2569 if ((address->address_length != sizeof(struct IPv4UdpAddress)) &&
2570 (address->address_length != sizeof(struct IPv6UdpAddress)))
2571 {
2572 GNUNET_break_op (0);
2573 return NULL;
2574 }
2575 if (NULL != (s = udp_plugin_lookup_session (cls, address)))
2576 return s;
2577
2578 /* need to create new session */
2579 if (sizeof(struct IPv4UdpAddress) == address->address_length)
2580 {
2581 struct sockaddr_in v4;
2582
2583 udp_v4 = (const struct IPv4UdpAddress *) address->address;
2584 memset (&v4, '\0', sizeof(v4));
2585 v4.sin_family = AF_INET;
2586#if HAVE_SOCKADDR_IN_SIN_LEN
2587 v4.sin_len = sizeof(struct sockaddr_in);
2588#endif
2589 v4.sin_port = udp_v4->u4_port;
2590 v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2591 network_type = plugin->env->get_address_type (plugin->env->cls,
2592 (const struct sockaddr *) &v4,
2593 sizeof(v4));
2594 }
2595 if (sizeof(struct IPv6UdpAddress) == address->address_length)
2596 {
2597 struct sockaddr_in6 v6;
2598
2599 udp_v6 = (const struct IPv6UdpAddress *) address->address;
2600 memset (&v6, '\0', sizeof(v6));
2601 v6.sin6_family = AF_INET6;
2602#if HAVE_SOCKADDR_IN_SIN_LEN
2603 v6.sin6_len = sizeof(struct sockaddr_in6);
2604#endif
2605 v6.sin6_port = udp_v6->u6_port;
2606 v6.sin6_addr = udp_v6->ipv6_addr;
2607 network_type = plugin->env->get_address_type (plugin->env->cls,
2608 (const struct sockaddr *) &v6,
2609 sizeof(v6));
2610 }
2611 GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2612 return udp_plugin_create_session (cls, address, network_type);
2613}
2614
2615
2616/**
2617 * We've received a UDP Message. Process it (pass contents to main service).
2618 *
2619 * @param plugin plugin context
2620 * @param msg the message
2621 * @param udp_addr sender address
2622 * @param udp_addr_len number of bytes in @a udp_addr
2623 * @param network_type network type the address belongs to
2624 */
2625static void
2626process_udp_message (struct Plugin *plugin,
2627 const struct UDPMessage *msg,
2628 const union UdpAddress *udp_addr,
2629 size_t udp_addr_len,
2630 enum GNUNET_NetworkType network_type)
2631{
2632 struct GNUNET_ATS_Session *s;
2633 struct GNUNET_HELLO_Address *address;
2634
2635 GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2636 if (0 != ntohl (msg->reserved))
2637 {
2638 GNUNET_break_op (0);
2639 return;
2640 }
2641 if (ntohs (msg->header.size) <
2642 sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2643 {
2644 GNUNET_break_op (0);
2645 return;
2646 }
2647
2648 address = GNUNET_HELLO_address_allocate (&msg->sender,
2649 PLUGIN_NAME,
2650 udp_addr,
2651 udp_addr_len,
2652 GNUNET_HELLO_ADDRESS_INFO_NONE);
2653 if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
2654 {
2655 s = udp_plugin_create_session (plugin, address, network_type);
2656 plugin->env->session_start (plugin->env->cls, address, s, s->scope);
2657 notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UP);
2658 }
2659 GNUNET_free (address);
2660
2661 s->rc++;
2662 GNUNET_MST_from_buffer (s->mst,
2663 (const char *) &msg[1],
2664 ntohs (msg->header.size) - sizeof(struct UDPMessage),
2665 GNUNET_YES,
2666 GNUNET_NO);
2667 s->rc--;
2668 if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2669 free_session (s);
2670}
2671
2672
2673/**
2674 * Process a defragmented message.
2675 *
2676 * @param cls the `struct DefragContext *`
2677 * @param msg the message
2678 */
2679static void
2680fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
2681{
2682 struct DefragContext *dc = cls;
2683 const struct UDPMessage *um;
2684
2685 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2686 {
2687 GNUNET_break_op (0);
2688 return;
2689 }
2690 if (ntohs (msg->size) < sizeof(struct UDPMessage))
2691 {
2692 GNUNET_break_op (0);
2693 return;
2694 }
2695 um = (const struct UDPMessage *) msg;
2696 dc->sender = um->sender;
2697 dc->have_sender = GNUNET_YES;
2698 process_udp_message (dc->plugin,
2699 um,
2700 dc->udp_addr,
2701 dc->udp_addr_len,
2702 dc->network_type);
2703}
2704
2705
2706/**
2707 * We finished sending an acknowledgement. Update
2708 * statistics.
2709 *
2710 * @param cls the `struct Plugin`
2711 * @param udpw message queue entry of the ACK
2712 * @param result #GNUNET_OK if the transmission worked,
2713 * #GNUNET_SYSERR if we failed to send the ACK
2714 */
2715static void
2716ack_message_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
2717{
2718 struct Plugin *plugin = cls;
2719
2720 if (GNUNET_OK == result)
2721 {
2722 GNUNET_STATISTICS_update (plugin->env->stats,
2723 "# UDP, ACK messages sent",
2724 1,
2725 GNUNET_NO);
2726 }
2727 else
2728 {
2729 GNUNET_STATISTICS_update (plugin->env->stats,
2730 "# UDP, ACK transmissions failed",
2731 1,
2732 GNUNET_NO);
2733 }
2734}
2735
2736
2737/**
2738 * Transmit an acknowledgement.
2739 *
2740 * @param cls the `struct DefragContext *`
2741 * @param id message ID (unused)
2742 * @param msg ack to transmit
2743 */
2744static void
2745ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2746{
2747 struct DefragContext *rc = cls;
2748 struct Plugin *plugin = rc->plugin;
2749 size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2750 struct UDP_ACK_Message *udp_ack;
2751 uint32_t delay;
2752 struct UDP_MessageWrapper *udpw;
2753 struct GNUNET_ATS_Session *s;
2754 struct GNUNET_HELLO_Address *address;
2755
2756 if (GNUNET_NO == rc->have_sender)
2757 {
2758 /* tried to defragment but never succeeded, hence will not ACK */
2759 /* This can happen if we just lost msgs */
2760 GNUNET_STATISTICS_update (plugin->env->stats,
2761 "# UDP, fragments discarded without ACK",
2762 1,
2763 GNUNET_NO);
2764 return;
2765 }
2766 address = GNUNET_HELLO_address_allocate (&rc->sender,
2767 PLUGIN_NAME,
2768 rc->udp_addr,
2769 rc->udp_addr_len,
2770 GNUNET_HELLO_ADDRESS_INFO_NONE);
2771 s = udp_plugin_lookup_session (plugin, address);
2772 GNUNET_HELLO_address_free (address);
2773 if (NULL == s)
2774 {
2775 LOG (GNUNET_ERROR_TYPE_ERROR,
2776 "Trying to transmit ACK to peer `%s' but no session found!\n",
2777 udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len));
2778 GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2779 GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2780 GNUNET_free (rc);
2781 GNUNET_STATISTICS_update (plugin->env->stats,
2782 "# UDP, ACK transmissions failed",
2783 1,
2784 GNUNET_NO);
2785 return;
2786 }
2787 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
2788 s->flow_delay_for_other_peer.rel_value_us)
2789 delay = UINT32_MAX;
2790 else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
2791 delay = s->flow_delay_for_other_peer.rel_value_us;
2792 else
2793 delay = UINT32_MAX - 1; /* largest value we can communicate */
2794 LOG (GNUNET_ERROR_TYPE_DEBUG,
2795 "Sending ACK to `%s' including delay of %s\n",
2796 udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len),
2797 GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2798 GNUNET_YES));
2799 udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + msize);
2800 udpw->msg_size = msize;
2801 udpw->payload_size = 0;
2802 udpw->session = s;
2803 udpw->start_time = GNUNET_TIME_absolute_get ();
2804 udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2805 udpw->msg_buf = (char *) &udpw[1];
2806 udpw->qc = &ack_message_sent;
2807 udpw->qc_cls = plugin;
2808 udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2809 udp_ack->header.size = htons ((uint16_t) msize);
2810 udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2811 udp_ack->delay = htonl (delay);
2812 udp_ack->sender = *plugin->env->my_identity;
2813 GNUNET_memcpy (&udp_ack[1], msg, ntohs (msg->size));
2814 enqueue (plugin, udpw);
2815 notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2816 if (s->address->address_length == sizeof(struct IPv4UdpAddress))
2817 schedule_select_v4 (plugin);
2818 else
2819 schedule_select_v6 (plugin);
2820}
2821
2822
2823/**
2824 * We received a fragment, process it.
2825 *
2826 * @param plugin our plugin
2827 * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2828 * @param udp_addr sender address
2829 * @param udp_addr_len number of bytes in @a udp_addr
2830 * @param network_type network type the address belongs to
2831 */
2832static void
2833read_process_fragment (struct Plugin *plugin,
2834 const struct GNUNET_MessageHeader *msg,
2835 const union UdpAddress *udp_addr,
2836 size_t udp_addr_len,
2837 enum GNUNET_NetworkType network_type)
2838{
2839 struct DefragContext *d_ctx;
2840 struct GNUNET_TIME_Absolute now;
2841 struct FindReceiveContext frc;
2842
2843 frc.rc = NULL;
2844 frc.udp_addr = udp_addr;
2845 frc.udp_addr_len = udp_addr_len;
2846
2847 /* Lookup existing receive context for this address */
2848 GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2849 &find_receive_context,
2850 &frc);
2851 now = GNUNET_TIME_absolute_get ();
2852 d_ctx = frc.rc;
2853
2854 if (NULL == d_ctx)
2855 {
2856 /* Create a new defragmentation context */
2857 d_ctx = GNUNET_malloc (sizeof(struct DefragContext) + udp_addr_len);
2858 GNUNET_memcpy (&d_ctx[1], udp_addr, udp_addr_len);
2859 d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2860 d_ctx->udp_addr_len = udp_addr_len;
2861 d_ctx->network_type = network_type;
2862 d_ctx->plugin = plugin;
2863 d_ctx->defrag =
2864 GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2865 UDP_MTU,
2866 UDP_MAX_MESSAGES_IN_DEFRAG,
2867 d_ctx,
2868 &fragment_msg_proc,
2869 &ack_proc);
2870 d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2871 d_ctx,
2872 (GNUNET_CONTAINER_HeapCostType)
2873 now.abs_value_us);
2874 LOG (GNUNET_ERROR_TYPE_DEBUG,
2875 "Created new defragmentation context for %u-byte fragment from `%s'\n",
2876 (unsigned int) ntohs (msg->size),
2877 udp_address_to_string (plugin, udp_addr, udp_addr_len));
2878 }
2879 else
2880 {
2881 LOG (GNUNET_ERROR_TYPE_DEBUG,
2882 "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2883 (unsigned int) ntohs (msg->size),
2884 udp_address_to_string (plugin, udp_addr, udp_addr_len));
2885 }
2886
2887 if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2888 {
2889 /* keep this 'rc' from expiring */
2890 GNUNET_CONTAINER_heap_update_cost (d_ctx->hnode,
2891 (GNUNET_CONTAINER_HeapCostType)
2892 now.abs_value_us);
2893 }
2894 if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2895 UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2896 {
2897 /* remove 'rc' that was inactive the longest */
2898 d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2899 GNUNET_assert (NULL != d_ctx);
2900 GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2901 GNUNET_free (d_ctx);
2902 GNUNET_STATISTICS_update (plugin->env->stats,
2903 "# UDP, Defragmentations aborted",
2904 1,
2905 GNUNET_NO);
2906 }
2907}
2908
2909
2910/**
2911 * Read and process a message from the given socket.
2912 *
2913 * @param plugin the overall plugin
2914 * @param rsock socket to read from
2915 */
2916static void
2917udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2918{
2919 socklen_t fromlen;
2920 struct sockaddr_storage addr;
2921 char buf[65536] GNUNET_ALIGN;
2922 ssize_t size;
2923 const struct GNUNET_MessageHeader *msg;
2924 struct IPv4UdpAddress v4;
2925 struct IPv6UdpAddress v6;
2926 const struct sockaddr *sa;
2927 const struct sockaddr_in *sa4;
2928 const struct sockaddr_in6 *sa6;
2929 const union UdpAddress *int_addr;
2930 size_t int_addr_len;
2931 enum GNUNET_NetworkType network_type;
2932
2933 fromlen = sizeof(addr);
2934 memset (&addr, 0, sizeof(addr));
2935 size = GNUNET_NETWORK_socket_recvfrom (rsock,
2936 buf,
2937 sizeof(buf),
2938 (struct sockaddr *) &addr,
2939 &fromlen);
2940 sa = (const struct sockaddr *) &addr;
2941
2942 if (-1 == size)
2943 {
2944 LOG (GNUNET_ERROR_TYPE_DEBUG,
2945 "UDP failed to receive data: %s\n",
2946 strerror (errno));
2947 /* Connection failure or something. Not a protocol violation. */
2948 return;
2949 }
2950
2951 /* Check if this is a STUN packet */
2952 if (GNUNET_NO !=
2953 GNUNET_NAT_stun_handle_packet (plugin->nat,
2954 (const struct sockaddr *) &addr,
2955 fromlen,
2956 buf,
2957 size))
2958 return; /* was STUN, do not process further */
2959
2960 if (size < sizeof(struct GNUNET_MessageHeader))
2961 {
2962 LOG (GNUNET_ERROR_TYPE_WARNING,
2963 "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2964 (unsigned int) size,
2965 GNUNET_a2s (sa, fromlen));
2966 /* _MAY_ be a connection failure (got partial message) */
2967 /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2968 GNUNET_break_op (0);
2969 return;
2970 }
2971
2972 msg = (const struct GNUNET_MessageHeader *) buf;
2973 LOG (GNUNET_ERROR_TYPE_DEBUG,
2974 "UDP received %u-byte message from `%s' type %u\n",
2975 (unsigned int) size,
2976 GNUNET_a2s (sa, fromlen),
2977 ntohs (msg->type));
2978 if (size != ntohs (msg->size))
2979 {
2980 LOG (GNUNET_ERROR_TYPE_WARNING,
2981 "UDP malformed message (size %u) header from %s\n",
2982 (unsigned int) size,
2983 GNUNET_a2s (sa, fromlen));
2984 GNUNET_break_op (0);
2985 return;
2986 }
2987 GNUNET_STATISTICS_update (plugin->env->stats,
2988 "# UDP, total bytes received",
2989 size,
2990 GNUNET_NO);
2991 network_type = plugin->env->get_address_type (plugin->env->cls, sa, fromlen);
2992 switch (sa->sa_family)
2993 {
2994 case AF_INET:
2995 sa4 = (const struct sockaddr_in *) &addr;
2996 v4.options = 0;
2997 v4.ipv4_addr = sa4->sin_addr.s_addr;
2998 v4.u4_port = sa4->sin_port;
2999 int_addr = (union UdpAddress *) &v4;
3000 int_addr_len = sizeof(v4);
3001 break;
3002
3003 case AF_INET6:
3004 sa6 = (const struct sockaddr_in6 *) &addr;
3005 v6.options = 0;
3006 v6.ipv6_addr = sa6->sin6_addr;
3007 v6.u6_port = sa6->sin6_port;
3008 int_addr = (union UdpAddress *) &v6;
3009 int_addr_len = sizeof(v6);
3010 break;
3011
3012 default:
3013 GNUNET_break (0);
3014 return;
3015 }
3016
3017 switch (ntohs (msg->type))
3018 {
3019 case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3020 if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3021 udp_broadcast_receive (plugin,
3022 buf,
3023 size,
3024 int_addr,
3025 int_addr_len,
3026 network_type);
3027 return;
3028
3029 case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3030 if (ntohs (msg->size) < sizeof(struct UDPMessage))
3031 {
3032 GNUNET_break_op (0);
3033 return;
3034 }
3035 process_udp_message (plugin,
3036 (const struct UDPMessage *) msg,
3037 int_addr,
3038 int_addr_len,
3039 network_type);
3040 return;
3041
3042 case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3043 read_process_ack (plugin, msg, int_addr, int_addr_len);
3044 return;
3045
3046 case GNUNET_MESSAGE_TYPE_FRAGMENT:
3047 read_process_fragment (plugin, msg, int_addr, int_addr_len, network_type);
3048 return;
3049
3050 default:
3051 GNUNET_break_op (0);
3052 return;
3053 }
3054}
3055
3056
3057/**
3058 * Removes messages from the transmission queue that have
3059 * timed out, and then selects a message that should be
3060 * transmitted next.
3061 *
3062 * @param plugin the UDP plugin
3063 * @param sock which socket should we process the queue for (v4 or v6)
3064 * @return message selected for transmission, or NULL for none
3065 */
3066static struct UDP_MessageWrapper *
3067remove_timeout_messages_and_select (struct Plugin *plugin,
3068 struct GNUNET_NETWORK_Handle *sock)
3069{
3070 struct UDP_MessageWrapper *udpw;
3071 struct GNUNET_TIME_Relative remaining;
3072 struct GNUNET_ATS_Session *session;
3073 int removed;
3074
3075 removed = GNUNET_NO;
3076 udpw = (sock == plugin->sockv4) ? plugin->ipv4_queue_head
3077 : plugin->ipv6_queue_head;
3078 while (NULL != udpw)
3079 {
3080 session = udpw->session;
3081 /* Find messages with timeout */
3082 remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3083 if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3084 {
3085 /* Message timed out */
3086 removed = GNUNET_YES;
3087 dequeue (plugin, udpw);
3088 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3089 GNUNET_free (udpw);
3090
3091 if (sock == plugin->sockv4)
3092 {
3093 udpw = plugin->ipv4_queue_head;
3094 }
3095 else if (sock == plugin->sockv6)
3096 {
3097 udpw = plugin->ipv6_queue_head;
3098 }
3099 else
3100 {
3101 GNUNET_break (0); /* should never happen */
3102 udpw = NULL;
3103 }
3104 GNUNET_STATISTICS_update (plugin->env->stats,
3105 "# messages discarded due to timeout",
3106 1,
3107 GNUNET_NO);
3108 }
3109 else
3110 {
3111 /* Message did not time out, check transmission time */
3112 remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3113 if (0 == remaining.rel_value_us)
3114 {
3115 /* this message is not delayed */
3116 LOG (GNUNET_ERROR_TYPE_DEBUG,
3117 "Message for peer `%s' (%lu bytes) is not delayed \n",
3118 GNUNET_i2s (&udpw->session->target),
3119 (unsigned long) udpw->payload_size);
3120 break; /* Found message to send, break */
3121 }
3122 else
3123 {
3124 /* Message is delayed, try next */
3125 LOG (GNUNET_ERROR_TYPE_DEBUG,
3126 "Message for peer `%s' (%lu bytes) is delayed for %s\n",
3127 GNUNET_i2s (&udpw->session->target),
3128 (unsigned long) udpw->payload_size,
3129 GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
3130 udpw = udpw->next;
3131 }
3132 }
3133 }
3134 if (GNUNET_YES == removed)
3135 notify_session_monitor (session->plugin,
3136 session,
3137 GNUNET_TRANSPORT_SS_UPDATE);
3138 return udpw;
3139}
3140
3141
3142/**
3143 * We failed to transmit a message via UDP. Generate
3144 * a descriptive error message.
3145 *
3146 * @param plugin our plugin
3147 * @param sa target address we were trying to reach
3148 * @param slen number of bytes in @a sa
3149 * @param error the errno value returned from the sendto() call
3150 */
3151static void
3152analyze_send_error (struct Plugin *plugin,
3153 const struct sockaddr *sa,
3154 socklen_t slen,
3155 int error)
3156{
3157 enum GNUNET_NetworkType type;
3158
3159 type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
3160 if (((GNUNET_NT_LAN == type) || (GNUNET_NT_WAN == type)) &&
3161 ((ENETUNREACH == errno) || (ENETDOWN == errno)))
3162 {
3163 if (slen == sizeof(struct sockaddr_in))
3164 {
3165 /* IPv4: "Network unreachable" or "Network down"
3166 *
3167 * This indicates we do not have connectivity
3168 */
3169 LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3170 _ ("UDP could not transmit message to `%s': "
3171 "Network seems down, please check your network configuration\n"),
3172 GNUNET_a2s (sa, slen));
3173 }
3174 if (slen == sizeof(struct sockaddr_in6))
3175 {
3176 /* IPv6: "Network unreachable" or "Network down"
3177 *
3178 * This indicates that this system is IPv6 enabled, but does not
3179 * have a valid global IPv6 address assigned or we do not have
3180 * connectivity
3181 */LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3182 _ (
3183 "UDP could not transmit IPv6 message! "
3184 "Please check your network configuration and disable IPv6 if your "
3185 "connection does not have a global IPv6 address\n"));
3186 }
3187 }
3188 else
3189 {
3190 LOG (GNUNET_ERROR_TYPE_WARNING,
3191 "UDP could not transmit message to `%s': `%s'\n",
3192 GNUNET_a2s (sa, slen),
3193 strerror (error));
3194 }
3195}
3196
3197
3198/**
3199 * It is time to try to transmit a UDP message. Select one
3200 * and send.
3201 *
3202 * @param plugin the plugin
3203 * @param sock which socket (v4/v6) to send on
3204 */
3205static void
3206udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
3207{
3208 ssize_t sent;
3209 socklen_t slen;
3210 const struct sockaddr *a;
3211 const struct IPv4UdpAddress *u4;
3212 struct sockaddr_in a4;
3213 const struct IPv6UdpAddress *u6;
3214 struct sockaddr_in6 a6;
3215 struct UDP_MessageWrapper *udpw;
3216
3217 /* Find message(s) to send */
3218 while (NULL != (udpw = remove_timeout_messages_and_select (plugin, sock)))
3219 {
3220 if (sizeof(struct IPv4UdpAddress) ==
3221 udpw->session->address->address_length)
3222 {
3223 u4 = udpw->session->address->address;
3224 memset (&a4, 0, sizeof(a4));
3225 a4.sin_family = AF_INET;
3226#if HAVE_SOCKADDR_IN_SIN_LEN
3227 a4.sin_len = sizeof(a4);
3228#endif
3229 a4.sin_port = u4->u4_port;
3230 a4.sin_addr.s_addr = u4->ipv4_addr;
3231 a = (const struct sockaddr *) &a4;
3232 slen = sizeof(a4);
3233 }
3234 else if (sizeof(struct IPv6UdpAddress) ==
3235 udpw->session->address->address_length)
3236 {
3237 u6 = udpw->session->address->address;
3238 memset (&a6, 0, sizeof(a6));
3239 a6.sin6_family = AF_INET6;
3240#if HAVE_SOCKADDR_IN_SIN_LEN
3241 a6.sin6_len = sizeof(a6);
3242#endif
3243 a6.sin6_port = u6->u6_port;
3244 a6.sin6_addr = u6->ipv6_addr;
3245 a = (const struct sockaddr *) &a6;
3246 slen = sizeof(a6);
3247 }
3248 else
3249 {
3250 GNUNET_break (0);
3251 dequeue (plugin, udpw);
3252 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3253 notify_session_monitor (plugin,
3254 udpw->session,
3255 GNUNET_TRANSPORT_SS_UPDATE);
3256 GNUNET_free (udpw);
3257 continue;
3258 }
3259 sent = GNUNET_NETWORK_socket_sendto (sock,
3260 udpw->msg_buf,
3261 udpw->msg_size,
3262 a,
3263 slen);
3264 udpw->session->last_transmit_time =
3265 GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3266 udpw->session->last_transmit_time);
3267 dequeue (plugin, udpw);
3268 if (GNUNET_SYSERR == sent)
3269 {
3270 /* Failure */
3271 analyze_send_error (plugin, a, slen, errno);
3272 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3273 GNUNET_STATISTICS_update (plugin->env->stats,
3274 "# UDP, total, bytes, sent, failure",
3275 sent,
3276 GNUNET_NO);
3277 GNUNET_STATISTICS_update (plugin->env->stats,
3278 "# UDP, total, messages, sent, failure",
3279 1,
3280 GNUNET_NO);
3281 }
3282 else
3283 {
3284 /* Success */
3285 LOG (GNUNET_ERROR_TYPE_DEBUG,
3286 "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n",
3287 (unsigned int) (udpw->msg_size),
3288 GNUNET_i2s (&udpw->session->target),
3289 GNUNET_a2s (a, slen),
3290 (int) sent,
3291 (sent < 0) ? strerror (errno) : "ok");
3292 GNUNET_STATISTICS_update (plugin->env->stats,
3293 "# UDP, total, bytes, sent, success",
3294 sent,
3295 GNUNET_NO);
3296 GNUNET_STATISTICS_update (plugin->env->stats,
3297 "# UDP, total, messages, sent, success",
3298 1,
3299 GNUNET_NO);
3300 if (NULL != udpw->frag_ctx)
3301 udpw->frag_ctx->on_wire_size += udpw->msg_size;
3302 udpw->qc (udpw->qc_cls, udpw, GNUNET_OK);
3303 }
3304 notify_session_monitor (plugin, udpw->session, GNUNET_TRANSPORT_SS_UPDATE);
3305 GNUNET_free (udpw);
3306 }
3307}
3308
3309
3310/* ***************** Event loop (part 2) *************** */
3311
3312
3313/**
3314 * We have been notified that our readset has something to read. We don't
3315 * know which socket needs to be read, so we have to check each one
3316 * Then reschedule this function to be called again once more is available.
3317 *
3318 * @param cls the plugin handle
3319 */
3320static void
3321udp_plugin_select_v4 (void *cls)
3322{
3323 struct Plugin *plugin = cls;
3324 const struct GNUNET_SCHEDULER_TaskContext *tc;
3325
3326 plugin->select_task_v4 = NULL;
3327 if (NULL == plugin->sockv4)
3328 return;
3329 tc = GNUNET_SCHEDULER_get_task_context ();
3330 if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3331 (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3332 udp_select_read (plugin, plugin->sockv4);
3333 udp_select_send (plugin, plugin->sockv4);
3334 schedule_select_v4 (plugin);
3335}
3336
3337
3338/**
3339 * We have been notified that our readset has something to read. We don't
3340 * know which socket needs to be read, so we have to check each one
3341 * Then reschedule this function to be called again once more is available.
3342 *
3343 * @param cls the plugin handle
3344 */
3345static void
3346udp_plugin_select_v6 (void *cls)
3347{
3348 struct Plugin *plugin = cls;
3349 const struct GNUNET_SCHEDULER_TaskContext *tc;
3350
3351 plugin->select_task_v6 = NULL;
3352 if (NULL == plugin->sockv6)
3353 return;
3354 tc = GNUNET_SCHEDULER_get_task_context ();
3355 if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3356 (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3357 udp_select_read (plugin, plugin->sockv6);
3358
3359 udp_select_send (plugin, plugin->sockv6);
3360 schedule_select_v6 (plugin);
3361}
3362
3363
3364/* ******************* Initialization *************** */
3365
3366
3367/**
3368 * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3369 *
3370 * @param plugin the plugin to initialize
3371 * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3372 * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3373 * @return number of sockets that were successfully bound
3374 */
3375static unsigned int
3376setup_sockets (struct Plugin *plugin,
3377 const struct sockaddr_in6 *bind_v6,
3378 const struct sockaddr_in *bind_v4)
3379{
3380 int tries;
3381 unsigned int sockets_created = 0;
3382 struct sockaddr_in6 server_addrv6;
3383 struct sockaddr_in server_addrv4;
3384 const struct sockaddr *server_addr;
3385 const struct sockaddr *addrs[2];
3386 socklen_t addrlens[2];
3387 socklen_t addrlen;
3388 int eno;
3389
3390 /* Create IPv6 socket */
3391 eno = EINVAL;
3392 if (GNUNET_YES == plugin->enable_ipv6)
3393 {
3394 plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3395 if (NULL == plugin->sockv6)
3396 {
3397 LOG (GNUNET_ERROR_TYPE_INFO,
3398 _ ("Disabling IPv6 since it is not supported on this system!\n"));
3399 plugin->enable_ipv6 = GNUNET_NO;
3400 }
3401 else
3402 {
3403 memset (&server_addrv6, 0, sizeof(struct sockaddr_in6));
3404#if HAVE_SOCKADDR_IN_SIN_LEN
3405 server_addrv6.sin6_len = sizeof(struct sockaddr_in6);
3406#endif
3407 server_addrv6.sin6_family = AF_INET6;
3408 if (NULL != bind_v6)
3409 server_addrv6.sin6_addr = bind_v6->sin6_addr;
3410 else
3411 server_addrv6.sin6_addr = in6addr_any;
3412
3413 if (0 == plugin->port) /* autodetect */
3414 server_addrv6.sin6_port = htons (
3415 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3416 + 32000);
3417 else
3418 server_addrv6.sin6_port = htons (plugin->port);
3419 addrlen = sizeof(struct sockaddr_in6);
3420 server_addr = (const struct sockaddr *) &server_addrv6;
3421
3422 tries = 0;
3423 while (tries < 10)
3424 {
3425 LOG (GNUNET_ERROR_TYPE_DEBUG,
3426 "Binding to IPv6 `%s'\n",
3427 GNUNET_a2s (server_addr, addrlen));
3428 /* binding */
3429 if (GNUNET_OK ==
3430 GNUNET_NETWORK_socket_bind (plugin->sockv6, server_addr, addrlen))
3431 break;
3432 eno = errno;
3433 if (0 != plugin->port)
3434 {
3435 tries = 10; /* fail immediately */
3436 break; /* bind failed on specific port */
3437 }
3438 /* autodetect */
3439 server_addrv6.sin6_port = htons (
3440 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3441 + 32000);
3442 tries++;
3443 }
3444 if (tries >= 10)
3445 {
3446 GNUNET_NETWORK_socket_close (plugin->sockv6);
3447 plugin->enable_ipv6 = GNUNET_NO;
3448 plugin->sockv6 = NULL;
3449 }
3450 else
3451 {
3452 plugin->port = ntohs (server_addrv6.sin6_port);
3453 }
3454 if (NULL != plugin->sockv6)
3455 {
3456 LOG (GNUNET_ERROR_TYPE_DEBUG,
3457 "IPv6 UDP socket created listinging at %s\n",
3458 GNUNET_a2s (server_addr, addrlen));
3459 addrs[sockets_created] = server_addr;
3460 addrlens[sockets_created] = addrlen;
3461 sockets_created++;
3462 }
3463 else
3464 {
3465 LOG (GNUNET_ERROR_TYPE_WARNING,
3466 _ ("Failed to bind UDP socket to %s: %s\n"),
3467 GNUNET_a2s (server_addr, addrlen),
3468 strerror (eno));
3469 }
3470 }
3471 }
3472
3473 /* Create IPv4 socket */
3474 eno = EINVAL;
3475 plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3476 if (NULL == plugin->sockv4)
3477 {
3478 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
3479 LOG (GNUNET_ERROR_TYPE_INFO,
3480 _ ("Disabling IPv4 since it is not supported on this system!\n"));
3481 plugin->enable_ipv4 = GNUNET_NO;
3482 }
3483 else
3484 {
3485 memset (&server_addrv4, 0, sizeof(struct sockaddr_in));
3486#if HAVE_SOCKADDR_IN_SIN_LEN
3487 server_addrv4.sin_len = sizeof(struct sockaddr_in);
3488#endif
3489 server_addrv4.sin_family = AF_INET;
3490 if (NULL != bind_v4)
3491 server_addrv4.sin_addr = bind_v4->sin_addr;
3492 else
3493 server_addrv4.sin_addr.s_addr = INADDR_ANY;
3494
3495 if (0 == plugin->port)
3496 /* autodetect */
3497 server_addrv4.sin_port = htons (
3498 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
3499 else
3500 server_addrv4.sin_port = htons (plugin->port);
3501
3502 addrlen = sizeof(struct sockaddr_in);
3503 server_addr = (const struct sockaddr *) &server_addrv4;
3504
3505 tries = 0;
3506 while (tries < 10)
3507 {
3508 LOG (GNUNET_ERROR_TYPE_DEBUG,
3509 "Binding to IPv4 `%s'\n",
3510 GNUNET_a2s (server_addr, addrlen));
3511
3512 /* binding */
3513 if (GNUNET_OK ==
3514 GNUNET_NETWORK_socket_bind (plugin->sockv4, server_addr, addrlen))
3515 break;
3516 eno = errno;
3517 if (0 != plugin->port)
3518 {
3519 tries = 10; /* fail */
3520 break; /* bind failed on specific port */
3521 }
3522
3523 /* autodetect */
3524 server_addrv4.sin_port = htons (
3525 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
3526 tries++;
3527 }
3528 if (tries >= 10)
3529 {
3530 GNUNET_NETWORK_socket_close (plugin->sockv4);
3531 plugin->enable_ipv4 = GNUNET_NO;
3532 plugin->sockv4 = NULL;
3533 }
3534 else
3535 {
3536 plugin->port = ntohs (server_addrv4.sin_port);
3537 }
3538
3539 if (NULL != plugin->sockv4)
3540 {
3541 LOG (GNUNET_ERROR_TYPE_DEBUG,
3542 "IPv4 socket created on port %s\n",
3543 GNUNET_a2s (server_addr, addrlen));
3544 addrs[sockets_created] = server_addr;
3545 addrlens[sockets_created] = addrlen;
3546 sockets_created++;
3547 }
3548 else
3549 {
3550 LOG (GNUNET_ERROR_TYPE_ERROR,
3551 _ ("Failed to bind UDP socket to %s: %s\n"),
3552 GNUNET_a2s (server_addr, addrlen),
3553 strerror (eno));
3554 }
3555 }
3556
3557 if (0 == sockets_created)
3558 {
3559 LOG (GNUNET_ERROR_TYPE_WARNING, _ ("Failed to open UDP sockets\n"));
3560 return 0; /* No sockets created, return */
3561 }
3562 schedule_select_v4 (plugin);
3563 schedule_select_v6 (plugin);
3564 plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3565 "transport-udp",
3566 IPPROTO_UDP,
3567 sockets_created,
3568 addrs,
3569 addrlens,
3570 &udp_nat_port_map_callback,
3571 NULL,
3572 plugin);
3573 return sockets_created;
3574}
3575
3576
3577/**
3578 * The exported method. Makes the core api available via a global and
3579 * returns the udp transport API.
3580 *
3581 * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3582 * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3583 */
3584void *
3585libgnunet_plugin_transport_udp_init (void *cls)
3586{
3587 struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3588 struct GNUNET_TRANSPORT_PluginFunctions *api;
3589 struct Plugin *p;
3590 unsigned long long port;
3591 unsigned long long aport;
3592 unsigned long long udp_max_bps;
3593 int enable_v6;
3594 int enable_broadcasting;
3595 int enable_broadcasting_recv;
3596 char *bind4_address;
3597 char *bind6_address;
3598 struct GNUNET_TIME_Relative interval;
3599 struct sockaddr_in server_addrv4;
3600 struct sockaddr_in6 server_addrv6;
3601 unsigned int res;
3602 int have_bind4;
3603 int have_bind6;
3604
3605 if (NULL == env->receive)
3606 {
3607 /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3608 initialize the plugin or the API */
3609 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3610 api->cls = NULL;
3611 api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3612 api->address_to_string = &udp_address_to_string;
3613 api->string_to_address = &udp_string_to_address;
3614 return api;
3615 }
3616
3617 /* Get port number: port == 0 : autodetect a port,
3618 * > 0 : use this port, not given : 2086 default */
3619 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3620 "transport-udp",
3621 "PORT",
3622 &port))
3623 port = 2086;
3624 if (port > 65535)
3625 {
3626 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3627 "transport-udp",
3628 "PORT",
3629 _ ("must be in [0,65535]"));
3630 return NULL;
3631 }
3632 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3633 "transport-udp",
3634 "ADVERTISED_PORT",
3635 &aport))
3636 aport = port;
3637 if (aport > 65535)
3638 {
3639 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3640 "transport-udp",
3641 "ADVERTISED_PORT",
3642 _ ("must be in [0,65535]"));
3643 return NULL;
3644 }
3645
3646 if (GNUNET_YES ==
3647 GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3648 enable_v6 = GNUNET_NO;
3649 else
3650 enable_v6 = GNUNET_YES;
3651
3652 have_bind4 = GNUNET_NO;
3653 memset (&server_addrv4, 0, sizeof(server_addrv4));
3654 if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_string (env->cfg,
3655 "transport-udp",
3656 "BINDTO",
3657 &bind4_address))
3658 {
3659 LOG (GNUNET_ERROR_TYPE_DEBUG,
3660 "Binding UDP plugin to specific address: `%s'\n",
3661 bind4_address);
3662 if (1 != inet_pton (AF_INET, bind4_address, &server_addrv4.sin_addr))
3663 {
3664 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3665 "transport-udp",
3666 "BINDTO",
3667 _ ("must be valid IPv4 address"));
3668 GNUNET_free (bind4_address);
3669 return NULL;
3670 }
3671 have_bind4 = GNUNET_YES;
3672 }
3673 GNUNET_free (bind4_address);
3674 have_bind6 = GNUNET_NO;
3675 memset (&server_addrv6, 0, sizeof(server_addrv6));
3676 if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_string (env->cfg,
3677 "transport-udp",
3678 "BINDTO6",
3679 &bind6_address))
3680 {
3681 LOG (GNUNET_ERROR_TYPE_DEBUG,
3682 "Binding udp plugin to specific address: `%s'\n",
3683 bind6_address);
3684 if (1 != inet_pton (AF_INET6, bind6_address, &server_addrv6.sin6_addr))
3685 {
3686 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3687 "transport-udp",
3688 "BINDTO6",
3689 _ ("must be valid IPv6 address"));
3690 GNUNET_free (bind6_address);
3691 return NULL;
3692 }
3693 have_bind6 = GNUNET_YES;
3694 }
3695 GNUNET_free (bind6_address);
3696
3697 enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3698 "transport-udp",
3699 "BROADCAST");
3700 if (enable_broadcasting == GNUNET_SYSERR)
3701 enable_broadcasting = GNUNET_NO;
3702
3703 enable_broadcasting_recv =
3704 GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3705 "transport-udp",
3706 "BROADCAST_RECEIVE");
3707 if (enable_broadcasting_recv == GNUNET_SYSERR)
3708 enable_broadcasting_recv = GNUNET_YES;
3709
3710 if (GNUNET_SYSERR ==
3711 GNUNET_CONFIGURATION_get_value_time (env->cfg,
3712 "transport-udp",
3713 "BROADCAST_INTERVAL",
3714 &interval))
3715 {
3716 interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3717 }
3718 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3719 "transport-udp",
3720 "MAX_BPS",
3721 &udp_max_bps))
3722 {
3723 /* 50 MB/s == infinity for practical purposes */
3724 udp_max_bps = 1024 * 1024 * 50;
3725 }
3726
3727 p = GNUNET_new (struct Plugin);
3728 p->port = port;
3729 p->aport = aport;
3730 p->broadcast_interval = interval;
3731 p->enable_ipv6 = enable_v6;
3732 p->enable_ipv4 = GNUNET_YES; /* default */
3733 p->enable_broadcasting = enable_broadcasting;
3734 p->enable_broadcasting_receiving = enable_broadcasting_recv;
3735 p->env = env;
3736 p->sessions = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_NO);
3737 p->defrag_ctxs =
3738 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3739 GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3740 NULL,
3741 NULL,
3742 GNUNET_BANDWIDTH_value_init (
3743 (uint32_t) udp_max_bps),
3744 30);
3745 res = setup_sockets (p,
3746 (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3747 (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3748 if ((0 == res) || ((NULL == p->sockv4) && (NULL == p->sockv6)))
3749 {
3750 LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to create UDP network sockets\n"));
3751 GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3752 GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3753 if (NULL != p->nat)
3754 GNUNET_NAT_unregister (p->nat);
3755 GNUNET_free (p);
3756 return NULL;
3757 }
3758
3759 /* Setup broadcasting and receiving beacons */
3760 setup_broadcast (p, &server_addrv6, &server_addrv4);
3761
3762 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3763 api->cls = p;
3764 api->disconnect_session = &udp_disconnect_session;
3765 api->query_keepalive_factor = &udp_query_keepalive_factor;
3766 api->disconnect_peer = &udp_disconnect;
3767 api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3768 api->address_to_string = &udp_address_to_string;
3769 api->string_to_address = &udp_string_to_address;
3770 api->check_address = &udp_plugin_check_address;
3771 api->get_session = &udp_plugin_get_session;
3772 api->send = &udp_plugin_send;
3773 api->get_network = &udp_plugin_get_network;
3774 api->get_network_for_address = &udp_plugin_get_network_for_address;
3775 api->update_session_timeout = &udp_plugin_update_session_timeout;
3776 api->setup_monitor = &udp_plugin_setup_monitor;
3777 return api;
3778}
3779
3780
3781/**
3782 * Function called on each entry in the defragmentation heap to
3783 * clean it up.
3784 *
3785 * @param cls NULL
3786 * @param node node in the heap (to be removed)
3787 * @param element a `struct DefragContext` to be cleaned up
3788 * @param cost unused
3789 * @return #GNUNET_YES
3790 */
3791static int
3792heap_cleanup_iterator (void *cls,
3793 struct GNUNET_CONTAINER_HeapNode *node,
3794 void *element,
3795 GNUNET_CONTAINER_HeapCostType cost)
3796{
3797 struct DefragContext *d_ctx = element;
3798
3799 GNUNET_CONTAINER_heap_remove_node (node);
3800 GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3801 GNUNET_free (d_ctx);
3802 return GNUNET_YES;
3803}
3804
3805
3806/**
3807 * The exported method. Makes the core api available via a global and
3808 * returns the udp transport API.
3809 *
3810 * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3811 * @return NULL
3812 */
3813void *
3814libgnunet_plugin_transport_udp_done (void *cls)
3815{
3816 struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3817 struct Plugin *plugin = api->cls;
3818 struct PrettyPrinterContext *cur;
3819 struct UDP_MessageWrapper *udpw;
3820
3821 if (NULL == plugin)
3822 {
3823 GNUNET_free (api);
3824 return NULL;
3825 }
3826 stop_broadcast (plugin);
3827 if (NULL != plugin->select_task_v4)
3828 {
3829 GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
3830 plugin->select_task_v4 = NULL;
3831 }
3832 if (NULL != plugin->select_task_v6)
3833 {
3834 GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3835 plugin->select_task_v6 = NULL;
3836 }
3837 if (NULL != plugin->sockv4)
3838 {
3839 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3840 plugin->sockv4 = NULL;
3841 }
3842 if (NULL != plugin->sockv6)
3843 {
3844 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3845 plugin->sockv6 = NULL;
3846 }
3847 if (NULL != plugin->nat)
3848 {
3849 GNUNET_NAT_unregister (plugin->nat);
3850 plugin->nat = NULL;
3851 }
3852 if (NULL != plugin->defrag_ctxs)
3853 {
3854 GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3855 &heap_cleanup_iterator,
3856 NULL);
3857 GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3858 plugin->defrag_ctxs = NULL;
3859 }
3860 while (NULL != (udpw = plugin->ipv4_queue_head))
3861 {
3862 dequeue (plugin, udpw);
3863 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3864 GNUNET_free (udpw);
3865 }
3866 while (NULL != (udpw = plugin->ipv6_queue_head))
3867 {
3868 dequeue (plugin, udpw);
3869 udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3870 GNUNET_free (udpw);
3871 }
3872 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3873 &disconnect_and_free_it,
3874 plugin);
3875 GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3876
3877 while (NULL != (cur = plugin->ppc_dll_head))
3878 {
3879 GNUNET_break (0);
3880 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3881 plugin->ppc_dll_tail,
3882 cur);
3883 GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3884 if (NULL != cur->timeout_task)
3885 {
3886 GNUNET_SCHEDULER_cancel (cur->timeout_task);
3887 cur->timeout_task = NULL;
3888 }
3889 GNUNET_free (cur);
3890 }
3891 GNUNET_free (plugin);
3892 GNUNET_free (api);
3893 return NULL;
3894}
3895
3896
3897/* end of plugin_transport_udp.c */