diff options
-rw-r--r-- | src/transport/plugin_transport_udp_new.c | 455 | ||||
-rw-r--r-- | src/transport/plugin_transport_udp_new.h | 17 |
2 files changed, 432 insertions, 40 deletions
diff --git a/src/transport/plugin_transport_udp_new.c b/src/transport/plugin_transport_udp_new.c index 02a2fac0e..bb894a8f9 100644 --- a/src/transport/plugin_transport_udp_new.c +++ b/src/transport/plugin_transport_udp_new.c | |||
@@ -44,6 +44,27 @@ | |||
44 | 44 | ||
45 | 45 | ||
46 | /** | 46 | /** |
47 | * Number of messages we can defragment in parallel. We only really | ||
48 | * defragment 1 message at a time, but if messages get re-ordered, we | ||
49 | * may want to keep knowledge about the previous message to avoid | ||
50 | * discarding the current message in favor of a single fragment of a | ||
51 | * previous message. 3 should be good since we don't expect massive | ||
52 | * message reorderings with UDP. | ||
53 | */ | ||
54 | #define UDP_MAX_MESSAGES_IN_DEFRAG 3 | ||
55 | |||
56 | /** | ||
57 | * We keep a defragmentation queue per sender address. How many | ||
58 | * sender addresses do we support at the same time? Memory consumption | ||
59 | * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this | ||
60 | * value. (So 128 corresponds to 12 MB and should suffice for | ||
61 | * connecting to roughly 128 peers via UDP). | ||
62 | */ | ||
63 | #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128 | ||
64 | |||
65 | |||
66 | |||
67 | /** | ||
47 | * Closure for 'append_port'. | 68 | * Closure for 'append_port'. |
48 | */ | 69 | */ |
49 | struct PrettyPrinterContext | 70 | struct PrettyPrinterContext |
@@ -82,6 +103,9 @@ struct Session | |||
82 | * Desired delay for next sending we received from other peer | 103 | * Desired delay for next sending we received from other peer |
83 | */ | 104 | */ |
84 | struct GNUNET_TIME_Absolute flow_delay_from_other_peer; | 105 | struct GNUNET_TIME_Absolute flow_delay_from_other_peer; |
106 | |||
107 | struct FragmentationContext * head; | ||
108 | struct FragmentationContext * tail; | ||
85 | }; | 109 | }; |
86 | 110 | ||
87 | 111 | ||
@@ -117,6 +141,142 @@ struct SourceInformation | |||
117 | 141 | ||
118 | 142 | ||
119 | /** | 143 | /** |
144 | * Closure for 'find_receive_context'. | ||
145 | */ | ||
146 | struct FindReceiveContext | ||
147 | { | ||
148 | /** | ||
149 | * Where to store the result. | ||
150 | */ | ||
151 | struct ReceiveContext *rc; | ||
152 | |||
153 | /** | ||
154 | * Address to find. | ||
155 | */ | ||
156 | const struct sockaddr *addr; | ||
157 | |||
158 | /** | ||
159 | * Number of bytes in 'addr'. | ||
160 | */ | ||
161 | socklen_t addr_len; | ||
162 | |||
163 | struct Session *session; | ||
164 | }; | ||
165 | |||
166 | |||
167 | |||
168 | /** | ||
169 | * Data structure to track defragmentation contexts based | ||
170 | * on the source of the UDP traffic. | ||
171 | */ | ||
172 | struct ReceiveContext | ||
173 | { | ||
174 | |||
175 | /** | ||
176 | * Defragmentation context. | ||
177 | */ | ||
178 | struct GNUNET_DEFRAGMENT_Context *defrag; | ||
179 | |||
180 | /** | ||
181 | * Source address this receive context is for (allocated at the | ||
182 | * end of the struct). | ||
183 | */ | ||
184 | const struct sockaddr *src_addr; | ||
185 | |||
186 | /** | ||
187 | * Reference to master plugin struct. | ||
188 | */ | ||
189 | struct Plugin *plugin; | ||
190 | |||
191 | /** | ||
192 | * Node in the defrag heap. | ||
193 | */ | ||
194 | struct GNUNET_CONTAINER_HeapNode *hnode; | ||
195 | |||
196 | /** | ||
197 | * Length of 'src_addr' | ||
198 | */ | ||
199 | size_t addr_len; | ||
200 | |||
201 | struct GNUNET_PeerIdentity id; | ||
202 | |||
203 | }; | ||
204 | |||
205 | |||
206 | |||
207 | /** | ||
208 | * Closure for 'process_inbound_tokenized_messages' | ||
209 | */ | ||
210 | struct FragmentationContext | ||
211 | { | ||
212 | struct FragmentationContext * next; | ||
213 | struct FragmentationContext * prev; | ||
214 | |||
215 | struct Plugin * plugin; | ||
216 | struct GNUNET_FRAGMENT_Context * frag; | ||
217 | struct Session * session; | ||
218 | |||
219 | /** | ||
220 | * Function to call upon completion of the transmission. | ||
221 | */ | ||
222 | GNUNET_TRANSPORT_TransmitContinuation cont; | ||
223 | |||
224 | /** | ||
225 | * Closure for 'cont'. | ||
226 | */ | ||
227 | void *cont_cls; | ||
228 | |||
229 | size_t bytes_to_send; | ||
230 | }; | ||
231 | |||
232 | |||
233 | struct UDPMessageWrapper | ||
234 | { | ||
235 | struct Session *session; | ||
236 | struct UDPMessageWrapper *prev; | ||
237 | struct UDPMessageWrapper *next; | ||
238 | struct UDPMessage *udp; | ||
239 | size_t msg_size; | ||
240 | /** | ||
241 | * Function to call upon completion of the transmission. | ||
242 | */ | ||
243 | GNUNET_TRANSPORT_TransmitContinuation cont; | ||
244 | |||
245 | /** | ||
246 | * Closure for 'cont'. | ||
247 | */ | ||
248 | void *cont_cls; | ||
249 | |||
250 | struct FragmentationContext *frag; | ||
251 | |||
252 | }; | ||
253 | |||
254 | |||
255 | /** | ||
256 | * UDP ACK Message-Packet header (after defragmentation). | ||
257 | */ | ||
258 | struct UDP_ACK_Message | ||
259 | { | ||
260 | /** | ||
261 | * Message header. | ||
262 | */ | ||
263 | struct GNUNET_MessageHeader header; | ||
264 | |||
265 | /** | ||
266 | * Desired delay for flow control | ||
267 | */ | ||
268 | uint32_t delay; | ||
269 | |||
270 | /** | ||
271 | * What is the identity of the sender | ||
272 | */ | ||
273 | struct GNUNET_PeerIdentity sender; | ||
274 | |||
275 | }; | ||
276 | |||
277 | |||
278 | |||
279 | /** | ||
120 | * Function called for a quick conversion of the binary address to | 280 | * Function called for a quick conversion of the binary address to |
121 | * a numeric address. Note that the caller must not free the | 281 | * a numeric address. Note that the caller must not free the |
122 | * address and that the next call to this function is allowed | 282 | * address and that the next call to this function is allowed |
@@ -364,6 +524,14 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) | |||
364 | GNUNET_i2s (&s->target), | 524 | GNUNET_i2s (&s->target), |
365 | GNUNET_a2s (s->sock_addr, s->addrlen)); | 525 | GNUNET_a2s (s->sock_addr, s->addrlen)); |
366 | #endif | 526 | #endif |
527 | struct FragmentationContext *fctx = s->head; | ||
528 | while (fctx != NULL) | ||
529 | { | ||
530 | GNUNET_FRAGMENT_context_destroy(fctx->frag); | ||
531 | GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx); | ||
532 | GNUNET_free (fctx); | ||
533 | fctx = s->head; | ||
534 | } | ||
367 | 535 | ||
368 | plugin->env->session_end (plugin->env->cls, &s->target, s); | 536 | plugin->env->session_end (plugin->env->cls, &s->target, s); |
369 | 537 | ||
@@ -576,6 +744,43 @@ udp_plugin_get_session (void *cls, | |||
576 | return s; | 744 | return s; |
577 | } | 745 | } |
578 | 746 | ||
747 | /** | ||
748 | * Function that is called with messages created by the fragmentation | ||
749 | * module. In the case of the 'proc' callback of the | ||
750 | * GNUNET_FRAGMENT_context_create function, this function must | ||
751 | * eventually call 'GNUNET_FRAGMENT_context_transmission_done'. | ||
752 | * | ||
753 | * @param cls closure, the 'struct FragmentationContext' | ||
754 | * @param msg the message that was created | ||
755 | */ | ||
756 | static void | ||
757 | enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) | ||
758 | { | ||
759 | struct FragmentationContext *frag_ctx = cls; | ||
760 | struct Plugin *plugin = frag_ctx->plugin; | ||
761 | struct UDPMessageWrapper * udpw; | ||
762 | |||
763 | size_t msg_len = ntohs (msg->size); | ||
764 | |||
765 | #if DEBUG_UDP | ||
766 | #endif | ||
767 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); | ||
768 | |||
769 | |||
770 | udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len); | ||
771 | udpw->session = frag_ctx->session; | ||
772 | udpw->udp = (struct UDPMessage *) &udpw[1]; | ||
773 | |||
774 | udpw->msg_size = msg_len; | ||
775 | udpw->cont = frag_ctx->cont; | ||
776 | udpw->cont_cls = frag_ctx->cont_cls; | ||
777 | udpw->frag = frag_ctx; | ||
778 | |||
779 | memcpy (udpw->udp, msg, msg_len); | ||
780 | |||
781 | GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); | ||
782 | } | ||
783 | |||
579 | 784 | ||
580 | /** | 785 | /** |
581 | * Function that can be used by the transport service to transmit | 786 | * Function that can be used by the transport service to transmit |
@@ -613,12 +818,12 @@ udp_plugin_send (void *cls, | |||
613 | GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) | 818 | GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) |
614 | { | 819 | { |
615 | struct Plugin *plugin = cls; | 820 | struct Plugin *plugin = cls; |
616 | size_t mlen = msgbuf_size + sizeof (struct UDPMessage);; | 821 | size_t mlen = msgbuf_size + sizeof (struct UDPMessage); |
617 | 822 | ||
618 | struct GNUNET_TIME_Relative delta; | 823 | struct GNUNET_TIME_Relative delta; |
619 | struct UDPMessageWrapper * udpw; | 824 | struct UDPMessageWrapper * udpw; |
620 | struct UDPMessage *udp; | 825 | struct UDPMessage *udp; |
621 | 826 | char mbuf[mlen]; | |
622 | GNUNET_assert (plugin != NULL); | 827 | GNUNET_assert (plugin != NULL); |
623 | GNUNET_assert (s != NULL); | 828 | GNUNET_assert (s != NULL); |
624 | 829 | ||
@@ -628,33 +833,62 @@ udp_plugin_send (void *cls, | |||
628 | return GNUNET_SYSERR; | 833 | return GNUNET_SYSERR; |
629 | } | 834 | } |
630 | 835 | ||
631 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
632 | "UDP transmits %u-byte message to `%s' using address `%s'\n", | ||
633 | msgbuf_size, | ||
634 | GNUNET_i2s (&s->target), | ||
635 | GNUNET_a2s(s->sock_addr, s->addrlen)); | ||
636 | |||
637 | if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s)) | 836 | if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s)) |
638 | { | 837 | { |
639 | GNUNET_break (0); | 838 | GNUNET_break (0); |
640 | return GNUNET_SYSERR; | 839 | return GNUNET_SYSERR; |
641 | } | 840 | } |
642 | 841 | ||
643 | udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + sizeof (struct UDPMessage) + msgbuf_size); | 842 | LOG (GNUNET_ERROR_TYPE_ERROR, |
644 | udpw->session = s; | 843 | "UDP transmits %u-byte message to `%s' using address `%s'\n", |
645 | udp = (struct UDPMessage *) &udpw[1]; | 844 | msgbuf_size, |
646 | udpw->udp = udp; | 845 | GNUNET_i2s (&s->target), |
647 | udpw->msg_size = mlen; | 846 | GNUNET_a2s(s->sock_addr, s->addrlen)); |
648 | udpw->cont = cont; | ||
649 | udpw->cont_cls = cont_cls; | ||
650 | 847 | ||
848 | /* Message */ | ||
849 | udp = (struct UDPMessage *) mbuf; | ||
651 | udp->header.size = htons (mlen); | 850 | udp->header.size = htons (mlen); |
652 | udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE); | 851 | udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE); |
653 | udp->reserved = htonl (0); | 852 | udp->reserved = htonl (0); |
654 | udp->sender = *plugin->env->my_identity; | 853 | udp->sender = *plugin->env->my_identity; |
655 | memcpy (&udp[1], msgbuf, msgbuf_size); | ||
656 | 854 | ||
657 | GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); | 855 | if (mlen <= UDP_MTU) |
856 | { | ||
857 | udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen); | ||
858 | udpw->session = s; | ||
859 | udpw->udp = (struct UDPMessage *) &udpw[1]; | ||
860 | udpw->msg_size = mlen; | ||
861 | udpw->cont = cont; | ||
862 | udpw->cont_cls = cont_cls; | ||
863 | udpw->frag = NULL; | ||
864 | memcpy (udpw->udp, udp, sizeof (struct UDPMessage)); | ||
865 | memcpy (&udpw->udp[1], msgbuf, msgbuf_size); | ||
866 | |||
867 | GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); | ||
868 | } | ||
869 | else | ||
870 | { | ||
871 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
872 | "UDP has to fragment message \n"); | ||
873 | memcpy (&udp[1], msgbuf, msgbuf_size); | ||
874 | struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext)); | ||
875 | |||
876 | frag_ctx->plugin = plugin; | ||
877 | frag_ctx->session = s; | ||
878 | frag_ctx->cont = cont; | ||
879 | frag_ctx->cont_cls = cont_cls; | ||
880 | frag_ctx->bytes_to_send = mlen; | ||
881 | frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats, | ||
882 | UDP_MTU, | ||
883 | &plugin->tracker, | ||
884 | plugin->last_expected_delay, | ||
885 | &udp->header, | ||
886 | &enqueue_fragment, | ||
887 | frag_ctx); | ||
888 | |||
889 | GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx); | ||
890 | |||
891 | } | ||
658 | 892 | ||
659 | delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer); | 893 | delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer); |
660 | return mlen; | 894 | return mlen; |
@@ -847,6 +1081,108 @@ process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg, | |||
847 | 1081 | ||
848 | 1082 | ||
849 | /** | 1083 | /** |
1084 | * Scan the heap for a receive context with the given address. | ||
1085 | * | ||
1086 | * @param cls the 'struct FindReceiveContext' | ||
1087 | * @param node internal node of the heap | ||
1088 | * @param element value stored at the node (a 'struct ReceiveContext') | ||
1089 | * @param cost cost associated with the node | ||
1090 | * @return GNUNET_YES if we should continue to iterate, | ||
1091 | * GNUNET_NO if not. | ||
1092 | */ | ||
1093 | static int | ||
1094 | find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node, | ||
1095 | void *element, GNUNET_CONTAINER_HeapCostType cost) | ||
1096 | { | ||
1097 | struct FindReceiveContext *frc = cls; | ||
1098 | struct ReceiveContext *e = element; | ||
1099 | |||
1100 | if ((frc->addr_len == e->addr_len) && | ||
1101 | (0 == memcmp (frc->addr, e->src_addr, frc->addr_len))) | ||
1102 | { | ||
1103 | frc->rc = e; | ||
1104 | return GNUNET_NO; | ||
1105 | } | ||
1106 | return GNUNET_YES; | ||
1107 | } | ||
1108 | |||
1109 | |||
1110 | /** | ||
1111 | * Process a defragmented message. | ||
1112 | * | ||
1113 | * @param cls the 'struct ReceiveContext' | ||
1114 | * @param msg the message | ||
1115 | */ | ||
1116 | static void | ||
1117 | fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) | ||
1118 | { | ||
1119 | struct ReceiveContext *rc = cls; | ||
1120 | |||
1121 | if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE) | ||
1122 | { | ||
1123 | GNUNET_break (0); | ||
1124 | return; | ||
1125 | } | ||
1126 | if (ntohs (msg->size) < sizeof (struct UDPMessage)) | ||
1127 | { | ||
1128 | GNUNET_break (0); | ||
1129 | return; | ||
1130 | } | ||
1131 | process_udp_message (rc->plugin, (const struct UDPMessage *) msg, | ||
1132 | rc->src_addr, rc->addr_len); | ||
1133 | } | ||
1134 | |||
1135 | /** | ||
1136 | * Transmit an acknowledgement. | ||
1137 | * | ||
1138 | * @param cls the 'struct ReceiveContext' | ||
1139 | * @param id message ID (unused) | ||
1140 | * @param msg ack to transmit | ||
1141 | */ | ||
1142 | static void | ||
1143 | ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) | ||
1144 | { | ||
1145 | #if 0 | ||
1146 | struct ReceiveContext *rc = cls; | ||
1147 | |||
1148 | size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); | ||
1149 | char buf[msize]; | ||
1150 | struct UDP_ACK_Message *udp_ack; | ||
1151 | uint32_t delay = 0; | ||
1152 | |||
1153 | struct Session *s; | ||
1154 | |||
1155 | s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len); | ||
1156 | if (s != NULL) | ||
1157 | { | ||
1158 | if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) | ||
1159 | delay = s->flow_delay_for_other_peer.rel_value; | ||
1160 | else | ||
1161 | delay = UINT32_MAX; | ||
1162 | } | ||
1163 | |||
1164 | |||
1165 | #if DEBUG_UDP | ||
1166 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1167 | "Sending ACK to `%s' including delay of %u ms\n", | ||
1168 | GNUNET_a2s (rc->src_addr, | ||
1169 | (rc->src_addr->sa_family == | ||
1170 | AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct | ||
1171 | sockaddr_in6)), | ||
1172 | delay); | ||
1173 | #endif | ||
1174 | udp_ack = (struct UDP_ACK_Message *) buf; | ||
1175 | udp_ack->header.size = htons ((uint16_t) msize); | ||
1176 | udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); | ||
1177 | udp_ack->delay = htonl (delay); | ||
1178 | udp_ack->sender = *rc->plugin->env->my_identity; | ||
1179 | memcpy (&udp_ack[1], msg, ntohs (msg->size)); | ||
1180 | (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header); | ||
1181 | #endif | ||
1182 | } | ||
1183 | |||
1184 | |||
1185 | /** | ||
850 | * Read and process a message from the given socket. | 1186 | * Read and process a message from the given socket. |
851 | * | 1187 | * |
852 | * @param plugin the overall plugin | 1188 | * @param plugin the overall plugin |
@@ -863,9 +1199,9 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
863 | //const struct GNUNET_MessageHeader *ack; | 1199 | //const struct GNUNET_MessageHeader *ack; |
864 | //struct Session *peer_session; | 1200 | //struct Session *peer_session; |
865 | //const struct UDP_ACK_Message *udp_ack; | 1201 | //const struct UDP_ACK_Message *udp_ack; |
866 | //struct ReceiveContext *rc; | 1202 | struct ReceiveContext *rc; |
867 | //struct GNUNET_TIME_Absolute now; | 1203 | struct GNUNET_TIME_Absolute now; |
868 | //struct FindReceiveContext frc; | 1204 | struct FindReceiveContext frc; |
869 | //struct Session *s = NULL; | 1205 | //struct Session *s = NULL; |
870 | //struct GNUNET_TIME_Relative flow_delay; | 1206 | //struct GNUNET_TIME_Relative flow_delay; |
871 | //struct GNUNET_ATS_Information ats; | 1207 | //struct GNUNET_ATS_Information ats; |
@@ -903,9 +1239,67 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) | |||
903 | GNUNET_break_op (0); | 1239 | GNUNET_break_op (0); |
904 | return; | 1240 | return; |
905 | } | 1241 | } |
906 | |||
907 | process_udp_message (plugin, (const struct UDPMessage *) msg, | 1242 | process_udp_message (plugin, (const struct UDPMessage *) msg, |
908 | (const struct sockaddr *) addr, fromlen); | 1243 | (const struct sockaddr *) addr, fromlen); |
1244 | return; | ||
1245 | case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: | ||
1246 | if (ntohs (msg->size) < | ||
1247 | sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader)) | ||
1248 | { | ||
1249 | GNUNET_break_op (0); | ||
1250 | return; | ||
1251 | } | ||
1252 | /* TODO */ | ||
1253 | GNUNET_break_op (0); | ||
1254 | return; | ||
1255 | case GNUNET_MESSAGE_TYPE_FRAGMENT: | ||
1256 | frc.rc = NULL; | ||
1257 | frc.addr = (const struct sockaddr *) addr; | ||
1258 | frc.addr_len = fromlen; | ||
1259 | GNUNET_CONTAINER_heap_iterate (plugin->defrags, | ||
1260 | &find_receive_context, | ||
1261 | &frc); | ||
1262 | now = GNUNET_TIME_absolute_get (); | ||
1263 | rc = frc.rc; | ||
1264 | if (rc == NULL) | ||
1265 | { | ||
1266 | /* need to create a new RC */ | ||
1267 | rc = GNUNET_malloc (sizeof (struct ReceiveContext) + fromlen); | ||
1268 | memcpy (&rc[1], addr, fromlen); | ||
1269 | rc->src_addr = (const struct sockaddr *) &rc[1]; | ||
1270 | rc->addr_len = fromlen; | ||
1271 | rc->plugin = plugin; | ||
1272 | rc->defrag = | ||
1273 | GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU, | ||
1274 | UDP_MAX_MESSAGES_IN_DEFRAG, rc, | ||
1275 | &fragment_msg_proc, &ack_proc); | ||
1276 | rc->hnode = | ||
1277 | GNUNET_CONTAINER_heap_insert (plugin->defrags, rc, | ||
1278 | (GNUNET_CONTAINER_HeapCostType) | ||
1279 | now.abs_value); | ||
1280 | } | ||
1281 | #if DEBUG_UDP | ||
1282 | LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n", | ||
1283 | (unsigned int) ntohs (msg->size), | ||
1284 | GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); | ||
1285 | #endif | ||
1286 | |||
1287 | if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (rc->defrag, msg)) | ||
1288 | { | ||
1289 | /* keep this 'rc' from expiring */ | ||
1290 | GNUNET_CONTAINER_heap_update_cost (plugin->defrags, rc->hnode, | ||
1291 | (GNUNET_CONTAINER_HeapCostType) | ||
1292 | now.abs_value); | ||
1293 | } | ||
1294 | if (GNUNET_CONTAINER_heap_get_size (plugin->defrags) > | ||
1295 | UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG) | ||
1296 | { | ||
1297 | /* remove 'rc' that was inactive the longest */ | ||
1298 | rc = GNUNET_CONTAINER_heap_remove_root (plugin->defrags); | ||
1299 | GNUNET_assert (NULL != rc); | ||
1300 | GNUNET_DEFRAGMENT_context_destroy (rc->defrag); | ||
1301 | GNUNET_free (rc); | ||
1302 | } | ||
909 | 1303 | ||
910 | return; | 1304 | return; |
911 | default: | 1305 | default: |
@@ -958,8 +1352,17 @@ udp_select_send (struct Plugin *plugin) | |||
958 | (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, | 1352 | (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, |
959 | (sent < 0) ? STRERROR (errno) : "ok"); | 1353 | (sent < 0) ? STRERROR (errno) : "ok"); |
960 | 1354 | ||
961 | if (udpw->cont != NULL) | 1355 | /* This was just a message fragment */ |
962 | udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK); | 1356 | if (udpw->frag != NULL) |
1357 | { | ||
1358 | GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag); | ||
1359 | } | ||
1360 | /* This was a complete message*/ | ||
1361 | else | ||
1362 | { | ||
1363 | if (udpw->cont != NULL) | ||
1364 | udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK); | ||
1365 | } | ||
963 | 1366 | ||
964 | GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw); | 1367 | GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw); |
965 | GNUNET_free (udpw); | 1368 | GNUNET_free (udpw); |
@@ -1273,6 +1676,7 @@ libgnunet_plugin_transport_udp_init (void *cls) | |||
1273 | 1676 | ||
1274 | 1677 | ||
1275 | plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); | 1678 | plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); |
1679 | plugin->defrags = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1276 | plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); | 1680 | plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); |
1277 | plugin->port = port; | 1681 | plugin->port = port; |
1278 | plugin->aport = aport; | 1682 | plugin->aport = aport; |
@@ -1347,6 +1751,11 @@ libgnunet_plugin_transport_udp_done (void *cls) | |||
1347 | GNUNET_NETWORK_fdset_destroy (plugin->ws); | 1751 | GNUNET_NETWORK_fdset_destroy (plugin->ws); |
1348 | GNUNET_NAT_unregister (plugin->nat); | 1752 | GNUNET_NAT_unregister (plugin->nat); |
1349 | 1753 | ||
1754 | if (plugin->defrags != NULL) | ||
1755 | { | ||
1756 | GNUNET_CONTAINER_heap_destroy(plugin->defrags); | ||
1757 | plugin->defrags = NULL; | ||
1758 | } | ||
1350 | if (plugin->mst != NULL) | 1759 | if (plugin->mst != NULL) |
1351 | { | 1760 | { |
1352 | GNUNET_SERVER_mst_destroy(plugin->mst); | 1761 | GNUNET_SERVER_mst_destroy(plugin->mst); |
diff --git a/src/transport/plugin_transport_udp_new.h b/src/transport/plugin_transport_udp_new.h index b565f9cb6..757135120 100644 --- a/src/transport/plugin_transport_udp_new.h +++ b/src/transport/plugin_transport_udp_new.h | |||
@@ -110,23 +110,6 @@ struct UDPMessage | |||
110 | 110 | ||
111 | }; | 111 | }; |
112 | 112 | ||
113 | struct UDPMessageWrapper | ||
114 | { | ||
115 | struct Session *session; | ||
116 | struct UDPMessageWrapper *prev; | ||
117 | struct UDPMessageWrapper *next; | ||
118 | struct UDPMessage *udp; | ||
119 | size_t msg_size; | ||
120 | /** | ||
121 | * Function to call upon completion of the transmission. | ||
122 | */ | ||
123 | GNUNET_TRANSPORT_TransmitContinuation cont; | ||
124 | |||
125 | /** | ||
126 | * Closure for 'cont'. | ||
127 | */ | ||
128 | void *cont_cls; | ||
129 | }; | ||
130 | 113 | ||
131 | /** | 114 | /** |
132 | * Encapsulation of all of the state of the plugin. | 115 | * Encapsulation of all of the state of the plugin. |