diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-05 14:17:50 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-05 14:17:50 +0000 |
commit | 439fed4dda3de557a37de85c08be9ac29746e6b7 (patch) | |
tree | 0a7d0941cd3991fea4aeba26fe9e66d2f456fc8d /src/dv | |
parent | 1f3219c222bb206e02a793e6c7d48ccc3045d604 (diff) | |
download | gnunet-439fed4dda3de557a37de85c08be9ac29746e6b7.tar.gz gnunet-439fed4dda3de557a37de85c08be9ac29746e6b7.zip |
-reworking DV to use new MQ API
Diffstat (limited to 'src/dv')
-rw-r--r-- | src/dv/dv.h | 29 | ||||
-rw-r--r-- | src/dv/dv_api.c | 651 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 50 | ||||
-rw-r--r-- | src/dv/plugin_transport_dv.c | 157 |
4 files changed, 207 insertions, 680 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h index e86fdfd15..844cfb5e1 100644 --- a/src/dv/dv.h +++ b/src/dv/dv.h | |||
@@ -126,34 +126,9 @@ struct GNUNET_DV_SendMessage | |||
126 | struct GNUNET_MessageHeader header; | 126 | struct GNUNET_MessageHeader header; |
127 | 127 | ||
128 | /** | 128 | /** |
129 | * Unique ID for this message, for confirm callback, must never be zero. | 129 | * Reserved for alignment. 0. |
130 | */ | 130 | */ |
131 | uint32_t uid GNUNET_PACKED; | 131 | uint32_t reserved GNUNET_PACKED; |
132 | |||
133 | /** | ||
134 | * The (actual) target of the message | ||
135 | */ | ||
136 | struct GNUNET_PeerIdentity target; | ||
137 | |||
138 | }; | ||
139 | |||
140 | |||
141 | /** | ||
142 | * Message from service to DV plugin, saying that a | ||
143 | * SEND request was handled. | ||
144 | */ | ||
145 | struct GNUNET_DV_AckMessage | ||
146 | { | ||
147 | /** | ||
148 | * Type: #GNUNET_MESSAGE_TYPE_DV_SEND_ACK or | ||
149 | * #GNUNET_MESSAGE_TYPE_DV_SEND_NACK. | ||
150 | */ | ||
151 | struct GNUNET_MessageHeader header; | ||
152 | |||
153 | /** | ||
154 | * Which message is being acknowledged? | ||
155 | */ | ||
156 | uint32_t uid GNUNET_PACKED; | ||
157 | 132 | ||
158 | /** | 133 | /** |
159 | * The (actual) target of the message | 134 | * The (actual) target of the message |
diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index d74453376..76a6ea484 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009--2013 GNUnet e.V. | 3 | Copyright (C) 2009--2013, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -37,60 +37,6 @@ | |||
37 | /** | 37 | /** |
38 | * Information we track for each peer. | 38 | * Information we track for each peer. |
39 | */ | 39 | */ |
40 | struct ConnectedPeer; | ||
41 | |||
42 | |||
43 | /** | ||
44 | * Handle for a send operation. | ||
45 | */ | ||
46 | struct GNUNET_DV_TransmitHandle | ||
47 | { | ||
48 | /** | ||
49 | * Kept in a DLL. | ||
50 | */ | ||
51 | struct GNUNET_DV_TransmitHandle *next; | ||
52 | |||
53 | /** | ||
54 | * Kept in a DLL. | ||
55 | */ | ||
56 | struct GNUNET_DV_TransmitHandle *prev; | ||
57 | |||
58 | /** | ||
59 | * Handle to the service. | ||
60 | */ | ||
61 | struct GNUNET_DV_ServiceHandle *sh; | ||
62 | |||
63 | /** | ||
64 | * Function to call upon completion. | ||
65 | */ | ||
66 | GNUNET_DV_MessageSentCallback cb; | ||
67 | |||
68 | /** | ||
69 | * Closure for @a cb. | ||
70 | */ | ||
71 | void *cb_cls; | ||
72 | |||
73 | /** | ||
74 | * The actual message (allocated at the end of this struct). | ||
75 | */ | ||
76 | const struct GNUNET_MessageHeader *msg; | ||
77 | |||
78 | /** | ||
79 | * Destination for the message. | ||
80 | */ | ||
81 | struct ConnectedPeer *target; | ||
82 | |||
83 | /** | ||
84 | * UID of our message, if any. | ||
85 | */ | ||
86 | uint32_t uid; | ||
87 | |||
88 | }; | ||
89 | |||
90 | |||
91 | /** | ||
92 | * Information we track for each peer. | ||
93 | */ | ||
94 | struct ConnectedPeer | 40 | struct ConnectedPeer |
95 | { | 41 | { |
96 | 42 | ||
@@ -99,22 +45,6 @@ struct ConnectedPeer | |||
99 | */ | 45 | */ |
100 | struct GNUNET_PeerIdentity pid; | 46 | struct GNUNET_PeerIdentity pid; |
101 | 47 | ||
102 | /** | ||
103 | * Head of DLL of transmission handles where we need | ||
104 | * to invoke a continuation when we are informed about | ||
105 | * successful transmission. The respective request | ||
106 | * has already been sent to the DV service. | ||
107 | */ | ||
108 | struct GNUNET_DV_TransmitHandle *head; | ||
109 | |||
110 | /** | ||
111 | * Tail of DLL of transmission handles where we need | ||
112 | * to invoke a continuation when we are informed about | ||
113 | * successful transmission. The respective request | ||
114 | * has already been sent to the DV service. | ||
115 | */ | ||
116 | struct GNUNET_DV_TransmitHandle *tail; | ||
117 | |||
118 | }; | 48 | }; |
119 | 49 | ||
120 | 50 | ||
@@ -127,12 +57,7 @@ struct GNUNET_DV_ServiceHandle | |||
127 | /** | 57 | /** |
128 | * Connection to DV service. | 58 | * Connection to DV service. |
129 | */ | 59 | */ |
130 | struct GNUNET_CLIENT_Connection *client; | 60 | struct GNUNET_MQ_Handle *mq; |
131 | |||
132 | /** | ||
133 | * Active request for transmission to DV service. | ||
134 | */ | ||
135 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
136 | 61 | ||
137 | /** | 62 | /** |
138 | * Our configuration. | 63 | * Our configuration. |
@@ -165,26 +90,11 @@ struct GNUNET_DV_ServiceHandle | |||
165 | GNUNET_DV_MessageReceivedCallback message_cb; | 90 | GNUNET_DV_MessageReceivedCallback message_cb; |
166 | 91 | ||
167 | /** | 92 | /** |
168 | * Head of messages to transmit. | ||
169 | */ | ||
170 | struct GNUNET_DV_TransmitHandle *th_head; | ||
171 | |||
172 | /** | ||
173 | * Tail of messages to transmit. | ||
174 | */ | ||
175 | struct GNUNET_DV_TransmitHandle *th_tail; | ||
176 | |||
177 | /** | ||
178 | * Information tracked per connected peer. Maps peer | 93 | * Information tracked per connected peer. Maps peer |
179 | * identities to `struct ConnectedPeer` entries. | 94 | * identities to `struct ConnectedPeer` entries. |
180 | */ | 95 | */ |
181 | struct GNUNET_CONTAINER_MultiPeerMap *peers; | 96 | struct GNUNET_CONTAINER_MultiPeerMap *peers; |
182 | 97 | ||
183 | /** | ||
184 | * Current unique ID | ||
185 | */ | ||
186 | uint32_t uid_gen; | ||
187 | |||
188 | }; | 98 | }; |
189 | 99 | ||
190 | 100 | ||
@@ -198,120 +108,160 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh); | |||
198 | 108 | ||
199 | 109 | ||
200 | /** | 110 | /** |
201 | * Start sending messages from our queue to the service. | 111 | * We got disconnected from the service and thus all of the |
112 | * connections need to be torn down. | ||
202 | * | 113 | * |
203 | * @param sh service handle | 114 | * @param cls the `struct GNUNET_DV_ServiceHandle` |
115 | * @param key a peer identity | ||
116 | * @param value a `struct ConnectedPeer` to clean up | ||
117 | * @return #GNUNET_OK (continue to iterate) | ||
204 | */ | 118 | */ |
205 | static void | 119 | static int |
206 | start_transmit (struct GNUNET_DV_ServiceHandle *sh); | 120 | cleanup_send_cb (void *cls, |
121 | const struct GNUNET_PeerIdentity *key, | ||
122 | void *value) | ||
123 | { | ||
124 | struct GNUNET_DV_ServiceHandle *sh = cls; | ||
125 | struct ConnectedPeer *peer = value; | ||
126 | |||
127 | GNUNET_assert (GNUNET_YES == | ||
128 | GNUNET_CONTAINER_multipeermap_remove (sh->peers, | ||
129 | key, | ||
130 | peer)); | ||
131 | sh->disconnect_cb (sh->cls, | ||
132 | key); | ||
133 | GNUNET_free (peer); | ||
134 | return GNUNET_OK; | ||
135 | } | ||
207 | 136 | ||
208 | 137 | ||
209 | /** | 138 | /** |
210 | * Gives a message from our queue to the DV service. | 139 | * Handles a message sent from the DV service to us. |
140 | * Parse it out and give it to the plugin. | ||
211 | * | 141 | * |
212 | * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`) | 142 | * @param cls the handle to the DV API |
213 | * @param size how many bytes can we send | 143 | * @param cm the message that was received |
214 | * @param buf where to copy the message to send | ||
215 | * @return how many bytes we copied to @a buf | ||
216 | */ | 144 | */ |
217 | static size_t | 145 | static void |
218 | transmit_pending (void *cls, size_t size, void *buf) | 146 | handle_connect (void *cls, |
147 | const struct GNUNET_DV_ConnectMessage *cm) | ||
219 | { | 148 | { |
220 | struct GNUNET_DV_ServiceHandle *sh = cls; | 149 | struct GNUNET_DV_ServiceHandle *sh = cls; |
221 | char *cbuf = buf; | 150 | struct ConnectedPeer *peer; |
222 | struct GNUNET_DV_TransmitHandle *th; | ||
223 | size_t ret; | ||
224 | size_t tsize; | ||
225 | 151 | ||
226 | sh->th = NULL; | 152 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, |
227 | if (NULL == buf) | 153 | &cm->peer); |
154 | if (NULL != peer) | ||
228 | { | 155 | { |
156 | GNUNET_break (0); | ||
229 | reconnect (sh); | 157 | reconnect (sh); |
230 | return 0; | 158 | return; |
231 | } | 159 | } |
232 | ret = 0; | 160 | peer = GNUNET_new (struct ConnectedPeer); |
233 | while ( (NULL != (th = sh->th_head)) && | 161 | peer->pid = cm->peer; |
234 | (size - ret >= (tsize = ntohs (th->msg->size)) )) | 162 | GNUNET_assert (GNUNET_OK == |
163 | GNUNET_CONTAINER_multipeermap_put (sh->peers, | ||
164 | &peer->pid, | ||
165 | peer, | ||
166 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
167 | sh->connect_cb (sh->cls, | ||
168 | &cm->peer, | ||
169 | ntohl (cm->distance), | ||
170 | (enum GNUNET_ATS_Network_Type) ntohl (cm->network)); | ||
171 | } | ||
172 | |||
173 | |||
174 | /** | ||
175 | * Handles a message sent from the DV service to us. | ||
176 | * Parse it out and give it to the plugin. | ||
177 | * | ||
178 | * @param cls the handle to the DV API | ||
179 | * @param dm the message that was received | ||
180 | */ | ||
181 | static void | ||
182 | handle_disconnect (void *cls, | ||
183 | const struct GNUNET_DV_DisconnectMessage *dm) | ||
184 | { | ||
185 | struct GNUNET_DV_ServiceHandle *sh = cls; | ||
186 | struct ConnectedPeer *peer; | ||
187 | |||
188 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
189 | &dm->peer); | ||
190 | if (NULL == peer) | ||
235 | { | 191 | { |
236 | GNUNET_CONTAINER_DLL_remove (sh->th_head, | 192 | GNUNET_break (0); |
237 | sh->th_tail, | 193 | reconnect (sh); |
238 | th); | 194 | return; |
239 | memcpy (&cbuf[ret], th->msg, tsize); | ||
240 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
241 | "Passing %u bytes of type %u to DV service\n", | ||
242 | tsize, | ||
243 | ntohs (th->msg->type)); | ||
244 | th->msg = NULL; | ||
245 | ret += tsize; | ||
246 | if (NULL != th->cb) | ||
247 | { | ||
248 | GNUNET_CONTAINER_DLL_insert_tail (th->target->head, | ||
249 | th->target->tail, | ||
250 | th); | ||
251 | } | ||
252 | else | ||
253 | { | ||
254 | GNUNET_free (th); | ||
255 | } | ||
256 | } | 195 | } |
257 | if (NULL != sh->th_head) | 196 | cleanup_send_cb (sh, |
258 | start_transmit (sh); | 197 | &dm->peer, |
259 | return ret; | 198 | peer); |
260 | } | 199 | } |
261 | 200 | ||
262 | 201 | ||
263 | /** | 202 | /** |
264 | * Start sending messages from our queue to the service. | 203 | * Handles a message sent from the DV service to us. |
204 | * Parse it out and give it to the plugin. | ||
265 | * | 205 | * |
266 | * @param sh service handle | 206 | * @param cls the handle to the DV API |
207 | * @param msg the message that was received | ||
267 | */ | 208 | */ |
268 | static void | 209 | static void |
269 | start_transmit (struct GNUNET_DV_ServiceHandle *sh) | 210 | handle_distance_update (void *cls, |
211 | const struct GNUNET_DV_DistanceUpdateMessage *dum) | ||
270 | { | 212 | { |
271 | if (NULL != sh->th) | 213 | struct GNUNET_DV_ServiceHandle *sh = cls; |
272 | return; | 214 | struct ConnectedPeer *peer; |
273 | if (NULL == sh->th_head) | 215 | |
216 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
217 | &dum->peer); | ||
218 | if (NULL == peer) | ||
219 | { | ||
220 | GNUNET_break (0); | ||
221 | reconnect (sh); | ||
274 | return; | 222 | return; |
275 | sh->th = | 223 | } |
276 | GNUNET_CLIENT_notify_transmit_ready (sh->client, | 224 | sh->distance_cb (sh->cls, |
277 | ntohs (sh->th_head->msg->size), | 225 | &dum->peer, |
278 | GNUNET_TIME_UNIT_FOREVER_REL, | 226 | ntohl (dum->distance), |
279 | GNUNET_NO, | 227 | (enum GNUNET_ATS_Network_Type) ntohl (dum->network)); |
280 | &transmit_pending, sh); | ||
281 | } | 228 | } |
282 | 229 | ||
283 | 230 | ||
284 | /** | 231 | /** |
285 | * We got disconnected from the service and thus all of the | 232 | * Handles a message sent from the DV service to us. |
286 | * pending send callbacks will never be confirmed. Clean up. | 233 | * Parse it out and give it to the plugin. |
287 | * | 234 | * |
288 | * @param cls the 'struct GNUNET_DV_ServiceHandle' | 235 | * @param cls the handle to the DV API |
289 | * @param key a peer identity | 236 | * @param rm the message that was received |
290 | * @param value a `struct ConnectedPeer` to clean up | ||
291 | * @return #GNUNET_OK (continue to iterate) | ||
292 | */ | 237 | */ |
293 | static int | 238 | static int |
294 | cleanup_send_cb (void *cls, | 239 | check_received (void *cls, |
295 | const struct GNUNET_PeerIdentity *key, | 240 | const struct GNUNET_DV_ReceivedMessage *rm) |
296 | void *value) | ||
297 | { | 241 | { |
298 | struct GNUNET_DV_ServiceHandle *sh = cls; | 242 | struct GNUNET_DV_ServiceHandle *sh = cls; |
299 | struct ConnectedPeer *peer = value; | 243 | const struct GNUNET_MessageHeader *payload; |
300 | struct GNUNET_DV_TransmitHandle *th; | ||
301 | 244 | ||
302 | GNUNET_assert (GNUNET_YES == | 245 | if (NULL == |
303 | GNUNET_CONTAINER_multipeermap_remove (sh->peers, | 246 | GNUNET_CONTAINER_multipeermap_get (sh->peers, |
304 | key, | 247 | &rm->sender)) |
305 | peer)); | ||
306 | sh->disconnect_cb (sh->cls, | ||
307 | key); | ||
308 | while (NULL != (th = peer->head)) | ||
309 | { | 248 | { |
310 | GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th); | 249 | GNUNET_break (0); |
311 | th->cb (th->cb_cls); | 250 | return GNUNET_SYSERR; |
312 | GNUNET_free (th); | 251 | } |
252 | if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) < | ||
253 | sizeof (*payload)) | ||
254 | { | ||
255 | GNUNET_break (0); | ||
256 | return GNUNET_SYSERR; | ||
257 | } | ||
258 | payload = (const struct GNUNET_MessageHeader *) &rm[1]; | ||
259 | if (ntohs (rm->header.size) != | ||
260 | sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size)) | ||
261 | { | ||
262 | GNUNET_break (0); | ||
263 | return GNUNET_SYSERR; | ||
313 | } | 264 | } |
314 | GNUNET_free (peer); | ||
315 | return GNUNET_OK; | 265 | return GNUNET_OK; |
316 | } | 266 | } |
317 | 267 | ||
@@ -321,211 +271,38 @@ cleanup_send_cb (void *cls, | |||
321 | * Parse it out and give it to the plugin. | 271 | * Parse it out and give it to the plugin. |
322 | * | 272 | * |
323 | * @param cls the handle to the DV API | 273 | * @param cls the handle to the DV API |
324 | * @param msg the message that was received | 274 | * @param rm the message that was received |
325 | */ | 275 | */ |
326 | static void | 276 | static void |
327 | handle_message_receipt (void *cls, | 277 | handle_received (void *cls, |
328 | const struct GNUNET_MessageHeader *msg) | 278 | const struct GNUNET_DV_ReceivedMessage *rm) |
329 | { | 279 | { |
330 | struct GNUNET_DV_ServiceHandle *sh = cls; | 280 | struct GNUNET_DV_ServiceHandle *sh = cls; |
331 | const struct GNUNET_DV_ConnectMessage *cm; | ||
332 | const struct GNUNET_DV_DistanceUpdateMessage *dum; | ||
333 | const struct GNUNET_DV_DisconnectMessage *dm; | ||
334 | const struct GNUNET_DV_ReceivedMessage *rm; | ||
335 | const struct GNUNET_MessageHeader *payload; | 281 | const struct GNUNET_MessageHeader *payload; |
336 | const struct GNUNET_DV_AckMessage *ack; | ||
337 | struct GNUNET_DV_TransmitHandle *th; | ||
338 | struct GNUNET_DV_TransmitHandle *tn; | ||
339 | struct ConnectedPeer *peer; | ||
340 | 282 | ||
341 | if (NULL == msg) | 283 | payload = (const struct GNUNET_MessageHeader *) &rm[1]; |
342 | { | 284 | sh->message_cb (sh->cls, |
343 | /* Connection closed */ | 285 | &rm->sender, |
344 | reconnect (sh); | 286 | ntohl (rm->distance), |
345 | return; | 287 | payload); |
346 | } | ||
347 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
348 | "Received message of type %u with %u bytes from DV service\n", | ||
349 | (unsigned int) ntohs (msg->type), | ||
350 | (unsigned int) ntohs (msg->size)); | ||
351 | switch (ntohs (msg->type)) | ||
352 | { | ||
353 | case GNUNET_MESSAGE_TYPE_DV_CONNECT: | ||
354 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage)) | ||
355 | { | ||
356 | GNUNET_break (0); | ||
357 | reconnect (sh); | ||
358 | return; | ||
359 | } | ||
360 | cm = (const struct GNUNET_DV_ConnectMessage *) msg; | ||
361 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
362 | &cm->peer); | ||
363 | if (NULL != peer) | ||
364 | { | ||
365 | GNUNET_break (0); | ||
366 | reconnect (sh); | ||
367 | return; | ||
368 | } | ||
369 | peer = GNUNET_new (struct ConnectedPeer); | ||
370 | peer->pid = cm->peer; | ||
371 | GNUNET_assert (GNUNET_OK == | ||
372 | GNUNET_CONTAINER_multipeermap_put (sh->peers, | ||
373 | &peer->pid, | ||
374 | peer, | ||
375 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
376 | sh->connect_cb (sh->cls, | ||
377 | &cm->peer, | ||
378 | ntohl (cm->distance), | ||
379 | (enum GNUNET_ATS_Network_Type) ntohl (cm->network)); | ||
380 | break; | ||
381 | case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED: | ||
382 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage)) | ||
383 | { | ||
384 | GNUNET_break (0); | ||
385 | reconnect (sh); | ||
386 | return; | ||
387 | } | ||
388 | dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg; | ||
389 | sh->distance_cb (sh->cls, | ||
390 | &dum->peer, | ||
391 | ntohl (dum->distance), | ||
392 | (enum GNUNET_ATS_Network_Type) ntohl (dum->network)); | ||
393 | break; | ||
394 | case GNUNET_MESSAGE_TYPE_DV_DISCONNECT: | ||
395 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage)) | ||
396 | { | ||
397 | GNUNET_break (0); | ||
398 | reconnect (sh); | ||
399 | return; | ||
400 | } | ||
401 | dm = (const struct GNUNET_DV_DisconnectMessage *) msg; | ||
402 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
403 | &dm->peer); | ||
404 | if (NULL == peer) | ||
405 | { | ||
406 | GNUNET_break (0); | ||
407 | reconnect (sh); | ||
408 | return; | ||
409 | } | ||
410 | tn = sh->th_head; | ||
411 | while (NULL != (th = tn)) | ||
412 | { | ||
413 | tn = th->next; | ||
414 | if (peer == th->target) | ||
415 | { | ||
416 | GNUNET_CONTAINER_DLL_remove (sh->th_head, | ||
417 | sh->th_tail, | ||
418 | th); | ||
419 | th->cb (th->cb_cls); | ||
420 | GNUNET_free (th); | ||
421 | } | ||
422 | } | ||
423 | cleanup_send_cb (sh, &dm->peer, peer); | ||
424 | break; | ||
425 | case GNUNET_MESSAGE_TYPE_DV_RECV: | ||
426 | if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader)) | ||
427 | { | ||
428 | GNUNET_break (0); | ||
429 | reconnect (sh); | ||
430 | return; | ||
431 | } | ||
432 | rm = (const struct GNUNET_DV_ReceivedMessage *) msg; | ||
433 | payload = (const struct GNUNET_MessageHeader *) &rm[1]; | ||
434 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size)) | ||
435 | { | ||
436 | GNUNET_break (0); | ||
437 | reconnect (sh); | ||
438 | return; | ||
439 | } | ||
440 | if (NULL == | ||
441 | GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
442 | &rm->sender)) | ||
443 | { | ||
444 | GNUNET_break (0); | ||
445 | reconnect (sh); | ||
446 | return; | ||
447 | } | ||
448 | sh->message_cb (sh->cls, | ||
449 | &rm->sender, | ||
450 | ntohl (rm->distance), | ||
451 | payload); | ||
452 | break; | ||
453 | case GNUNET_MESSAGE_TYPE_DV_SEND_ACK: | ||
454 | case GNUNET_MESSAGE_TYPE_DV_SEND_NACK: | ||
455 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage)) | ||
456 | { | ||
457 | GNUNET_break (0); | ||
458 | reconnect (sh); | ||
459 | return; | ||
460 | } | ||
461 | ack = (const struct GNUNET_DV_AckMessage *) msg; | ||
462 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | ||
463 | &ack->target); | ||
464 | if (NULL == peer) | ||
465 | break; /* this happens, just ignore */ | ||
466 | for (th = peer->head; NULL != th; th = th->next) | ||
467 | { | ||
468 | if (th->uid != ntohl (ack->uid)) | ||
469 | continue; | ||
470 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
471 | "Matched ACK for message to peer %s\n", | ||
472 | GNUNET_i2s (&ack->target)); | ||
473 | GNUNET_CONTAINER_DLL_remove (peer->head, | ||
474 | peer->tail, | ||
475 | th); | ||
476 | th->cb (th->cb_cls); | ||
477 | GNUNET_free (th); | ||
478 | break; | ||
479 | } | ||
480 | break; | ||
481 | default: | ||
482 | reconnect (sh); | ||
483 | break; | ||
484 | } | ||
485 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
486 | "Received message, continuing receive loop for %p\n", | ||
487 | sh->client); | ||
488 | GNUNET_CLIENT_receive (sh->client, | ||
489 | &handle_message_receipt, sh, | ||
490 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
491 | } | 288 | } |
492 | 289 | ||
493 | 290 | ||
494 | /** | 291 | /** |
495 | * Transmit the start message to the DV service. | 292 | * Generic error handler, called with the appropriate error code and |
293 | * the same closure specified at the creation of the message queue. | ||
294 | * Not every message queue implementation supports an error handler. | ||
496 | * | 295 | * |
497 | * @param cls the `struct GNUNET_DV_ServiceHandle *` | 296 | * @param cls closure with the `struct GNUNET_DV_ServiceHandle *` |
498 | * @param size number of bytes available in buf | 297 | * @param error error code |
499 | * @param buf where to copy the message | ||
500 | * @return number of bytes written to buf | ||
501 | */ | 298 | */ |
502 | static size_t | 299 | static void |
503 | transmit_start (void *cls, | 300 | mq_error_handler (void *cls, |
504 | size_t size, | 301 | enum GNUNET_MQ_Error error) |
505 | void *buf) | ||
506 | { | 302 | { |
507 | struct GNUNET_DV_ServiceHandle *sh = cls; | 303 | struct GNUNET_DV_ServiceHandle *sh = cls; |
508 | struct GNUNET_MessageHeader start_message; | ||
509 | 304 | ||
510 | sh->th = NULL; | 305 | reconnect (sh); |
511 | if (NULL == buf) | ||
512 | { | ||
513 | GNUNET_break (0); | ||
514 | reconnect (sh); | ||
515 | return 0; | ||
516 | } | ||
517 | GNUNET_assert (size >= sizeof (start_message)); | ||
518 | start_message.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
519 | start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START); | ||
520 | memcpy (buf, &start_message, sizeof (start_message)); | ||
521 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
522 | "Transmitting START request, starting receive loop for %p\n", | ||
523 | sh->client); | ||
524 | GNUNET_CLIENT_receive (sh->client, | ||
525 | &handle_message_receipt, sh, | ||
526 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
527 | start_transmit (sh); | ||
528 | return sizeof (start_message); | ||
529 | } | 306 | } |
530 | 307 | ||
531 | 308 | ||
@@ -537,36 +314,52 @@ transmit_start (void *cls, | |||
537 | static void | 314 | static void |
538 | reconnect (struct GNUNET_DV_ServiceHandle *sh) | 315 | reconnect (struct GNUNET_DV_ServiceHandle *sh) |
539 | { | 316 | { |
540 | if (NULL != sh->th) | 317 | GNUNET_MQ_hd_fixed_size (connect, |
541 | { | 318 | GNUNET_MESSAGE_TYPE_DV_CONNECT, |
542 | GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); | 319 | struct GNUNET_DV_ConnectMessage); |
543 | sh->th = NULL; | 320 | GNUNET_MQ_hd_fixed_size (disconnect, |
544 | } | 321 | GNUNET_MESSAGE_TYPE_DV_DISCONNECT, |
545 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 322 | struct GNUNET_DV_DisconnectMessage); |
546 | "Disconnecting from DV service at %p\n", | 323 | GNUNET_MQ_hd_fixed_size (distance_update, |
547 | sh->client); | 324 | GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED, |
548 | if (NULL != sh->client) | 325 | struct GNUNET_DV_DistanceUpdateMessage); |
326 | GNUNET_MQ_hd_var_size (received, | ||
327 | GNUNET_MESSAGE_TYPE_DV_RECV, | ||
328 | struct GNUNET_DV_ReceivedMessage); | ||
329 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
330 | make_connect_handler (sh), | ||
331 | make_disconnect_handler (sh), | ||
332 | make_distance_update_handler (sh), | ||
333 | make_received_handler (sh), | ||
334 | GNUNET_MQ_handler_end () | ||
335 | }; | ||
336 | struct GNUNET_MessageHeader *sm; | ||
337 | struct GNUNET_MQ_Envelope *env; | ||
338 | |||
339 | if (NULL != sh->mq) | ||
549 | { | 340 | { |
550 | GNUNET_CLIENT_disconnect (sh->client); | 341 | GNUNET_MQ_destroy (sh->mq); |
551 | sh->client = NULL; | 342 | sh->mq = NULL; |
552 | } | 343 | } |
553 | GNUNET_CONTAINER_multipeermap_iterate (sh->peers, | 344 | GNUNET_CONTAINER_multipeermap_iterate (sh->peers, |
554 | &cleanup_send_cb, | 345 | &cleanup_send_cb, |
555 | sh); | 346 | sh); |
556 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 347 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
557 | "Connecting to DV service\n"); | 348 | "Connecting to DV service\n"); |
558 | sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg); | 349 | sh->mq = GNUNET_CLIENT_connecT (sh->cfg, |
559 | if (NULL == sh->client) | 350 | "dv", |
351 | handlers, | ||
352 | &mq_error_handler, | ||
353 | sh); | ||
354 | if (NULL == sh->mq) | ||
560 | { | 355 | { |
561 | GNUNET_break (0); | 356 | GNUNET_break (0); |
562 | return; | 357 | return; |
563 | } | 358 | } |
564 | sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client, | 359 | env = GNUNET_MQ_msg (sm, |
565 | sizeof (struct GNUNET_MessageHeader), | 360 | GNUNET_MESSAGE_TYPE_DV_START); |
566 | GNUNET_TIME_UNIT_FOREVER_REL, | 361 | GNUNET_MQ_send (sh->mq, |
567 | GNUNET_YES, | 362 | env); |
568 | &transmit_start, | ||
569 | sh); | ||
570 | } | 363 | } |
571 | 364 | ||
572 | 365 | ||
@@ -598,7 +391,8 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
598 | sh->distance_cb = distance_cb; | 391 | sh->distance_cb = distance_cb; |
599 | sh->disconnect_cb = disconnect_cb; | 392 | sh->disconnect_cb = disconnect_cb; |
600 | sh->message_cb = message_cb; | 393 | sh->message_cb = message_cb; |
601 | sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES); | 394 | sh->peers = GNUNET_CONTAINER_multipeermap_create (128, |
395 | GNUNET_YES); | ||
602 | reconnect (sh); | 396 | reconnect (sh); |
603 | return sh; | 397 | return sh; |
604 | } | 398 | } |
@@ -612,26 +406,12 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
612 | void | 406 | void |
613 | GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) | 407 | GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) |
614 | { | 408 | { |
615 | struct GNUNET_DV_TransmitHandle *pos; | ||
616 | |||
617 | if (NULL == sh) | 409 | if (NULL == sh) |
618 | return; | 410 | return; |
619 | if (NULL != sh->th) | 411 | if (NULL != sh->mq) |
620 | { | ||
621 | GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); | ||
622 | sh->th = NULL; | ||
623 | } | ||
624 | while (NULL != (pos = sh->th_head)) | ||
625 | { | ||
626 | GNUNET_CONTAINER_DLL_remove (sh->th_head, | ||
627 | sh->th_tail, | ||
628 | pos); | ||
629 | GNUNET_free (pos); | ||
630 | } | ||
631 | if (NULL != sh->client) | ||
632 | { | 412 | { |
633 | GNUNET_CLIENT_disconnect (sh->client); | 413 | GNUNET_MQ_destroy (sh->mq); |
634 | sh->client = NULL; | 414 | sh->mq = NULL; |
635 | } | 415 | } |
636 | GNUNET_CONTAINER_multipeermap_iterate (sh->peers, | 416 | GNUNET_CONTAINER_multipeermap_iterate (sh->peers, |
637 | &cleanup_send_cb, | 417 | &cleanup_send_cb, |
@@ -647,87 +427,40 @@ GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) | |||
647 | * @param sh service handle | 427 | * @param sh service handle |
648 | * @param target intended recpient | 428 | * @param target intended recpient |
649 | * @param msg message payload | 429 | * @param msg message payload |
650 | * @param cb function to invoke when done | ||
651 | * @param cb_cls closure for @a cb | ||
652 | * @return handle to cancel the operation | ||
653 | */ | 430 | */ |
654 | struct GNUNET_DV_TransmitHandle * | 431 | void |
655 | GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, | 432 | GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, |
656 | const struct GNUNET_PeerIdentity *target, | 433 | const struct GNUNET_PeerIdentity *target, |
657 | const struct GNUNET_MessageHeader *msg, | 434 | const struct GNUNET_MessageHeader *msg) |
658 | GNUNET_DV_MessageSentCallback cb, | ||
659 | void *cb_cls) | ||
660 | { | 435 | { |
661 | struct GNUNET_DV_TransmitHandle *th; | ||
662 | struct GNUNET_DV_SendMessage *sm; | 436 | struct GNUNET_DV_SendMessage *sm; |
663 | struct ConnectedPeer *peer; | 437 | struct ConnectedPeer *peer; |
438 | struct GNUNET_MQ_Envelope *env; | ||
664 | 439 | ||
665 | if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 440 | if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
666 | { | 441 | { |
667 | GNUNET_break (0); | 442 | GNUNET_break (0); |
668 | return NULL; | 443 | return; |
669 | } | 444 | } |
670 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 445 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
671 | "Asked to send %u bytes of type %u to %s via %p\n", | 446 | "Asked to send %u bytes of type %u to %s\n", |
672 | (unsigned int) ntohs (msg->size), | 447 | (unsigned int) ntohs (msg->size), |
673 | (unsigned int) ntohs (msg->type), | 448 | (unsigned int) ntohs (msg->type), |
674 | GNUNET_i2s (target), | 449 | GNUNET_i2s (target)); |
675 | sh->client); | ||
676 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, | 450 | peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, |
677 | target); | 451 | target); |
678 | if (NULL == peer) | 452 | if (NULL == peer) |
679 | { | 453 | { |
680 | GNUNET_break (0); | 454 | GNUNET_break (0); |
681 | return NULL; | 455 | return; |
682 | } | 456 | } |
683 | th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) + | 457 | GNUNET_assert (NULL != sh->mq); |
684 | sizeof (struct GNUNET_DV_SendMessage) + | 458 | env = GNUNET_MQ_msg_nested_mh (sm, |
685 | ntohs (msg->size)); | 459 | GNUNET_MESSAGE_TYPE_DV_SEND, |
686 | th->sh = sh; | 460 | msg); |
687 | th->target = peer; | 461 | sm->target = *target; |
688 | th->cb = cb; | 462 | GNUNET_MQ_send (sh->mq, |
689 | th->cb_cls = cb_cls; | 463 | env); |
690 | th->msg = (const struct GNUNET_MessageHeader *) &th[1]; | ||
691 | sm = (struct GNUNET_DV_SendMessage *) &th[1]; | ||
692 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); | ||
693 | sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + | ||
694 | ntohs (msg->size)); | ||
695 | if (0 == sh->uid_gen) | ||
696 | sh->uid_gen = 1; | ||
697 | th->uid = sh->uid_gen; | ||
698 | sm->uid = htonl (sh->uid_gen++); | ||
699 | /* use memcpy here as 'target' may not be sufficiently aligned */ | ||
700 | memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); | ||
701 | memcpy (&sm[1], msg, ntohs (msg->size)); | ||
702 | GNUNET_CONTAINER_DLL_insert_tail (sh->th_head, | ||
703 | sh->th_tail, | ||
704 | th); | ||
705 | start_transmit (sh); | ||
706 | return th; | ||
707 | } | ||
708 | |||
709 | |||
710 | /** | ||
711 | * Abort send operation (naturally, the message may have | ||
712 | * already been transmitted; this only stops the 'cb' | ||
713 | * from being called again). | ||
714 | * | ||
715 | * @param th send operation to cancel | ||
716 | */ | ||
717 | void | ||
718 | GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th) | ||
719 | { | ||
720 | struct GNUNET_DV_ServiceHandle *sh = th->sh; | ||
721 | |||
722 | if (NULL == th->msg) | ||
723 | GNUNET_CONTAINER_DLL_remove (th->target->head, | ||
724 | th->target->tail, | ||
725 | th); | ||
726 | else | ||
727 | GNUNET_CONTAINER_DLL_remove (sh->th_head, | ||
728 | sh->th_tail, | ||
729 | th); | ||
730 | GNUNET_free (th); | ||
731 | } | 464 | } |
732 | 465 | ||
733 | 466 | ||
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index af6ddb3d9..d103612a8 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c | |||
@@ -149,11 +149,6 @@ struct PendingMessage | |||
149 | */ | 149 | */ |
150 | struct GNUNET_PeerIdentity next_target; | 150 | struct GNUNET_PeerIdentity next_target; |
151 | 151 | ||
152 | /** | ||
153 | * Unique ID of the message. | ||
154 | */ | ||
155 | uint32_t uid; | ||
156 | |||
157 | }; | 152 | }; |
158 | 153 | ||
159 | 154 | ||
@@ -480,33 +475,6 @@ send_control_to_plugin (const struct GNUNET_MessageHeader *message) | |||
480 | 475 | ||
481 | 476 | ||
482 | /** | 477 | /** |
483 | * Give an (N)ACK message to the plugin, we transmitted a message for it. | ||
484 | * | ||
485 | * @param target peer that received the message | ||
486 | * @param uid plugin-chosen UID for the message | ||
487 | * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK | ||
488 | */ | ||
489 | static void | ||
490 | send_ack_to_plugin (const struct GNUNET_PeerIdentity *target, | ||
491 | uint32_t uid, | ||
492 | int nack) | ||
493 | { | ||
494 | struct GNUNET_DV_AckMessage ack_msg; | ||
495 | |||
496 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
497 | "Delivering ACK for message to peer `%s'\n", | ||
498 | GNUNET_i2s (target)); | ||
499 | ack_msg.header.size = htons (sizeof (ack_msg)); | ||
500 | ack_msg.header.type = htons ((GNUNET_YES == nack) | ||
501 | ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK | ||
502 | : GNUNET_MESSAGE_TYPE_DV_SEND_ACK); | ||
503 | ack_msg.uid = htonl (uid); | ||
504 | ack_msg.target = *target; | ||
505 | send_control_to_plugin (&ack_msg.header); | ||
506 | } | ||
507 | |||
508 | |||
509 | /** | ||
510 | * Send a DISTANCE_CHANGED message to the plugin. | 478 | * Send a DISTANCE_CHANGED message to the plugin. |
511 | * | 479 | * |
512 | * @param peer peer with a changed distance | 480 | * @param peer peer with a changed distance |
@@ -613,16 +581,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
613 | dn->pm_tail, | 581 | dn->pm_tail, |
614 | pending); | 582 | pending); |
615 | memcpy (&cbuf[off], pending->msg, msize); | 583 | memcpy (&cbuf[off], pending->msg, msize); |
616 | if (0 != pending->uid) | ||
617 | { | ||
618 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
619 | "Acking transmission of %u bytes to %s with plugin\n", | ||
620 | (unsigned int) msize, | ||
621 | GNUNET_i2s (&pending->next_target)); | ||
622 | send_ack_to_plugin (&pending->next_target, | ||
623 | pending->uid, | ||
624 | GNUNET_NO); | ||
625 | } | ||
626 | GNUNET_free (pending); | 584 | GNUNET_free (pending); |
627 | off += msize; | 585 | off += msize; |
628 | } | 586 | } |
@@ -649,7 +607,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
649 | * | 607 | * |
650 | * @param target where to send the message | 608 | * @param target where to send the message |
651 | * @param distance distance to the @a sender | 609 | * @param distance distance to the @a sender |
652 | * @param uid unique ID for the message | ||
653 | * @param sender original sender of the message | 610 | * @param sender original sender of the message |
654 | * @param actual_target ultimate recipient for the message | 611 | * @param actual_target ultimate recipient for the message |
655 | * @param payload payload of the message | 612 | * @param payload payload of the message |
@@ -657,7 +614,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
657 | static void | 614 | static void |
658 | forward_payload (struct DirectNeighbor *target, | 615 | forward_payload (struct DirectNeighbor *target, |
659 | uint32_t distance, | 616 | uint32_t distance, |
660 | uint32_t uid, | ||
661 | const struct GNUNET_PeerIdentity *sender, | 617 | const struct GNUNET_PeerIdentity *sender, |
662 | const struct GNUNET_PeerIdentity *actual_target, | 618 | const struct GNUNET_PeerIdentity *actual_target, |
663 | const struct GNUNET_MessageHeader *payload) | 619 | const struct GNUNET_MessageHeader *payload) |
@@ -667,7 +623,6 @@ forward_payload (struct DirectNeighbor *target, | |||
667 | size_t msize; | 623 | size_t msize; |
668 | 624 | ||
669 | if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && | 625 | if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && |
670 | (0 == uid) && | ||
671 | (0 != memcmp (sender, | 626 | (0 != memcmp (sender, |
672 | &my_identity, | 627 | &my_identity, |
673 | sizeof (struct GNUNET_PeerIdentity))) ) | 628 | sizeof (struct GNUNET_PeerIdentity))) ) |
@@ -686,7 +641,6 @@ forward_payload (struct DirectNeighbor *target, | |||
686 | } | 641 | } |
687 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 642 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); |
688 | pm->next_target = target->peer; | 643 | pm->next_target = target->peer; |
689 | pm->uid = uid; | ||
690 | pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; | 644 | pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; |
691 | rm = (struct RouteMessage *) &pm[1]; | 645 | rm = (struct RouteMessage *) &pm[1]; |
692 | rm->header.size = htons ((uint16_t) msize); | 646 | rm->header.size = htons ((uint16_t) msize); |
@@ -1888,7 +1842,6 @@ handle_dv_route_message (void *cls, | |||
1888 | GNUNET_i2s (&neighbor->peer)); | 1842 | GNUNET_i2s (&neighbor->peer)); |
1889 | forward_payload (neighbor, | 1843 | forward_payload (neighbor, |
1890 | distance + 1, | 1844 | distance + 1, |
1891 | 0, | ||
1892 | &rm->sender, | 1845 | &rm->sender, |
1893 | &rm->target, | 1846 | &rm->target, |
1894 | payload); | 1847 | payload); |
@@ -1920,7 +1873,6 @@ handle_dv_send_message (void *cls, | |||
1920 | return; | 1873 | return; |
1921 | } | 1874 | } |
1922 | msg = (const struct GNUNET_DV_SendMessage *) message; | 1875 | msg = (const struct GNUNET_DV_SendMessage *) message; |
1923 | GNUNET_break (0 != ntohl (msg->uid)); | ||
1924 | payload = (const struct GNUNET_MessageHeader *) &msg[1]; | 1876 | payload = (const struct GNUNET_MessageHeader *) &msg[1]; |
1925 | if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) | 1877 | if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) |
1926 | { | 1878 | { |
@@ -1940,7 +1892,6 @@ handle_dv_send_message (void *cls, | |||
1940 | GNUNET_STATISTICS_update (stats, | 1892 | GNUNET_STATISTICS_update (stats, |
1941 | "# local messages discarded (no route)", | 1893 | "# local messages discarded (no route)", |
1942 | 1, GNUNET_NO); | 1894 | 1, GNUNET_NO); |
1943 | send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES); | ||
1944 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1895 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1945 | return; | 1896 | return; |
1946 | } | 1897 | } |
@@ -1952,7 +1903,6 @@ handle_dv_send_message (void *cls, | |||
1952 | 1903 | ||
1953 | forward_payload (route->next_hop, | 1904 | forward_payload (route->next_hop, |
1954 | 0 /* first hop, distance is zero */, | 1905 | 0 /* first hop, distance is zero */, |
1955 | htonl (msg->uid), | ||
1956 | &my_identity, | 1906 | &my_identity, |
1957 | &msg->target, | 1907 | &msg->target, |
1958 | payload); | 1908 | payload); |
diff --git a/src/dv/plugin_transport_dv.c b/src/dv/plugin_transport_dv.c index 7293d9fb8..0c72cea3f 100644 --- a/src/dv/plugin_transport_dv.c +++ b/src/dv/plugin_transport_dv.c | |||
@@ -45,51 +45,6 @@ struct Plugin; | |||
45 | 45 | ||
46 | 46 | ||
47 | /** | 47 | /** |
48 | * An active request for transmission via DV. | ||
49 | */ | ||
50 | struct PendingRequest | ||
51 | { | ||
52 | |||
53 | /** | ||
54 | * This is a DLL. | ||
55 | */ | ||
56 | struct PendingRequest *next; | ||
57 | |||
58 | /** | ||
59 | * This is a DLL. | ||
60 | */ | ||
61 | struct PendingRequest *prev; | ||
62 | |||
63 | /** | ||
64 | * Continuation function to call once the transmission buffer | ||
65 | * has again space available. NULL if there is no | ||
66 | * continuation to call. | ||
67 | */ | ||
68 | GNUNET_TRANSPORT_TransmitContinuation transmit_cont; | ||
69 | |||
70 | /** | ||
71 | * Closure for @e transmit_cont. | ||
72 | */ | ||
73 | void *transmit_cont_cls; | ||
74 | |||
75 | /** | ||
76 | * Transmission handle from DV client library. | ||
77 | */ | ||
78 | struct GNUNET_DV_TransmitHandle *th; | ||
79 | |||
80 | /** | ||
81 | * Session of this request. | ||
82 | */ | ||
83 | struct GNUNET_ATS_Session *session; | ||
84 | |||
85 | /** | ||
86 | * Number of bytes to transmit. | ||
87 | */ | ||
88 | size_t size; | ||
89 | }; | ||
90 | |||
91 | |||
92 | /** | ||
93 | * Session handle for connections. | 48 | * Session handle for connections. |
94 | */ | 49 | */ |
95 | struct GNUNET_ATS_Session | 50 | struct GNUNET_ATS_Session |
@@ -100,16 +55,6 @@ struct GNUNET_ATS_Session | |||
100 | struct Plugin *plugin; | 55 | struct Plugin *plugin; |
101 | 56 | ||
102 | /** | 57 | /** |
103 | * Head of pending requests. | ||
104 | */ | ||
105 | struct PendingRequest *pr_head; | ||
106 | |||
107 | /** | ||
108 | * Tail of pending requests. | ||
109 | */ | ||
110 | struct PendingRequest *pr_tail; | ||
111 | |||
112 | /** | ||
113 | * Address we use for the other peer. | 58 | * Address we use for the other peer. |
114 | */ | 59 | */ |
115 | struct GNUNET_HELLO_Address *address; | 60 | struct GNUNET_HELLO_Address *address; |
@@ -449,7 +394,6 @@ static void | |||
449 | free_session (struct GNUNET_ATS_Session *session) | 394 | free_session (struct GNUNET_ATS_Session *session) |
450 | { | 395 | { |
451 | struct Plugin *plugin = session->plugin; | 396 | struct Plugin *plugin = session->plugin; |
452 | struct PendingRequest *pr; | ||
453 | 397 | ||
454 | GNUNET_assert (GNUNET_YES == | 398 | GNUNET_assert (GNUNET_YES == |
455 | GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, | 399 | GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, |
@@ -470,20 +414,6 @@ free_session (struct GNUNET_ATS_Session *session) | |||
470 | session); | 414 | session); |
471 | session->active = GNUNET_NO; | 415 | session->active = GNUNET_NO; |
472 | } | 416 | } |
473 | while (NULL != (pr = session->pr_head)) | ||
474 | { | ||
475 | GNUNET_CONTAINER_DLL_remove (session->pr_head, | ||
476 | session->pr_tail, | ||
477 | pr); | ||
478 | GNUNET_DV_send_cancel (pr->th); | ||
479 | pr->th = NULL; | ||
480 | if (NULL != pr->transmit_cont) | ||
481 | pr->transmit_cont (pr->transmit_cont_cls, | ||
482 | &session->sender, | ||
483 | GNUNET_SYSERR, | ||
484 | pr->size, 0); | ||
485 | GNUNET_free (pr); | ||
486 | } | ||
487 | GNUNET_HELLO_address_free (session->address); | 417 | GNUNET_HELLO_address_free (session->address); |
488 | GNUNET_free (session); | 418 | GNUNET_free (session); |
489 | } | 419 | } |
@@ -515,31 +445,6 @@ handle_dv_disconnect (void *cls, | |||
515 | 445 | ||
516 | 446 | ||
517 | /** | 447 | /** |
518 | * Function called once the delivery of a message has been successful. | ||
519 | * Clean up the pending request, and call continuations. | ||
520 | * | ||
521 | * @param cls closure | ||
522 | */ | ||
523 | static void | ||
524 | send_finished (void *cls) | ||
525 | { | ||
526 | struct PendingRequest *pr = cls; | ||
527 | struct GNUNET_ATS_Session *session = pr->session; | ||
528 | |||
529 | pr->th = NULL; | ||
530 | GNUNET_CONTAINER_DLL_remove (session->pr_head, | ||
531 | session->pr_tail, | ||
532 | pr); | ||
533 | if (NULL != pr->transmit_cont) | ||
534 | pr->transmit_cont (pr->transmit_cont_cls, | ||
535 | &session->sender, | ||
536 | GNUNET_OK, | ||
537 | pr->size, 0); | ||
538 | GNUNET_free (pr); | ||
539 | } | ||
540 | |||
541 | |||
542 | /** | ||
543 | * Function that can be used by the transport service to transmit | 448 | * Function that can be used by the transport service to transmit |
544 | * a message using the plugin. | 449 | * a message using the plugin. |
545 | * | 450 | * |
@@ -565,10 +470,10 @@ dv_plugin_send (void *cls, | |||
565 | size_t msgbuf_size, | 470 | size_t msgbuf_size, |
566 | unsigned int priority, | 471 | unsigned int priority, |
567 | struct GNUNET_TIME_Relative timeout, | 472 | struct GNUNET_TIME_Relative timeout, |
568 | GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) | 473 | GNUNET_TRANSPORT_TransmitContinuation cont, |
474 | void *cont_cls) | ||
569 | { | 475 | { |
570 | struct Plugin *plugin = cls; | 476 | struct Plugin *plugin = cls; |
571 | struct PendingRequest *pr; | ||
572 | const struct GNUNET_MessageHeader *msg; | 477 | const struct GNUNET_MessageHeader *msg; |
573 | struct GNUNET_MessageHeader *box; | 478 | struct GNUNET_MessageHeader *box; |
574 | 479 | ||
@@ -585,20 +490,13 @@ dv_plugin_send (void *cls, | |||
585 | memcpy (&box[1], msgbuf, msgbuf_size); | 490 | memcpy (&box[1], msgbuf, msgbuf_size); |
586 | msg = box; | 491 | msg = box; |
587 | } | 492 | } |
588 | pr = GNUNET_new (struct PendingRequest); | 493 | GNUNET_DV_send (plugin->dvh, |
589 | pr->transmit_cont = cont; | 494 | &session->sender, |
590 | pr->transmit_cont_cls = cont_cls; | 495 | msg); |
591 | pr->session = session; | 496 | cont (cont_cls, |
592 | pr->size = msgbuf_size; | 497 | &session->sender, |
593 | GNUNET_CONTAINER_DLL_insert_tail (session->pr_head, | 498 | GNUNET_OK, |
594 | session->pr_tail, | 499 | msgbuf_size, 0); |
595 | pr); | ||
596 | |||
597 | pr->th = GNUNET_DV_send (plugin->dvh, | ||
598 | &session->sender, | ||
599 | msg, | ||
600 | &send_finished, | ||
601 | pr); | ||
602 | GNUNET_free_non_null (box); | 500 | GNUNET_free_non_null (box); |
603 | return 0; /* DV */ | 501 | return 0; /* DV */ |
604 | } | 502 | } |
@@ -618,26 +516,11 @@ dv_plugin_disconnect_peer (void *cls, | |||
618 | { | 516 | { |
619 | struct Plugin *plugin = cls; | 517 | struct Plugin *plugin = cls; |
620 | struct GNUNET_ATS_Session *session; | 518 | struct GNUNET_ATS_Session *session; |
621 | struct PendingRequest *pr; | ||
622 | 519 | ||
623 | session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions, | 520 | session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions, |
624 | target); | 521 | target); |
625 | if (NULL == session) | 522 | if (NULL == session) |
626 | return; /* nothing to do */ | 523 | return; /* nothing to do */ |
627 | while (NULL != (pr = session->pr_head)) | ||
628 | { | ||
629 | GNUNET_CONTAINER_DLL_remove (session->pr_head, | ||
630 | session->pr_tail, | ||
631 | pr); | ||
632 | GNUNET_DV_send_cancel (pr->th); | ||
633 | pr->th = NULL; | ||
634 | if (NULL != pr->transmit_cont) | ||
635 | pr->transmit_cont (pr->transmit_cont_cls, | ||
636 | &session->sender, | ||
637 | GNUNET_SYSERR, | ||
638 | pr->size, 0); | ||
639 | GNUNET_free (pr); | ||
640 | } | ||
641 | session->active = GNUNET_NO; | 524 | session->active = GNUNET_NO; |
642 | } | 525 | } |
643 | 526 | ||
@@ -655,22 +538,6 @@ static int | |||
655 | dv_plugin_disconnect_session (void *cls, | 538 | dv_plugin_disconnect_session (void *cls, |
656 | struct GNUNET_ATS_Session *session) | 539 | struct GNUNET_ATS_Session *session) |
657 | { | 540 | { |
658 | struct PendingRequest *pr; | ||
659 | |||
660 | while (NULL != (pr = session->pr_head)) | ||
661 | { | ||
662 | GNUNET_CONTAINER_DLL_remove (session->pr_head, | ||
663 | session->pr_tail, | ||
664 | pr); | ||
665 | GNUNET_DV_send_cancel (pr->th); | ||
666 | pr->th = NULL; | ||
667 | if (NULL != pr->transmit_cont) | ||
668 | pr->transmit_cont (pr->transmit_cont_cls, | ||
669 | &session->sender, | ||
670 | GNUNET_SYSERR, | ||
671 | pr->size, 0); | ||
672 | GNUNET_free (pr); | ||
673 | } | ||
674 | session->active = GNUNET_NO; | 541 | session->active = GNUNET_NO; |
675 | return GNUNET_OK; | 542 | return GNUNET_OK; |
676 | } | 543 | } |
@@ -691,9 +558,11 @@ dv_plugin_disconnect_session (void *cls, | |||
691 | * @param asc_cls closure for @a asc | 558 | * @param asc_cls closure for @a asc |
692 | */ | 559 | */ |
693 | static void | 560 | static void |
694 | dv_plugin_address_pretty_printer (void *cls, const char *type, | 561 | dv_plugin_address_pretty_printer (void *cls, |
562 | const char *type, | ||
695 | const void *addr, | 563 | const void *addr, |
696 | size_t addrlen, int numeric, | 564 | size_t addrlen, |
565 | int numeric, | ||
697 | struct GNUNET_TIME_Relative timeout, | 566 | struct GNUNET_TIME_Relative timeout, |
698 | GNUNET_TRANSPORT_AddressStringCallback asc, | 567 | GNUNET_TRANSPORT_AddressStringCallback asc, |
699 | void *asc_cls) | 568 | void *asc_cls) |