aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-03-01 22:54:42 +0000
committerChristian Grothoff <christian@grothoff.org>2015-03-01 22:54:42 +0000
commitd9f888cdbb23b78f5936628d3adb7a77492b499c (patch)
tree8cae17769716b4030aaa872ac7a35e8888ef52e5
parent8ea46cc3c928bd5f74859dc74f305c94b687aad2 (diff)
downloadgnunet-d9f888cdbb23b78f5936628d3adb7a77492b499c.tar.gz
gnunet-d9f888cdbb23b78f5936628d3adb7a77492b499c.zip
major code cleanup in UDP plugin, seems to also fix bugs; specifically, I think I fixed a leak
-rw-r--r--src/transport/plugin_transport_udp.c2845
-rw-r--r--src/transport/test_plugin_transport.c35
2 files changed, 1471 insertions, 1409 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index f6b1ac251..a84f3e749 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -348,40 +348,19 @@ struct UDP_FragmentationContext
348 348
349 349
350/** 350/**
351 * Message types included in a `struct UDP_MessageWrapper` 351 * Function called when a message is removed from the
352 * transmission queue.
353 *
354 * @param cls closure
355 * @param udpw message wrapper finished
356 * @param result #GNUNET_OK on success (message was sent)
357 * #GNUNET_SYSERR if the target disconnected
358 * or we had a timeout or other trouble sending
352 */ 359 */
353enum UDP_MessageType 360typedef void
354{ 361(*QueueContinuation) (void *cls,
355 /** 362 struct UDP_MessageWrapper *udpw,
356 * Uninitialized (error) 363 int result);
357 */
358 UMT_UNDEFINED = 0,
359
360 /**
361 * This queue entry represents a fragment of a message.
362 */
363 UMT_MSG_FRAGMENTED = 1,
364
365 /**
366 * This queue entry does not include a message, but merely
367 * represents that we finished sending a fragmented message
368 * (all fragments confirmed, or timeout).
369 */
370 UMT_MSG_FRAGMENTED_COMPLETE = 2,
371
372 /**
373 * This queue entry represents a unfragmented message
374 * (was small enough to not require fragmentation).
375 */
376 UMT_MSG_UNFRAGMENTED = 3,
377
378 /**
379 * This queue entry represents the acknowledgement of us
380 * receiving a fragment.
381 */
382 UMT_MSG_ACK = 4
383
384};
385 364
386 365
387/** 366/**
@@ -410,7 +389,20 @@ struct UDP_MessageWrapper
410 char *msg_buf; 389 char *msg_buf;
411 390
412 /** 391 /**
413 * Function to call upon completion of the transmission, can be NULL. 392 * Function to call once the message wrapper is being removed
393 * from the queue (with success or failure).
394 */
395 QueueContinuation qc;
396
397 /**
398 * Closure for @e qc.
399 */
400 void *qc_cls;
401
402 /**
403 * External continuation to call upon completion of the
404 * transmission, NULL if this queue entry is not for a
405 * message from the application.
414 */ 406 */
415 GNUNET_TRANSPORT_TransmitContinuation cont; 407 GNUNET_TRANSPORT_TransmitContinuation cont;
416 408
@@ -441,14 +433,11 @@ struct UDP_MessageWrapper
441 */ 433 */
442 size_t payload_size; 434 size_t payload_size;
443 435
444 /**
445 * Message type (what does this entry in the queue represent).
446 */
447 enum UDP_MessageType msg_type;
448
449}; 436};
450 437
451 438
439GNUNET_NETWORK_STRUCT_BEGIN
440
452/** 441/**
453 * UDP ACK Message-Packet header. 442 * UDP ACK Message-Packet header.
454 */ 443 */
@@ -462,7 +451,7 @@ struct UDP_ACK_Message
462 /** 451 /**
463 * Desired delay for flow control, in us (in NBO). 452 * Desired delay for flow control, in us (in NBO).
464 */ 453 */
465 uint32_t delay; 454 uint32_t delay GNUNET_PACKED;
466 455
467 /** 456 /**
468 * What is the identity of the sender 457 * What is the identity of the sender
@@ -471,6 +460,11 @@ struct UDP_ACK_Message
471 460
472}; 461};
473 462
463GNUNET_NETWORK_STRUCT_END
464
465
466/* ************************* Monitoring *********** */
467
474 468
475/** 469/**
476 * If a session monitor is attached, notify it about the new 470 * If a session monitor is attached, notify it about the new
@@ -510,6 +504,126 @@ notify_session_monitor (struct Plugin *plugin,
510 504
511 505
512/** 506/**
507 * Return information about the given session to the monitor callback.
508 *
509 * @param cls the `struct Plugin` with the monitor callback (`sic`)
510 * @param peer peer we send information about
511 * @param value our `struct Session` to send information about
512 * @return #GNUNET_OK (continue to iterate)
513 */
514static int
515send_session_info_iter (void *cls,
516 const struct GNUNET_PeerIdentity *peer,
517 void *value)
518{
519 struct Plugin *plugin = cls;
520 struct Session *session = value;
521
522 notify_session_monitor (plugin,
523 session,
524 GNUNET_TRANSPORT_SS_INIT);
525 notify_session_monitor (plugin,
526 session,
527 GNUNET_TRANSPORT_SS_UP);
528 return GNUNET_OK;
529}
530
531
532/**
533 * Begin monitoring sessions of a plugin. There can only
534 * be one active monitor per plugin (i.e. if there are
535 * multiple monitors, the transport service needs to
536 * multiplex the generated events over all of them).
537 *
538 * @param cls closure of the plugin
539 * @param sic callback to invoke, NULL to disable monitor;
540 * plugin will being by iterating over all active
541 * sessions immediately and then enter monitor mode
542 * @param sic_cls closure for @a sic
543 */
544static void
545udp_plugin_setup_monitor (void *cls,
546 GNUNET_TRANSPORT_SessionInfoCallback sic,
547 void *sic_cls)
548{
549 struct Plugin *plugin = cls;
550
551 plugin->sic = sic;
552 plugin->sic_cls = sic_cls;
553 if (NULL != sic)
554 {
555 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
556 &send_session_info_iter,
557 plugin);
558 /* signal end of first iteration */
559 sic (sic_cls,
560 NULL,
561 NULL);
562 }
563}
564
565
566/* ****************** Little Helpers ****************** */
567
568
569/**
570 * Function to free last resources associated with a session.
571 *
572 * @param s session to free
573 */
574static void
575free_session (struct Session *s)
576{
577 if (NULL != s->address)
578 {
579 GNUNET_HELLO_address_free (s->address);
580 s->address = NULL;
581 }
582 if (NULL != s->frag_ctx)
583 {
584 GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
585 NULL,
586 NULL);
587 GNUNET_free (s->frag_ctx);
588 s->frag_ctx = NULL;
589 }
590 GNUNET_free (s);
591}
592
593
594/**
595 * Function that is called to get the keepalive factor.
596 * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
597 * calculate the interval between keepalive packets.
598 *
599 * @param cls closure with the `struct Plugin`
600 * @return keepalive factor
601 */
602static unsigned int
603udp_query_keepalive_factor (void *cls)
604{
605 return 15;
606}
607
608
609/**
610 * Function obtain the network type for a session
611 *
612 * @param cls closure (`struct Plugin *`)
613 * @param session the session
614 * @return the network type
615 */
616static enum GNUNET_ATS_Network_Type
617udp_get_network (void *cls,
618 struct Session *session)
619{
620 return session->scope;
621}
622
623
624/* ******************* Event loop ******************** */
625
626/**
513 * We have been notified that our readset has something to read. We don't 627 * We have been notified that our readset has something to read. We don't
514 * know which socket needs to be read, so we have to check each one 628 * know which socket needs to be read, so we have to check each one
515 * Then reschedule this function to be called again once more is available. 629 * Then reschedule this function to be called again once more is available.
@@ -595,6 +709,9 @@ schedule_select_v6 (struct Plugin *plugin)
595} 709}
596 710
597 711
712/* ******************* Address to string and back ***************** */
713
714
598/** 715/**
599 * Function called for a quick conversion of the binary address to 716 * Function called for a quick conversion of the binary address to
600 * a numeric address. Note that the caller must not free the 717 * a numeric address. Note that the caller must not free the
@@ -622,31 +739,41 @@ udp_address_to_string (void *cls,
622 uint16_t port; 739 uint16_t port;
623 uint32_t options; 740 uint32_t options;
624 741
625 if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress))) 742 if (NULL == addr)
743 {
744 GNUNET_break_op (0);
745 return NULL;
746 }
747
748 if (addrlen == sizeof(struct IPv6UdpAddress))
626 { 749 {
627 t6 = addr; 750 t6 = addr;
628 af = AF_INET6; 751 af = AF_INET6;
629 options = ntohl (t6->options); 752 options = ntohl (t6->options);
630 port = ntohs (t6->u6_port); 753 port = ntohs (t6->u6_port);
631 memcpy (&a6, &t6->ipv6_addr, sizeof(a6)); 754 a6 = t6->ipv6_addr;
632 sb = &a6; 755 sb = &a6;
633 } 756 }
634 else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress))) 757 else if (addrlen == sizeof(struct IPv4UdpAddress))
635 { 758 {
636 t4 = addr; 759 t4 = addr;
637 af = AF_INET; 760 af = AF_INET;
638 options = ntohl (t4->options); 761 options = ntohl (t4->options);
639 port = ntohs (t4->u4_port); 762 port = ntohs (t4->u4_port);
640 memcpy (&a4, &t4->ipv4_addr, sizeof(a4)); 763 a4.s_addr = t4->ipv4_addr;
641 sb = &a4; 764 sb = &a4;
642 } 765 }
643 else 766 else
644 { 767 {
768 GNUNET_break_op (0);
645 return NULL; 769 return NULL;
646 } 770 }
647 inet_ntop (af, sb, buf, INET6_ADDRSTRLEN); 771 inet_ntop (af,
648 772 sb,
649 GNUNET_snprintf (rbuf, sizeof(rbuf), 773 buf,
774 INET6_ADDRSTRLEN);
775 GNUNET_snprintf (rbuf,
776 sizeof(rbuf),
650 (af == AF_INET6) 777 (af == AF_INET6)
651 ? "%s.%u.[%s]:%u" 778 ? "%s.%u.[%s]:%u"
652 : "%s.%u.%s:%u", 779 : "%s.%u.%s:%u",
@@ -659,8 +786,7 @@ udp_address_to_string (void *cls,
659 786
660 787
661/** 788/**
662 * Function called to convert a string address to 789 * Function called to convert a string address to a binary address.
663 * a binary address.
664 * 790 *
665 * @param cls closure (`struct Plugin *`) 791 * @param cls closure (`struct Plugin *`)
666 * @param addr string address 792 * @param addr string address
@@ -688,27 +814,27 @@ udp_string_to_address (void *cls,
688 plugin = NULL; 814 plugin = NULL;
689 optionstr = NULL; 815 optionstr = NULL;
690 816
691 if ((NULL == addr) || (addrlen == 0)) 817 if ((NULL == addr) || (0 == addrlen))
692 { 818 {
693 GNUNET_break(0); 819 GNUNET_break (0);
694 return GNUNET_SYSERR; 820 return GNUNET_SYSERR;
695 } 821 }
696 if ('\0' != addr[addrlen - 1]) 822 if ('\0' != addr[addrlen - 1])
697 { 823 {
698 GNUNET_break(0); 824 GNUNET_break (0);
699 return GNUNET_SYSERR; 825 return GNUNET_SYSERR;
700 } 826 }
701 if (strlen (addr) != addrlen - 1) 827 if (strlen (addr) != addrlen - 1)
702 { 828 {
703 GNUNET_break(0); 829 GNUNET_break (0);
704 return GNUNET_SYSERR; 830 return GNUNET_SYSERR;
705 } 831 }
706 plugin = GNUNET_strdup (addr); 832 plugin = GNUNET_strdup (addr);
707 optionstr = strchr (plugin, '.'); 833 optionstr = strchr (plugin, '.');
708 if (NULL == optionstr) 834 if (NULL == optionstr)
709 { 835 {
710 GNUNET_break(0); 836 GNUNET_break (0);
711 GNUNET_free(plugin); 837 GNUNET_free (plugin);
712 return GNUNET_SYSERR; 838 return GNUNET_SYSERR;
713 } 839 }
714 optionstr[0] = '\0'; 840 optionstr[0] = '\0';
@@ -717,52 +843,54 @@ udp_string_to_address (void *cls,
717 address = strchr (optionstr, '.'); 843 address = strchr (optionstr, '.');
718 if (NULL == address) 844 if (NULL == address)
719 { 845 {
720 GNUNET_break(0); 846 GNUNET_break (0);
721 GNUNET_free(plugin); 847 GNUNET_free (plugin);
722 return GNUNET_SYSERR; 848 return GNUNET_SYSERR;
723 } 849 }
724 address[0] = '\0'; 850 address[0] = '\0';
725 address++; 851 address++;
726 852
727 if (GNUNET_OK != 853 if (GNUNET_OK !=
728 GNUNET_STRINGS_to_address_ip (address, strlen (address), 854 GNUNET_STRINGS_to_address_ip (address,
855 strlen (address),
729 &socket_address)) 856 &socket_address))
730 { 857 {
731 GNUNET_break(0); 858 GNUNET_break (0);
732 GNUNET_free(plugin); 859 GNUNET_free (plugin);
733 return GNUNET_SYSERR; 860 return GNUNET_SYSERR;
734 } 861 }
735
736 GNUNET_free(plugin); 862 GNUNET_free(plugin);
737 863
738 switch (socket_address.ss_family) 864 switch (socket_address.ss_family)
739 { 865 {
740 case AF_INET: 866 case AF_INET:
741 { 867 {
742 struct IPv4UdpAddress *u4; 868 struct IPv4UdpAddress *u4;
743 struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; 869 const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
744 u4 = GNUNET_new (struct IPv4UdpAddress); 870
745 u4->options = htonl (options); 871 u4 = GNUNET_new (struct IPv4UdpAddress);
746 u4->ipv4_addr = in4->sin_addr.s_addr; 872 u4->options = htonl (options);
747 u4->u4_port = in4->sin_port; 873 u4->ipv4_addr = in4->sin_addr.s_addr;
748 *buf = u4; 874 u4->u4_port = in4->sin_port;
749 *added = sizeof(struct IPv4UdpAddress); 875 *buf = u4;
750 return GNUNET_OK; 876 *added = sizeof (struct IPv4UdpAddress);
751 } 877 return GNUNET_OK;
878 }
752 case AF_INET6: 879 case AF_INET6:
753 { 880 {
754 struct IPv6UdpAddress *u6; 881 struct IPv6UdpAddress *u6;
755 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; 882 const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
756 u6 = GNUNET_new (struct IPv6UdpAddress); 883
757 u6->options = htonl (options); 884 u6 = GNUNET_new (struct IPv6UdpAddress);
758 u6->ipv6_addr = in6->sin6_addr; 885 u6->options = htonl (options);
759 u6->u6_port = in6->sin6_port; 886 u6->ipv6_addr = in6->sin6_addr;
760 *buf = u6; 887 u6->u6_port = in6->sin6_port;
761 *added = sizeof(struct IPv6UdpAddress); 888 *buf = u6;
762 return GNUNET_OK; 889 *added = sizeof (struct IPv6UdpAddress);
763 } 890 return GNUNET_OK;
891 }
764 default: 892 default:
765 GNUNET_break(0); 893 GNUNET_break (0);
766 return GNUNET_SYSERR; 894 return GNUNET_SYSERR;
767 } 895 }
768} 896}
@@ -785,13 +913,13 @@ append_port (void *cls,
785 if (NULL == hostname) 913 if (NULL == hostname)
786 { 914 {
787 /* Final call, done */ 915 /* Final call, done */
788 ppc->asc (ppc->asc_cls,
789 NULL,
790 GNUNET_OK);
791 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head, 916 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
792 plugin->ppc_dll_tail, 917 plugin->ppc_dll_tail,
793 ppc); 918 ppc);
794 ppc->resolver_handle = NULL; 919 ppc->resolver_handle = NULL;
920 ppc->asc (ppc->asc_cls,
921 NULL,
922 GNUNET_OK);
795 GNUNET_free (ppc); 923 GNUNET_free (ppc);
796 return; 924 return;
797 } 925 }
@@ -817,8 +945,7 @@ append_port (void *cls,
817 945
818 946
819/** 947/**
820 * Convert the transports address to a nice, human-readable 948 * Convert the transports address to a nice, human-readable format.
821 * format.
822 * 949 *
823 * @param cls closure with the `struct Plugin *` 950 * @param cls closure with the `struct Plugin *`
824 * @param type name of the transport that generated the address 951 * @param type name of the transport that generated the address
@@ -843,7 +970,7 @@ udp_plugin_address_pretty_printer (void *cls,
843{ 970{
844 struct Plugin *plugin = cls; 971 struct Plugin *plugin = cls;
845 struct PrettyPrinterContext *ppc; 972 struct PrettyPrinterContext *ppc;
846 const void *sb; 973 const struct sockaddr *sb;
847 size_t sbs; 974 size_t sbs;
848 struct sockaddr_in a4; 975 struct sockaddr_in a4;
849 struct sockaddr_in6 a6; 976 struct sockaddr_in6 a6;
@@ -855,22 +982,26 @@ udp_plugin_address_pretty_printer (void *cls,
855 if (addrlen == sizeof(struct IPv6UdpAddress)) 982 if (addrlen == sizeof(struct IPv6UdpAddress))
856 { 983 {
857 u6 = addr; 984 u6 = addr;
858 memset (&a6, 0, sizeof(a6)); 985 memset (&a6,
986 0,
987 sizeof (a6));
859 a6.sin6_family = AF_INET6; 988 a6.sin6_family = AF_INET6;
860#if HAVE_SOCKADDR_IN_SIN_LEN 989#if HAVE_SOCKADDR_IN_SIN_LEN
861 a6.sin6_len = sizeof (a6); 990 a6.sin6_len = sizeof (a6);
862#endif 991#endif
863 a6.sin6_port = u6->u6_port; 992 a6.sin6_port = u6->u6_port;
864 memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); 993 a6.sin6_addr = u6->ipv6_addr;
865 port = ntohs (u6->u6_port); 994 port = ntohs (u6->u6_port);
866 options = ntohl (u6->options); 995 options = ntohl (u6->options);
867 sb = &a6; 996 sb = (const struct sockaddr *) &a6;
868 sbs = sizeof(a6); 997 sbs = sizeof (a6);
869 } 998 }
870 else if (addrlen == sizeof(struct IPv4UdpAddress)) 999 else if (addrlen == sizeof (struct IPv4UdpAddress))
871 { 1000 {
872 u4 = addr; 1001 u4 = addr;
873 memset (&a4, 0, sizeof(a4)); 1002 memset (&a4,
1003 0,
1004 sizeof(a4));
874 a4.sin_family = AF_INET; 1005 a4.sin_family = AF_INET;
875#if HAVE_SOCKADDR_IN_SIN_LEN 1006#if HAVE_SOCKADDR_IN_SIN_LEN
876 a4.sin_len = sizeof (a4); 1007 a4.sin_len = sizeof (a4);
@@ -879,15 +1010,19 @@ udp_plugin_address_pretty_printer (void *cls,
879 a4.sin_addr.s_addr = u4->ipv4_addr; 1010 a4.sin_addr.s_addr = u4->ipv4_addr;
880 port = ntohs (u4->u4_port); 1011 port = ntohs (u4->u4_port);
881 options = ntohl (u4->options); 1012 options = ntohl (u4->options);
882 sb = &a4; 1013 sb = (const struct sockaddr *) &a4;
883 sbs = sizeof(a4); 1014 sbs = sizeof(a4);
884 } 1015 }
885 else 1016 else
886 { 1017 {
887 /* invalid address */ 1018 /* invalid address */
888 GNUNET_break_op (0); 1019 GNUNET_break_op (0);
889 asc (asc_cls, NULL , GNUNET_SYSERR); 1020 asc (asc_cls,
890 asc (asc_cls, NULL, GNUNET_OK); 1021 NULL,
1022 GNUNET_SYSERR);
1023 asc (asc_cls,
1024 NULL,
1025 GNUNET_OK);
891 return; 1026 return;
892 } 1027 }
893 ppc = GNUNET_new (struct PrettyPrinterContext); 1028 ppc = GNUNET_new (struct PrettyPrinterContext);
@@ -896,7 +1031,7 @@ udp_plugin_address_pretty_printer (void *cls,
896 ppc->asc_cls = asc_cls; 1031 ppc->asc_cls = asc_cls;
897 ppc->port = port; 1032 ppc->port = port;
898 ppc->options = options; 1033 ppc->options = options;
899 if (addrlen == sizeof(struct IPv6UdpAddress)) 1034 if (addrlen == sizeof (struct IPv6UdpAddress))
900 ppc->ipv6 = GNUNET_YES; 1035 ppc->ipv6 = GNUNET_YES;
901 else 1036 else
902 ppc->ipv6 = GNUNET_NO; 1037 ppc->ipv6 = GNUNET_NO;
@@ -908,219 +1043,8 @@ udp_plugin_address_pretty_printer (void *cls,
908 sbs, 1043 sbs,
909 ! numeric, 1044 ! numeric,
910 timeout, 1045 timeout,
911 &append_port, ppc); 1046 &append_port,
912} 1047 ppc);
913
914
915/**
916 * FIXME.
917 */
918static void
919call_continuation (struct UDP_MessageWrapper *udpw,
920 int result)
921{
922 struct Session *session = udpw->session;
923 struct Plugin *plugin = session->plugin;
924 size_t overhead;
925
926 LOG (GNUNET_ERROR_TYPE_DEBUG,
927 "Calling continuation for %u byte message to `%s' with result %s\n",
928 udpw->payload_size,
929 GNUNET_i2s (&udpw->session->target),
930 (GNUNET_OK == result) ? "OK" : "SYSERR");
931
932 if (udpw->msg_size >= udpw->payload_size)
933 overhead = udpw->msg_size - udpw->payload_size;
934 else
935 overhead = udpw->msg_size;
936
937 switch (result)
938 {
939 case GNUNET_OK:
940 switch (udpw->msg_type)
941 {
942 case UMT_MSG_UNFRAGMENTED:
943 if (NULL != udpw->cont)
944 {
945 /* Transport continuation */
946 udpw->cont (udpw->cont_cls,
947 &udpw->session->target,
948 result,
949 udpw->payload_size,
950 udpw->msg_size);
951 }
952 GNUNET_STATISTICS_update (plugin->env->stats,
953 "# UDP, unfragmented msgs, messages, sent, success",
954 1,
955 GNUNET_NO);
956 GNUNET_STATISTICS_update (plugin->env->stats,
957 "# UDP, unfragmented msgs, bytes payload, sent, success",
958 udpw->payload_size,
959 GNUNET_NO);
960 GNUNET_STATISTICS_update (plugin->env->stats,
961 "# UDP, unfragmented msgs, bytes overhead, sent, success",
962 overhead,
963 GNUNET_NO);
964 GNUNET_STATISTICS_update (plugin->env->stats,
965 "# UDP, total, bytes overhead, sent",
966 overhead,
967 GNUNET_NO);
968 GNUNET_STATISTICS_update (plugin->env->stats,
969 "# UDP, total, bytes payload, sent",
970 udpw->payload_size,
971 GNUNET_NO);
972 break;
973 case UMT_MSG_FRAGMENTED_COMPLETE:
974 GNUNET_assert(NULL != udpw->frag_ctx);
975 if (udpw->frag_ctx->cont != NULL )
976 udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
977 &udpw->session->target,
978 GNUNET_OK,
979 udpw->frag_ctx->payload_size,
980 udpw->frag_ctx->on_wire_size);
981 GNUNET_STATISTICS_update (plugin->env->stats,
982 "# UDP, fragmented msgs, messages, sent, success",
983 1,
984 GNUNET_NO);
985 GNUNET_STATISTICS_update (plugin->env->stats,
986 "# UDP, fragmented msgs, bytes payload, sent, success",
987 udpw->payload_size,
988 GNUNET_NO);
989 GNUNET_STATISTICS_update (plugin->env->stats,
990 "# UDP, fragmented msgs, bytes overhead, sent, success",
991 overhead,
992 GNUNET_NO);
993 GNUNET_STATISTICS_update (plugin->env->stats,
994 "# UDP, total, bytes overhead, sent",
995 overhead,
996 GNUNET_NO);
997 GNUNET_STATISTICS_update (plugin->env->stats,
998 "# UDP, total, bytes payload, sent",
999 udpw->payload_size,
1000 GNUNET_NO);
1001 GNUNET_STATISTICS_update (plugin->env->stats,
1002 "# UDP, fragmented msgs, messages, pending",
1003 -1,
1004 GNUNET_NO);
1005 break;
1006 case UMT_MSG_FRAGMENTED:
1007 /* Fragmented message: enqueue next fragment */
1008 if (NULL != udpw->cont)
1009 udpw->cont (udpw->cont_cls,
1010 &udpw->session->target,
1011 result,
1012 udpw->payload_size,
1013 udpw->msg_size);
1014 GNUNET_STATISTICS_update (plugin->env->stats,
1015 "# UDP, fragmented msgs, fragments, sent, success",
1016 1,
1017 GNUNET_NO);
1018 GNUNET_STATISTICS_update (plugin->env->stats,
1019 "# UDP, fragmented msgs, fragments bytes, sent, success",
1020 udpw->msg_size,
1021 GNUNET_NO);
1022 break;
1023 case UMT_MSG_ACK:
1024 /* No continuation */
1025 GNUNET_STATISTICS_update (plugin->env->stats,
1026 "# UDP, ACK msgs, messages, sent, success",
1027 1,
1028 GNUNET_NO);
1029 GNUNET_STATISTICS_update (plugin->env->stats,
1030 "# UDP, ACK msgs, bytes overhead, sent, success",
1031 overhead,
1032 GNUNET_NO);
1033 GNUNET_STATISTICS_update (plugin->env->stats,
1034 "# UDP, total, bytes overhead, sent",
1035 overhead,
1036 GNUNET_NO);
1037 break;
1038 default:
1039 GNUNET_break(0);
1040 break;
1041 }
1042 break;
1043 case GNUNET_SYSERR:
1044 switch (udpw->msg_type)
1045 {
1046 case UMT_MSG_UNFRAGMENTED:
1047 /* Unfragmented message: failed to send */
1048 if (NULL != udpw->cont)
1049 udpw->cont (udpw->cont_cls,
1050 &udpw->session->target,
1051 result,
1052 udpw->payload_size,
1053 overhead);
1054 GNUNET_STATISTICS_update (plugin->env->stats,
1055 "# UDP, unfragmented msgs, messages, sent, failure",
1056 1,
1057 GNUNET_NO);
1058 GNUNET_STATISTICS_update (plugin->env->stats,
1059 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1060 udpw->payload_size,
1061 GNUNET_NO);
1062 GNUNET_STATISTICS_update (plugin->env->stats,
1063 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1064 overhead,
1065 GNUNET_NO);
1066 break;
1067 case UMT_MSG_FRAGMENTED_COMPLETE:
1068 GNUNET_assert (NULL != udpw->frag_ctx);
1069 if (udpw->frag_ctx->cont != NULL)
1070 udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
1071 &udpw->session->target,
1072 GNUNET_SYSERR,
1073 udpw->frag_ctx->payload_size,
1074 udpw->frag_ctx->on_wire_size);
1075 GNUNET_STATISTICS_update (plugin->env->stats,
1076 "# UDP, fragmented msgs, messages, sent, failure",
1077 1,
1078 GNUNET_NO);
1079 GNUNET_STATISTICS_update (plugin->env->stats,
1080 "# UDP, fragmented msgs, bytes payload, sent, failure",
1081 udpw->payload_size,
1082 GNUNET_NO);
1083 GNUNET_STATISTICS_update (plugin->env->stats,
1084 "# UDP, fragmented msgs, bytes payload, sent, failure",
1085 overhead,
1086 GNUNET_NO);
1087 GNUNET_STATISTICS_update (plugin->env->stats,
1088 "# UDP, fragmented msgs, bytes payload, sent, failure",
1089 overhead,
1090 GNUNET_NO);
1091 GNUNET_STATISTICS_update (plugin->env->stats,
1092 "# UDP, fragmented msgs, messages, pending",
1093 -1,
1094 GNUNET_NO);
1095 break;
1096 case UMT_MSG_FRAGMENTED:
1097 GNUNET_assert (NULL != udpw->frag_ctx);
1098 /* Fragmented message: failed to send */
1099 GNUNET_STATISTICS_update (plugin->env->stats,
1100 "# UDP, fragmented msgs, fragments, sent, failure",
1101 1,
1102 GNUNET_NO);
1103 GNUNET_STATISTICS_update (plugin->env->stats,
1104 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1105 udpw->msg_size,
1106 GNUNET_NO);
1107 break;
1108 case UMT_MSG_ACK:
1109 /* ACK message: failed to send */
1110 GNUNET_STATISTICS_update (plugin->env->stats,
1111 "# UDP, ACK msgs, messages, sent, failure",
1112 1,
1113 GNUNET_NO);
1114 break;
1115 default:
1116 GNUNET_break(0);
1117 break;
1118 }
1119 break;
1120 default:
1121 GNUNET_break(0);
1122 break;
1123 }
1124} 1048}
1125 1049
1126 1050
@@ -1131,13 +1055,14 @@ call_continuation (struct UDP_MessageWrapper *udpw,
1131 * 1055 *
1132 * @param plugin global variables 1056 * @param plugin global variables
1133 * @param in_port port number to check 1057 * @param in_port port number to check
1134 * @return #GNUNET_OK if port is either open_port or adv_port 1058 * @return #GNUNET_OK if port is either our open or advertised port
1135 */ 1059 */
1136static int 1060static int
1137check_port (struct Plugin *plugin, 1061check_port (const struct Plugin *plugin,
1138 uint16_t in_port) 1062 uint16_t in_port)
1139{ 1063{
1140 if ((in_port == plugin->port) || (in_port == plugin->aport)) 1064 if ( (plugin->port == in_port) ||
1065 (plugin->aport == in_port) )
1141 return GNUNET_OK; 1066 return GNUNET_OK;
1142 return GNUNET_SYSERR; 1067 return GNUNET_SYSERR;
1143} 1068}
@@ -1164,19 +1089,14 @@ udp_plugin_check_address (void *cls,
1164 size_t addrlen) 1089 size_t addrlen)
1165{ 1090{
1166 struct Plugin *plugin = cls; 1091 struct Plugin *plugin = cls;
1167 struct IPv4UdpAddress *v4; 1092 const struct IPv4UdpAddress *v4;
1168 struct IPv6UdpAddress *v6; 1093 const struct IPv6UdpAddress *v6;
1169 1094
1170 if ( (addrlen != sizeof(struct IPv4UdpAddress)) && 1095 if (sizeof(struct IPv4UdpAddress) == addrlen)
1171 (addrlen != sizeof(struct IPv6UdpAddress)) )
1172 { 1096 {
1173 GNUNET_break_op(0); 1097 v4 = (const struct IPv4UdpAddress *) addr;
1174 return GNUNET_SYSERR; 1098 if (GNUNET_OK != check_port (plugin,
1175 } 1099 ntohs (v4->u4_port)))
1176 if (addrlen == sizeof(struct IPv4UdpAddress))
1177 {
1178 v4 = (struct IPv4UdpAddress *) addr;
1179 if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1180 return GNUNET_SYSERR; 1100 return GNUNET_SYSERR;
1181 if (GNUNET_OK != 1101 if (GNUNET_OK !=
1182 GNUNET_NAT_test_address (plugin->nat, 1102 GNUNET_NAT_test_address (plugin->nat,
@@ -1184,49 +1104,299 @@ udp_plugin_check_address (void *cls,
1184 sizeof (struct in_addr))) 1104 sizeof (struct in_addr)))
1185 return GNUNET_SYSERR; 1105 return GNUNET_SYSERR;
1186 } 1106 }
1187 else 1107 else if (sizeof(struct IPv6UdpAddress) == addrlen)
1188 { 1108 {
1189 v6 = (struct IPv6UdpAddress *) addr; 1109 v6 = (const struct IPv6UdpAddress *) addr;
1190 if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr)) 1110 if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1191 { 1111 {
1192 GNUNET_break_op(0); 1112 GNUNET_break_op (0);
1193 return GNUNET_SYSERR; 1113 return GNUNET_SYSERR;
1194 } 1114 }
1195 if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port))) 1115 if (GNUNET_OK != check_port (plugin,
1116 ntohs (v6->u6_port)))
1196 return GNUNET_SYSERR; 1117 return GNUNET_SYSERR;
1197 if (GNUNET_OK != 1118 if (GNUNET_OK !=
1198 GNUNET_NAT_test_address (plugin->nat, 1119 GNUNET_NAT_test_address (plugin->nat,
1199 &v6->ipv6_addr, 1120 &v6->ipv6_addr,
1200 sizeof(struct in6_addr))) 1121 sizeof (struct in6_addr)))
1201 return GNUNET_SYSERR; 1122 return GNUNET_SYSERR;
1202 } 1123 }
1124 else
1125 {
1126 GNUNET_break_op (0);
1127 return GNUNET_SYSERR;
1128 }
1203 return GNUNET_OK; 1129 return GNUNET_OK;
1204} 1130}
1205 1131
1206 1132
1207/** 1133/**
1208 * Function to free last resources associated with a session. 1134 * Our external IP address/port mapping has changed.
1209 * 1135 *
1210 * @param s session to free 1136 * @param cls closure, the `struct Plugin`
1137 * @param add_remove #GNUNET_YES to mean the new public IP address,
1138 * #GNUNET_NO to mean the previous (now invalid) one
1139 * @param addr either the previous or the new public IP address
1140 * @param addrlen actual length of the @a addr
1211 */ 1141 */
1212static void 1142static void
1213free_session (struct Session *s) 1143udp_nat_port_map_callback (void *cls,
1144 int add_remove,
1145 const struct sockaddr *addr,
1146 socklen_t addrlen)
1214{ 1147{
1215 if (NULL != s->frag_ctx) 1148 struct Plugin *plugin = cls;
1149 struct GNUNET_HELLO_Address *address;
1150 struct IPv4UdpAddress u4;
1151 struct IPv6UdpAddress u6;
1152 void *arg;
1153 size_t args;
1154
1155 LOG (GNUNET_ERROR_TYPE_DEBUG,
1156 (GNUNET_YES == add_remove)
1157 ? "NAT notification to add address `%s'\n"
1158 : "NAT notification to remove address `%s'\n",
1159 GNUNET_a2s (addr,
1160 addrlen));
1161 /* convert 'address' to our internal format */
1162 switch (addr->sa_family)
1216 { 1163 {
1217 GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL ); 1164 case AF_INET:
1218 GNUNET_free(s->frag_ctx); 1165 {
1219 s->frag_ctx = NULL; 1166 const struct sockaddr_in *i4;
1167
1168 GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1169 i4 = (const struct sockaddr_in *) addr;
1170 if (0 == ntohs (i4->sin_port))
1171 {
1172 GNUNET_break (0);
1173 return;
1174 }
1175 memset (&u4,
1176 0,
1177 sizeof(u4));
1178 u4.options = htonl (plugin->myoptions);
1179 u4.ipv4_addr = i4->sin_addr.s_addr;
1180 u4.u4_port = i4->sin_port;
1181 arg = &u4;
1182 args = sizeof (struct IPv4UdpAddress);
1183 break;
1184 }
1185 case AF_INET6:
1186 {
1187 const struct sockaddr_in6 *i6;
1188
1189 GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1190 i6 = (const struct sockaddr_in6 *) addr;
1191 if (0 == ntohs (i6->sin6_port))
1192 {
1193 GNUNET_break (0);
1194 return;
1195 }
1196 memset (&u6,
1197 0,
1198 sizeof(u6));
1199 u6.options = htonl (plugin->myoptions);
1200 u6.ipv6_addr = i6->sin6_addr;
1201 u6.u6_port = i6->sin6_port;
1202 arg = &u6;
1203 args = sizeof (struct IPv6UdpAddress);
1204 break;
1205 }
1206 default:
1207 GNUNET_break (0);
1208 return;
1209 }
1210 /* modify our published address list */
1211 address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1212 PLUGIN_NAME,
1213 arg,
1214 args,
1215 GNUNET_HELLO_ADDRESS_INFO_NONE);
1216 plugin->env->notify_address (plugin->env->cls,
1217 add_remove,
1218 address);
1219 GNUNET_HELLO_address_free (address);
1220}
1221
1222
1223/* ********************* Finding sessions ******************* */
1224
1225
1226/**
1227 * Closure for #session_cmp_it().
1228 */
1229struct SessionCompareContext
1230{
1231 /**
1232 * Set to session matching the address.
1233 */
1234 struct Session *res;
1235
1236 /**
1237 * Address we are looking for.
1238 */
1239 const struct GNUNET_HELLO_Address *address;
1240};
1241
1242
1243/**
1244 * Find a session with a matching address.
1245 *
1246 * @param cls the `struct SessionCompareContext *`
1247 * @param key peer identity (unused)
1248 * @param value the `struct Session *`
1249 * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1250 */
1251static int
1252session_cmp_it (void *cls,
1253 const struct GNUNET_PeerIdentity *key,
1254 void *value)
1255{
1256 struct SessionCompareContext *cctx = cls;
1257 struct Session *s = value;
1258
1259 if (0 == GNUNET_HELLO_address_cmp (s->address,
1260 cctx->address))
1261 {
1262 GNUNET_assert (GNUNET_NO == s->in_destroy);
1263 cctx->res = s;
1264 return GNUNET_NO;
1265 }
1266 return GNUNET_OK;
1267}
1268
1269
1270/**
1271 * Locate an existing session the transport service is using to
1272 * send data to another peer. Performs some basic sanity checks
1273 * on the address and then tries to locate a matching session.
1274 *
1275 * @param cls the plugin
1276 * @param address the address we should locate the session by
1277 * @return the session if it exists, or NULL if it is not found
1278 */
1279static struct Session *
1280udp_plugin_lookup_session (void *cls,
1281 const struct GNUNET_HELLO_Address *address)
1282{
1283 struct Plugin *plugin = cls;
1284 const struct IPv6UdpAddress *udp_a6;
1285 const struct IPv4UdpAddress *udp_a4;
1286 struct SessionCompareContext cctx;
1287
1288 if (NULL == address->address)
1289 {
1290 GNUNET_break (0);
1291 return NULL;
1292 }
1293 if (sizeof(struct IPv4UdpAddress) == address->address_length)
1294 {
1295 if (NULL == plugin->sockv4)
1296 return NULL;
1297 udp_a4 = (const struct IPv4UdpAddress *) address->address;
1298 if (0 == udp_a4->u4_port)
1299 {
1300 GNUNET_break (0);
1301 return NULL;
1302 }
1303 }
1304 else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1305 {
1306 if (NULL == plugin->sockv6)
1307 return NULL;
1308 udp_a6 = (const struct IPv6UdpAddress *) address->address;
1309 if (0 == udp_a6->u6_port)
1310 {
1311 GNUNET_break (0);
1312 return NULL;
1313 }
1314 }
1315 else
1316 {
1317 GNUNET_break (0);
1318 return NULL;
1319 }
1320
1321 /* check if session already exists */
1322 cctx.address = address;
1323 cctx.res = NULL;
1324 LOG (GNUNET_ERROR_TYPE_DEBUG,
1325 "Looking for existing session for peer `%s' with address `%s'\n",
1326 GNUNET_i2s (&address->peer),
1327 udp_address_to_string (plugin,
1328 address->address,
1329 address->address_length));
1330 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1331 &address->peer,
1332 &session_cmp_it,
1333 &cctx);
1334 if (NULL == cctx.res)
1335 return NULL;
1336 LOG (GNUNET_ERROR_TYPE_DEBUG,
1337 "Found existing session %p\n",
1338 cctx.res);
1339 return cctx.res;
1340}
1341
1342
1343/* ********************** Timeout ****************** */
1344
1345
1346/**
1347 * Increment session timeout due to activity.
1348 *
1349 * @param s session to reschedule timeout activity for
1350 */
1351static void
1352reschedule_session_timeout (struct Session *s)
1353{
1354 if (GNUNET_YES == s->in_destroy)
1355 return;
1356 GNUNET_assert (NULL != s->timeout_task);
1357 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1358}
1359
1360
1361
1362/**
1363 * Function that will be called whenever the transport service wants to
1364 * notify the plugin that a session is still active and in use and
1365 * therefore the session timeout for this session has to be updated
1366 *
1367 * @param cls closure with the `struct Plugin`
1368 * @param peer which peer was the session for
1369 * @param session which session is being updated
1370 */
1371static void
1372udp_plugin_update_session_timeout (void *cls,
1373 const struct GNUNET_PeerIdentity *peer,
1374 struct Session *session)
1375{
1376 struct Plugin *plugin = cls;
1377
1378 if (GNUNET_YES !=
1379 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1380 peer,
1381 session))
1382 {
1383 GNUNET_break (0);
1384 return;
1220 } 1385 }
1221 GNUNET_free(s); 1386 /* Reschedule session timeout */
1387 reschedule_session_timeout (session);
1222} 1388}
1223 1389
1224 1390
1391/* ************************* Sending ************************ */
1392
1393
1225/** 1394/**
1226 * Remove a message from the transmission queue. 1395 * Remove the given message from the transmission queue and
1396 * update all applicable statistics.
1227 * 1397 *
1228 * @param plugin the UDP plugin 1398 * @param plugin the UDP plugin
1229 * @param udpw message wrapper to queue 1399 * @param udpw message wrapper to dequeue
1230 */ 1400 */
1231static void 1401static void
1232dequeue (struct Plugin *plugin, 1402dequeue (struct Plugin *plugin,
@@ -1241,22 +1411,27 @@ dequeue (struct Plugin *plugin,
1241 else 1411 else
1242 { 1412 {
1243 GNUNET_STATISTICS_update (plugin->env->stats, 1413 GNUNET_STATISTICS_update (plugin->env->stats,
1244 "# UDP, total, bytes in buffers", 1414 "# UDP, total bytes in send buffers",
1245 -(long long) udpw->msg_size, 1415 - (long long) udpw->msg_size,
1246 GNUNET_NO); 1416 GNUNET_NO);
1247 plugin->bytes_in_buffer -= udpw->msg_size; 1417 plugin->bytes_in_buffer -= udpw->msg_size;
1248 } 1418 }
1249 GNUNET_STATISTICS_update (plugin->env->stats, 1419 GNUNET_STATISTICS_update (plugin->env->stats,
1250 "# UDP, total, msgs in buffers", 1420 "# UDP, total messages in send buffers",
1251 -1, GNUNET_NO); 1421 -1,
1252 if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress)) 1422 GNUNET_NO);
1423 if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1424 {
1253 GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head, 1425 GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1254 plugin->ipv4_queue_tail, 1426 plugin->ipv4_queue_tail,
1255 udpw); 1427 udpw);
1256 else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress)) 1428 }
1429 else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1430 {
1257 GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head, 1431 GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1258 plugin->ipv6_queue_tail, 1432 plugin->ipv6_queue_tail,
1259 udpw); 1433 udpw);
1434 }
1260 else 1435 else
1261 { 1436 {
1262 GNUNET_break (0); 1437 GNUNET_break (0);
@@ -1270,41 +1445,146 @@ dequeue (struct Plugin *plugin,
1270 1445
1271 1446
1272/** 1447/**
1273 * We have completed our (attempt) to transmit a message 1448 * Enqueue a message for transmission and update statistics.
1274 * that had to be fragmented -- either because we got an 1449 *
1275 * ACK saying that all fragments were received, or because 1450 * @param plugin the UDP plugin
1276 * of timeout / disconnect. Clean up our state. 1451 * @param udpw message wrapper to queue
1452 */
1453static void
1454enqueue (struct Plugin *plugin,
1455 struct UDP_MessageWrapper *udpw)
1456{
1457 struct Session *session = udpw->session;
1458
1459 if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1460 {
1461 GNUNET_break (0);
1462 }
1463 else
1464 {
1465 GNUNET_STATISTICS_update (plugin->env->stats,
1466 "# UDP, total bytes in send buffers",
1467 udpw->msg_size,
1468 GNUNET_NO);
1469 plugin->bytes_in_buffer += udpw->msg_size;
1470 }
1471 GNUNET_STATISTICS_update (plugin->env->stats,
1472 "# UDP, total messages in send buffers",
1473 1,
1474 GNUNET_NO);
1475 if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1476 {
1477 GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1478 plugin->ipv4_queue_tail,
1479 udpw);
1480 }
1481 else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1482 {
1483 GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1484 plugin->ipv6_queue_tail,
1485 udpw);
1486 }
1487 else
1488 {
1489 GNUNET_break (0);
1490 udpw->cont (udpw->cont_cls,
1491 &session->target,
1492 GNUNET_SYSERR,
1493 udpw->msg_size,
1494 0);
1495 GNUNET_free (udpw);
1496 return;
1497 }
1498 session->msgs_in_queue++;
1499 session->bytes_in_queue += udpw->msg_size;
1500}
1501
1502
1503/**
1504 * We have completed our (attempt) to transmit a message that had to
1505 * be fragmented -- either because we got an ACK saying that all
1506 * fragments were received, or because of timeout / disconnect. Clean
1507 * up our state.
1277 * 1508 *
1278 * @param fc fragmentation context to clean up 1509 * @param frag_ctx fragmentation context to clean up
1279 * @param result #GNUNET_OK if we succeeded (got ACK), 1510 * @param result #GNUNET_OK if we succeeded (got ACK),
1280 * #GNUNET_SYSERR if the transmission failed 1511 * #GNUNET_SYSERR if the transmission failed
1281 */ 1512 */
1282static void 1513static void
1283fragmented_message_done (struct UDP_FragmentationContext *fc, 1514fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1284 int result) 1515 int result)
1285{ 1516{
1286 struct Plugin *plugin = fc->plugin; 1517 struct Plugin *plugin = frag_ctx->plugin;
1287 struct Session *s = fc->session; 1518 struct Session *s = frag_ctx->session;
1288 struct UDP_MessageWrapper *udpw; 1519 struct UDP_MessageWrapper *udpw;
1289 struct UDP_MessageWrapper *tmp; 1520 struct UDP_MessageWrapper *tmp;
1290 struct UDP_MessageWrapper dummy; 1521 size_t overhead;
1291 1522
1292 LOG (GNUNET_ERROR_TYPE_DEBUG, 1523 LOG (GNUNET_ERROR_TYPE_DEBUG,
1293 "%p : Fragmented message removed with result %s\n", 1524 "%p: Fragmented message removed with result %s\n",
1294 fc, 1525 frag_ctx,
1295 (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS"); 1526 (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1296
1297 /* Call continuation for fragmented message */ 1527 /* Call continuation for fragmented message */
1298 memset (&dummy, 0, sizeof(dummy)); 1528 if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1299 dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE; 1529 overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1300 dummy.msg_size = s->frag_ctx->on_wire_size; 1530 else
1301 dummy.payload_size = s->frag_ctx->payload_size; 1531 overhead = frag_ctx->on_wire_size;
1302 dummy.frag_ctx = s->frag_ctx; 1532 if (NULL != frag_ctx->cont)
1303 dummy.cont = NULL; 1533 frag_ctx->cont (frag_ctx->cont_cls,
1304 dummy.cont_cls = NULL; 1534 &s->target,
1305 dummy.session = s; 1535 result,
1306 call_continuation (&dummy, result); 1536 s->frag_ctx->payload_size,
1307 /* Remove leftover fragments from queue */ 1537 frag_ctx->on_wire_size);
1538 GNUNET_STATISTICS_update (plugin->env->stats,
1539 "# UDP, fragmented messages active",
1540 -1,
1541 GNUNET_NO);
1542
1543 if (GNUNET_OK == result)
1544 {
1545 GNUNET_STATISTICS_update (plugin->env->stats,
1546 "# UDP, fragmented msgs, messages, sent, success",
1547 1,
1548 GNUNET_NO);
1549 GNUNET_STATISTICS_update (plugin->env->stats,
1550 "# UDP, fragmented msgs, bytes payload, sent, success",
1551 s->frag_ctx->payload_size,
1552 GNUNET_NO);
1553 GNUNET_STATISTICS_update (plugin->env->stats,
1554 "# UDP, fragmented msgs, bytes overhead, sent, success",
1555 overhead,
1556 GNUNET_NO);
1557 GNUNET_STATISTICS_update (plugin->env->stats,
1558 "# UDP, total, bytes overhead, sent",
1559 overhead,
1560 GNUNET_NO);
1561 GNUNET_STATISTICS_update (plugin->env->stats,
1562 "# UDP, total, bytes payload, sent",
1563 s->frag_ctx->payload_size,
1564 GNUNET_NO);
1565 }
1566 else
1567 {
1568 GNUNET_STATISTICS_update (plugin->env->stats,
1569 "# UDP, fragmented msgs, messages, sent, failure",
1570 1,
1571 GNUNET_NO);
1572 GNUNET_STATISTICS_update (plugin->env->stats,
1573 "# UDP, fragmented msgs, bytes payload, sent, failure",
1574 s->frag_ctx->payload_size,
1575 GNUNET_NO);
1576 GNUNET_STATISTICS_update (plugin->env->stats,
1577 "# UDP, fragmented msgs, bytes payload, sent, failure",
1578 overhead,
1579 GNUNET_NO);
1580 GNUNET_STATISTICS_update (plugin->env->stats,
1581 "# UDP, fragmented msgs, bytes payload, sent, failure",
1582 overhead,
1583 GNUNET_NO);
1584 }
1585
1586 /* Remove remaining fragments from queue, no need to transmit those
1587 any longer. */
1308 if (s->address->address_length == sizeof(struct IPv6UdpAddress)) 1588 if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1309 { 1589 {
1310 udpw = plugin->ipv6_queue_head; 1590 udpw = plugin->ipv6_queue_head;
@@ -1312,10 +1592,10 @@ fragmented_message_done (struct UDP_FragmentationContext *fc,
1312 { 1592 {
1313 tmp = udpw->next; 1593 tmp = udpw->next;
1314 if ( (udpw->frag_ctx != NULL) && 1594 if ( (udpw->frag_ctx != NULL) &&
1315 (udpw->frag_ctx == s->frag_ctx) ) 1595 (udpw->frag_ctx == frag_ctx) )
1316 { 1596 {
1317 dequeue (plugin, udpw); 1597 dequeue (plugin,
1318 call_continuation (udpw, GNUNET_SYSERR); 1598 udpw);
1319 GNUNET_free (udpw); 1599 GNUNET_free (udpw);
1320 } 1600 }
1321 udpw = tmp; 1601 udpw = tmp;
@@ -1324,14 +1604,15 @@ fragmented_message_done (struct UDP_FragmentationContext *fc,
1324 if (s->address->address_length == sizeof(struct IPv4UdpAddress)) 1604 if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1325 { 1605 {
1326 udpw = plugin->ipv4_queue_head; 1606 udpw = plugin->ipv4_queue_head;
1327 while (udpw != NULL ) 1607 while (NULL != udpw)
1328 { 1608 {
1329 tmp = udpw->next; 1609 tmp = udpw->next;
1330 if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx)) 1610 if ( (NULL != udpw->frag_ctx) &&
1611 (udpw->frag_ctx == frag_ctx) )
1331 { 1612 {
1332 dequeue (plugin, udpw); 1613 dequeue (plugin,
1333 call_continuation (udpw, GNUNET_SYSERR); 1614 udpw);
1334 GNUNET_free(udpw); 1615 GNUNET_free (udpw);
1335 } 1616 }
1336 udpw = tmp; 1617 udpw = tmp;
1337 } 1618 }
@@ -1339,16 +1620,437 @@ fragmented_message_done (struct UDP_FragmentationContext *fc,
1339 notify_session_monitor (s->plugin, 1620 notify_session_monitor (s->plugin,
1340 s, 1621 s,
1341 GNUNET_TRANSPORT_SS_UPDATE); 1622 GNUNET_TRANSPORT_SS_UPDATE);
1342 /* Destroy fragmentation context */ 1623 GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1343 GNUNET_FRAGMENT_context_destroy (fc->frag,
1344 &s->last_expected_msg_delay, 1624 &s->last_expected_msg_delay,
1345 &s->last_expected_ack_delay); 1625 &s->last_expected_ack_delay);
1346 s->frag_ctx = NULL; 1626 s->frag_ctx = NULL;
1347 GNUNET_free (fc); 1627 GNUNET_free (frag_ctx);
1628}
1629
1630
1631/**
1632 * We are finished with a fragment in the message queue.
1633 * Notify the continuation and update statistics.
1634 *
1635 * @param cls the `struct Plugin *`
1636 * @param udpw the queue entry
1637 * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1638 */
1639static void
1640qc_fragment_sent (void *cls,
1641 struct UDP_MessageWrapper *udpw,
1642 int result)
1643{
1644 struct Plugin *plugin = cls;
1645
1646 GNUNET_assert (NULL != udpw->frag_ctx);
1647 if (GNUNET_OK == result)
1648 {
1649 GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1650 GNUNET_STATISTICS_update (plugin->env->stats,
1651 "# UDP, fragmented msgs, fragments, sent, success",
1652 1,
1653 GNUNET_NO);
1654 GNUNET_STATISTICS_update (plugin->env->stats,
1655 "# UDP, fragmented msgs, fragments bytes, sent, success",
1656 udpw->msg_size,
1657 GNUNET_NO);
1658 }
1659 else
1660 {
1661 fragmented_message_done (udpw->frag_ctx,
1662 GNUNET_SYSERR);
1663 GNUNET_STATISTICS_update (plugin->env->stats,
1664 "# UDP, fragmented msgs, fragments, sent, failure",
1665 1,
1666 GNUNET_NO);
1667 GNUNET_STATISTICS_update (plugin->env->stats,
1668 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1669 udpw->msg_size,
1670 GNUNET_NO);
1671 }
1672}
1673
1674
1675/**
1676 * Function that is called with messages created by the fragmentation
1677 * module. In the case of the `proc` callback of the
1678 * #GNUNET_FRAGMENT_context_create() function, this function must
1679 * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1680 *
1681 * @param cls closure, the `struct UDP_FragmentationContext`
1682 * @param msg the message that was created
1683 */
1684static void
1685enqueue_fragment (void *cls,
1686 const struct GNUNET_MessageHeader *msg)
1687{
1688 struct UDP_FragmentationContext *frag_ctx = cls;
1689 struct Plugin *plugin = frag_ctx->plugin;
1690 struct UDP_MessageWrapper *udpw;
1691 struct Session *session = frag_ctx->session;
1692 size_t msg_len = ntohs (msg->size);
1693
1694 LOG (GNUNET_ERROR_TYPE_DEBUG,
1695 "Enqueuing fragment with %u bytes\n",
1696 msg_len);
1697 udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1698 udpw->session = session;
1699 udpw->msg_buf = (char *) &udpw[1];
1700 udpw->msg_size = msg_len;
1701 udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1702 udpw->timeout = frag_ctx->timeout;
1703 udpw->frag_ctx = frag_ctx;
1704 udpw->qc = &qc_fragment_sent;
1705 udpw->qc_cls = plugin;
1706 memcpy (udpw->msg_buf,
1707 msg,
1708 msg_len);
1709 enqueue (plugin,
1710 udpw);
1711 if (sizeof (struct IPv4UdpAddress) == session->address->address_length)
1712 schedule_select_v4 (plugin);
1713 else
1714 schedule_select_v6 (plugin);
1715}
1716
1717
1718/**
1719 * We are finished with a message from the message queue.
1720 * Notify the continuation and update statistics.
1721 *
1722 * @param cls the `struct Plugin *`
1723 * @param udpw the queue entry
1724 * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1725 */
1726static void
1727qc_message_sent (void *cls,
1728 struct UDP_MessageWrapper *udpw,
1729 int result)
1730{
1731 struct Plugin *plugin = cls;
1732 size_t overhead;
1733
1734 if (udpw->msg_size >= udpw->payload_size)
1735 overhead = udpw->msg_size - udpw->payload_size;
1736 else
1737 overhead = udpw->msg_size;
1738
1739 if (NULL != udpw->cont)
1740 udpw->cont (udpw->cont_cls,
1741 &udpw->session->target,
1742 result,
1743 udpw->payload_size,
1744 overhead);
1745 if (GNUNET_OK == result)
1746 {
1747 GNUNET_STATISTICS_update (plugin->env->stats,
1748 "# UDP, unfragmented msgs, messages, sent, success",
1749 1,
1750 GNUNET_NO);
1751 GNUNET_STATISTICS_update (plugin->env->stats,
1752 "# UDP, unfragmented msgs, bytes payload, sent, success",
1753 udpw->payload_size,
1754 GNUNET_NO);
1755 GNUNET_STATISTICS_update (plugin->env->stats,
1756 "# UDP, unfragmented msgs, bytes overhead, sent, success",
1757 overhead,
1758 GNUNET_NO);
1759 GNUNET_STATISTICS_update (plugin->env->stats,
1760 "# UDP, total, bytes overhead, sent",
1761 overhead,
1762 GNUNET_NO);
1763 GNUNET_STATISTICS_update (plugin->env->stats,
1764 "# UDP, total, bytes payload, sent",
1765 udpw->payload_size,
1766 GNUNET_NO);
1767 }
1768 else
1769 {
1770 GNUNET_STATISTICS_update (plugin->env->stats,
1771 "# UDP, unfragmented msgs, messages, sent, failure",
1772 1,
1773 GNUNET_NO);
1774 GNUNET_STATISTICS_update (plugin->env->stats,
1775 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1776 udpw->payload_size,
1777 GNUNET_NO);
1778 GNUNET_STATISTICS_update (plugin->env->stats,
1779 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1780 overhead,
1781 GNUNET_NO);
1782 }
1783}
1784
1785
1786/**
1787 * Function that can be used by the transport service to transmit a
1788 * message using the plugin. Note that in the case of a peer
1789 * disconnecting, the continuation MUST be called prior to the
1790 * disconnect notification itself. This function will be called with
1791 * this peer's HELLO message to initiate a fresh connection to another
1792 * peer.
1793 *
1794 * @param cls closure
1795 * @param s which session must be used
1796 * @param msgbuf the message to transmit
1797 * @param msgbuf_size number of bytes in @a msgbuf
1798 * @param priority how important is the message (most plugins will
1799 * ignore message priority and just FIFO)
1800 * @param to how long to wait at most for the transmission (does not
1801 * require plugins to discard the message after the timeout,
1802 * just advisory for the desired delay; most plugins will ignore
1803 * this as well)
1804 * @param cont continuation to call once the message has
1805 * been transmitted (or if the transport is ready
1806 * for the next transmission call; or if the
1807 * peer disconnected...); can be NULL
1808 * @param cont_cls closure for @a cont
1809 * @return number of bytes used (on the physical network, with overheads);
1810 * -1 on hard errors (i.e. address invalid); 0 is a legal value
1811 * and does NOT mean that the message was not transmitted (DV)
1812 */
1813static ssize_t
1814udp_plugin_send (void *cls,
1815 struct Session *s,
1816 const char *msgbuf,
1817 size_t msgbuf_size,
1818 unsigned int priority,
1819 struct GNUNET_TIME_Relative to,
1820 GNUNET_TRANSPORT_TransmitContinuation cont,
1821 void *cont_cls)
1822{
1823 struct Plugin *plugin = cls;
1824 size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1825 struct UDP_FragmentationContext *frag_ctx;
1826 struct UDP_MessageWrapper *udpw;
1827 struct UDPMessage *udp;
1828 char mbuf[udpmlen] GNUNET_ALIGN;
1829
1830 if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1831 (NULL == plugin->sockv6) )
1832 return GNUNET_SYSERR;
1833 if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1834 (NULL == plugin->sockv4) )
1835 return GNUNET_SYSERR;
1836 if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1837 {
1838 GNUNET_break (0);
1839 return GNUNET_SYSERR;
1840 }
1841 if (GNUNET_YES !=
1842 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1843 &s->target,
1844 s))
1845 {
1846 GNUNET_break (0);
1847 return GNUNET_SYSERR;
1848 }
1849 LOG (GNUNET_ERROR_TYPE_DEBUG,
1850 "UDP transmits %u-byte message to `%s' using address `%s'\n",
1851 udpmlen,
1852 GNUNET_i2s (&s->target),
1853 udp_address_to_string (plugin,
1854 s->address->address,
1855 s->address->address_length));
1856
1857 udp = (struct UDPMessage *) mbuf;
1858 udp->header.size = htons (udpmlen);
1859 udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1860 udp->reserved = htonl (0);
1861 udp->sender = *plugin->env->my_identity;
1862
1863 /* We do not update the session time out here! Otherwise this
1864 * session will not timeout since we send keep alive before session
1865 * can timeout.
1866 *
1867 * For UDP we update session timeout only on receive, this will
1868 * cover keep alives, since remote peer will reply with keep alive
1869 * responses!
1870 */
1871 if (udpmlen <= UDP_MTU)
1872 {
1873 /* unfragmented message */
1874 udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1875 udpw->session = s;
1876 udpw->msg_buf = (char *) &udpw[1];
1877 udpw->msg_size = udpmlen; /* message size with UDP overhead */
1878 udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1879 udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
1880 udpw->cont = cont;
1881 udpw->cont_cls = cont_cls;
1882 udpw->frag_ctx = NULL;
1883 udpw->qc = &qc_message_sent;
1884 udpw->qc_cls = plugin;
1885 memcpy (udpw->msg_buf,
1886 udp,
1887 sizeof (struct UDPMessage));
1888 memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
1889 msgbuf,
1890 msgbuf_size);
1891 enqueue (plugin,
1892 udpw);
1893 GNUNET_STATISTICS_update (plugin->env->stats,
1894 "# UDP, unfragmented messages queued total",
1895 1,
1896 GNUNET_NO);
1897 GNUNET_STATISTICS_update (plugin->env->stats,
1898 "# UDP, unfragmented bytes payload queued total",
1899 udpw->payload_size,
1900 GNUNET_NO);
1901 }
1902 else
1903 {
1904 /* fragmented message */
1905 if (NULL != s->frag_ctx)
1906 return GNUNET_SYSERR;
1907 memcpy (&udp[1],
1908 msgbuf,
1909 msgbuf_size);
1910 frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
1911 frag_ctx->plugin = plugin;
1912 frag_ctx->session = s;
1913 frag_ctx->cont = cont;
1914 frag_ctx->cont_cls = cont_cls;
1915 frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
1916 frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1917 frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1918 frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1919 UDP_MTU,
1920 &plugin->tracker,
1921 s->last_expected_msg_delay,
1922 s->last_expected_ack_delay,
1923 &udp->header,
1924 &enqueue_fragment,
1925 frag_ctx);
1926 s->frag_ctx = frag_ctx;
1927 GNUNET_STATISTICS_update (plugin->env->stats,
1928 "# UDP, fragmented messages active",
1929 1,
1930 GNUNET_NO);
1931 GNUNET_STATISTICS_update (plugin->env->stats,
1932 "# UDP, fragmented messages, total",
1933 1,
1934 GNUNET_NO);
1935 GNUNET_STATISTICS_update (plugin->env->stats,
1936 "# UDP, fragmented bytes (payload)",
1937 frag_ctx->payload_size,
1938 GNUNET_NO);
1939 }
1940 notify_session_monitor (s->plugin,
1941 s,
1942 GNUNET_TRANSPORT_SS_UPDATE);
1943 if (s->address->address_length == sizeof (struct IPv4UdpAddress))
1944 schedule_select_v4 (plugin);
1945 else
1946 schedule_select_v6 (plugin);
1947 return udpmlen;
1348} 1948}
1349 1949
1350 1950
1351/** 1951/**
1952 * Handle an ACK message.
1953 *
1954 * @param plugin the UDP plugin
1955 * @param msg the (presumed) UDP ACK message
1956 * @param udp_addr sender address
1957 * @param udp_addr_len number of bytes in @a udp_addr
1958 */
1959static void
1960read_process_ack (struct Plugin *plugin,
1961 const struct GNUNET_MessageHeader *msg,
1962 const union UdpAddress *udp_addr,
1963 socklen_t udp_addr_len)
1964{
1965 const struct GNUNET_MessageHeader *ack;
1966 const struct UDP_ACK_Message *udp_ack;
1967 struct GNUNET_HELLO_Address *address;
1968 struct Session *s;
1969 struct GNUNET_TIME_Relative flow_delay;
1970
1971 if (ntohs (msg->size)
1972 < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
1973 {
1974 GNUNET_break_op (0);
1975 return;
1976 }
1977 udp_ack = (const struct UDP_ACK_Message *) msg;
1978 address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
1979 PLUGIN_NAME,
1980 udp_addr,
1981 udp_addr_len,
1982 GNUNET_HELLO_ADDRESS_INFO_NONE);
1983 s = udp_plugin_lookup_session (plugin,
1984 address);
1985 if (NULL == s)
1986 {
1987 LOG (GNUNET_ERROR_TYPE_WARNING,
1988 "UDP session of address %s for ACK not found\n",
1989 udp_address_to_string (plugin,
1990 address->address,
1991 address->address_length));
1992 GNUNET_HELLO_address_free (address);
1993 return;
1994 }
1995 if (NULL == s->frag_ctx)
1996 {
1997 LOG (GNUNET_ERROR_TYPE_WARNING,
1998 "Fragmentation context of address %s for ACK not found\n",
1999 udp_address_to_string (plugin,
2000 address->address,
2001 address->address_length));
2002 GNUNET_HELLO_address_free (address);
2003 return;
2004 }
2005 GNUNET_HELLO_address_free (address);
2006
2007 flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2008 LOG (GNUNET_ERROR_TYPE_DEBUG,
2009 "We received a sending delay of %s for %s\n",
2010 GNUNET_STRINGS_relative_time_to_string (flow_delay,
2011 GNUNET_YES),
2012 GNUNET_i2s (&udp_ack->sender));
2013 s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2014
2015 ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2016 if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2017 {
2018 GNUNET_break_op(0);
2019 return;
2020 }
2021
2022 if (GNUNET_OK !=
2023 GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2024 ack))
2025 {
2026 LOG (GNUNET_ERROR_TYPE_DEBUG,
2027 "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2028 (unsigned int) ntohs (msg->size),
2029 GNUNET_i2s (&udp_ack->sender),
2030 udp_address_to_string (plugin,
2031 udp_addr,
2032 udp_addr_len));
2033 /* Expect more ACKs to arrive */
2034 return;
2035 }
2036
2037 LOG (GNUNET_ERROR_TYPE_DEBUG,
2038 "Message from %s at %s full ACK'ed\n",
2039 GNUNET_i2s (&udp_ack->sender),
2040 udp_address_to_string (plugin,
2041 udp_addr,
2042 udp_addr_len));
2043
2044 /* Remove fragmented message after successful sending */
2045 fragmented_message_done (s->frag_ctx,
2046 GNUNET_OK);
2047}
2048
2049
2050/* ********************** Receiving ********************** */
2051
2052
2053/**
1352 * Closure for #find_receive_context(). 2054 * Closure for #find_receive_context().
1353 */ 2055 */
1354struct FindReceiveContext 2056struct FindReceiveContext
@@ -1408,9 +2110,37 @@ find_receive_context (void *cls,
1408 2110
1409 2111
1410/** 2112/**
1411 * Functions with this signature are called whenever we need 2113 * Message tokenizer has broken up an incomming message. Pass it on
1412 * to close a session due to a disconnect or failure to 2114 * to the service.
1413 * establish a connection. 2115 *
2116 * @param cls the `struct Plugin *`
2117 * @param client the `struct Session *`
2118 * @param hdr the actual message
2119 * @return #GNUNET_OK (always)
2120 */
2121static int
2122process_inbound_tokenized_messages (void *cls,
2123 void *client,
2124 const struct GNUNET_MessageHeader *hdr)
2125{
2126 struct Plugin *plugin = cls;
2127 struct Session *session = client;
2128
2129 if (GNUNET_YES == session->in_destroy)
2130 return GNUNET_OK;
2131 reschedule_session_timeout (session);
2132 session->flow_delay_for_other_peer
2133 = plugin->env->receive (plugin->env->cls,
2134 session->address,
2135 session,
2136 hdr);
2137 return GNUNET_OK;
2138}
2139
2140
2141/**
2142 * Functions with this signature are called whenever we need to close
2143 * a session due to a disconnect or failure to establish a connection.
1414 * 2144 *
1415 * @param cls closure with the `struct Plugin` 2145 * @param cls closure with the `struct Plugin`
1416 * @param s session to close down 2146 * @param s session to close down
@@ -1427,13 +2157,12 @@ udp_disconnect_session (void *cls,
1427 2157
1428 GNUNET_assert (GNUNET_YES != s->in_destroy); 2158 GNUNET_assert (GNUNET_YES != s->in_destroy);
1429 LOG (GNUNET_ERROR_TYPE_DEBUG, 2159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1430 "Session %p to peer `%s' address ended\n", 2160 "Session %p to peer `%s' at address %s ended\n",
1431 s, 2161 s,
1432 GNUNET_i2s (&s->target), 2162 GNUNET_i2s (&s->target),
1433 udp_address_to_string (plugin, 2163 udp_address_to_string (plugin,
1434 s->address->address, 2164 s->address->address,
1435 s->address->address_length)); 2165 s->address->address_length));
1436 /* stop timeout task */
1437 if (NULL != s->timeout_task) 2166 if (NULL != s->timeout_task)
1438 { 2167 {
1439 GNUNET_SCHEDULER_cancel (s->timeout_task); 2168 GNUNET_SCHEDULER_cancel (s->timeout_task);
@@ -1470,9 +2199,12 @@ udp_disconnect_session (void *cls,
1470 next = udpw->next; 2199 next = udpw->next;
1471 if (udpw->session == s) 2200 if (udpw->session == s)
1472 { 2201 {
1473 dequeue (plugin, udpw); 2202 dequeue (plugin,
1474 call_continuation (udpw, GNUNET_SYSERR); 2203 udpw);
1475 GNUNET_free(udpw); 2204 udpw->qc (udpw->qc_cls,
2205 udpw,
2206 GNUNET_SYSERR);
2207 GNUNET_free (udpw);
1476 } 2208 }
1477 } 2209 }
1478 next = plugin->ipv6_queue_head; 2210 next = plugin->ipv6_queue_head;
@@ -1481,9 +2213,12 @@ udp_disconnect_session (void *cls,
1481 next = udpw->next; 2213 next = udpw->next;
1482 if (udpw->session == s) 2214 if (udpw->session == s)
1483 { 2215 {
1484 dequeue (plugin, udpw); 2216 dequeue (plugin,
1485 call_continuation (udpw, GNUNET_SYSERR); 2217 udpw);
1486 GNUNET_free(udpw); 2218 udpw->qc (udpw->qc_cls,
2219 udpw,
2220 GNUNET_SYSERR);
2221 GNUNET_free (udpw);
1487 } 2222 }
1488 } 2223 }
1489 notify_session_monitor (s->plugin, 2224 notify_session_monitor (s->plugin,
@@ -1493,19 +2228,19 @@ udp_disconnect_session (void *cls,
1493 s->address, 2228 s->address,
1494 s); 2229 s);
1495 2230
1496 if (NULL != s->frag_ctx) 2231 if ( (NULL != s->frag_ctx) &&
2232 (NULL != s->frag_ctx->cont) )
1497 { 2233 {
1498 if (NULL != s->frag_ctx->cont) 2234 /* The 'frag_ctx' itself will be freed in #free_session() a bit
1499 { 2235 later, as it might be in use right now */
1500 s->frag_ctx->cont (s->frag_ctx->cont_cls, 2236 LOG (GNUNET_ERROR_TYPE_DEBUG,
1501 &s->target, 2237 "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1502 GNUNET_SYSERR, 2238 GNUNET_i2s (&s->target));
1503 s->frag_ctx->payload_size, 2239 s->frag_ctx->cont (s->frag_ctx->cont_cls,
1504 s->frag_ctx->on_wire_size); 2240 &s->target,
1505 LOG (GNUNET_ERROR_TYPE_DEBUG, 2241 GNUNET_SYSERR,
1506 "Calling continuation for fragemented message to `%s' with result SYSERR\n", 2242 s->frag_ctx->payload_size,
1507 GNUNET_i2s (&s->target)); 2243 s->frag_ctx->on_wire_size);
1508 }
1509 } 2244 }
1510 2245
1511 GNUNET_assert (GNUNET_YES == 2246 GNUNET_assert (GNUNET_YES ==
@@ -1522,7 +2257,6 @@ udp_disconnect_session (void *cls,
1522 } 2257 }
1523 else 2258 else
1524 { 2259 {
1525 GNUNET_HELLO_address_free (s->address);
1526 free_session (s); 2260 free_session (s);
1527 } 2261 }
1528 return GNUNET_OK; 2262 return GNUNET_OK;
@@ -1530,21 +2264,6 @@ udp_disconnect_session (void *cls,
1530 2264
1531 2265
1532/** 2266/**
1533 * Function that is called to get the keepalive factor.
1534 * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1535 * calculate the interval between keepalive packets.
1536 *
1537 * @param cls closure with the `struct Plugin`
1538 * @return keepalive factor
1539 */
1540static unsigned int
1541udp_query_keepalive_factor (void *cls)
1542{
1543 return 15;
1544}
1545
1546
1547/**
1548 * Destroy a session, plugin is being unloaded. 2267 * Destroy a session, plugin is being unloaded.
1549 * 2268 *
1550 * @param cls the `struct Plugin` 2269 * @param cls the `struct Plugin`
@@ -1559,7 +2278,8 @@ disconnect_and_free_it (void *cls,
1559{ 2278{
1560 struct Plugin *plugin = cls; 2279 struct Plugin *plugin = cls;
1561 2280
1562 udp_disconnect_session (plugin, value); 2281 udp_disconnect_session (plugin,
2282 value);
1563 return GNUNET_OK; 2283 return GNUNET_OK;
1564} 2284}
1565 2285
@@ -1581,7 +2301,6 @@ udp_disconnect (void *cls,
1581 LOG (GNUNET_ERROR_TYPE_DEBUG, 2301 LOG (GNUNET_ERROR_TYPE_DEBUG,
1582 "Disconnecting from peer `%s'\n", 2302 "Disconnecting from peer `%s'\n",
1583 GNUNET_i2s (target)); 2303 GNUNET_i2s (target));
1584 /* Clean up sessions */
1585 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, 2304 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1586 target, 2305 target,
1587 &disconnect_and_free_it, 2306 &disconnect_and_free_it,
@@ -1590,7 +2309,7 @@ udp_disconnect (void *cls,
1590 2309
1591 2310
1592/** 2311/**
1593 * Session was idle, so disconnect it 2312 * Session was idle, so disconnect it.
1594 * 2313 *
1595 * @param cls the `struct Session` to time out 2314 * @param cls the `struct Session` to time out
1596 * @param tc scheduler context 2315 * @param tc scheduler context
@@ -1623,152 +2342,8 @@ session_timeout (void *cls,
1623 GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT, 2342 GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
1624 GNUNET_YES)); 2343 GNUNET_YES));
1625 /* call session destroy function */ 2344 /* call session destroy function */
1626 udp_disconnect_session (plugin, s); 2345 udp_disconnect_session (plugin,
1627} 2346 s);
1628
1629
1630/**
1631 * Increment session timeout due to activity
1632 *
1633 * @param s session to reschedule timeout activity for
1634 */
1635static void
1636reschedule_session_timeout (struct Session *s)
1637{
1638 if (GNUNET_YES == s->in_destroy)
1639 return;
1640 GNUNET_assert(NULL != s->timeout_task);
1641 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1642}
1643
1644
1645/**
1646 * Function obtain the network type for a session
1647 *
1648 * @param cls closure (`struct Plugin *`)
1649 * @param session the session
1650 * @return the network type
1651 */
1652static enum GNUNET_ATS_Network_Type
1653udp_get_network (void *cls,
1654 struct Session *session)
1655{
1656 return session->scope;
1657}
1658
1659
1660/**
1661 * Closure for #session_cmp_it().
1662 */
1663struct SessionCompareContext
1664{
1665 /**
1666 * Set to session matching the address.
1667 */
1668 struct Session *res;
1669
1670 /**
1671 * Address we are looking for.
1672 */
1673 const struct GNUNET_HELLO_Address *address;
1674};
1675
1676
1677/**
1678 * Find a session with a matching address.
1679 *
1680 * @param cls the `struct SessionCompareContext *`
1681 * @param key peer identity (unused)
1682 * @param value the `struct Session *`
1683 * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1684 */
1685static int
1686session_cmp_it (void *cls,
1687 const struct GNUNET_PeerIdentity *key,
1688 void *value)
1689{
1690 struct SessionCompareContext *cctx = cls;
1691 struct Session *s = value;
1692
1693 if (0 == GNUNET_HELLO_address_cmp (s->address,
1694 cctx->address))
1695 {
1696 cctx->res = s;
1697 return GNUNET_NO;
1698 }
1699 return GNUNET_YES;
1700}
1701
1702
1703/**
1704 * Locate an existing session the transport service is using to
1705 * send data to another peer. Performs some basic sanity checks
1706 * on the address and then tries to locate a matching session.
1707 *
1708 * @param cls the plugin
1709 * @param address the address we should locate the session by
1710 * @return the session if it exists, or NULL if it is not found
1711 */
1712static struct Session *
1713udp_plugin_lookup_session (void *cls,
1714 const struct GNUNET_HELLO_Address *address)
1715{
1716 struct Plugin *plugin = cls;
1717 const struct IPv6UdpAddress *udp_a6;
1718 const struct IPv4UdpAddress *udp_a4;
1719 struct SessionCompareContext cctx;
1720
1721 if ( (NULL == address->address) ||
1722 ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1723 (address->address_length != sizeof (struct IPv6UdpAddress))))
1724 {
1725 LOG (GNUNET_ERROR_TYPE_WARNING,
1726 "Trying to locate session for address of unexpected length %u (should be %u or %u)\n",
1727 address->address_length,
1728 sizeof (struct IPv4UdpAddress),
1729 sizeof (struct IPv6UdpAddress));
1730 return NULL;
1731 }
1732
1733 if (address->address_length == sizeof(struct IPv4UdpAddress))
1734 {
1735 if (NULL == plugin->sockv4)
1736 return NULL;
1737 udp_a4 = (const struct IPv4UdpAddress *) address->address;
1738 if (0 == udp_a4->u4_port)
1739 return NULL;
1740 }
1741
1742 if (address->address_length == sizeof(struct IPv6UdpAddress))
1743 {
1744 if (NULL == plugin->sockv6)
1745 return NULL;
1746 udp_a6 = (const struct IPv6UdpAddress *) address->address;
1747 if (0 == udp_a6->u6_port)
1748 return NULL;
1749 }
1750
1751 /* check if session already exists */
1752 cctx.address = address;
1753 cctx.res = NULL;
1754 LOG (GNUNET_ERROR_TYPE_DEBUG,
1755 "Looking for existing session for peer `%s' `%s' \n",
1756 GNUNET_i2s (&address->peer),
1757 udp_address_to_string (plugin,
1758 address->address,
1759 address->address_length));
1760 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1761 &address->peer,
1762 &session_cmp_it,
1763 &cctx);
1764 if (NULL != cctx.res)
1765 {
1766 LOG (GNUNET_ERROR_TYPE_DEBUG,
1767 "Found existing session %p\n",
1768 cctx.res);
1769 return cctx.res;
1770 }
1771 return NULL;
1772} 2347}
1773 2348
1774 2349
@@ -1802,7 +2377,8 @@ udp_plugin_create_session (void *cls,
1802 s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; 2377 s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1803 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); 2378 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1804 s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, 2379 s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1805 &session_timeout, s); 2380 &session_timeout,
2381 s);
1806 s->scope = network_type; 2382 s->scope = network_type;
1807 2383
1808 LOG (GNUNET_ERROR_TYPE_DEBUG, 2384 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1812,11 +2388,11 @@ udp_plugin_create_session (void *cls,
1812 udp_address_to_string (plugin, 2388 udp_address_to_string (plugin,
1813 address->address, 2389 address->address,
1814 address->address_length)); 2390 address->address_length));
1815 GNUNET_assert(GNUNET_OK == 2391 GNUNET_assert (GNUNET_OK ==
1816 GNUNET_CONTAINER_multipeermap_put (plugin->sessions, 2392 GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
1817 &s->target, 2393 &s->target,
1818 s, 2394 s,
1819 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 2395 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1820 GNUNET_STATISTICS_set (plugin->env->stats, 2396 GNUNET_STATISTICS_set (plugin->env->stats,
1821 "# UDP sessions active", 2397 "# UDP sessions active",
1822 GNUNET_CONTAINER_multipeermap_size (plugin->sessions), 2398 GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
@@ -1826,35 +2402,6 @@ udp_plugin_create_session (void *cls,
1826 2402
1827 2403
1828/** 2404/**
1829 * Function that will be called whenever the transport service wants to
1830 * notify the plugin that a session is still active and in use and
1831 * therefore the session timeout for this session has to be updated
1832 *
1833 * @param cls closure with the `struct Plugin`
1834 * @param peer which peer was the session for
1835 * @param session which session is being updated
1836 */
1837static void
1838udp_plugin_update_session_timeout (void *cls,
1839 const struct GNUNET_PeerIdentity *peer,
1840 struct Session *session)
1841{
1842 struct Plugin *plugin = cls;
1843
1844 if (GNUNET_YES !=
1845 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1846 peer,
1847 session))
1848 {
1849 GNUNET_break(0);
1850 return;
1851 }
1852 /* Reschedule session timeout */
1853 reschedule_session_timeout (session);
1854}
1855
1856
1857/**
1858 * Creates a new outbound session the transport service will use to 2405 * Creates a new outbound session the transport service will use to
1859 * send data to the peer. 2406 * send data to the peer.
1860 * 2407 *
@@ -1869,8 +2416,8 @@ udp_plugin_get_session (void *cls,
1869 struct Plugin *plugin = cls; 2416 struct Plugin *plugin = cls;
1870 struct Session *s; 2417 struct Session *s;
1871 enum GNUNET_ATS_Network_Type network_type; 2418 enum GNUNET_ATS_Network_Type network_type;
1872 struct IPv4UdpAddress *udp_v4; 2419 const struct IPv4UdpAddress *udp_v4;
1873 struct IPv6UdpAddress *udp_v6; 2420 const struct IPv6UdpAddress *udp_v6;
1874 2421
1875 if (NULL == address) 2422 if (NULL == address)
1876 { 2423 {
@@ -1892,7 +2439,7 @@ udp_plugin_get_session (void *cls,
1892 { 2439 {
1893 struct sockaddr_in v4; 2440 struct sockaddr_in v4;
1894 2441
1895 udp_v4 = (struct IPv4UdpAddress *) address->address; 2442 udp_v4 = (const struct IPv4UdpAddress *) address->address;
1896 memset (&v4, '\0', sizeof (v4)); 2443 memset (&v4, '\0', sizeof (v4));
1897 v4.sin_family = AF_INET; 2444 v4.sin_family = AF_INET;
1898#if HAVE_SOCKADDR_IN_SIN_LEN 2445#if HAVE_SOCKADDR_IN_SIN_LEN
@@ -1908,7 +2455,7 @@ udp_plugin_get_session (void *cls,
1908 { 2455 {
1909 struct sockaddr_in6 v6; 2456 struct sockaddr_in6 v6;
1910 2457
1911 udp_v6 = (struct IPv6UdpAddress *) address->address; 2458 udp_v6 = (const struct IPv6UdpAddress *) address->address;
1912 memset (&v6, '\0', sizeof (v6)); 2459 memset (&v6, '\0', sizeof (v6));
1913 v6.sin6_family = AF_INET6; 2460 v6.sin6_family = AF_INET6;
1914#if HAVE_SOCKADDR_IN_SIN_LEN 2461#if HAVE_SOCKADDR_IN_SIN_LEN
@@ -1920,6 +2467,7 @@ udp_plugin_get_session (void *cls,
1920 (const struct sockaddr *) &v6, 2467 (const struct sockaddr *) &v6,
1921 sizeof (v6)); 2468 sizeof (v6));
1922 } 2469 }
2470 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
1923 return udp_plugin_create_session (cls, 2471 return udp_plugin_create_session (cls,
1924 address, 2472 address,
1925 network_type); 2473 network_type);
@@ -1927,370 +2475,6 @@ udp_plugin_get_session (void *cls,
1927 2475
1928 2476
1929/** 2477/**
1930 * Enqueue a message for transmission.
1931 *
1932 * @param plugin the UDP plugin
1933 * @param udpw message wrapper to queue
1934 */
1935static void
1936enqueue (struct Plugin *plugin,
1937 struct UDP_MessageWrapper *udpw)
1938{
1939 struct Session *session = udpw->session;
1940
1941 if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1942 {
1943 GNUNET_break (0);
1944 }
1945 else
1946 {
1947 GNUNET_STATISTICS_update (plugin->env->stats,
1948 "# UDP, total, bytes in buffers",
1949 udpw->msg_size,
1950 GNUNET_NO);
1951 plugin->bytes_in_buffer += udpw->msg_size;
1952 }
1953 GNUNET_STATISTICS_update (plugin->env->stats,
1954 "# UDP, total, msgs in buffers",
1955 1, GNUNET_NO);
1956 if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1957 GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1958 plugin->ipv4_queue_tail,
1959 udpw);
1960 else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1961 GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1962 plugin->ipv6_queue_tail,
1963 udpw);
1964 else
1965 {
1966 GNUNET_break (0);
1967 return;
1968 }
1969 session->msgs_in_queue++;
1970 session->bytes_in_queue += udpw->msg_size;
1971}
1972
1973
1974/**
1975 * Fragment message was transmitted via UDP, let fragmentation know
1976 * to send the next fragment now.
1977 *
1978 * @param cls the `struct UDPMessageWrapper *` of the fragment
1979 * @param target destination peer (ignored)
1980 * @param result #GNUNET_OK on success (ignored)
1981 * @param payload bytes payload sent
1982 * @param physical bytes physical sent
1983 */
1984static void
1985send_next_fragment (void *cls,
1986 const struct GNUNET_PeerIdentity *target,
1987 int result,
1988 size_t payload,
1989 size_t physical)
1990{
1991 struct UDP_MessageWrapper *udpw = cls;
1992
1993 GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1994}
1995
1996
1997/**
1998 * Function that is called with messages created by the fragmentation
1999 * module. In the case of the 'proc' callback of the
2000 * #GNUNET_FRAGMENT_context_create() function, this function must
2001 * eventually call #GNUNET_FRAGMENT_context_transmission_done().
2002 *
2003 * @param cls closure, the 'struct FragmentationContext'
2004 * @param msg the message that was created
2005 */
2006static void
2007enqueue_fragment (void *cls,
2008 const struct GNUNET_MessageHeader *msg)
2009{
2010 struct UDP_FragmentationContext *frag_ctx = cls;
2011 struct Plugin *plugin = frag_ctx->plugin;
2012 struct UDP_MessageWrapper *udpw;
2013 size_t msg_len = ntohs (msg->size);
2014
2015 LOG (GNUNET_ERROR_TYPE_DEBUG,
2016 "Enqueuing fragment with %u bytes\n",
2017 msg_len);
2018 udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
2019 udpw->session = frag_ctx->session;
2020 udpw->msg_buf = (char *) &udpw[1];
2021 udpw->msg_size = msg_len;
2022 udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
2023 udpw->cont = &send_next_fragment;
2024 udpw->cont_cls = udpw;
2025 udpw->timeout = frag_ctx->timeout;
2026 udpw->frag_ctx = frag_ctx;
2027 udpw->msg_type = UMT_MSG_FRAGMENTED;
2028 memcpy (udpw->msg_buf, msg, msg_len);
2029 enqueue (plugin,
2030 udpw);
2031 if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
2032 schedule_select_v4 (plugin);
2033 else
2034 schedule_select_v6 (plugin);
2035}
2036
2037
2038/**
2039 * Function that can be used by the transport service to transmit
2040 * a message using the plugin. Note that in the case of a
2041 * peer disconnecting, the continuation MUST be called
2042 * prior to the disconnect notification itself. This function
2043 * will be called with this peer's HELLO message to initiate
2044 * a fresh connection to another peer.
2045 *
2046 * @param cls closure
2047 * @param s which session must be used
2048 * @param msgbuf the message to transmit
2049 * @param msgbuf_size number of bytes in 'msgbuf'
2050 * @param priority how important is the message (most plugins will
2051 * ignore message priority and just FIFO)
2052 * @param to how long to wait at most for the transmission (does not
2053 * require plugins to discard the message after the timeout,
2054 * just advisory for the desired delay; most plugins will ignore
2055 * this as well)
2056 * @param cont continuation to call once the message has
2057 * been transmitted (or if the transport is ready
2058 * for the next transmission call; or if the
2059 * peer disconnected...); can be NULL
2060 * @param cont_cls closure for cont
2061 * @return number of bytes used (on the physical network, with overheads);
2062 * -1 on hard errors (i.e. address invalid); 0 is a legal value
2063 * and does NOT mean that the message was not transmitted (DV)
2064 */
2065static ssize_t
2066udp_plugin_send (void *cls,
2067 struct Session *s,
2068 const char *msgbuf,
2069 size_t msgbuf_size,
2070 unsigned int priority,
2071 struct GNUNET_TIME_Relative to,
2072 GNUNET_TRANSPORT_TransmitContinuation cont,
2073 void *cont_cls)
2074{
2075 struct Plugin *plugin = cls;
2076 size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2077 struct UDP_FragmentationContext * frag_ctx;
2078 struct UDP_MessageWrapper * udpw;
2079 struct UDPMessage *udp;
2080 char mbuf[udpmlen];
2081
2082 if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
2083 (plugin->sockv6 == NULL) )
2084 return GNUNET_SYSERR;
2085 if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
2086 (plugin->sockv4 == NULL) )
2087 return GNUNET_SYSERR;
2088 if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2089 {
2090 GNUNET_break(0);
2091 return GNUNET_SYSERR;
2092 }
2093 if (GNUNET_YES !=
2094 GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2095 &s->target,
2096 s))
2097 {
2098 GNUNET_break(0);
2099 return GNUNET_SYSERR;
2100 }
2101 LOG (GNUNET_ERROR_TYPE_DEBUG,
2102 "UDP transmits %u-byte message to `%s' using address `%s'\n",
2103 udpmlen,
2104 GNUNET_i2s (&s->target),
2105 udp_address_to_string (plugin,
2106 s->address->address,
2107 s->address->address_length));
2108
2109 /* Message */
2110 udp = (struct UDPMessage *) mbuf;
2111 udp->header.size = htons (udpmlen);
2112 udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2113 udp->reserved = htonl (0);
2114 udp->sender = *plugin->env->my_identity;
2115
2116 /* We do not update the session time out here!
2117 * Otherwise this session will not timeout since we send keep alive before
2118 * session can timeout
2119 *
2120 * For UDP we update session timeout only on receive, this will cover keep
2121 * alives, since remote peer will reply with keep alive response!
2122 */
2123 if (udpmlen <= UDP_MTU)
2124 {
2125 /* unfragmented message */
2126 udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2127 udpw->session = s;
2128 udpw->msg_buf = (char *) &udpw[1];
2129 udpw->msg_size = udpmlen; /* message size with UDP overhead */
2130 udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2131 udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
2132 udpw->cont = cont;
2133 udpw->cont_cls = cont_cls;
2134 udpw->frag_ctx = NULL;
2135 udpw->msg_type = UMT_MSG_UNFRAGMENTED;
2136 memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2137 memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
2138 enqueue (plugin, udpw);
2139
2140 GNUNET_STATISTICS_update (plugin->env->stats,
2141 "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
2142 GNUNET_STATISTICS_update (plugin->env->stats,
2143 "# UDP, unfragmented msgs, bytes payload, attempt",
2144 udpw->payload_size,
2145 GNUNET_NO);
2146 }
2147 else
2148 {
2149 /* fragmented message */
2150 if (s->frag_ctx != NULL)
2151 return GNUNET_SYSERR;
2152 memcpy (&udp[1], msgbuf, msgbuf_size);
2153 frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2154 frag_ctx->plugin = plugin;
2155 frag_ctx->session = s;
2156 frag_ctx->cont = cont;
2157 frag_ctx->cont_cls = cont_cls;
2158 frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
2159 to);
2160 frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2161 frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2162 frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2163 UDP_MTU,
2164 &plugin->tracker,
2165 s->last_expected_msg_delay,
2166 s->last_expected_ack_delay,
2167 &udp->header,
2168 &enqueue_fragment,
2169 frag_ctx);
2170 s->frag_ctx = frag_ctx;
2171 GNUNET_STATISTICS_update (plugin->env->stats,
2172 "# UDP, fragmented msgs, messages, pending",
2173 1,
2174 GNUNET_NO);
2175 GNUNET_STATISTICS_update (plugin->env->stats,
2176 "# UDP, fragmented msgs, messages, attempt",
2177 1,
2178 GNUNET_NO);
2179 GNUNET_STATISTICS_update (plugin->env->stats,
2180 "# UDP, fragmented msgs, bytes payload, attempt",
2181 frag_ctx->payload_size,
2182 GNUNET_NO);
2183 }
2184 notify_session_monitor (s->plugin,
2185 s,
2186 GNUNET_TRANSPORT_SS_UPDATE);
2187 if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2188 schedule_select_v4 (plugin);
2189 else
2190 schedule_select_v6 (plugin);
2191 return udpmlen;
2192}
2193
2194
2195/**
2196 * Our external IP address/port mapping has changed.
2197 *
2198 * @param cls closure, the `struct LocalAddrList`
2199 * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
2200 * the previous (now invalid) one
2201 * @param addr either the previous or the new public IP address
2202 * @param addrlen actual lenght of the address
2203 */
2204static void
2205udp_nat_port_map_callback (void *cls,
2206 int add_remove,
2207 const struct sockaddr *addr,
2208 socklen_t addrlen)
2209{
2210 struct Plugin *plugin = cls;
2211 struct GNUNET_HELLO_Address *address;
2212 struct IPv4UdpAddress u4;
2213 struct IPv6UdpAddress u6;
2214 void *arg;
2215 size_t args;
2216
2217 LOG (GNUNET_ERROR_TYPE_INFO,
2218 "NAT notification to %s address `%s'\n",
2219 (GNUNET_YES == add_remove) ? "add" : "remove",
2220 GNUNET_a2s (addr, addrlen));
2221
2222 /* convert 'address' to our internal format */
2223 switch (addr->sa_family)
2224 {
2225 case AF_INET:
2226 GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
2227 memset (&u4, 0, sizeof(u4));
2228 u4.options = htonl (plugin->myoptions);
2229 u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
2230 u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
2231 if (0 == ((struct sockaddr_in *) addr)->sin_port)
2232 return;
2233 arg = &u4;
2234 args = sizeof(struct IPv4UdpAddress);
2235 break;
2236 case AF_INET6:
2237 GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
2238 memset (&u6, 0, sizeof(u6));
2239 u6.options = htonl (plugin->myoptions);
2240 if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
2241 return;
2242 memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
2243 sizeof(struct in6_addr));
2244 u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
2245 arg = &u6;
2246 args = sizeof(struct IPv6UdpAddress);
2247 break;
2248 default:
2249 GNUNET_break(0);
2250 return;
2251 }
2252 /* modify our published address list */
2253 address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
2254 PLUGIN_NAME,
2255 arg, args,
2256 GNUNET_HELLO_ADDRESS_INFO_NONE);
2257 plugin->env->notify_address (plugin->env->cls, add_remove, address);
2258 GNUNET_HELLO_address_free (address);
2259}
2260
2261
2262/**
2263 * Message tokenizer has broken up an incomming message. Pass it on
2264 * to the service.
2265 *
2266 * @param cls the `struct Plugin *`
2267 * @param client the `struct Session *`
2268 * @param hdr the actual message
2269 * @return #GNUNET_OK (always)
2270 */
2271static int
2272process_inbound_tokenized_messages (void *cls,
2273 void *client,
2274 const struct GNUNET_MessageHeader *hdr)
2275{
2276 struct Plugin *plugin = cls;
2277 struct Session *session = client;
2278 struct GNUNET_TIME_Relative delay;
2279
2280 if (GNUNET_YES == session->in_destroy)
2281 return GNUNET_OK;
2282 /* setup ATS */
2283 reschedule_session_timeout (session);
2284 delay = plugin->env->receive (plugin->env->cls,
2285 session->address,
2286 session,
2287 hdr);
2288 session->flow_delay_for_other_peer = delay;
2289 return GNUNET_OK;
2290}
2291
2292
2293/**
2294 * We've received a UDP Message. Process it (pass contents to main service). 2478 * We've received a UDP Message. Process it (pass contents to main service).
2295 * 2479 *
2296 * @param plugin plugin context 2480 * @param plugin plugin context
@@ -2328,7 +2512,8 @@ process_udp_message (struct Plugin *plugin,
2328 udp_addr_len, 2512 udp_addr_len,
2329 GNUNET_HELLO_ADDRESS_INFO_NONE); 2513 GNUNET_HELLO_ADDRESS_INFO_NONE);
2330 if (NULL == 2514 if (NULL ==
2331 (s = udp_plugin_lookup_session (plugin, address))) 2515 (s = udp_plugin_lookup_session (plugin,
2516 address)))
2332 { 2517 {
2333 s = udp_plugin_create_session (plugin, 2518 s = udp_plugin_create_session (plugin,
2334 address, 2519 address,
@@ -2346,7 +2531,6 @@ process_udp_message (struct Plugin *plugin,
2346 } 2531 }
2347 GNUNET_free (address); 2532 GNUNET_free (address);
2348 2533
2349 /* iterate over all embedded messages */
2350 s->rc++; 2534 s->rc++;
2351 GNUNET_SERVER_mst_receive (plugin->mst, 2535 GNUNET_SERVER_mst_receive (plugin->mst,
2352 s, 2536 s,
@@ -2355,7 +2539,8 @@ process_udp_message (struct Plugin *plugin,
2355 GNUNET_YES, 2539 GNUNET_YES,
2356 GNUNET_NO); 2540 GNUNET_NO);
2357 s->rc--; 2541 s->rc--;
2358 if ((0 == s->rc) && (GNUNET_YES == s->in_destroy)) 2542 if ( (0 == s->rc) &&
2543 (GNUNET_YES == s->in_destroy) )
2359 free_session (s); 2544 free_session (s);
2360} 2545}
2361 2546
@@ -2375,12 +2560,12 @@ fragment_msg_proc (void *cls,
2375 2560
2376 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE) 2561 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2377 { 2562 {
2378 GNUNET_break(0); 2563 GNUNET_break_op (0);
2379 return; 2564 return;
2380 } 2565 }
2381 if (ntohs (msg->size) < sizeof(struct UDPMessage)) 2566 if (ntohs (msg->size) < sizeof(struct UDPMessage))
2382 { 2567 {
2383 GNUNET_break(0); 2568 GNUNET_break_op (0);
2384 return; 2569 return;
2385 } 2570 }
2386 um = (const struct UDPMessage *) msg; 2571 um = (const struct UDPMessage *) msg;
@@ -2395,6 +2580,39 @@ fragment_msg_proc (void *cls,
2395 2580
2396 2581
2397/** 2582/**
2583 * We finished sending an acknowledgement. Update
2584 * statistics.
2585 *
2586 * @param cls the `struct Plugin`
2587 * @param udpw message queue entry of the ACK
2588 * @param result #GNUNET_OK if the transmission worked,
2589 * #GNUNET_SYSERR if we failed to send the ACK
2590 */
2591static void
2592ack_message_sent (void *cls,
2593 struct UDP_MessageWrapper *udpw,
2594 int result)
2595{
2596 struct Plugin *plugin = cls;
2597
2598 if (GNUNET_OK == result)
2599 {
2600 GNUNET_STATISTICS_update (plugin->env->stats,
2601 "# UDP, ACK messages sent",
2602 1,
2603 GNUNET_NO);
2604 }
2605 else
2606 {
2607 GNUNET_STATISTICS_update (plugin->env->stats,
2608 "# UDP, ACK transmissions failed",
2609 1,
2610 GNUNET_NO);
2611 }
2612}
2613
2614
2615/**
2398 * Transmit an acknowledgement. 2616 * Transmit an acknowledgement.
2399 * 2617 *
2400 * @param cls the `struct DefragContext *` 2618 * @param cls the `struct DefragContext *`
@@ -2407,6 +2625,7 @@ ack_proc (void *cls,
2407 const struct GNUNET_MessageHeader *msg) 2625 const struct GNUNET_MessageHeader *msg)
2408{ 2626{
2409 struct DefragContext *rc = cls; 2627 struct DefragContext *rc = cls;
2628 struct Plugin *plugin = rc->plugin;
2410 size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size); 2629 size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2411 struct UDP_ACK_Message *udp_ack; 2630 struct UDP_ACK_Message *udp_ack;
2412 uint32_t delay; 2631 uint32_t delay;
@@ -2425,19 +2644,23 @@ ack_proc (void *cls,
2425 rc->udp_addr, 2644 rc->udp_addr,
2426 rc->udp_addr_len, 2645 rc->udp_addr_len,
2427 GNUNET_HELLO_ADDRESS_INFO_NONE); 2646 GNUNET_HELLO_ADDRESS_INFO_NONE);
2428 s = udp_plugin_lookup_session (rc->plugin, 2647 s = udp_plugin_lookup_session (plugin,
2429 address); 2648 address);
2430 GNUNET_HELLO_address_free (address); 2649 GNUNET_HELLO_address_free (address);
2431 if (NULL == s) 2650 if (NULL == s)
2432 { 2651 {
2433 LOG (GNUNET_ERROR_TYPE_ERROR, 2652 LOG (GNUNET_ERROR_TYPE_ERROR,
2434 "Trying to transmit ACK to peer `%s' but no session found!\n", 2653 "Trying to transmit ACK to peer `%s' but no session found!\n",
2435 udp_address_to_string (rc->plugin, 2654 udp_address_to_string (plugin,
2436 rc->udp_addr, 2655 rc->udp_addr,
2437 rc->udp_addr_len)); 2656 rc->udp_addr_len));
2438 GNUNET_CONTAINER_heap_remove_node (rc->hnode); 2657 GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2439 GNUNET_DEFRAGMENT_context_destroy (rc->defrag); 2658 GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2440 GNUNET_free (rc); 2659 GNUNET_free (rc);
2660 GNUNET_STATISTICS_update (plugin->env->stats,
2661 "# UDP, ACK transmissions failed",
2662 1,
2663 GNUNET_NO);
2441 return; 2664 return;
2442 } 2665 }
2443 if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX) 2666 if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
@@ -2447,7 +2670,7 @@ ack_proc (void *cls,
2447 2670
2448 LOG (GNUNET_ERROR_TYPE_DEBUG, 2671 LOG (GNUNET_ERROR_TYPE_DEBUG,
2449 "Sending ACK to `%s' including delay of %s\n", 2672 "Sending ACK to `%s' including delay of %s\n",
2450 udp_address_to_string (rc->plugin, 2673 udp_address_to_string (plugin,
2451 rc->udp_addr, 2674 rc->udp_addr,
2452 rc->udp_addr_len), 2675 rc->udp_addr_len),
2453 GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer, 2676 GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
@@ -2458,120 +2681,25 @@ ack_proc (void *cls,
2458 udpw->session = s; 2681 udpw->session = s;
2459 udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; 2682 udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2460 udpw->msg_buf = (char *) &udpw[1]; 2683 udpw->msg_buf = (char *) &udpw[1];
2461 udpw->msg_type = UMT_MSG_ACK; 2684 udpw->qc = &ack_message_sent;
2685 udpw->qc_cls = plugin;
2462 udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf; 2686 udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2463 udp_ack->header.size = htons ((uint16_t) msize); 2687 udp_ack->header.size = htons ((uint16_t) msize);
2464 udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); 2688 udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2465 udp_ack->delay = htonl (delay); 2689 udp_ack->delay = htonl (delay);
2466 udp_ack->sender = *rc->plugin->env->my_identity; 2690 udp_ack->sender = *plugin->env->my_identity;
2467 memcpy (&udp_ack[1], msg, ntohs (msg->size)); 2691 memcpy (&udp_ack[1],
2468 enqueue (rc->plugin, udpw); 2692 msg,
2469 notify_session_monitor (s->plugin, 2693 ntohs (msg->size));
2694 enqueue (plugin,
2695 udpw);
2696 notify_session_monitor (plugin,
2470 s, 2697 s,
2471 GNUNET_TRANSPORT_SS_UPDATE); 2698 GNUNET_TRANSPORT_SS_UPDATE);
2472 if (s->address->address_length == sizeof (struct IPv4UdpAddress)) 2699 if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2473 schedule_select_v4 (rc->plugin); 2700 schedule_select_v4 (plugin);
2474 else 2701 else
2475 schedule_select_v6 (rc->plugin); 2702 schedule_select_v6 (plugin);
2476}
2477
2478
2479/**
2480 * Handle an ACK message.
2481 *
2482 * @param plugin the UDP plugin
2483 * @param msg the (presumed) UDP ACK message
2484 * @param udp_addr sender address
2485 * @param udp_addr_len number of bytes in @a udp_addr
2486 */
2487static void
2488read_process_ack (struct Plugin *plugin,
2489 const struct GNUNET_MessageHeader *msg,
2490 const union UdpAddress *udp_addr,
2491 socklen_t udp_addr_len)
2492{
2493 const struct GNUNET_MessageHeader *ack;
2494 const struct UDP_ACK_Message *udp_ack;
2495 struct GNUNET_HELLO_Address *address;
2496 struct Session *s;
2497 struct GNUNET_TIME_Relative flow_delay;
2498
2499 if (ntohs (msg->size)
2500 < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2501 {
2502 GNUNET_break_op(0);
2503 return;
2504 }
2505 udp_ack = (const struct UDP_ACK_Message *) msg;
2506 address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2507 PLUGIN_NAME,
2508 udp_addr,
2509 udp_addr_len,
2510 GNUNET_HELLO_ADDRESS_INFO_NONE);
2511 s = udp_plugin_lookup_session (plugin,
2512 address);
2513 if (NULL == s)
2514 {
2515 LOG (GNUNET_ERROR_TYPE_WARNING,
2516 "UDP session of address %s for ACK not found\n",
2517 udp_address_to_string (plugin,
2518 address->address,
2519 address->address_length));
2520 GNUNET_HELLO_address_free (address);
2521 return;
2522 }
2523 if (NULL == s->frag_ctx)
2524 {
2525 LOG (GNUNET_ERROR_TYPE_WARNING,
2526 "Fragmentation context of address %s for ACK not found\n",
2527 udp_address_to_string (plugin,
2528 address->address,
2529 address->address_length));
2530 GNUNET_HELLO_address_free (address);
2531 return;
2532 }
2533 GNUNET_HELLO_address_free (address);
2534
2535 flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2536 LOG (GNUNET_ERROR_TYPE_DEBUG,
2537 "We received a sending delay of %s for %s\n",
2538 GNUNET_STRINGS_relative_time_to_string (flow_delay,
2539 GNUNET_YES),
2540 GNUNET_i2s (&udp_ack->sender));
2541 s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2542
2543 ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2544 if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2545 {
2546 GNUNET_break_op(0);
2547 return;
2548 }
2549
2550 if (GNUNET_OK !=
2551 GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2552 ack))
2553 {
2554 LOG(GNUNET_ERROR_TYPE_DEBUG,
2555 "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2556 (unsigned int ) ntohs (msg->size),
2557 GNUNET_i2s (&udp_ack->sender),
2558 udp_address_to_string (plugin,
2559 udp_addr,
2560 udp_addr_len));
2561 /* Expect more ACKs to arrive */
2562 return;
2563 }
2564
2565 LOG (GNUNET_ERROR_TYPE_DEBUG,
2566 "Message from %s at %s full ACK'ed\n",
2567 GNUNET_i2s (&udp_ack->sender),
2568 udp_address_to_string (plugin,
2569 udp_addr,
2570 udp_addr_len));
2571
2572 /* Remove fragmented message after successful sending */
2573 fragmented_message_done (s->frag_ctx,
2574 GNUNET_OK);
2575} 2703}
2576 2704
2577 2705
@@ -2625,10 +2753,10 @@ read_process_fragment (struct Plugin *plugin,
2625 &ack_proc); 2753 &ack_proc);
2626 d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, 2754 d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2627 d_ctx, 2755 d_ctx,
2628 (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); 2756 (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2629 LOG (GNUNET_ERROR_TYPE_DEBUG, 2757 LOG (GNUNET_ERROR_TYPE_DEBUG,
2630 "Created new defragmentation context for %u-byte fragment from `%s'\n", 2758 "Created new defragmentation context for %u-byte fragment from `%s'\n",
2631 (unsigned int ) ntohs (msg->size), 2759 (unsigned int) ntohs (msg->size),
2632 udp_address_to_string (plugin, 2760 udp_address_to_string (plugin,
2633 udp_addr, 2761 udp_addr,
2634 udp_addr_len)); 2762 udp_addr_len));
@@ -2637,19 +2765,20 @@ read_process_fragment (struct Plugin *plugin,
2637 { 2765 {
2638 LOG (GNUNET_ERROR_TYPE_DEBUG, 2766 LOG (GNUNET_ERROR_TYPE_DEBUG,
2639 "Found existing defragmentation context for %u-byte fragment from `%s'\n", 2767 "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2640 (unsigned int ) ntohs (msg->size), 2768 (unsigned int) ntohs (msg->size),
2641 udp_address_to_string (plugin, 2769 udp_address_to_string (plugin,
2642 udp_addr, 2770 udp_addr,
2643 udp_addr_len)); 2771 udp_addr_len));
2644 } 2772 }
2645 2773
2646 if (GNUNET_OK == 2774 if (GNUNET_OK ==
2647 GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) 2775 GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2776 msg))
2648 { 2777 {
2649 /* keep this 'rc' from expiring */ 2778 /* keep this 'rc' from expiring */
2650 GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, 2779 GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2651 d_ctx->hnode, 2780 d_ctx->hnode,
2652 (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); 2781 (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2653 } 2782 }
2654 if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) > 2783 if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2655 UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG) 2784 UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
@@ -2659,6 +2788,10 @@ read_process_fragment (struct Plugin *plugin,
2659 GNUNET_assert (NULL != d_ctx); 2788 GNUNET_assert (NULL != d_ctx);
2660 GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag); 2789 GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2661 GNUNET_free (d_ctx); 2790 GNUNET_free (d_ctx);
2791 GNUNET_STATISTICS_update (plugin->env->stats,
2792 "# UDP, Defragmentations aborted",
2793 1,
2794 GNUNET_NO);
2662 } 2795 }
2663} 2796}
2664 2797
@@ -2687,10 +2820,15 @@ udp_select_read (struct Plugin *plugin,
2687 size_t int_addr_len; 2820 size_t int_addr_len;
2688 enum GNUNET_ATS_Network_Type network_type; 2821 enum GNUNET_ATS_Network_Type network_type;
2689 2822
2690 fromlen = sizeof(addr); 2823 fromlen = sizeof (addr);
2691 memset (&addr, 0, sizeof(addr)); 2824 memset (&addr,
2692 size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf), 2825 0,
2693 (struct sockaddr *) &addr, &fromlen); 2826 sizeof(addr));
2827 size = GNUNET_NETWORK_socket_recvfrom (rsock,
2828 buf,
2829 sizeof(buf),
2830 (struct sockaddr *) &addr,
2831 &fromlen);
2694 sa = (const struct sockaddr *) &addr; 2832 sa = (const struct sockaddr *) &addr;
2695#if MINGW 2833#if MINGW
2696 /* On SOCK_DGRAM UDP sockets recvfrom might fail with a 2834 /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
@@ -2703,8 +2841,9 @@ udp_select_read (struct Plugin *plugin,
2703 * error indicates a previous send operation resulted in an ICMP Port 2841 * error indicates a previous send operation resulted in an ICMP Port
2704 * Unreachable message. 2842 * Unreachable message.
2705 */ 2843 */
2706 if ( (-1 == size) && (ECONNRESET == errno) ) 2844 if ( (-1 == size) &&
2707 return; 2845 (ECONNRESET == errno) )
2846 return;
2708#endif 2847#endif
2709 if (-1 == size) 2848 if (-1 == size)
2710 { 2849 {
@@ -2719,10 +2858,11 @@ udp_select_read (struct Plugin *plugin,
2719 LOG (GNUNET_ERROR_TYPE_WARNING, 2858 LOG (GNUNET_ERROR_TYPE_WARNING,
2720 "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n", 2859 "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2721 (unsigned int ) size, 2860 (unsigned int ) size,
2722 GNUNET_a2s (sa, fromlen)); 2861 GNUNET_a2s (sa,
2862 fromlen));
2723 /* _MAY_ be a connection failure (got partial message) */ 2863 /* _MAY_ be a connection failure (got partial message) */
2724 /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */ 2864 /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2725 GNUNET_break_op(0); 2865 GNUNET_break_op (0);
2726 return; 2866 return;
2727 } 2867 }
2728 msg = (const struct GNUNET_MessageHeader *) buf; 2868 msg = (const struct GNUNET_MessageHeader *) buf;
@@ -2743,7 +2883,7 @@ udp_select_read (struct Plugin *plugin,
2743 return; 2883 return;
2744 } 2884 }
2745 GNUNET_STATISTICS_update (plugin->env->stats, 2885 GNUNET_STATISTICS_update (plugin->env->stats,
2746 "# UDP, total, bytes, received", 2886 "# UDP, total bytes received",
2747 size, 2887 size,
2748 GNUNET_NO); 2888 GNUNET_NO);
2749 network_type = plugin->env->get_address_type (plugin->env->cls, 2889 network_type = plugin->env->get_address_type (plugin->env->cls,
@@ -2828,7 +2968,7 @@ static struct UDP_MessageWrapper *
2828remove_timeout_messages_and_select (struct Plugin *plugin, 2968remove_timeout_messages_and_select (struct Plugin *plugin,
2829 struct GNUNET_NETWORK_Handle *sock) 2969 struct GNUNET_NETWORK_Handle *sock)
2830{ 2970{
2831 struct UDP_MessageWrapper *udpw = NULL; 2971 struct UDP_MessageWrapper *udpw;
2832 struct GNUNET_TIME_Relative remaining; 2972 struct GNUNET_TIME_Relative remaining;
2833 struct Session *session; 2973 struct Session *session;
2834 int removed; 2974 int removed;
@@ -2845,95 +2985,26 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
2845 if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us) 2985 if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2846 { 2986 {
2847 /* Message timed out */ 2987 /* Message timed out */
2848 switch (udpw->msg_type) 2988 udpw->qc (udpw->qc_cls,
2849 { 2989 udpw,
2850 case UMT_MSG_UNFRAGMENTED: 2990 GNUNET_SYSERR);
2851 GNUNET_STATISTICS_update (plugin->env->stats, 2991 /* Remove message */
2852 "# UDP, total, bytes, sent, timeout", 2992 removed = GNUNET_YES;
2853 udpw->msg_size, 2993 dequeue (plugin,
2854 GNUNET_NO); 2994 udpw);
2855 GNUNET_STATISTICS_update (plugin->env->stats, 2995 GNUNET_free (udpw);
2856 "# UDP, total, messages, sent, timeout", 2996
2857 1,
2858 GNUNET_NO);
2859 GNUNET_STATISTICS_update (plugin->env->stats,
2860 "# UDP, unfragmented msgs, messages, sent, timeout",
2861 1,
2862 GNUNET_NO);
2863 GNUNET_STATISTICS_update (plugin->env->stats,
2864 "# UDP, unfragmented msgs, bytes, sent, timeout",
2865 udpw->payload_size,
2866 GNUNET_NO);
2867 /* Not fragmented message */
2868 LOG (GNUNET_ERROR_TYPE_DEBUG,
2869 "Message for peer `%s' with size %u timed out\n",
2870 GNUNET_i2s (&udpw->session->target),
2871 udpw->payload_size);
2872 call_continuation (udpw, GNUNET_SYSERR);
2873 /* Remove message */
2874 removed = GNUNET_YES;
2875 dequeue (plugin, udpw);
2876 GNUNET_free(udpw);
2877 break;
2878 case UMT_MSG_FRAGMENTED:
2879 /* Fragmented message */
2880 GNUNET_STATISTICS_update (plugin->env->stats,
2881 "# UDP, total, bytes, sent, timeout",
2882 udpw->frag_ctx->on_wire_size,
2883 GNUNET_NO);
2884 GNUNET_STATISTICS_update (plugin->env->stats,
2885 "# UDP, total, messages, sent, timeout",
2886 1,
2887 GNUNET_NO);
2888 call_continuation (udpw,
2889 GNUNET_SYSERR);
2890 LOG (GNUNET_ERROR_TYPE_DEBUG,
2891 "Fragment for message for peer `%s' with size %u timed out\n",
2892 GNUNET_i2s (&udpw->session->target),
2893 udpw->frag_ctx->payload_size);
2894
2895 GNUNET_STATISTICS_update (plugin->env->stats,
2896 "# UDP, fragmented msgs, messages, sent, timeout",
2897 1,
2898 GNUNET_NO);
2899 GNUNET_STATISTICS_update (plugin->env->stats,
2900 "# UDP, fragmented msgs, bytes, sent, timeout",
2901 udpw->frag_ctx->payload_size,
2902 GNUNET_NO);
2903 /* Remove fragmented message due to timeout */
2904 fragmented_message_done (udpw->frag_ctx,
2905 GNUNET_SYSERR);
2906 break;
2907 case UMT_MSG_ACK:
2908 GNUNET_STATISTICS_update (plugin->env->stats,
2909 "# UDP, total, bytes, sent, timeout",
2910 udpw->msg_size,
2911 GNUNET_NO);
2912 GNUNET_STATISTICS_update (plugin->env->stats,
2913 "# UDP, total, messages, sent, timeout",
2914 1,
2915 GNUNET_NO);
2916 LOG (GNUNET_ERROR_TYPE_DEBUG,
2917 "ACK Message for peer `%s' with size %u timed out\n",
2918 GNUNET_i2s (&udpw->session->target),
2919 udpw->payload_size);
2920 call_continuation (udpw,
2921 GNUNET_SYSERR);
2922 removed = GNUNET_YES;
2923 dequeue (plugin,
2924 udpw);
2925 GNUNET_free (udpw);
2926 break;
2927 default:
2928 break;
2929 }
2930 if (sock == plugin->sockv4) 2997 if (sock == plugin->sockv4)
2998 {
2931 udpw = plugin->ipv4_queue_head; 2999 udpw = plugin->ipv4_queue_head;
3000 }
2932 else if (sock == plugin->sockv6) 3001 else if (sock == plugin->sockv6)
3002 {
2933 udpw = plugin->ipv6_queue_head; 3003 udpw = plugin->ipv6_queue_head;
3004 }
2934 else 3005 else
2935 { 3006 {
2936 GNUNET_break(0); /* should never happen */ 3007 GNUNET_break (0); /* should never happen */
2937 udpw = NULL; 3008 udpw = NULL;
2938 } 3009 }
2939 GNUNET_STATISTICS_update (plugin->env->stats, 3010 GNUNET_STATISTICS_update (plugin->env->stats,
@@ -2945,7 +3016,7 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
2945 { 3016 {
2946 /* Message did not time out, check flow delay */ 3017 /* Message did not time out, check flow delay */
2947 remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer); 3018 remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2948 if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us) 3019 if (0 == remaining.rel_value_us)
2949 { 3020 {
2950 /* this message is not delayed */ 3021 /* this message is not delayed */
2951 LOG (GNUNET_ERROR_TYPE_DEBUG, 3022 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2976,7 +3047,13 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
2976 3047
2977 3048
2978/** 3049/**
2979 * FIXME. 3050 * We failed to transmit a message via UDP. Generate
3051 * a descriptive error message.
3052 *
3053 * @param plugin our plugin
3054 * @param sa target address we were trying to reach
3055 * @param slen number of bytes in @a sa
3056 * @param error the errno value returned from the sendto() call
2980 */ 3057 */
2981static void 3058static void
2982analyze_send_error (struct Plugin *plugin, 3059analyze_send_error (struct Plugin *plugin,
@@ -2986,10 +3063,13 @@ analyze_send_error (struct Plugin *plugin,
2986{ 3063{
2987 enum GNUNET_ATS_Network_Type type; 3064 enum GNUNET_ATS_Network_Type type;
2988 3065
2989 type = plugin->env->get_address_type (plugin->env->cls, sa, slen); 3066 type = plugin->env->get_address_type (plugin->env->cls,
2990 if (((GNUNET_ATS_NET_LAN == type) 3067 sa,
2991 || (GNUNET_ATS_NET_WAN == type)) 3068 slen);
2992 && ((ENETUNREACH == errno)|| (ENETDOWN == errno))) 3069 if ( ( (GNUNET_ATS_NET_LAN == type) ||
3070 (GNUNET_ATS_NET_WAN == type) ) &&
3071 ( (ENETUNREACH == errno) ||
3072 (ENETDOWN == errno) ) )
2993 { 3073 {
2994 if (slen == sizeof (struct sockaddr_in)) 3074 if (slen == sizeof (struct sockaddr_in))
2995 { 3075 {
@@ -3000,7 +3080,8 @@ analyze_send_error (struct Plugin *plugin,
3000 LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, 3080 LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3001 _("UDP could not transmit message to `%s': " 3081 _("UDP could not transmit message to `%s': "
3002 "Network seems down, please check your network configuration\n"), 3082 "Network seems down, please check your network configuration\n"),
3003 GNUNET_a2s (sa, slen)); 3083 GNUNET_a2s (sa,
3084 slen));
3004 } 3085 }
3005 if (slen == sizeof (struct sockaddr_in6)) 3086 if (slen == sizeof (struct sockaddr_in6))
3006 { 3087 {
@@ -3020,7 +3101,9 @@ analyze_send_error (struct Plugin *plugin,
3020 { 3101 {
3021 LOG (GNUNET_ERROR_TYPE_WARNING, 3102 LOG (GNUNET_ERROR_TYPE_WARNING,
3022 "UDP could not transmit message to `%s': `%s'\n", 3103 "UDP could not transmit message to `%s': `%s'\n",
3023 GNUNET_a2s (sa, slen), STRERROR (error)); 3104 GNUNET_a2s (sa,
3105 slen),
3106 STRERROR (error));
3024 } 3107 }
3025} 3108}
3026 3109
@@ -3038,7 +3121,7 @@ udp_select_send (struct Plugin *plugin,
3038{ 3121{
3039 ssize_t sent; 3122 ssize_t sent;
3040 socklen_t slen; 3123 socklen_t slen;
3041 struct sockaddr *a; 3124 const struct sockaddr *a;
3042 const struct IPv4UdpAddress *u4; 3125 const struct IPv4UdpAddress *u4;
3043 struct sockaddr_in a4; 3126 struct sockaddr_in a4;
3044 const struct IPv6UdpAddress *u6; 3127 const struct IPv6UdpAddress *u6;
@@ -3052,35 +3135,39 @@ udp_select_send (struct Plugin *plugin,
3052 if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length) 3135 if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3053 { 3136 {
3054 u4 = udpw->session->address->address; 3137 u4 = udpw->session->address->address;
3055 memset (&a4, 0, sizeof(a4)); 3138 memset (&a4,
3139 0,
3140 sizeof(a4));
3056 a4.sin_family = AF_INET; 3141 a4.sin_family = AF_INET;
3057#if HAVE_SOCKADDR_IN_SIN_LEN 3142#if HAVE_SOCKADDR_IN_SIN_LEN
3058 a4.sin_len = sizeof (a4); 3143 a4.sin_len = sizeof (a4);
3059#endif 3144#endif
3060 a4.sin_port = u4->u4_port; 3145 a4.sin_port = u4->u4_port;
3061 memcpy (&a4.sin_addr, 3146 a4.sin_addr.s_addr = u4->ipv4_addr;
3062 &u4->ipv4_addr, 3147 a = (const struct sockaddr *) &a4;
3063 sizeof(struct in_addr));
3064 a = (struct sockaddr *) &a4;
3065 slen = sizeof (a4); 3148 slen = sizeof (a4);
3066 } 3149 }
3067 else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length) 3150 else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3068 { 3151 {
3069 u6 = udpw->session->address->address; 3152 u6 = udpw->session->address->address;
3070 memset (&a6, 0, sizeof(a6)); 3153 memset (&a6,
3154 0,
3155 sizeof(a6));
3071 a6.sin6_family = AF_INET6; 3156 a6.sin6_family = AF_INET6;
3072#if HAVE_SOCKADDR_IN_SIN_LEN 3157#if HAVE_SOCKADDR_IN_SIN_LEN
3073 a6.sin6_len = sizeof (a6); 3158 a6.sin6_len = sizeof (a6);
3074#endif 3159#endif
3075 a6.sin6_port = u6->u6_port; 3160 a6.sin6_port = u6->u6_port;
3076 memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); 3161 a6.sin6_addr = u6->ipv6_addr;
3077 a = (struct sockaddr *) &a6; 3162 a = (const struct sockaddr *) &a6;
3078 slen = sizeof (a6); 3163 slen = sizeof (a6);
3079 } 3164 }
3080 else 3165 else
3081 { 3166 {
3082 call_continuation (udpw, 3167 GNUNET_break (0);
3083 GNUNET_OK); 3168 udpw->qc (udpw->qc_cls,
3169 udpw,
3170 GNUNET_SYSERR);
3084 dequeue (plugin, 3171 dequeue (plugin,
3085 udpw); 3172 udpw);
3086 notify_session_monitor (plugin, 3173 notify_session_monitor (plugin,
@@ -3101,8 +3188,9 @@ udp_select_send (struct Plugin *plugin,
3101 a, 3188 a,
3102 slen, 3189 slen,
3103 errno); 3190 errno);
3104 call_continuation (udpw, 3191 udpw->qc (udpw->qc_cls,
3105 GNUNET_SYSERR); 3192 udpw,
3193 GNUNET_SYSERR);
3106 GNUNET_STATISTICS_update (plugin->env->stats, 3194 GNUNET_STATISTICS_update (plugin->env->stats,
3107 "# UDP, total, bytes, sent, failure", 3195 "# UDP, total, bytes, sent, failure",
3108 sent, 3196 sent,
@@ -3119,7 +3207,8 @@ udp_select_send (struct Plugin *plugin,
3119 "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n", 3207 "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n",
3120 (unsigned int) (udpw->msg_size), 3208 (unsigned int) (udpw->msg_size),
3121 GNUNET_i2s (&udpw->session->target), 3209 GNUNET_i2s (&udpw->session->target),
3122 GNUNET_a2s (a, slen), 3210 GNUNET_a2s (a,
3211 slen),
3123 (int ) sent, 3212 (int ) sent,
3124 (sent < 0) ? STRERROR (errno) : "ok"); 3213 (sent < 0) ? STRERROR (errno) : "ok");
3125 GNUNET_STATISTICS_update (plugin->env->stats, 3214 GNUNET_STATISTICS_update (plugin->env->stats,
@@ -3132,7 +3221,9 @@ udp_select_send (struct Plugin *plugin,
3132 GNUNET_NO); 3221 GNUNET_NO);
3133 if (NULL != udpw->frag_ctx) 3222 if (NULL != udpw->frag_ctx)
3134 udpw->frag_ctx->on_wire_size += udpw->msg_size; 3223 udpw->frag_ctx->on_wire_size += udpw->msg_size;
3135 call_continuation (udpw, GNUNET_OK); 3224 udpw->qc (udpw->qc_cls,
3225 udpw,
3226 GNUNET_OK);
3136 } 3227 }
3137 dequeue (plugin, 3228 dequeue (plugin,
3138 udpw); 3229 udpw);
@@ -3144,6 +3235,9 @@ udp_select_send (struct Plugin *plugin,
3144} 3235}
3145 3236
3146 3237
3238/* ***************** Event loop (part 2) *************** */
3239
3240
3147/** 3241/**
3148 * We have been notified that our readset has something to read. We don't 3242 * We have been notified that our readset has something to read. We don't
3149 * know which socket needs to be read, so we have to check each one 3243 * know which socket needs to be read, so we have to check each one
@@ -3202,6 +3296,9 @@ udp_plugin_select_v6 (void *cls,
3202} 3296}
3203 3297
3204 3298
3299/* ******************* Initialization *************** */
3300
3301
3205/** 3302/**
3206 * Setup the UDP sockets (for IPv4 and IPv6) for the plugin. 3303 * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3207 * 3304 *
@@ -3219,8 +3316,8 @@ setup_sockets (struct Plugin *plugin,
3219 int sockets_created = 0; 3316 int sockets_created = 0;
3220 struct sockaddr_in6 server_addrv6; 3317 struct sockaddr_in6 server_addrv6;
3221 struct sockaddr_in server_addrv4; 3318 struct sockaddr_in server_addrv4;
3222 struct sockaddr *server_addr; 3319 const struct sockaddr *server_addr;
3223 struct sockaddr *addrs[2]; 3320 const struct sockaddr *addrs[2];
3224 socklen_t addrlens[2]; 3321 socklen_t addrlens[2];
3225 socklen_t addrlen; 3322 socklen_t addrlen;
3226 int eno; 3323 int eno;
@@ -3229,16 +3326,20 @@ setup_sockets (struct Plugin *plugin,
3229 eno = EINVAL; 3326 eno = EINVAL;
3230 if (GNUNET_YES == plugin->enable_ipv6) 3327 if (GNUNET_YES == plugin->enable_ipv6)
3231 { 3328 {
3232 plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0); 3329 plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3330 SOCK_DGRAM,
3331 0);
3233 if (NULL == plugin->sockv6) 3332 if (NULL == plugin->sockv6)
3234 { 3333 {
3235 LOG(GNUNET_ERROR_TYPE_WARNING, 3334 LOG (GNUNET_ERROR_TYPE_INFO,
3236 "Disabling IPv6 since it is not supported on this system!\n"); 3335 _("Disabling IPv6 since it is not supported on this system!\n"));
3237 plugin->enable_ipv6 = GNUNET_NO; 3336 plugin->enable_ipv6 = GNUNET_NO;
3238 } 3337 }
3239 else 3338 else
3240 { 3339 {
3241 memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6)); 3340 memset (&server_addrv6,
3341 0,
3342 sizeof(struct sockaddr_in6));
3242#if HAVE_SOCKADDR_IN_SIN_LEN 3343#if HAVE_SOCKADDR_IN_SIN_LEN
3243 server_addrv6.sin6_len = sizeof (struct sockaddr_in6); 3344 server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3244#endif 3345#endif
@@ -3250,19 +3351,21 @@ setup_sockets (struct Plugin *plugin,
3250 3351
3251 if (0 == plugin->port) /* autodetect */ 3352 if (0 == plugin->port) /* autodetect */
3252 server_addrv6.sin6_port 3353 server_addrv6.sin6_port
3253 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) 3354 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3355 33537)
3254 + 32000); 3356 + 32000);
3255 else 3357 else
3256 server_addrv6.sin6_port = htons (plugin->port); 3358 server_addrv6.sin6_port = htons (plugin->port);
3257 addrlen = sizeof(struct sockaddr_in6); 3359 addrlen = sizeof (struct sockaddr_in6);
3258 server_addr = (struct sockaddr *) &server_addrv6; 3360 server_addr = (const struct sockaddr *) &server_addrv6;
3259 3361
3260 tries = 0; 3362 tries = 0;
3261 while (tries < 10) 3363 while (tries < 10)
3262 { 3364 {
3263 LOG(GNUNET_ERROR_TYPE_DEBUG, 3365 LOG(GNUNET_ERROR_TYPE_DEBUG,
3264 "Binding to IPv6 `%s'\n", 3366 "Binding to IPv6 `%s'\n",
3265 GNUNET_a2s (server_addr, addrlen)); 3367 GNUNET_a2s (server_addr,
3368 addrlen));
3266 /* binding */ 3369 /* binding */
3267 if (GNUNET_OK == 3370 if (GNUNET_OK ==
3268 GNUNET_NETWORK_socket_bind (plugin->sockv6, 3371 GNUNET_NETWORK_socket_bind (plugin->sockv6,
@@ -3277,7 +3380,8 @@ setup_sockets (struct Plugin *plugin,
3277 } 3380 }
3278 /* autodetect */ 3381 /* autodetect */
3279 server_addrv6.sin6_port 3382 server_addrv6.sin6_port
3280 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) 3383 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3384 33537)
3281 + 32000); 3385 + 32000);
3282 tries++; 3386 tries++;
3283 } 3387 }
@@ -3294,17 +3398,19 @@ setup_sockets (struct Plugin *plugin,
3294 if (NULL != plugin->sockv6) 3398 if (NULL != plugin->sockv6)
3295 { 3399 {
3296 LOG (GNUNET_ERROR_TYPE_DEBUG, 3400 LOG (GNUNET_ERROR_TYPE_DEBUG,
3297 "IPv6 socket created on port %s\n", 3401 "IPv6 UDP socket created listinging at %s\n",
3298 GNUNET_a2s (server_addr, addrlen)); 3402 GNUNET_a2s (server_addr,
3299 addrs[sockets_created] = (struct sockaddr *) &server_addrv6; 3403 addrlen));
3300 addrlens[sockets_created] = sizeof(struct sockaddr_in6); 3404 addrs[sockets_created] = server_addr;
3405 addrlens[sockets_created] = addrlen;
3301 sockets_created++; 3406 sockets_created++;
3302 } 3407 }
3303 else 3408 else
3304 { 3409 {
3305 LOG (GNUNET_ERROR_TYPE_ERROR, 3410 LOG (GNUNET_ERROR_TYPE_WARNING,
3306 "Failed to bind UDP socket to %s: %s\n", 3411 _("Failed to bind UDP socket to %s: %s\n"),
3307 GNUNET_a2s (server_addr, addrlen), 3412 GNUNET_a2s (server_addr,
3413 addrlen),
3308 STRERROR (eno)); 3414 STRERROR (eno));
3309 } 3415 }
3310 } 3416 }
@@ -3312,18 +3418,22 @@ setup_sockets (struct Plugin *plugin,
3312 3418
3313 /* Create IPv4 socket */ 3419 /* Create IPv4 socket */
3314 eno = EINVAL; 3420 eno = EINVAL;
3315 plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0); 3421 plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3422 SOCK_DGRAM,
3423 0);
3316 if (NULL == plugin->sockv4) 3424 if (NULL == plugin->sockv4)
3317 { 3425 {
3318 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, 3426 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3319 "socket"); 3427 "socket");
3320 LOG (GNUNET_ERROR_TYPE_WARNING, 3428 LOG (GNUNET_ERROR_TYPE_INFO,
3321 "Disabling IPv4 since it is not supported on this system!\n"); 3429 _("Disabling IPv4 since it is not supported on this system!\n"));
3322 plugin->enable_ipv4 = GNUNET_NO; 3430 plugin->enable_ipv4 = GNUNET_NO;
3323 } 3431 }
3324 else 3432 else
3325 { 3433 {
3326 memset (&server_addrv4, '\0', sizeof(struct sockaddr_in)); 3434 memset (&server_addrv4,
3435 0,
3436 sizeof(struct sockaddr_in));
3327#if HAVE_SOCKADDR_IN_SIN_LEN 3437#if HAVE_SOCKADDR_IN_SIN_LEN
3328 server_addrv4.sin_len = sizeof (struct sockaddr_in); 3438 server_addrv4.sin_len = sizeof (struct sockaddr_in);
3329#endif 3439#endif
@@ -3342,15 +3452,16 @@ setup_sockets (struct Plugin *plugin,
3342 else 3452 else
3343 server_addrv4.sin_port = htons (plugin->port); 3453 server_addrv4.sin_port = htons (plugin->port);
3344 3454
3345 addrlen = sizeof(struct sockaddr_in); 3455 addrlen = sizeof (struct sockaddr_in);
3346 server_addr = (struct sockaddr *) &server_addrv4; 3456 server_addr = (const struct sockaddr *) &server_addrv4;
3347 3457
3348 tries = 0; 3458 tries = 0;
3349 while (tries < 10) 3459 while (tries < 10)
3350 { 3460 {
3351 LOG (GNUNET_ERROR_TYPE_DEBUG, 3461 LOG (GNUNET_ERROR_TYPE_DEBUG,
3352 "Binding to IPv4 `%s'\n", 3462 "Binding to IPv4 `%s'\n",
3353 GNUNET_a2s (server_addr, addrlen)); 3463 GNUNET_a2s (server_addr,
3464 addrlen));
3354 3465
3355 /* binding */ 3466 /* binding */
3356 if (GNUNET_OK == 3467 if (GNUNET_OK ==
@@ -3367,7 +3478,8 @@ setup_sockets (struct Plugin *plugin,
3367 3478
3368 /* autodetect */ 3479 /* autodetect */
3369 server_addrv4.sin_port 3480 server_addrv4.sin_port
3370 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) 3481 = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3482 33537)
3371 + 32000); 3483 + 32000);
3372 tries++; 3484 tries++;
3373 } 3485 }
@@ -3386,16 +3498,18 @@ setup_sockets (struct Plugin *plugin,
3386 { 3498 {
3387 LOG (GNUNET_ERROR_TYPE_DEBUG, 3499 LOG (GNUNET_ERROR_TYPE_DEBUG,
3388 "IPv4 socket created on port %s\n", 3500 "IPv4 socket created on port %s\n",
3389 GNUNET_a2s (server_addr, addrlen)); 3501 GNUNET_a2s (server_addr,
3390 addrs[sockets_created] = (struct sockaddr *) &server_addrv4; 3502 addrlen));
3391 addrlens[sockets_created] = sizeof(struct sockaddr_in); 3503 addrs[sockets_created] = server_addr;
3504 addrlens[sockets_created] = addrlen;
3392 sockets_created++; 3505 sockets_created++;
3393 } 3506 }
3394 else 3507 else
3395 { 3508 {
3396 LOG (GNUNET_ERROR_TYPE_ERROR, 3509 LOG (GNUNET_ERROR_TYPE_ERROR,
3397 _("Failed to bind UDP socket to %s: %s\n"), 3510 _("Failed to bind UDP socket to %s: %s\n"),
3398 GNUNET_a2s (server_addr, addrlen), 3511 GNUNET_a2s (server_addr,
3512 addrlen),
3399 STRERROR (eno)); 3513 STRERROR (eno));
3400 } 3514 }
3401 } 3515 }
@@ -3412,76 +3526,16 @@ setup_sockets (struct Plugin *plugin,
3412 GNUNET_NO, 3526 GNUNET_NO,
3413 plugin->port, 3527 plugin->port,
3414 sockets_created, 3528 sockets_created,
3415 (const struct sockaddr **) addrs, 3529 addrs,
3416 addrlens, 3530 addrlens,
3417 &udp_nat_port_map_callback, 3531 &udp_nat_port_map_callback,
3418 NULL, 3532 NULL,
3419 plugin); 3533 plugin);
3420
3421 return sockets_created; 3534 return sockets_created;
3422} 3535}
3423 3536
3424 3537
3425/** 3538/**
3426 * Return information about the given session to the
3427 * monitor callback.
3428 *
3429 * @param cls the `struct Plugin` with the monitor callback (`sic`)
3430 * @param peer peer we send information about
3431 * @param value our `struct Session` to send information about
3432 * @return #GNUNET_OK (continue to iterate)
3433 */
3434static int
3435send_session_info_iter (void *cls,
3436 const struct GNUNET_PeerIdentity *peer,
3437 void *value)
3438{
3439 struct Plugin *plugin = cls;
3440 struct Session *session = value;
3441
3442 notify_session_monitor (plugin,
3443 session,
3444 GNUNET_TRANSPORT_SS_INIT);
3445 notify_session_monitor (plugin,
3446 session,
3447 GNUNET_TRANSPORT_SS_UP);
3448 return GNUNET_OK;
3449}
3450
3451
3452/**
3453 * Begin monitoring sessions of a plugin. There can only
3454 * be one active monitor per plugin (i.e. if there are
3455 * multiple monitors, the transport service needs to
3456 * multiplex the generated events over all of them).
3457 *
3458 * @param cls closure of the plugin
3459 * @param sic callback to invoke, NULL to disable monitor;
3460 * plugin will being by iterating over all active
3461 * sessions immediately and then enter monitor mode
3462 * @param sic_cls closure for @a sic
3463 */
3464static void
3465udp_plugin_setup_monitor (void *cls,
3466 GNUNET_TRANSPORT_SessionInfoCallback sic,
3467 void *sic_cls)
3468{
3469 struct Plugin *plugin = cls;
3470
3471 plugin->sic = sic;
3472 plugin->sic_cls = sic_cls;
3473 if (NULL != sic)
3474 {
3475 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3476 &send_session_info_iter,
3477 plugin);
3478 /* signal end of first iteration */
3479 sic (sic_cls, NULL, NULL);
3480 }
3481}
3482
3483
3484/**
3485 * The exported method. Makes the core api available via a global and 3539 * The exported method. Makes the core api available via a global and
3486 * returns the udp transport API. 3540 * returns the udp transport API.
3487 * 3541 *
@@ -3502,7 +3556,6 @@ libgnunet_plugin_transport_udp_init (void *cls)
3502 unsigned long long enable_broadcasting_recv; 3556 unsigned long long enable_broadcasting_recv;
3503 char *bind4_address; 3557 char *bind4_address;
3504 char *bind6_address; 3558 char *bind6_address;
3505 char *fancy_interval;
3506 struct GNUNET_TIME_Relative interval; 3559 struct GNUNET_TIME_Relative interval;
3507 struct sockaddr_in server_addrv4; 3560 struct sockaddr_in server_addrv4;
3508 struct sockaddr_in6 server_addrv6; 3561 struct sockaddr_in6 server_addrv6;
@@ -3527,54 +3580,76 @@ libgnunet_plugin_transport_udp_init (void *cls)
3527 if (GNUNET_OK != 3580 if (GNUNET_OK !=
3528 GNUNET_CONFIGURATION_get_value_number (env->cfg, 3581 GNUNET_CONFIGURATION_get_value_number (env->cfg,
3529 "transport-udp", 3582 "transport-udp",
3530 "PORT", &port)) 3583 "PORT",
3584 &port))
3531 port = 2086; 3585 port = 2086;
3586 if (port > 65535)
3587 {
3588 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3589 "transport-udp",
3590 "PORT",
3591 _("must be in [0,65535]"));
3592 return NULL;
3593 }
3532 if (GNUNET_OK != 3594 if (GNUNET_OK !=
3533 GNUNET_CONFIGURATION_get_value_number (env->cfg, 3595 GNUNET_CONFIGURATION_get_value_number (env->cfg,
3534 "transport-udp", 3596 "transport-udp",
3535 "ADVERTISED_PORT", &aport)) 3597 "ADVERTISED_PORT",
3598 &aport))
3536 aport = port; 3599 aport = port;
3537 if (port > 65535) 3600 if (aport > 65535)
3538 { 3601 {
3539 LOG (GNUNET_ERROR_TYPE_WARNING, 3602 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3540 _("Given `%s' option is out of range: %llu > %u\n"), 3603 "transport-udp",
3541 "PORT", port, 3604 "ADVERTISED_PORT",
3542 65535); 3605 _("must be in [0,65535]"));
3543 return NULL; 3606 return NULL;
3544 } 3607 }
3545 3608
3546 /* Protocols */
3547 if (GNUNET_YES == 3609 if (GNUNET_YES ==
3548 GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6")) 3610 GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3611 "nat",
3612 "DISABLEV6"))
3549 enable_v6 = GNUNET_NO; 3613 enable_v6 = GNUNET_NO;
3550 else 3614 else
3551 enable_v6 = GNUNET_YES; 3615 enable_v6 = GNUNET_YES;
3552 3616
3553 /* Addresses */
3554 have_bind4 = GNUNET_NO; 3617 have_bind4 = GNUNET_NO;
3555 memset (&server_addrv4, 0, sizeof(server_addrv4)); 3618 memset (&server_addrv4,
3619 0,
3620 sizeof (server_addrv4));
3556 if (GNUNET_YES == 3621 if (GNUNET_YES ==
3557 GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", 3622 GNUNET_CONFIGURATION_get_value_string (env->cfg,
3558 "BINDTO", &bind4_address)) 3623 "transport-udp",
3624 "BINDTO",
3625 &bind4_address))
3559 { 3626 {
3560 LOG (GNUNET_ERROR_TYPE_DEBUG, 3627 LOG (GNUNET_ERROR_TYPE_DEBUG,
3561 "Binding udp plugin to specific address: `%s'\n", 3628 "Binding UDP plugin to specific address: `%s'\n",
3562 bind4_address); 3629 bind4_address);
3563 if (1 != inet_pton (AF_INET, 3630 if (1 != inet_pton (AF_INET,
3564 bind4_address, 3631 bind4_address,
3565 &server_addrv4.sin_addr)) 3632 &server_addrv4.sin_addr))
3566 { 3633 {
3634 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3635 "transport-udp",
3636 "BINDTO",
3637 _("must be valid IPv4 address"));
3567 GNUNET_free (bind4_address); 3638 GNUNET_free (bind4_address);
3568 return NULL; 3639 return NULL;
3569 } 3640 }
3570 have_bind4 = GNUNET_YES; 3641 have_bind4 = GNUNET_YES;
3571 } 3642 }
3572 GNUNET_free_non_null(bind4_address); 3643 GNUNET_free_non_null (bind4_address);
3573 have_bind6 = GNUNET_NO; 3644 have_bind6 = GNUNET_NO;
3574 memset (&server_addrv6, 0, sizeof(server_addrv6)); 3645 memset (&server_addrv6,
3646 0,
3647 sizeof (server_addrv6));
3575 if (GNUNET_YES == 3648 if (GNUNET_YES ==
3576 GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", 3649 GNUNET_CONFIGURATION_get_value_string (env->cfg,
3577 "BINDTO6", &bind6_address)) 3650 "transport-udp",
3651 "BINDTO6",
3652 &bind6_address))
3578 { 3653 {
3579 LOG (GNUNET_ERROR_TYPE_DEBUG, 3654 LOG (GNUNET_ERROR_TYPE_DEBUG,
3580 "Binding udp plugin to specific address: `%s'\n", 3655 "Binding udp plugin to specific address: `%s'\n",
@@ -3583,9 +3658,10 @@ libgnunet_plugin_transport_udp_init (void *cls)
3583 bind6_address, 3658 bind6_address,
3584 &server_addrv6.sin6_addr)) 3659 &server_addrv6.sin6_addr))
3585 { 3660 {
3586 LOG (GNUNET_ERROR_TYPE_ERROR, 3661 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3587 _("Invalid IPv6 address: `%s'\n"), 3662 "transport-udp",
3588 bind6_address); 3663 "BINDTO6",
3664 _("must be valid IPv6 address"));
3589 GNUNET_free (bind6_address); 3665 GNUNET_free (bind6_address);
3590 return NULL; 3666 return NULL;
3591 } 3667 }
@@ -3593,40 +3669,35 @@ libgnunet_plugin_transport_udp_init (void *cls)
3593 } 3669 }
3594 GNUNET_free_non_null (bind6_address); 3670 GNUNET_free_non_null (bind6_address);
3595 3671
3596 /* Enable neighbour discovery */
3597 enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, 3672 enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3598 "transport-udp", "BROADCAST"); 3673 "transport-udp",
3674 "BROADCAST");
3599 if (enable_broadcasting == GNUNET_SYSERR) 3675 if (enable_broadcasting == GNUNET_SYSERR)
3600 enable_broadcasting = GNUNET_NO; 3676 enable_broadcasting = GNUNET_NO;
3601 3677
3602 enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, 3678 enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3603 "transport-udp", "BROADCAST_RECEIVE"); 3679 "transport-udp",
3680 "BROADCAST_RECEIVE");
3604 if (enable_broadcasting_recv == GNUNET_SYSERR) 3681 if (enable_broadcasting_recv == GNUNET_SYSERR)
3605 enable_broadcasting_recv = GNUNET_YES; 3682 enable_broadcasting_recv = GNUNET_YES;
3606 3683
3607 if (GNUNET_SYSERR == 3684 if (GNUNET_SYSERR ==
3608 GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", 3685 GNUNET_CONFIGURATION_get_value_time (env->cfg,
3609 "BROADCAST_INTERVAL", 3686 "transport-udp",
3610 &fancy_interval)) 3687 "BROADCAST_INTERVAL",
3611 { 3688 &interval))
3612 interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3613 }
3614 else
3615 { 3689 {
3616 if (GNUNET_SYSERR == 3690 interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3617 GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval)) 3691 10);
3618 {
3619 interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
3620 }
3621 GNUNET_free(fancy_interval);
3622 } 3692 }
3623
3624 /* Maximum datarate */
3625 if (GNUNET_OK != 3693 if (GNUNET_OK !=
3626 GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", 3694 GNUNET_CONFIGURATION_get_value_number (env->cfg,
3627 "MAX_BPS", &udp_max_bps)) 3695 "transport-udp",
3696 "MAX_BPS",
3697 &udp_max_bps))
3628 { 3698 {
3629 udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */ 3699 /* 50 MB/s == infinity for practical purposes */
3700 udp_max_bps = 1024 * 1024 * 50;
3630 } 3701 }
3631 3702
3632 p = GNUNET_new (struct Plugin); 3703 p = GNUNET_new (struct Plugin);
@@ -3638,9 +3709,9 @@ libgnunet_plugin_transport_udp_init (void *cls)
3638 p->enable_broadcasting = enable_broadcasting; 3709 p->enable_broadcasting = enable_broadcasting;
3639 p->enable_broadcasting_receiving = enable_broadcasting_recv; 3710 p->enable_broadcasting_receiving = enable_broadcasting_recv;
3640 p->env = env; 3711 p->env = env;
3641 p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); 3712 p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3642 p->defrag_ctxs = GNUNET_CONTAINER_heap_create ( 3713 GNUNET_NO);
3643 GNUNET_CONTAINER_HEAP_ORDER_MIN); 3714 p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3644 p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, 3715 p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3645 p); 3716 p);
3646 GNUNET_BANDWIDTH_tracker_init (&p->tracker, 3717 GNUNET_BANDWIDTH_tracker_init (&p->tracker,
@@ -3648,15 +3719,15 @@ libgnunet_plugin_transport_udp_init (void *cls)
3648 NULL, 3719 NULL,
3649 GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps), 3720 GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3650 30); 3721 30);
3651 LOG(GNUNET_ERROR_TYPE_DEBUG,
3652 "Setting up sockets\n");
3653 res = setup_sockets (p, 3722 res = setup_sockets (p,
3654 (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL, 3723 (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3655 (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL); 3724 (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3656 if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL))) 3725 if ( (0 == res) ||
3726 ( (NULL == p->sockv4) &&
3727 (NULL == p->sockv6) ) )
3657 { 3728 {
3658 LOG (GNUNET_ERROR_TYPE_ERROR, 3729 LOG (GNUNET_ERROR_TYPE_ERROR,
3659 _("Failed to create network sockets, plugin failed\n")); 3730 _("Failed to create UDP network sockets\n"));
3660 GNUNET_CONTAINER_multipeermap_destroy (p->sessions); 3731 GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3661 GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs); 3732 GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3662 GNUNET_SERVER_mst_destroy (p->mst); 3733 GNUNET_SERVER_mst_destroy (p->mst);
@@ -3665,11 +3736,12 @@ libgnunet_plugin_transport_udp_init (void *cls)
3665 } 3736 }
3666 3737
3667 /* Setup broadcasting and receiving beacons */ 3738 /* Setup broadcasting and receiving beacons */
3668 setup_broadcast (p, &server_addrv6, &server_addrv4); 3739 setup_broadcast (p,
3740 &server_addrv6,
3741 &server_addrv4);
3669 3742
3670 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions); 3743 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3671 api->cls = p; 3744 api->cls = p;
3672 api->send = NULL;
3673 api->disconnect_session = &udp_disconnect_session; 3745 api->disconnect_session = &udp_disconnect_session;
3674 api->query_keepalive_factor = &udp_query_keepalive_factor; 3746 api->query_keepalive_factor = &udp_query_keepalive_factor;
3675 api->disconnect_peer = &udp_disconnect; 3747 api->disconnect_peer = &udp_disconnect;
@@ -3724,12 +3796,11 @@ libgnunet_plugin_transport_udp_done (void *cls)
3724 struct GNUNET_TRANSPORT_PluginFunctions *api = cls; 3796 struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3725 struct Plugin *plugin = api->cls; 3797 struct Plugin *plugin = api->cls;
3726 struct PrettyPrinterContext *cur; 3798 struct PrettyPrinterContext *cur;
3727 struct PrettyPrinterContext *next;
3728 struct UDP_MessageWrapper *udpw; 3799 struct UDP_MessageWrapper *udpw;
3729 3800
3730 if (NULL == plugin) 3801 if (NULL == plugin)
3731 { 3802 {
3732 GNUNET_free(api); 3803 GNUNET_free (api);
3733 return NULL; 3804 return NULL;
3734 } 3805 }
3735 stop_broadcast (plugin); 3806 stop_broadcast (plugin);
@@ -3743,25 +3814,17 @@ libgnunet_plugin_transport_udp_done (void *cls)
3743 GNUNET_SCHEDULER_cancel (plugin->select_task_v6); 3814 GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3744 plugin->select_task_v6 = NULL; 3815 plugin->select_task_v6 = NULL;
3745 } 3816 }
3746 3817 if (NULL != plugin->sockv4)
3747 /* Closing sockets */
3748 if (GNUNET_YES == plugin->enable_ipv4)
3749 { 3818 {
3750 if (NULL != plugin->sockv4) 3819 GNUNET_break (GNUNET_OK ==
3751 { 3820 GNUNET_NETWORK_socket_close (plugin->sockv4));
3752 GNUNET_break (GNUNET_OK == 3821 plugin->sockv4 = NULL;
3753 GNUNET_NETWORK_socket_close (plugin->sockv4));
3754 plugin->sockv4 = NULL;
3755 }
3756 } 3822 }
3757 if (GNUNET_YES == plugin->enable_ipv6) 3823 if (NULL != plugin->sockv6)
3758 { 3824 {
3759 if (NULL != plugin->sockv6) 3825 GNUNET_break (GNUNET_OK ==
3760 { 3826 GNUNET_NETWORK_socket_close (plugin->sockv6));
3761 GNUNET_break (GNUNET_OK == 3827 plugin->sockv6 = NULL;
3762 GNUNET_NETWORK_socket_close (plugin->sockv6));
3763 plugin->sockv6 = NULL;
3764 }
3765 } 3828 }
3766 if (NULL != plugin->nat) 3829 if (NULL != plugin->nat)
3767 { 3830 {
@@ -3771,7 +3834,8 @@ libgnunet_plugin_transport_udp_done (void *cls)
3771 if (NULL != plugin->defrag_ctxs) 3834 if (NULL != plugin->defrag_ctxs)
3772 { 3835 {
3773 GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, 3836 GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3774 &heap_cleanup_iterator, NULL); 3837 &heap_cleanup_iterator,
3838 NULL);
3775 GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs); 3839 GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3776 plugin->defrag_ctxs = NULL; 3840 plugin->defrag_ctxs = NULL;
3777 } 3841 }
@@ -3780,39 +3844,32 @@ libgnunet_plugin_transport_udp_done (void *cls)
3780 GNUNET_SERVER_mst_destroy (plugin->mst); 3844 GNUNET_SERVER_mst_destroy (plugin->mst);
3781 plugin->mst = NULL; 3845 plugin->mst = NULL;
3782 } 3846 }
3783 3847 while (NULL != (udpw = plugin->ipv4_queue_head))
3784 /* Clean up leftover messages */
3785 udpw = plugin->ipv4_queue_head;
3786 while (NULL != udpw)
3787 { 3848 {
3788 struct UDP_MessageWrapper *tmp = udpw->next; 3849 dequeue (plugin,
3789 dequeue (plugin, udpw); 3850 udpw);
3790 call_continuation (udpw, GNUNET_SYSERR); 3851 udpw->qc (udpw->qc_cls,
3791 GNUNET_free(udpw); 3852 udpw,
3792 udpw = tmp; 3853 GNUNET_SYSERR);
3854 GNUNET_free (udpw);
3793 } 3855 }
3794 udpw = plugin->ipv6_queue_head; 3856 while (NULL != (udpw = plugin->ipv6_queue_head))
3795 while (NULL != udpw)
3796 { 3857 {
3797 struct UDP_MessageWrapper *tmp = udpw->next; 3858 dequeue (plugin,
3798 dequeue (plugin, udpw); 3859 udpw);
3799 call_continuation (udpw, GNUNET_SYSERR); 3860 udpw->qc (udpw->qc_cls,
3800 GNUNET_free(udpw); 3861 udpw,
3801 udpw = tmp; 3862 GNUNET_SYSERR);
3863 GNUNET_free (udpw);
3802 } 3864 }
3803
3804 /* Clean up sessions */
3805 LOG (GNUNET_ERROR_TYPE_DEBUG,
3806 "Cleaning up sessions\n");
3807 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions, 3865 GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3808 &disconnect_and_free_it, plugin); 3866 &disconnect_and_free_it,
3867 plugin);
3809 GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions); 3868 GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3810 3869
3811 next = plugin->ppc_dll_head; 3870 while (NULL != (cur = plugin->ppc_dll_head))
3812 for (cur = next; NULL != cur; cur = next)
3813 { 3871 {
3814 GNUNET_break(0); 3872 GNUNET_break (0);
3815 next = cur->next;
3816 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head, 3873 GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3817 plugin->ppc_dll_tail, 3874 plugin->ppc_dll_tail,
3818 cur); 3875 cur);
diff --git a/src/transport/test_plugin_transport.c b/src/transport/test_plugin_transport.c
index df7c7b23f..84c9362e9 100644
--- a/src/transport/test_plugin_transport.c
+++ b/src/transport/test_plugin_transport.c
@@ -342,16 +342,16 @@ test_addr_string (void *cls,
342 if (NULL == w->addrstring) 342 if (NULL == w->addrstring)
343 { 343 {
344 GNUNET_break(0); 344 GNUNET_break(0);
345 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 345 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
346 "Plugin cannot convert address to string!\n"); 346 "Plugin cannot convert address to string!\n");
347 end_badly_now (); 347 end_badly_now ();
348 return; 348 return;
349 } 349 }
350 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 350 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
351 "Plugin added address `%s'\n", 351 "Plugin added address `%s'\n",
352 w->addrstring); 352 w->addrstring);
353 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 353 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
354 "Testing address_to_string: OK\n"); 354 "Testing address_to_string: OK\n");
355 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 355 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
356 "Testing: string_to_address \n"); 356 "Testing: string_to_address \n");
357 s2a = NULL; 357 s2a = NULL;
@@ -385,14 +385,16 @@ test_addr_string (void *cls,
385 } 385 }
386 else if (0 != memcmp (s2a, w->address->address, s2a_len)) 386 else if (0 != memcmp (s2a, w->address->address, s2a_len))
387 { 387 {
388 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 388 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
389 "Plugin creates different address length when converting back and forth %i!\n", 389 "Plugin creates different address length when converting back and forth %i!\n",
390 memcmp (s2a, w->address->address, s2a_len)); 390 memcmp (s2a,
391 w->address->address,
392 s2a_len));
391 } 393 }
392 else 394 else
393 { 395 {
394 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 396 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
395 "Testing string_to_address: OK\n"); 397 "Testing string_to_address: OK\n");
396 } 398 }
397 GNUNET_free(s2a); 399 GNUNET_free(s2a);
398 400
@@ -406,10 +408,13 @@ test_addr_string (void *cls,
406 &address_pretty_printer_cb, w); 408 &address_pretty_printer_cb, w);
407 409
408 if (GNUNET_OK != 410 if (GNUNET_OK !=
409 api->check_address (api->cls, w->address->address, w->address->address_length)) 411 api->check_address (api->cls,
412 w->address->address,
413 w->address->address_length))
410 { 414 {
411 GNUNET_break(0); 415 GNUNET_break (0);
412 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Plugin refuses added address!\n"); 416 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
417 "Plugin refuses added address!\n");
413 end_badly_now (); 418 end_badly_now ();
414 return; 419 return;
415 } 420 }