diff options
author | Christian Grothoff <christian@grothoff.org> | 2018-11-01 15:29:45 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2018-11-01 15:29:51 +0100 |
commit | 43de1e4a084b7d9e773b05f173d516dc573de5c1 (patch) | |
tree | 656ad22a37f648791f7a003323842cce03fe6ee5 /src | |
parent | 11916b980c6f022ef4be5e34eea2a0abdce68b10 (diff) | |
download | gnunet-43de1e4a084b7d9e773b05f173d516dc573de5c1.tar.gz gnunet-43de1e4a084b7d9e773b05f173d516dc573de5c1.zip |
work on TNG
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_protocols.h | 51 | ||||
-rw-r--r-- | src/include/gnunet_transport_communication_service.h | 14 | ||||
-rw-r--r-- | src/transport/transport.h | 257 | ||||
-rw-r--r-- | src/transport/transport_api2_communication.c | 565 |
4 files changed, 860 insertions, 27 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 03b13fd48..4831c9215 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -3005,9 +3005,58 @@ extern "C" | |||
3005 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 | 3005 | #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 |
3006 | 3006 | ||
3007 | 3007 | ||
3008 | /******************************************************* | ||
3009 | NEW (TNG) Transport service | ||
3010 | ******************************************************* */ | ||
3008 | 3011 | ||
3009 | /** | 3012 | /** |
3010 | * Next available: 1200 | 3013 | * @brief inform transport to add an address of this peer |
3014 | */ | ||
3015 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS 1200 | ||
3016 | |||
3017 | /** | ||
3018 | * @brief inform transport to delete an address of this peer | ||
3019 | */ | ||
3020 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS 1201 | ||
3021 | |||
3022 | /** | ||
3023 | * @brief inform transport about an incoming message | ||
3024 | */ | ||
3025 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG 1202 | ||
3026 | |||
3027 | /** | ||
3028 | * @brief transport acknowledges processing an incoming message | ||
3029 | */ | ||
3030 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK 1203 | ||
3031 | |||
3032 | /** | ||
3033 | * @brief inform transport that a queue was setup to talk to some peer | ||
3034 | */ | ||
3035 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP 1204 | ||
3036 | |||
3037 | /** | ||
3038 | * @brief inform transport that a queue was torn down | ||
3039 | */ | ||
3040 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN 1205 | ||
3041 | |||
3042 | /** | ||
3043 | * @brief transport tells communicator it wants a queue | ||
3044 | */ | ||
3045 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE 1206 | ||
3046 | |||
3047 | /** | ||
3048 | * @brief transport tells communicator it wants to transmit | ||
3049 | */ | ||
3050 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG 1207 | ||
3051 | |||
3052 | /** | ||
3053 | * @brief communicator tells transports that message was sent | ||
3054 | */ | ||
3055 | #define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK 1208 | ||
3056 | |||
3057 | |||
3058 | /** | ||
3059 | * Next available: 1300 | ||
3011 | */ | 3060 | */ |
3012 | 3061 | ||
3013 | 3062 | ||
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h index 94d15af22..d93d5134e 100644 --- a/src/include/gnunet_transport_communication_service.h +++ b/src/include/gnunet_transport_communication_service.h | |||
@@ -137,8 +137,8 @@ typedef void | |||
137 | * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was | 137 | * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was |
138 | * immediately dropped due to memory limitations (communicator | 138 | * immediately dropped due to memory limitations (communicator |
139 | * should try to apply back pressure), | 139 | * should try to apply back pressure), |
140 | * #GNUNET_SYSERR if the message is ill formed and communicator | 140 | * #GNUNET_SYSERR if the message could not be delivered because |
141 | * should try to reset stream | 141 | * the tranport service is not yet up |
142 | */ | 142 | */ |
143 | int | 143 | int |
144 | GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, | 144 | GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, |
@@ -162,7 +162,7 @@ struct GNUNET_TRANSPORT_QueueHandle; | |||
162 | * "inbound" connection or because the communicator discovered the | 162 | * "inbound" connection or because the communicator discovered the |
163 | * presence of another peer. | 163 | * presence of another peer. |
164 | * | 164 | * |
165 | * @param handle connection to transport service | 165 | * @param ch connection to transport service |
166 | * @param peer peer with which we can now communicate | 166 | * @param peer peer with which we can now communicate |
167 | * @param address address in human-readable format, 0-terminated, UTF-8 | 167 | * @param address address in human-readable format, 0-terminated, UTF-8 |
168 | * @param nt which network type does the @a address belong to? | 168 | * @param nt which network type does the @a address belong to? |
@@ -170,7 +170,7 @@ struct GNUNET_TRANSPORT_QueueHandle; | |||
170 | * @return API handle identifying the new MQ | 170 | * @return API handle identifying the new MQ |
171 | */ | 171 | */ |
172 | struct GNUNET_TRANSPORT_QueueHandle * | 172 | struct GNUNET_TRANSPORT_QueueHandle * |
173 | GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, | 173 | GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, |
174 | const struct GNUNET_PeerIdentity *peer, | 174 | const struct GNUNET_PeerIdentity *peer, |
175 | const char *address, | 175 | const char *address, |
176 | enum GNUNET_ATS_Network_Type nt, | 176 | enum GNUNET_ATS_Network_Type nt, |
@@ -198,16 +198,16 @@ struct GNUNET_TRANSPORT_AddressIdentifier; | |||
198 | * Notify transport service about an address that this communicator | 198 | * Notify transport service about an address that this communicator |
199 | * provides for this peer. | 199 | * provides for this peer. |
200 | * | 200 | * |
201 | * @param handle connection to transport service | 201 | * @param ch connection to transport service |
202 | * @param address our address in human-readable format, 0-terminated, UTF-8 | 202 | * @param address our address in human-readable format, 0-terminated, UTF-8 |
203 | * @param nt which network type does the address belong to? | 203 | * @param nt which network type does the address belong to? |
204 | * @param expiration when does the communicator forsee this address expiring? | 204 | * @param expiration when does the communicator forsee this address expiring? |
205 | */ | 205 | */ |
206 | struct GNUNET_TRANSPORT_AddressIdentifier * | 206 | struct GNUNET_TRANSPORT_AddressIdentifier * |
207 | GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, | 207 | GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, |
208 | const char *address, | 208 | const char *address, |
209 | enum GNUNET_ATS_Network_Type nt, | 209 | enum GNUNET_ATS_Network_Type nt, |
210 | struct GNUNET_TIME_Absolute expiration); | 210 | struct GNUNET_TIME_Relative expiration); |
211 | 211 | ||
212 | 212 | ||
213 | /** | 213 | /** |
diff --git a/src/transport/transport.h b/src/transport/transport.h index 75726e462..ec373286d 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h | |||
@@ -644,6 +644,263 @@ struct TransportPluginMonitorMessage | |||
644 | }; | 644 | }; |
645 | 645 | ||
646 | 646 | ||
647 | |||
648 | |||
649 | |||
650 | |||
651 | |||
652 | |||
653 | |||
654 | /* *********************** TNG messages ***************** */ | ||
655 | |||
656 | /** | ||
657 | * Add address to the list. | ||
658 | */ | ||
659 | struct GNUNET_TRANSPORT_AddAddressMessage | ||
660 | { | ||
661 | |||
662 | /** | ||
663 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS. | ||
664 | */ | ||
665 | struct GNUNET_MessageHeader header; | ||
666 | |||
667 | /** | ||
668 | * Address identifier (used during deletion). | ||
669 | */ | ||
670 | uint32_t aid GNUNET_PACKED; | ||
671 | |||
672 | /** | ||
673 | * When does the address expire? | ||
674 | */ | ||
675 | struct GNUNET_TIME_RelativeNBO expiration; | ||
676 | |||
677 | /** | ||
678 | * An `enum GNUNET_ATS_Network_Type` in NBO. | ||
679 | */ | ||
680 | uint32_t nt; | ||
681 | |||
682 | /* followed by UTF-8 encoded, 0-terminated human-readable address */ | ||
683 | }; | ||
684 | |||
685 | |||
686 | /** | ||
687 | * Remove address from the list. | ||
688 | */ | ||
689 | struct GNUNET_TRANSPORT_DelAddressMessage | ||
690 | { | ||
691 | |||
692 | /** | ||
693 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS. | ||
694 | */ | ||
695 | struct GNUNET_MessageHeader header; | ||
696 | |||
697 | /** | ||
698 | * Address identifier. | ||
699 | */ | ||
700 | uint32_t aid GNUNET_PACKED; | ||
701 | |||
702 | }; | ||
703 | |||
704 | |||
705 | /** | ||
706 | * Inform transport about an incoming message. | ||
707 | */ | ||
708 | struct GNUNET_TRANSPORT_IncomingMessage | ||
709 | { | ||
710 | |||
711 | /** | ||
712 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG. | ||
713 | */ | ||
714 | struct GNUNET_MessageHeader header; | ||
715 | |||
716 | /** | ||
717 | * Do we use flow control or not? | ||
718 | */ | ||
719 | uint32_t fc_on GNUNET_PACKED; | ||
720 | |||
721 | /** | ||
722 | * 64-bit number to identify the matching ACK. | ||
723 | */ | ||
724 | uint64_t fc_id GNUNET_PACKED; | ||
725 | |||
726 | /** | ||
727 | * Sender identifier. | ||
728 | */ | ||
729 | struct GNUNET_PeerIdentity sender GNUNET_PACKED; | ||
730 | |||
731 | /* followed by the message */ | ||
732 | }; | ||
733 | |||
734 | |||
735 | /** | ||
736 | * Transport informs us about being done with an incoming message. | ||
737 | * (only sent if fc_on was set). | ||
738 | */ | ||
739 | struct GNUNET_TRANSPORT_IncomingMessageAck | ||
740 | { | ||
741 | |||
742 | /** | ||
743 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK. | ||
744 | */ | ||
745 | struct GNUNET_MessageHeader header; | ||
746 | |||
747 | /** | ||
748 | * Reserved (0) | ||
749 | */ | ||
750 | uint32_t reserved GNUNET_PACKED; | ||
751 | |||
752 | /** | ||
753 | * Which message is being ACKed? | ||
754 | */ | ||
755 | uint64_t fc_id GNUNET_PACKED; | ||
756 | |||
757 | /** | ||
758 | * Sender identifier of the original message. | ||
759 | */ | ||
760 | struct GNUNET_PeerIdentity sender GNUNET_PACKED; | ||
761 | |||
762 | }; | ||
763 | |||
764 | |||
765 | /** | ||
766 | * Add queue to the transport | ||
767 | */ | ||
768 | struct GNUNET_TRANSPORT_AddQueueMessage | ||
769 | { | ||
770 | |||
771 | /** | ||
772 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE. | ||
773 | */ | ||
774 | struct GNUNET_MessageHeader header; | ||
775 | |||
776 | /** | ||
777 | * Queue identifier (used to identify the queue). | ||
778 | */ | ||
779 | uint32_t qid GNUNET_PACKED; | ||
780 | |||
781 | /** | ||
782 | * Receiver that can be addressed via the queue. | ||
783 | */ | ||
784 | struct GNUNET_PeerIdentity receiver GNUNET_PACKED; | ||
785 | |||
786 | /** | ||
787 | * An `enum GNUNET_ATS_Network_Type` in NBO. | ||
788 | */ | ||
789 | uint32_t nt; | ||
790 | |||
791 | /* followed by UTF-8 encoded, 0-terminated human-readable address */ | ||
792 | }; | ||
793 | |||
794 | |||
795 | /** | ||
796 | * Remove queue, it is no longer available. | ||
797 | */ | ||
798 | struct GNUNET_TRANSPORT_DelQueueMessage | ||
799 | { | ||
800 | |||
801 | /** | ||
802 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE. | ||
803 | */ | ||
804 | struct GNUNET_MessageHeader header; | ||
805 | |||
806 | /** | ||
807 | * Address identifier. | ||
808 | */ | ||
809 | uint32_t qid GNUNET_PACKED; | ||
810 | |||
811 | /** | ||
812 | * Receiver that can be addressed via the queue. | ||
813 | */ | ||
814 | struct GNUNET_PeerIdentity receiver GNUNET_PACKED; | ||
815 | |||
816 | }; | ||
817 | |||
818 | |||
819 | /** | ||
820 | * Transport tells communicator that it wants a new queue. | ||
821 | */ | ||
822 | struct GNUNET_TRANSPORT_CreateQueue | ||
823 | { | ||
824 | |||
825 | /** | ||
826 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE. | ||
827 | */ | ||
828 | struct GNUNET_MessageHeader header; | ||
829 | |||
830 | /** | ||
831 | * Always zero. | ||
832 | */ | ||
833 | uint32_t reserved GNUNET_PACKED; | ||
834 | |||
835 | /** | ||
836 | * Receiver that can be addressed via the queue. | ||
837 | */ | ||
838 | struct GNUNET_PeerIdentity receiver GNUNET_PACKED; | ||
839 | |||
840 | /* followed by UTF-8 encoded, 0-terminated human-readable address */ | ||
841 | }; | ||
842 | |||
843 | |||
844 | /** | ||
845 | * Inform communicator about transport's desire to send a message. | ||
846 | */ | ||
847 | struct GNUNET_TRANSPORT_SendMessageTo | ||
848 | { | ||
849 | |||
850 | /** | ||
851 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG. | ||
852 | */ | ||
853 | struct GNUNET_MessageHeader header; | ||
854 | |||
855 | /** | ||
856 | * Which queue should we use? | ||
857 | */ | ||
858 | uint32_t qid GNUNET_PACKED; | ||
859 | |||
860 | /** | ||
861 | * Message ID, used for flow control. | ||
862 | */ | ||
863 | uint64_t mid GNUNET_PACKED; | ||
864 | |||
865 | /** | ||
866 | * Receiver identifier. | ||
867 | */ | ||
868 | struct GNUNET_PeerIdentity receiver GNUNET_PACKED; | ||
869 | |||
870 | /* followed by the message */ | ||
871 | }; | ||
872 | |||
873 | |||
874 | /** | ||
875 | * Inform transport that message was sent. | ||
876 | */ | ||
877 | struct GNUNET_TRANSPORT_SendMessageToAck | ||
878 | { | ||
879 | |||
880 | /** | ||
881 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK. | ||
882 | */ | ||
883 | struct GNUNET_MessageHeader header; | ||
884 | |||
885 | /** | ||
886 | * Success (#GNUNET_OK), failure (#GNUNET_SYSERR). | ||
887 | */ | ||
888 | uint32_t status GNUNET_PACKED; | ||
889 | |||
890 | /** | ||
891 | * Message ID of the original message. | ||
892 | */ | ||
893 | uint64_t mid GNUNET_PACKED; | ||
894 | |||
895 | /** | ||
896 | * Receiver identifier. | ||
897 | */ | ||
898 | struct GNUNET_PeerIdentity receiver GNUNET_PACKED; | ||
899 | |||
900 | }; | ||
901 | |||
902 | |||
903 | |||
647 | GNUNET_NETWORK_STRUCT_END | 904 | GNUNET_NETWORK_STRUCT_END |
648 | 905 | ||
649 | /* end of transport.h */ | 906 | /* end of transport.h */ |
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e33c5f444..d446516bd 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c | |||
@@ -29,6 +29,79 @@ | |||
29 | 29 | ||
30 | 30 | ||
31 | /** | 31 | /** |
32 | * How many messages do we keep at most in the queue to the | ||
33 | * transport service before we start to drop (default, | ||
34 | * can be changed via the configuration file). | ||
35 | */ | ||
36 | #define DEFAULT_MAX_QUEUE_LENGTH 16 | ||
37 | |||
38 | |||
39 | /** | ||
40 | * Information we track per packet to enable flow control. | ||
41 | */ | ||
42 | struct FlowControl | ||
43 | { | ||
44 | /** | ||
45 | * Kept in a DLL. | ||
46 | */ | ||
47 | struct FlowControl *next; | ||
48 | |||
49 | /** | ||
50 | * Kept in a DLL. | ||
51 | */ | ||
52 | struct FlowControl *prev; | ||
53 | |||
54 | /** | ||
55 | * Function to call once the message was processed. | ||
56 | */ | ||
57 | GNUNET_TRANSPORT_MessageCompletedCallback cb; | ||
58 | |||
59 | /** | ||
60 | * Closure for @e cb | ||
61 | */ | ||
62 | void *cb_cls; | ||
63 | |||
64 | /** | ||
65 | * Which peer is this about? | ||
66 | */ | ||
67 | struct GNUNET_PeerIdentity sender; | ||
68 | |||
69 | /** | ||
70 | * More-or-less unique ID for the message. | ||
71 | */ | ||
72 | uint64_t id; | ||
73 | }; | ||
74 | |||
75 | |||
76 | /** | ||
77 | * Information we track per message to tell the transport about | ||
78 | * success or failures. | ||
79 | */ | ||
80 | struct AckPending | ||
81 | { | ||
82 | /** | ||
83 | * Kept in a DLL. | ||
84 | */ | ||
85 | struct AckPending *next; | ||
86 | |||
87 | /** | ||
88 | * Kept in a DLL. | ||
89 | */ | ||
90 | struct AckPending *prev; | ||
91 | |||
92 | /** | ||
93 | * Which peer is this about? | ||
94 | */ | ||
95 | struct GNUNET_PeerIdentity receiver; | ||
96 | |||
97 | /** | ||
98 | * More-or-less unique ID for the message. | ||
99 | */ | ||
100 | uint64_t mid; | ||
101 | }; | ||
102 | |||
103 | |||
104 | /** | ||
32 | * Opaque handle to the transport service for communicators. | 105 | * Opaque handle to the transport service for communicators. |
33 | */ | 106 | */ |
34 | struct GNUNET_TRANSPORT_CommunicatorHandle | 107 | struct GNUNET_TRANSPORT_CommunicatorHandle |
@@ -44,6 +117,36 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
44 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; | 117 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; |
45 | 118 | ||
46 | /** | 119 | /** |
120 | * DLL of messages awaiting flow control confirmation (ack). | ||
121 | */ | ||
122 | struct FlowControl *fc_head; | ||
123 | |||
124 | /** | ||
125 | * DLL of messages awaiting flow control confirmation (ack). | ||
126 | */ | ||
127 | struct FlowControl *fc_tail; | ||
128 | |||
129 | /** | ||
130 | * DLL of messages awaiting transmission confirmation (ack). | ||
131 | */ | ||
132 | struct AckPending *ap_head; | ||
133 | |||
134 | /** | ||
135 | * DLL of messages awaiting transmission confirmation (ack). | ||
136 | */ | ||
137 | struct AckPending *ac_tail; | ||
138 | |||
139 | /** | ||
140 | * DLL of queues we offer. | ||
141 | */ | ||
142 | struct QueueHandle *queue_head; | ||
143 | |||
144 | /** | ||
145 | * DLL of queues we offer. | ||
146 | */ | ||
147 | struct QueueHandle *queue_tail; | ||
148 | |||
149 | /** | ||
47 | * Our configuration. | 150 | * Our configuration. |
48 | */ | 151 | */ |
49 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 152 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
@@ -65,6 +168,16 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
65 | void *mq_init_cls; | 168 | void *mq_init_cls; |
66 | 169 | ||
67 | /** | 170 | /** |
171 | * Maximum permissable queue length. | ||
172 | */ | ||
173 | unsigned long long max_queue_length; | ||
174 | |||
175 | /** | ||
176 | * Flow-control identifier generator. | ||
177 | */ | ||
178 | uint64_t fc_gen; | ||
179 | |||
180 | /** | ||
68 | * MTU of the communicator | 181 | * MTU of the communicator |
69 | */ | 182 | */ |
70 | size_t mtu; | 183 | size_t mtu; |
@@ -74,10 +187,53 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
74 | * transport service. | 187 | * transport service. |
75 | */ | 188 | */ |
76 | uint32_t aid_gen; | 189 | uint32_t aid_gen; |
190 | |||
191 | /** | ||
192 | * Queue identifier generator. | ||
193 | */ | ||
194 | uint32_t queue_gen; | ||
77 | 195 | ||
78 | }; | 196 | }; |
79 | 197 | ||
80 | 198 | ||
199 | /** | ||
200 | * Handle returned to identify the internal data structure the transport | ||
201 | * API has created to manage a message queue to a particular peer. | ||
202 | */ | ||
203 | struct GNUNET_TRANSPORT_QueueHandle | ||
204 | { | ||
205 | /** | ||
206 | * Handle this queue belongs to. | ||
207 | */ | ||
208 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
209 | |||
210 | /** | ||
211 | * Which peer we can communciate with. | ||
212 | */ | ||
213 | struct GNUNET_PeerIdentity peer; | ||
214 | |||
215 | /** | ||
216 | * Address used by the communication queue. | ||
217 | */ | ||
218 | char *address; | ||
219 | |||
220 | /** | ||
221 | * Network type of the communciation queue. | ||
222 | */ | ||
223 | enum GNUNET_ATS_Network_Type nt; | ||
224 | |||
225 | /** | ||
226 | * The queue itself. | ||
227 | */ | ||
228 | struct GNUNET_MQ_Handle *mq; | ||
229 | |||
230 | /** | ||
231 | * ID for this queue when talking to the transport service. | ||
232 | */ | ||
233 | uint32_t queue_id; | ||
234 | |||
235 | }; | ||
236 | |||
81 | 237 | ||
82 | /** | 238 | /** |
83 | * Internal representation of an address a communicator is | 239 | * Internal representation of an address a communicator is |
@@ -185,6 +341,100 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | |||
185 | 341 | ||
186 | 342 | ||
187 | /** | 343 | /** |
344 | * Send message to the transport service about queue @a qh | ||
345 | * being now available. | ||
346 | * | ||
347 | * @param qh queue to add | ||
348 | */ | ||
349 | static void | ||
350 | send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
351 | { | ||
352 | struct GNUNET_MQ_Envelope *env; | ||
353 | struct GNUNET_TRANSPORT_AddQueueMessage *aqm; | ||
354 | |||
355 | if (NULL == ai->ch->mq) | ||
356 | return; | ||
357 | env = GNUNET_MQ_msg_extra (aqm, | ||
358 | strlen (ai->address) + 1, | ||
359 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); | ||
360 | aqm.receiver = qh->peer; | ||
361 | aqm.nt = htonl ((uint32_t) qh->nt); | ||
362 | aqm.qid = htonl (qh->qid); | ||
363 | memcpy (&aqm[1], | ||
364 | ai->address, | ||
365 | strlen (ai->address) + 1); | ||
366 | GNUNET_MQ_send (ai->ch->mq, | ||
367 | env); | ||
368 | } | ||
369 | |||
370 | |||
371 | /** | ||
372 | * Send message to the transport service about queue @a qh | ||
373 | * being no longer available. | ||
374 | * | ||
375 | * @param qh queue to delete | ||
376 | */ | ||
377 | static void | ||
378 | send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
379 | { | ||
380 | struct GNUNET_MQ_Envelope *env; | ||
381 | struct GNUNET_TRANSPORT_DelQueueMessage *dqm; | ||
382 | |||
383 | if (NULL == ai->ch->mq) | ||
384 | return; | ||
385 | env = GNUNET_MQ_msg (dqm, | ||
386 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); | ||
387 | dqm.qid = htonl (qh->qid); | ||
388 | dqm.receiver = qh->peer; | ||
389 | GNUNET_MQ_send (ai->ch->mq, | ||
390 | env); | ||
391 | } | ||
392 | |||
393 | |||
394 | /** | ||
395 | * Disconnect from the transport service. Purges | ||
396 | * all flow control entries as we will no longer receive | ||
397 | * the ACKs. Purges the ack pending entries as the | ||
398 | * transport will no longer expect the confirmations. | ||
399 | * | ||
400 | * @param ch service to disconnect from | ||
401 | */ | ||
402 | static void | ||
403 | disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
404 | { | ||
405 | struct FlowControl *fcn; | ||
406 | struct AckPending *apn; | ||
407 | |||
408 | for (struct FlowControl *fc = ch->fc_head; | ||
409 | NULL != fc; | ||
410 | fc = fcn) | ||
411 | { | ||
412 | fcn = fc->next; | ||
413 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, | ||
414 | ch->fc_tail, | ||
415 | fc); | ||
416 | fc->cb (fc->cb_cls, | ||
417 | GNUNET_SYSERR); | ||
418 | GNUNET_free (fc); | ||
419 | } | ||
420 | for (struct AckPending *ap = ch->ap_head; | ||
421 | NULL != ap; | ||
422 | ap = apn) | ||
423 | { | ||
424 | apn = ap->next; | ||
425 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, | ||
426 | ch->ap_tail, | ||
427 | ap); | ||
428 | GNUNET_free (ap); | ||
429 | } | ||
430 | if (NULL == ch->mq) | ||
431 | return; | ||
432 | GNUNET_MQ_destroy (ch->mq); | ||
433 | ch->mq = NULL; | ||
434 | } | ||
435 | |||
436 | |||
437 | /** | ||
188 | * Function called on MQ errors. | 438 | * Function called on MQ errors. |
189 | */ | 439 | */ |
190 | static void | 440 | static void |
@@ -192,15 +442,230 @@ error_handler (void *cls, | |||
192 | enum GNUNET_MQ_Error error) | 442 | enum GNUNET_MQ_Error error) |
193 | { | 443 | { |
194 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | 444 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; |
445 | |||
446 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
447 | "MQ failure, reconnecting to transport service.\n"); | ||
448 | disconnect (ch); | ||
449 | /* TODO: maybe do this with exponential backoff/delay */ | ||
450 | reconnect (ch); | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Transport service acknowledged a message we gave it | ||
456 | * (with flow control enabled). Tell the communicator. | ||
457 | * | ||
458 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
459 | * @param incoming_ack the ack | ||
460 | */ | ||
461 | static void | ||
462 | handle_incoming_ack (void *cls, | ||
463 | struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) | ||
464 | { | ||
465 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
195 | 466 | ||
196 | GNUNET_MQ_destroy (ch->mq); | 467 | for (struct FlowControl *fc = ch->fc_head; |
197 | ch->mq = NULL; | 468 | NULL != fc; |
469 | fc = fc->next) | ||
470 | { | ||
471 | if ( (fc->id == incoming_ack->fc_id) && | ||
472 | (0 == memcmp (&fc->sender, | ||
473 | incoming_ack->sender, | ||
474 | sizeof (struct GNUNET_PeerIdentity))) ) | ||
475 | { | ||
476 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, | ||
477 | ch->fc_tail, | ||
478 | fc); | ||
479 | fc->cb (fc->cb_cls, | ||
480 | GNUNET_OK); | ||
481 | GNUNET_free (fc); | ||
482 | return; | ||
483 | } | ||
484 | } | ||
485 | GNUNET_break (0); | ||
486 | disconnect (ch); | ||
198 | /* TODO: maybe do this with exponential backoff/delay */ | 487 | /* TODO: maybe do this with exponential backoff/delay */ |
199 | reconnect (ch); | 488 | reconnect (ch); |
200 | } | 489 | } |
201 | 490 | ||
202 | 491 | ||
203 | /** | 492 | /** |
493 | * Transport service wants us to create a queue. Check if @a cq | ||
494 | * is well-formed. | ||
495 | * | ||
496 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
497 | * @param cq the queue creation request | ||
498 | * @return #GNUNET_OK if @a smt is well-formed | ||
499 | */ | ||
500 | static int | ||
501 | check_create_queue (void *cls, | ||
502 | struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
503 | { | ||
504 | uint16_t len = ntohs (cq->header.size) - sizeof (*cq); | ||
505 | const char *addr = (const char *) &cq[1]; | ||
506 | |||
507 | if ( (0 == len) || | ||
508 | ('\0' != addr[len-1]) ) | ||
509 | { | ||
510 | GNUNET_break (0); | ||
511 | return GNUNET_SYSERR; | ||
512 | } | ||
513 | return GNUNET_OK; | ||
514 | } | ||
515 | |||
516 | |||
517 | /** | ||
518 | * Transport service wants us to create a queue. Tell the communicator. | ||
519 | * | ||
520 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
521 | * @param cq the queue creation request | ||
522 | */ | ||
523 | static void | ||
524 | handle_create_queue (void *cls, | ||
525 | struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
526 | { | ||
527 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
528 | const char *addr = (const char *) &cq[1]; | ||
529 | |||
530 | if (GNUNET_OK != | ||
531 | ch->mq_init (ch->mq_init_cls, | ||
532 | &cq->receiver, | ||
533 | addr)) | ||
534 | { | ||
535 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
536 | "Address `%s' invalid for this communicator\n", | ||
537 | addr); | ||
538 | // TODO: do we notify the transport!? | ||
539 | } | ||
540 | } | ||
541 | |||
542 | |||
543 | /** | ||
544 | * Transport service wants us to send a message. Check if @a smt | ||
545 | * is well-formed. | ||
546 | * | ||
547 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
548 | * @param smt the transmission request | ||
549 | * @return #GNUNET_OK if @a smt is well-formed | ||
550 | */ | ||
551 | static int | ||
552 | check_send_msg (void *cls, | ||
553 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
554 | { | ||
555 | uint16_t len = ntohs (smt->header.size) - sizeof (*smt); | ||
556 | const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; | ||
557 | |||
558 | if (ntohs (mh->size) != len) | ||
559 | { | ||
560 | GNUNET_break (0); | ||
561 | return GNUNET_SYSERR; | ||
562 | } | ||
563 | return GNUNET_OK; | ||
564 | } | ||
565 | |||
566 | |||
567 | /** | ||
568 | * Notify transport service about @a status of a message with | ||
569 | * @a mid sent to @a receiver. | ||
570 | * | ||
571 | * @param ch handle | ||
572 | * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure | ||
573 | * @param receiver which peer was the receiver | ||
574 | * @param mid message that the ack is about | ||
575 | */ | ||
576 | static void | ||
577 | send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
578 | int status, | ||
579 | const struct GNUNET_PeerIdentity *receiver, | ||
580 | uint64_t mid) | ||
581 | { | ||
582 | struct GNUNET_MQ_Envelope *env; | ||
583 | struct GNUNET_TRANSPORT_SendMessageToAck *ack; | ||
584 | |||
585 | env = GNUNET_MQ_msg (ack, | ||
586 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); | ||
587 | ack->status = htonl (GNUNET_OK); | ||
588 | ack->mid = ap->mid; | ||
589 | ack->receiver = ap->receiver; | ||
590 | GNUNET_MQ_send (ch->mq, | ||
591 | env); | ||
592 | } | ||
593 | |||
594 | |||
595 | /** | ||
596 | * Message queue transmission by communicator was successful, | ||
597 | * notify transport service. | ||
598 | * | ||
599 | * @param cls an `struct AckPending *` | ||
600 | */ | ||
601 | static void | ||
602 | send_ack_cb (void *cls) | ||
603 | { | ||
604 | struct AckPending *ap = cls; | ||
605 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; | ||
606 | |||
607 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, | ||
608 | ch->ap_tail, | ||
609 | ap); | ||
610 | send_ack (ch, | ||
611 | GNUNET_OK, | ||
612 | &ap->receiver, | ||
613 | ap->mid); | ||
614 | GNUNET_free (ap); | ||
615 | } | ||
616 | |||
617 | |||
618 | /** | ||
619 | * Transport service wants us to send a message. Tell the communicator. | ||
620 | * | ||
621 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
622 | * @param smt the transmission request | ||
623 | */ | ||
624 | static void | ||
625 | handle_send_msg (void *cls, | ||
626 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
627 | { | ||
628 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
629 | const struct GNUNET_MessageHeader *mh; | ||
630 | struct GNUNET_MQ_Envelope *env; | ||
631 | struct AckPending *ap; | ||
632 | struct QueueHandle *qh; | ||
633 | |||
634 | for (qh = ch->queue_head;NULL != qh; qh = qh->next) | ||
635 | if ( (qh->queue_id == smt->qid) && | ||
636 | (0 == memcmp (&qh->peer, | ||
637 | &smt->target, | ||
638 | sizeof (struct GNUNET_PeerIdentity))) ) | ||
639 | break; | ||
640 | if (NULL == qh) | ||
641 | { | ||
642 | /* queue is already gone, tell transport this one failed */ | ||
643 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
644 | "Transmission failed, queue no longer exists.\n"); | ||
645 | send_ack (ch, | ||
646 | GNUNET_NO, | ||
647 | &smt->receiver, | ||
648 | smt->mid); | ||
649 | return; | ||
650 | } | ||
651 | ap = GNUNET_new (struct AckPending); | ||
652 | ap->ch = ch; | ||
653 | ap->receiver = smt->receiver; | ||
654 | ap->mid = smt->mid; | ||
655 | GNUNET_CONTAINER_DLL_insert (ch->ap_head, | ||
656 | cp->ap_tail, | ||
657 | ap); | ||
658 | mh = (const struct GNUNET_MessageHeader *) &smt[1]; | ||
659 | env = GNUNET_MQ_msg_copy (mh); | ||
660 | GNUNET_MQ_notify_sent (env, | ||
661 | &send_ack_cb, | ||
662 | ap); | ||
663 | GNUNET_MQ_send (qh->mq, | ||
664 | env); | ||
665 | } | ||
666 | |||
667 | |||
668 | /** | ||
204 | * (re)connect our communicator to the transport service | 669 | * (re)connect our communicator to the transport service |
205 | * | 670 | * |
206 | * @param ch handle to reconnect | 671 | * @param ch handle to reconnect |
@@ -209,6 +674,18 @@ static void | |||
209 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | 674 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) |
210 | { | 675 | { |
211 | struct GNUNET_MQ_MessageHandler handlers[] = { | 676 | struct GNUNET_MQ_MessageHandler handlers[] = { |
677 | GNUNET_MQ_hd_fixed_size (incoming_ack, | ||
678 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, | ||
679 | struct GNUNET_TRANSPORT_IncomingMessageAck, | ||
680 | ch), | ||
681 | GNUNET_MQ_hd_var_size (create_queue, | ||
682 | GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, | ||
683 | struct GNUNET_TRANSPORT_CreateQueue, | ||
684 | ch), | ||
685 | GNUNET_MQ_hd_var_size (send_msg, | ||
686 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, | ||
687 | struct GNUNET_TRANSPORT_SendMessageTo, | ||
688 | ch), | ||
212 | GNUNET_MQ_handler_end() | 689 | GNUNET_MQ_handler_end() |
213 | }; | 690 | }; |
214 | 691 | ||
@@ -217,10 +694,14 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | |||
217 | handlers, | 694 | handlers, |
218 | &error_handler, | 695 | &error_handler, |
219 | ch); | 696 | ch); |
220 | for (struct GNUNET_TRANSPORT_AddressIdentifier ai = ch->ai_head; | 697 | for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; |
221 | NULL != ai; | 698 | NULL != ai; |
222 | ai = ai->next) | 699 | ai = ai->next) |
223 | send_add_address (ai); | 700 | send_add_address (ai); |
701 | for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; | ||
702 | NULL != qh; | ||
703 | qh = qh->next) | ||
704 | send_add_queue (qh); | ||
224 | } | 705 | } |
225 | 706 | ||
226 | 707 | ||
@@ -253,6 +734,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle | |||
253 | ch->mq_init = mq_init; | 734 | ch->mq_init = mq_init; |
254 | ch->mq_init_cls = mq_init_cls; | 735 | ch->mq_init_cls = mq_init_cls; |
255 | reconnect (ch); | 736 | reconnect (ch); |
737 | if (GNUNET_OK != | ||
738 | GNUNET_CONFIGURATION_get_value_number (cfg, | ||
739 | name, | ||
740 | "MAX_QUEUE_LENGTH", | ||
741 | &ch->max_queue_length)) | ||
742 | ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | ||
256 | if (NULL == ch->mq) | 743 | if (NULL == ch->mq) |
257 | { | 744 | { |
258 | GNUNET_free (ch); | 745 | GNUNET_free (ch); |
@@ -270,12 +757,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle | |||
270 | void | 757 | void |
271 | GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | 758 | GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) |
272 | { | 759 | { |
760 | disconnect (ch); | ||
273 | while (NULL != ch->ai_head) | 761 | while (NULL != ch->ai_head) |
274 | { | 762 | { |
275 | GNUNET_break (0); /* communicator forgot to remove address, warn! */ | 763 | GNUNET_break (0); /* communicator forgot to remove address, warn! */ |
276 | GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); | 764 | GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); |
277 | } | 765 | } |
278 | GNUNET_MQ_destroy (ch->mq); | ||
279 | GNUNET_free (ch); | 766 | GNUNET_free (ch); |
280 | } | 767 | } |
281 | 768 | ||
@@ -297,8 +784,8 @@ GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHa | |||
297 | * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was | 784 | * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was |
298 | * immediately dropped due to memory limitations (communicator | 785 | * immediately dropped due to memory limitations (communicator |
299 | * should try to apply back pressure), | 786 | * should try to apply back pressure), |
300 | * #GNUNET_SYSERR if the message is ill formed and communicator | 787 | * #GNUNET_SYSERR if the message could not be delivered because |
301 | * should try to reset stream | 788 | * the tranport service is not yet up |
302 | */ | 789 | */ |
303 | int | 790 | int |
304 | GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | 791 | GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, |
@@ -312,7 +799,33 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl | |||
312 | uint16_t msize; | 799 | uint16_t msize; |
313 | 800 | ||
314 | if (NULL == ai->ch->mq) | 801 | if (NULL == ai->ch->mq) |
315 | return; | 802 | return GNUNET_SYSERR; |
803 | if (NULL != cb) | ||
804 | { | ||
805 | struct FlowControl *fc; | ||
806 | |||
807 | im->fc_on = htonl (GNUNET_YES); | ||
808 | im->fc_id = ai->ch->fc_gen++; | ||
809 | fc = GNUNET_new (struct FlowControl); | ||
810 | fc->sender = *sender; | ||
811 | fc->id = im->fc_id; | ||
812 | fc->cb = cb; | ||
813 | fc->cb_cls = cb_cls; | ||
814 | GNUNET_CONTAINER_DLL_insert (ch->fc_head, | ||
815 | ch->fc_tail, | ||
816 | fc); | ||
817 | } | ||
818 | else | ||
819 | { | ||
820 | if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) | ||
821 | { | ||
822 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
823 | "Dropping message: transprot is too slow, queue length %u exceeded\n", | ||
824 | ch->max_queue_length); | ||
825 | return GNUNET_NO; | ||
826 | } | ||
827 | } | ||
828 | |||
316 | msize = ntohs (msg->size); | 829 | msize = ntohs (msg->size); |
317 | env = GNUNET_MQ_msg_extra (im, | 830 | env = GNUNET_MQ_msg_extra (im, |
318 | msize, | 831 | msize, |
@@ -320,7 +833,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl | |||
320 | if (NULL == env) | 833 | if (NULL == env) |
321 | { | 834 | { |
322 | GNUNET_break (0); | 835 | GNUNET_break (0); |
323 | return; | 836 | return GNUNET_SYSERR; |
324 | } | 837 | } |
325 | im->sender = *sender; | 838 | im->sender = *sender; |
326 | memcpy (&im[1], | 839 | memcpy (&im[1], |
@@ -328,19 +841,12 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl | |||
328 | msize); | 841 | msize); |
329 | GNUNET_MQ_send (ai->ch->mq, | 842 | GNUNET_MQ_send (ai->ch->mq, |
330 | env); | 843 | env); |
844 | return GNUNET_OK; | ||
331 | } | 845 | } |
332 | 846 | ||
333 | 847 | ||
334 | /* ************************* Discovery *************************** */ | 848 | /* ************************* Discovery *************************** */ |
335 | 849 | ||
336 | /** | ||
337 | * Handle returned to identify the internal data structure the transport | ||
338 | * API has created to manage a message queue to a particular peer. | ||
339 | */ | ||
340 | struct GNUNET_TRANSPORT_QueueHandle | ||
341 | { | ||
342 | }; | ||
343 | |||
344 | 850 | ||
345 | /** | 851 | /** |
346 | * Notify transport service that an MQ became available due to an | 852 | * Notify transport service that an MQ became available due to an |
@@ -361,6 +867,20 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle | |||
361 | enum GNUNET_ATS_Network_Type nt, | 867 | enum GNUNET_ATS_Network_Type nt, |
362 | struct GNUNET_MQ_Handle *mq) | 868 | struct GNUNET_MQ_Handle *mq) |
363 | { | 869 | { |
870 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
871 | |||
872 | qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle); | ||
873 | qh->ch = ch; | ||
874 | qh->peer = *peer; | ||
875 | qh->address = GNUNET_strdup (address); | ||
876 | qh->nt = nt; | ||
877 | qh->mq = mq; | ||
878 | qh->queue_id = ch->queue_gen++; | ||
879 | GNUNET_CONTAINER_DLL_insert (ch->queue_head, | ||
880 | ch->queue_tail, | ||
881 | qh); | ||
882 | send_add_queue (qh); | ||
883 | return qh; | ||
364 | } | 884 | } |
365 | 885 | ||
366 | 886 | ||
@@ -373,11 +893,18 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle | |||
373 | void | 893 | void |
374 | GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) | 894 | GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) |
375 | { | 895 | { |
896 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; | ||
897 | |||
898 | send_del_queue (qh); | ||
899 | GNUNET_CONTAINER_DLL_remove (ch->queue_head, | ||
900 | ch->queue_tail, | ||
901 | qh); | ||
902 | GNUNET_MQ_destroy (qh->mq); | ||
903 | GNUNET_free (qh->address); | ||
904 | GNUNET_free (qh); | ||
376 | } | 905 | } |
377 | 906 | ||
378 | 907 | ||
379 | |||
380 | |||
381 | /** | 908 | /** |
382 | * Notify transport service about an address that this communicator | 909 | * Notify transport service about an address that this communicator |
383 | * provides for this peer. | 910 | * provides for this peer. |
@@ -421,10 +948,10 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde | |||
421 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; | 948 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; |
422 | 949 | ||
423 | send_del_address (ai); | 950 | send_del_address (ai); |
424 | GNUNET_free (ai->address); | ||
425 | GNUNET_CONTAINER_DLL_remove (ch->ai_head, | 951 | GNUNET_CONTAINER_DLL_remove (ch->ai_head, |
426 | ch->ai_tail, | 952 | ch->ai_tail, |
427 | ai); | 953 | ai); |
954 | GNUNET_free (ai->address); | ||
428 | GNUNET_free (ai); | 955 | GNUNET_free (ai); |
429 | } | 956 | } |
430 | 957 | ||