aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_cadet_service.h4
-rw-r--r--src/include/gnunet_mq_lib.h12
-rw-r--r--src/pt/Makefile.am2
-rw-r--r--src/pt/gnunet-daemon-pt.c458
-rw-r--r--src/util/mq.c18
5 files changed, 220 insertions, 274 deletions
diff --git a/src/include/gnunet_cadet_service.h b/src/include/gnunet_cadet_service.h
index 4d13606ef..fd838df8d 100644
--- a/src/include/gnunet_cadet_service.h
+++ b/src/include/gnunet_cadet_service.h
@@ -740,9 +740,9 @@ typedef void
740 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. 740 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
741 * 741 *
742 * @param cls Channel closure. 742 * @param cls Channel closure.
743 * @param channel Connection to the other end (henceforth invalid). 743 * @param channel Connection to the other end --- FIXME: drop?
744 * @param window_size New window size. If the is more messages than buffer size 744 * @param window_size New window size. If the is more messages than buffer size
745 * this value will be negative.. 745 * this value will be negative. -- FIXME: make unsigned, we never call negative?
746 */ 746 */
747typedef void 747typedef void
748(*GNUNET_CADET_WindowSizeEventHandler) (void *cls, 748(*GNUNET_CADET_WindowSizeEventHandler) (void *cls,
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index f9ad6c913..b7f1eecaa 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -473,7 +473,6 @@ struct GNUNET_MQ_MessageHandler
473 */ 473 */
474struct GNUNET_MQ_Envelope * 474struct GNUNET_MQ_Envelope *
475GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, 475GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
476
477 uint16_t size, 476 uint16_t size,
478 uint16_t type); 477 uint16_t type);
479 478
@@ -511,6 +510,17 @@ GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq);
511 510
512 511
513/** 512/**
513 * Function to copy an envelope. The envelope must not yet
514 * be in any queue or have any options or callbacks set.
515 *
516 * @param env envelope to copy
517 * @return copy of @a env
518 */
519struct GNUNET_MQ_Envelope *
520GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env);
521
522
523/**
514 * Function to obtain the last envelope in the queue. 524 * Function to obtain the last envelope in the queue.
515 * 525 *
516 * @param mq message queue to interrogate 526 * @param mq message queue to interrogate
diff --git a/src/pt/Makefile.am b/src/pt/Makefile.am
index e36630ae4..7ea8257d5 100644
--- a/src/pt/Makefile.am
+++ b/src/pt/Makefile.am
@@ -25,7 +25,7 @@ gnunet_daemon_pt_SOURCES = \
25 gnunet-daemon-pt.c 25 gnunet-daemon-pt.c
26gnunet_daemon_pt_LDADD = \ 26gnunet_daemon_pt_LDADD = \
27 $(top_builddir)/src/vpn/libgnunetvpn.la \ 27 $(top_builddir)/src/vpn/libgnunetvpn.la \
28 $(top_builddir)/src/cadet/libgnunetcadet.la \ 28 $(top_builddir)/src/cadet/libgnunetcadetnew.la \
29 $(top_builddir)/src/dht/libgnunetdht.la \ 29 $(top_builddir)/src/dht/libgnunetdht.la \
30 $(top_builddir)/src/dns/libgnunetdns.la \ 30 $(top_builddir)/src/dns/libgnunetdns.la \
31 $(top_builddir)/src/dns/libgnunetdnsparser.la \ 31 $(top_builddir)/src/dns/libgnunetdnsparser.la \
diff --git a/src/pt/gnunet-daemon-pt.c b/src/pt/gnunet-daemon-pt.c
index 06ef88832..97ac8e961 100644
--- a/src/pt/gnunet-daemon-pt.c
+++ b/src/pt/gnunet-daemon-pt.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2010, 2012 Christian Grothoff 3 Copyright (C) 2010, 2012, 2017 Christian Grothoff
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -17,7 +17,6 @@
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA. 18 Boston, MA 02110-1301, USA.
19*/ 19*/
20
21/** 20/**
22 * @file pt/gnunet-daemon-pt.c 21 * @file pt/gnunet-daemon-pt.c
23 * @brief tool to manipulate DNS and VPN services to perform protocol translation (IPvX over GNUnet) 22 * @brief tool to manipulate DNS and VPN services to perform protocol translation (IPvX over GNUnet)
@@ -161,21 +160,6 @@ struct CadetExit
161 struct RequestContext *receive_queue_tail; 160 struct RequestContext *receive_queue_tail;
162 161
163 /** 162 /**
164 * Head of DLL of requests to be transmitted to a cadet_channel.
165 */
166 struct RequestContext *transmit_queue_head;
167
168 /**
169 * Tail of DLL of requests to be transmitted to a cadet_channel.
170 */
171 struct RequestContext *transmit_queue_tail;
172
173 /**
174 * Active transmission request for this channel (or NULL).
175 */
176 struct GNUNET_CADET_TransmitHandle *cadet_th;
177
178 /**
179 * Identity of the peer that is providing the exit for us. 163 * Identity of the peer that is providing the exit for us.
180 */ 164 */
181 struct GNUNET_PeerIdentity peer; 165 struct GNUNET_PeerIdentity peer;
@@ -190,6 +174,11 @@ struct CadetExit
190 */ 174 */
191 unsigned int num_answered; 175 unsigned int num_answered;
192 176
177 /**
178 * Size of the window, 0 if we are busy.
179 */
180 /* unsigned */ int idle;
181
193}; 182};
194 183
195 184
@@ -220,10 +209,9 @@ struct RequestContext
220 struct GNUNET_DNS_RequestHandle *rh; 209 struct GNUNET_DNS_RequestHandle *rh;
221 210
222 /** 211 /**
223 * Message we're sending out via CADET, allocated at the 212 * Envelope with the request we are transmitting.
224 * end of this struct.
225 */ 213 */
226 const struct GNUNET_MessageHeader *cadet_message; 214 struct GNUNET_MQ_Envelope *env;
227 215
228 /** 216 /**
229 * Task used to abort this operation with timeout. 217 * Task used to abort this operation with timeout.
@@ -240,12 +228,6 @@ struct RequestContext
240 */ 228 */
241 uint16_t dns_id; 229 uint16_t dns_id;
242 230
243 /**
244 * #GNUNET_NO if this request is still in the transmit_queue,
245 * #GNUNET_YES if we are in the receive_queue.
246 */
247 int16_t was_transmitted;
248
249}; 231};
250 232
251 233
@@ -328,59 +310,7 @@ static unsigned int dns_exit_available;
328 * We are short on cadet exits, try to open another one. 310 * We are short on cadet exits, try to open another one.
329 */ 311 */
330static void 312static void
331try_open_exit () 313try_open_exit (void);
332{
333 struct CadetExit *pos;
334 uint32_t candidate_count;
335 uint32_t candidate_selected;
336 struct GNUNET_HashCode port;
337
338 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_INTERNET_RESOLVER,
339 strlen (GNUNET_APPLICATION_PORT_INTERNET_RESOLVER),
340 &port);
341 candidate_count = 0;
342 for (pos = exit_head; NULL != pos; pos = pos->next)
343 if (NULL == pos->cadet_channel)
344 candidate_count++;
345 if (0 == candidate_count)
346 {
347 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
348 "No DNS exits available yet.\n");
349 return;
350 }
351 candidate_selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
352 candidate_count);
353 candidate_count = 0;
354 for (pos = exit_head; NULL != pos; pos = pos->next)
355 if (NULL == pos->cadet_channel)
356 {
357 candidate_count++;
358 if (candidate_selected < candidate_count)
359 {
360 /* move to the head of the DLL */
361 pos->cadet_channel
362 = GNUNET_CADET_channel_create (cadet_handle,
363 pos,
364 &pos->peer,
365 &port,
366 GNUNET_CADET_OPTION_DEFAULT);
367 if (NULL == pos->cadet_channel)
368 {
369 GNUNET_break (0);
370 continue;
371 }
372 GNUNET_CONTAINER_DLL_remove (exit_head,
373 exit_tail,
374 pos);
375 GNUNET_CONTAINER_DLL_insert (exit_head,
376 exit_tail,
377 pos);
378 dns_exit_available++;
379 return;
380 }
381 }
382 GNUNET_assert (NULL == exit_head);
383}
384 314
385 315
386/** 316/**
@@ -443,7 +373,7 @@ choose_exit ()
443 channel_weight = get_channel_weight (pos); 373 channel_weight = get_channel_weight (pos);
444 total_transmitted += channel_weight; 374 total_transmitted += channel_weight;
445 /* double weight for idle channels */ 375 /* double weight for idle channels */
446 if (NULL == pos->cadet_th) 376 if (0 != pos->idle)
447 total_transmitted += channel_weight; 377 total_transmitted += channel_weight;
448 } 378 }
449 if (0 == total_transmitted) 379 if (0 == total_transmitted)
@@ -461,7 +391,7 @@ choose_exit ()
461 channel_weight = get_channel_weight (pos); 391 channel_weight = get_channel_weight (pos);
462 total_transmitted += channel_weight; 392 total_transmitted += channel_weight;
463 /* double weight for idle channels */ 393 /* double weight for idle channels */
464 if (NULL == pos->cadet_th) 394 if (0 != pos->idle)
465 total_transmitted += channel_weight; 395 total_transmitted += channel_weight;
466 if (total_transmitted > selected_offset) 396 if (total_transmitted > selected_offset)
467 return pos; 397 return pos;
@@ -768,62 +698,6 @@ dns_post_request_handler (void *cls,
768 698
769 699
770/** 700/**
771 * Transmit a DNS request via CADET and move the request
772 * handle to the receive queue.
773 *
774 * @param cls the `struct CadetExit`
775 * @param size number of bytes available in buf
776 * @param buf where to copy the message
777 * @return number of bytes written to buf
778 */
779static size_t
780transmit_dns_request_to_cadet (void *cls,
781 size_t size,
782 void *buf)
783{
784 struct CadetExit *exit = cls;
785 struct RequestContext *rc;
786 size_t mlen;
787
788 exit->cadet_th = NULL;
789 if (NULL == (rc = exit->transmit_queue_head))
790 return 0;
791 mlen = rc->mlen;
792 if (mlen > size)
793 {
794 exit->cadet_th
795 = GNUNET_CADET_notify_transmit_ready (exit->cadet_channel,
796 GNUNET_NO,
797 TIMEOUT,
798 mlen,
799 &transmit_dns_request_to_cadet,
800 exit);
801 return 0;
802 }
803 GNUNET_assert (GNUNET_NO == rc->was_transmitted);
804 GNUNET_memcpy (buf,
805 rc->cadet_message,
806 mlen);
807 GNUNET_CONTAINER_DLL_remove (exit->transmit_queue_head,
808 exit->transmit_queue_tail,
809 rc);
810 rc->was_transmitted = GNUNET_YES;
811 GNUNET_CONTAINER_DLL_insert (exit->receive_queue_head,
812 exit->receive_queue_tail,
813 rc);
814 rc = exit->transmit_queue_head;
815 if (NULL != rc)
816 exit->cadet_th = GNUNET_CADET_notify_transmit_ready (exit->cadet_channel,
817 GNUNET_NO,
818 TIMEOUT,
819 rc->mlen,
820 &transmit_dns_request_to_cadet,
821 exit);
822 return mlen;
823}
824
825
826/**
827 * Task run if the time to answer a DNS request via CADET is over. 701 * Task run if the time to answer a DNS request via CADET is over.
828 * 702 *
829 * @param cls the `struct RequestContext` to abort 703 * @param cls the `struct RequestContext` to abort
@@ -834,19 +708,6 @@ timeout_request (void *cls)
834 struct RequestContext *rc = cls; 708 struct RequestContext *rc = cls;
835 struct CadetExit *exit = rc->exit; 709 struct CadetExit *exit = rc->exit;
836 710
837 if (rc->was_transmitted)
838 {
839 exit->num_transmitted++;
840 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head,
841 exit->receive_queue_tail,
842 rc);
843 }
844 else
845 {
846 GNUNET_CONTAINER_DLL_remove (exit->transmit_queue_head,
847 exit->transmit_queue_tail,
848 rc);
849 }
850 GNUNET_STATISTICS_update (stats, 711 GNUNET_STATISTICS_update (stats,
851 gettext_noop ("# DNS requests dropped (timeout)"), 712 gettext_noop ("# DNS requests dropped (timeout)"),
852 1, 713 1,
@@ -854,12 +715,10 @@ timeout_request (void *cls)
854 GNUNET_DNS_request_drop (rc->rh); 715 GNUNET_DNS_request_drop (rc->rh);
855 GNUNET_free (rc); 716 GNUNET_free (rc);
856 if ( (0 == get_channel_weight (exit)) && 717 if ( (0 == get_channel_weight (exit)) &&
857 (NULL == exit->receive_queue_head) && 718 (NULL == exit->receive_queue_head) )
858 (NULL == exit->transmit_queue_head) )
859 { 719 {
860 /* this straw broke the camel's back: this channel now has 720 /* this straw broke the camel's back: this channel now has
861 such a low score that it will not be used; close it! */ 721 such a low score that it will not be used; close it! */
862 GNUNET_assert (NULL == exit->cadet_th);
863 GNUNET_CADET_channel_destroy (exit->cadet_channel); 722 GNUNET_CADET_channel_destroy (exit->cadet_channel);
864 exit->cadet_channel = NULL; 723 exit->cadet_channel = NULL;
865 GNUNET_CONTAINER_DLL_remove (exit_head, 724 GNUNET_CONTAINER_DLL_remove (exit_head,
@@ -870,7 +729,7 @@ timeout_request (void *cls)
870 exit); 729 exit);
871 /* go back to semi-innocent: mark as not great, but 730 /* go back to semi-innocent: mark as not great, but
872 avoid a prohibitively negative score (see 731 avoid a prohibitively negative score (see
873 #get_channel_weight, which checks for a certain 732 #get_channel_weight(), which checks for a certain
874 minimum number of transmissions before making 733 minimum number of transmissions before making
875 up an opinion) */ 734 up an opinion) */
876 exit->num_transmitted = 5; 735 exit->num_transmitted = 5;
@@ -900,8 +759,8 @@ dns_pre_request_handler (void *cls,
900 const char *request) 759 const char *request)
901{ 760{
902 struct RequestContext *rc; 761 struct RequestContext *rc;
903 size_t mlen; 762 struct GNUNET_MQ_Envelope *env;
904 struct GNUNET_MessageHeader hdr; 763 struct GNUNET_MessageHeader *hdr;
905 struct GNUNET_TUN_DnsHeader dns; 764 struct GNUNET_TUN_DnsHeader dns;
906 struct CadetExit *exit; 765 struct CadetExit *exit;
907 766
@@ -924,93 +783,116 @@ dns_pre_request_handler (void *cls,
924 GNUNET_DNS_request_drop (rh); 783 GNUNET_DNS_request_drop (rh);
925 return; 784 return;
926 } 785 }
927 GNUNET_memcpy (&dns, request, sizeof (dns));
928 mlen = sizeof (struct GNUNET_MessageHeader) + request_length;
929 exit = choose_exit (); 786 exit = choose_exit ();
930 GNUNET_assert (NULL != exit); 787 GNUNET_assert (NULL != exit);
931 GNUNET_assert (NULL != exit->cadet_channel); 788 GNUNET_assert (NULL != exit->cadet_channel);
932 rc = GNUNET_malloc (sizeof (struct RequestContext) + mlen); 789
790 env = GNUNET_MQ_msg_extra (hdr,
791 request_length,
792 GNUNET_MESSAGE_TYPE_VPN_DNS_TO_INTERNET);
793 GNUNET_memcpy (&hdr[1],
794 request,
795 request_length);
796 rc = GNUNET_new (struct RequestContext);
933 rc->exit = exit; 797 rc->exit = exit;
934 rc->rh = rh; 798 rc->rh = rh;
935 rc->cadet_message = (const struct GNUNET_MessageHeader*) &rc[1];
936 rc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, 799 rc->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
937 &timeout_request, 800 &timeout_request,
938 rc); 801 rc);
802 GNUNET_memcpy (&dns,
803 request,
804 sizeof (dns));
939 rc->dns_id = dns.id; 805 rc->dns_id = dns.id;
940 rc->mlen = mlen; 806 rc->env = env;
941 hdr.type = htons (GNUNET_MESSAGE_TYPE_VPN_DNS_TO_INTERNET); 807 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head,
942 hdr.size = htons (mlen); 808 exit->receive_queue_tail,
943 GNUNET_memcpy (&rc[1], &hdr, sizeof (struct GNUNET_MessageHeader)); 809 rc);
944 GNUNET_memcpy (&(((char*)&rc[1])[sizeof (struct GNUNET_MessageHeader)]), 810 if (0 < exit->idle)
945 request, 811 exit->idle--;
946 request_length); 812 exit->num_transmitted++;
947 GNUNET_CONTAINER_DLL_insert_tail (exit->transmit_queue_head, 813 GNUNET_MQ_send (GNUNET_CADET_get_mq (exit->cadet_channel),
948 exit->transmit_queue_tail, 814 GNUNET_MQ_env_copy (env));
949 rc);
950 if (NULL == exit->cadet_th)
951 exit->cadet_th = GNUNET_CADET_notify_transmit_ready (exit->cadet_channel,
952 GNUNET_NO,
953 TIMEOUT,
954 mlen,
955 &transmit_dns_request_to_cadet,
956 exit);
957} 815}
958 816
959 817
818GNUNET_NETWORK_STRUCT_BEGIN
819
820/**
821 * Message with a DNS response.
822 */
823struct DnsResponseMessage
824{
825 /**
826 * GNUnet header, of type #GNUNET_MESSAGE_TYPE_VPN_DNS_FROM_INTERNET
827 */
828 struct GNUNET_MessageHeader header;
829
830 /**
831 * DNS header.
832 */
833 struct GNUNET_TUN_DnsHeader dns;
834
835 /* Followed by more DNS payload */
836};
837
838GNUNET_NETWORK_STRUCT_END
839
960/** 840/**
961 * Process a request via cadet to perform a DNS query. 841 * Process a request via cadet to perform a DNS query.
962 * 842 *
963 * @param cls NULL 843 * @param cls the `struct CadetExit` which got the message
964 * @param channel connection to the other end 844 * @param msg the actual message
965 * @param channel_ctx pointer to our `struct CadetExit`
966 * @param message the actual message
967 * @return #GNUNET_OK to keep the connection open, 845 * @return #GNUNET_OK to keep the connection open,
968 * #GNUNET_SYSERR to close it (signal serious error) 846 * #GNUNET_SYSERR to close it (signal serious error)
969 */ 847 */
970static int 848static int
971receive_dns_response (void *cls, 849check_dns_response (void *cls,
972 struct GNUNET_CADET_Channel *channel, 850 const struct DnsResponseMessage *msg)
973 void **channel_ctx,
974 const struct GNUNET_MessageHeader *message)
975{ 851{
976 struct CadetExit *exit = *channel_ctx; 852 return GNUNET_OK; /* all OK */
853}
854
855
856/**
857 * Process a request via cadet to perform a DNS query.
858 *
859 * @param cls the `struct CadetExit` which got the message
860 * @param msg the actual message
861 */
862static void
863handle_dns_response (void *cls,
864 const struct DnsResponseMessage *msg)
865{
866 struct CadetExit *exit = cls;
977 struct GNUNET_TUN_DnsHeader dns; 867 struct GNUNET_TUN_DnsHeader dns;
978 size_t mlen; 868 size_t mlen;
979 struct RequestContext *rc; 869 struct RequestContext *rc;
980 870
981 mlen = ntohs (message->size); 871 mlen = ntohs (msg->header.size) - sizeof (*msg);
982 mlen -= sizeof (struct GNUNET_MessageHeader);
983 if (mlen < sizeof (struct GNUNET_TUN_DnsHeader))
984 {
985 GNUNET_break_op (0);
986 return GNUNET_SYSERR;
987 }
988 GNUNET_memcpy (&dns, &message[1], sizeof (dns));
989 for (rc = exit->receive_queue_head; NULL != rc; rc = rc->next) 872 for (rc = exit->receive_queue_head; NULL != rc; rc = rc->next)
990 { 873 {
991 GNUNET_assert (GNUNET_YES == rc->was_transmitted);
992 if (dns.id == rc->dns_id) 874 if (dns.id == rc->dns_id)
993 { 875 {
994 GNUNET_STATISTICS_update (stats, 876 GNUNET_STATISTICS_update (stats,
995 gettext_noop ("# DNS replies received"), 877 gettext_noop ("# DNS replies received"),
996 1, GNUNET_NO); 878 1,
879 GNUNET_NO);
997 GNUNET_DNS_request_answer (rc->rh, 880 GNUNET_DNS_request_answer (rc->rh,
998 mlen, 881 mlen + sizeof (struct GNUNET_TUN_DnsHeader),
999 (const void*) &message[1]); 882 (const void*) &msg->dns);
1000 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head, 883 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head,
1001 exit->receive_queue_tail, 884 exit->receive_queue_tail,
1002 rc); 885 rc);
1003 GNUNET_SCHEDULER_cancel (rc->timeout_task); 886 GNUNET_SCHEDULER_cancel (rc->timeout_task);
887 GNUNET_MQ_discard (rc->env);
1004 GNUNET_free (rc); 888 GNUNET_free (rc);
1005 exit->num_answered++; 889 exit->num_answered++;
1006 exit->num_transmitted++; 890 return;
1007 return GNUNET_OK;
1008 } 891 }
1009 } 892 }
1010 GNUNET_STATISTICS_update (stats, 893 GNUNET_STATISTICS_update (stats,
1011 gettext_noop ("# DNS replies dropped (too late?)"), 894 gettext_noop ("# DNS replies dropped (too late?)"),
1012 1, GNUNET_NO); 895 1, GNUNET_NO);
1013 return GNUNET_OK;
1014} 896}
1015 897
1016 898
@@ -1031,15 +913,7 @@ abort_all_requests (struct CadetExit *exit)
1031 rc); 913 rc);
1032 GNUNET_DNS_request_drop (rc->rh); 914 GNUNET_DNS_request_drop (rc->rh);
1033 GNUNET_SCHEDULER_cancel (rc->timeout_task); 915 GNUNET_SCHEDULER_cancel (rc->timeout_task);
1034 GNUNET_free (rc); 916 GNUNET_MQ_discard (rc->env);
1035 }
1036 while (NULL != (rc = exit->transmit_queue_head))
1037 {
1038 GNUNET_CONTAINER_DLL_remove (exit->transmit_queue_head,
1039 exit->transmit_queue_tail,
1040 rc);
1041 GNUNET_DNS_request_drop (rc->rh);
1042 GNUNET_SCHEDULER_cancel (rc->timeout_task);
1043 GNUNET_free (rc); 917 GNUNET_free (rc);
1044 } 918 }
1045} 919}
@@ -1067,11 +941,6 @@ cleanup (void *cls)
1067 GNUNET_CONTAINER_DLL_remove (exit_head, 941 GNUNET_CONTAINER_DLL_remove (exit_head,
1068 exit_tail, 942 exit_tail,
1069 exit); 943 exit);
1070 if (NULL != exit->cadet_th)
1071 {
1072 GNUNET_CADET_notify_transmit_ready_cancel (exit->cadet_th);
1073 exit->cadet_th = NULL;
1074 }
1075 if (NULL != exit->cadet_channel) 944 if (NULL != exit->cadet_channel)
1076 { 945 {
1077 GNUNET_CADET_channel_destroy (exit->cadet_channel); 946 GNUNET_CADET_channel_destroy (exit->cadet_channel);
@@ -1126,63 +995,120 @@ cleanup (void *cls)
1126 */ 995 */
1127static void 996static void
1128cadet_channel_end_cb (void *cls, 997cadet_channel_end_cb (void *cls,
1129 const struct GNUNET_CADET_Channel *channel, 998 const struct GNUNET_CADET_Channel *channel)
1130 void *channel_ctx)
1131{ 999{
1132 struct CadetExit *exit = channel_ctx; 1000 struct CadetExit *exit = cls;
1133 struct CadetExit *alt; 1001 struct CadetExit *alt;
1134 struct RequestContext *rc; 1002 struct RequestContext *rc;
1135 1003
1136 if (NULL != exit->cadet_th)
1137 {
1138 GNUNET_CADET_notify_transmit_ready_cancel (exit->cadet_th);
1139 exit->cadet_th = NULL;
1140 }
1141 exit->cadet_channel = NULL; 1004 exit->cadet_channel = NULL;
1142 dns_exit_available--; 1005 dns_exit_available--;
1143 /* open alternative channels */ 1006 /* open alternative channels */
1144 try_open_exit (); 1007 /* our channel is now closed, move our requests to an alternative
1145 if (NULL == exit->cadet_channel) 1008 channel */
1009 alt = choose_exit ();
1010 while (NULL != (rc = exit->receive_queue_head))
1146 { 1011 {
1147 /* our channel is now closed, move our requests to an alternative 1012 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head,
1148 channel */ 1013 exit->receive_queue_tail,
1149 alt = choose_exit (); 1014 rc);
1150 while (NULL != (rc = exit->transmit_queue_head)) 1015 rc->exit = alt;
1151 { 1016 GNUNET_CONTAINER_DLL_insert (alt->receive_queue_head,
1152 GNUNET_CONTAINER_DLL_remove (exit->transmit_queue_head, 1017 alt->receive_queue_tail,
1153 exit->transmit_queue_tail, 1018 rc);
1154 rc); 1019 GNUNET_MQ_send (GNUNET_CADET_get_mq (exit->cadet_channel),
1155 rc->exit = alt; 1020 GNUNET_MQ_env_copy (rc->env));
1156 GNUNET_CONTAINER_DLL_insert (alt->transmit_queue_head,
1157 alt->transmit_queue_tail,
1158 rc);
1159 }
1160 while (NULL != (rc = exit->receive_queue_head))
1161 {
1162 GNUNET_CONTAINER_DLL_remove (exit->receive_queue_head,
1163 exit->receive_queue_tail,
1164 rc);
1165 rc->was_transmitted = GNUNET_NO;
1166 rc->exit = alt;
1167 GNUNET_CONTAINER_DLL_insert (alt->transmit_queue_head,
1168 alt->transmit_queue_tail,
1169 rc);
1170 }
1171 } 1021 }
1172 else 1022 try_open_exit ();
1023}
1024
1025
1026/**
1027 * Function called whenever a channel has excess capacity.
1028 *
1029 * @param cls the `struct CadetExit`
1030 * @param channel connection to the other end
1031 * @param window_size how much capacity do we have
1032 */
1033static void
1034channel_idle_notify_cb (void *cls,
1035 const struct GNUNET_CADET_Channel *channel,
1036 int window_size)
1037{
1038 struct CadetExit *pos = cls;
1039
1040 pos->idle = window_size;
1041}
1042
1043
1044/**
1045 * We are short on cadet exits, try to open another one.
1046 */
1047static void
1048try_open_exit ()
1049{
1050 struct CadetExit *pos;
1051 uint32_t candidate_count;
1052 uint32_t candidate_selected;
1053 struct GNUNET_HashCode port;
1054
1055 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_INTERNET_RESOLVER,
1056 strlen (GNUNET_APPLICATION_PORT_INTERNET_RESOLVER),
1057 &port);
1058 candidate_count = 0;
1059 for (pos = exit_head; NULL != pos; pos = pos->next)
1060 if (NULL == pos->cadet_channel)
1061 candidate_count++;
1062 if (0 == candidate_count)
1173 { 1063 {
1174 /* the same peer was chosen, just make sure the queue processing is restarted */ 1064 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1175 alt = exit; 1065 "No DNS exits available yet.\n");
1066 return;
1176 } 1067 }
1177 if ( (NULL == alt->cadet_th) && 1068 candidate_selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1178 (NULL != (rc = alt->transmit_queue_head)) ) 1069 candidate_count);
1179 alt->cadet_th 1070 candidate_count = 0;
1180 = GNUNET_CADET_notify_transmit_ready (alt->cadet_channel, 1071 for (pos = exit_head; NULL != pos; pos = pos->next)
1181 GNUNET_NO, 1072 if (NULL == pos->cadet_channel)
1182 TIMEOUT, 1073 {
1183 rc->mlen, 1074 candidate_count++;
1184 &transmit_dns_request_to_cadet, 1075 if (candidate_selected < candidate_count)
1185 alt); 1076 {
1077 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1078 GNUNET_MQ_hd_var_size (dns_response,
1079 GNUNET_MESSAGE_TYPE_VPN_DNS_FROM_INTERNET,
1080 struct DnsResponseMessage,
1081 pos),
1082 GNUNET_MQ_handler_end ()
1083 };
1084
1085
1086 /* move to the head of the DLL */
1087 pos->cadet_channel
1088 = GNUNET_CADET_channel_creatE (cadet_handle,
1089 pos,
1090 &pos->peer,
1091 &port,
1092 GNUNET_CADET_OPTION_DEFAULT,
1093 &channel_idle_notify_cb,
1094 &cadet_channel_end_cb,
1095 cadet_handlers);
1096 if (NULL == pos->cadet_channel)
1097 {
1098 GNUNET_break (0);
1099 continue;
1100 }
1101 GNUNET_CONTAINER_DLL_remove (exit_head,
1102 exit_tail,
1103 pos);
1104 GNUNET_CONTAINER_DLL_insert (exit_head,
1105 exit_tail,
1106 pos);
1107 dns_exit_available++;
1108 return;
1109 }
1110 }
1111 GNUNET_assert (NULL == exit_head);
1186} 1112}
1187 1113
1188 1114
@@ -1308,11 +1234,6 @@ run (void *cls, char *const *args GNUNET_UNUSED,
1308 } 1234 }
1309 if (dns_channel) 1235 if (dns_channel)
1310 { 1236 {
1311 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1312 {&receive_dns_response, GNUNET_MESSAGE_TYPE_VPN_DNS_FROM_INTERNET, 0},
1313 {NULL, 0, 0}
1314 };
1315
1316 dns_pre_handle 1237 dns_pre_handle
1317 = GNUNET_DNS_connect (cfg, 1238 = GNUNET_DNS_connect (cfg,
1318 GNUNET_DNS_FLAG_PRE_RESOLUTION, 1239 GNUNET_DNS_FLAG_PRE_RESOLUTION,
@@ -1326,10 +1247,7 @@ run (void *cls, char *const *args GNUNET_UNUSED,
1326 GNUNET_SCHEDULER_shutdown (); 1247 GNUNET_SCHEDULER_shutdown ();
1327 return; 1248 return;
1328 } 1249 }
1329 cadet_handle = GNUNET_CADET_connect (cfg, 1250 cadet_handle = GNUNET_CADET_connecT (cfg);
1330 NULL,
1331 &cadet_channel_end_cb,
1332 cadet_handlers);
1333 if (NULL == cadet_handle) 1251 if (NULL == cadet_handle)
1334 { 1252 {
1335 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1253 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
diff --git a/src/util/mq.c b/src/util/mq.c
index fe47f6ab4..e0d7c9f34 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -377,6 +377,24 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
377 377
378 378
379/** 379/**
380 * Function to copy an envelope. The envelope must not yet
381 * be in any queue or have any options or callbacks set.
382 *
383 * @param env envelope to copy
384 * @return copy of @a env
385 */
386struct GNUNET_MQ_Envelope *
387GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
388{
389 GNUNET_assert (NULL == env->next);
390 GNUNET_assert (NULL == env->parent_queue);
391 GNUNET_assert (NULL == env->sent_cb);
392 GNUNET_assert (GNUNET_NO == env->have_custom_options);
393 return GNUNET_MQ_msg_copy (env->mh);
394}
395
396
397/**
380 * Send a copy of a message with the given message queue. 398 * Send a copy of a message with the given message queue.
381 * Can be called repeatedly on the same envelope. 399 * Can be called repeatedly on the same envelope.
382 * 400 *