aboutsummaryrefslogtreecommitdiff
path: root/src/dv
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-05 14:17:50 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-05 14:17:50 +0000
commit439fed4dda3de557a37de85c08be9ac29746e6b7 (patch)
tree0a7d0941cd3991fea4aeba26fe9e66d2f456fc8d /src/dv
parent1f3219c222bb206e02a793e6c7d48ccc3045d604 (diff)
downloadgnunet-439fed4dda3de557a37de85c08be9ac29746e6b7.tar.gz
gnunet-439fed4dda3de557a37de85c08be9ac29746e6b7.zip
-reworking DV to use new MQ API
Diffstat (limited to 'src/dv')
-rw-r--r--src/dv/dv.h29
-rw-r--r--src/dv/dv_api.c651
-rw-r--r--src/dv/gnunet-service-dv.c50
-rw-r--r--src/dv/plugin_transport_dv.c157
4 files changed, 207 insertions, 680 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h
index e86fdfd15..844cfb5e1 100644
--- a/src/dv/dv.h
+++ b/src/dv/dv.h
@@ -126,34 +126,9 @@ struct GNUNET_DV_SendMessage
126 struct GNUNET_MessageHeader header; 126 struct GNUNET_MessageHeader header;
127 127
128 /** 128 /**
129 * Unique ID for this message, for confirm callback, must never be zero. 129 * Reserved for alignment. 0.
130 */ 130 */
131 uint32_t uid GNUNET_PACKED; 131 uint32_t reserved GNUNET_PACKED;
132
133 /**
134 * The (actual) target of the message
135 */
136 struct GNUNET_PeerIdentity target;
137
138};
139
140
141/**
142 * Message from service to DV plugin, saying that a
143 * SEND request was handled.
144 */
145struct GNUNET_DV_AckMessage
146{
147 /**
148 * Type: #GNUNET_MESSAGE_TYPE_DV_SEND_ACK or
149 * #GNUNET_MESSAGE_TYPE_DV_SEND_NACK.
150 */
151 struct GNUNET_MessageHeader header;
152
153 /**
154 * Which message is being acknowledged?
155 */
156 uint32_t uid GNUNET_PACKED;
157 132
158 /** 133 /**
159 * The (actual) target of the message 134 * The (actual) target of the message
diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c
index d74453376..76a6ea484 100644
--- a/src/dv/dv_api.c
+++ b/src/dv/dv_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009--2013 GNUnet e.V. 3 Copyright (C) 2009--2013, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -37,60 +37,6 @@
37/** 37/**
38 * Information we track for each peer. 38 * Information we track for each peer.
39 */ 39 */
40struct ConnectedPeer;
41
42
43/**
44 * Handle for a send operation.
45 */
46struct GNUNET_DV_TransmitHandle
47{
48 /**
49 * Kept in a DLL.
50 */
51 struct GNUNET_DV_TransmitHandle *next;
52
53 /**
54 * Kept in a DLL.
55 */
56 struct GNUNET_DV_TransmitHandle *prev;
57
58 /**
59 * Handle to the service.
60 */
61 struct GNUNET_DV_ServiceHandle *sh;
62
63 /**
64 * Function to call upon completion.
65 */
66 GNUNET_DV_MessageSentCallback cb;
67
68 /**
69 * Closure for @a cb.
70 */
71 void *cb_cls;
72
73 /**
74 * The actual message (allocated at the end of this struct).
75 */
76 const struct GNUNET_MessageHeader *msg;
77
78 /**
79 * Destination for the message.
80 */
81 struct ConnectedPeer *target;
82
83 /**
84 * UID of our message, if any.
85 */
86 uint32_t uid;
87
88};
89
90
91/**
92 * Information we track for each peer.
93 */
94struct ConnectedPeer 40struct ConnectedPeer
95{ 41{
96 42
@@ -99,22 +45,6 @@ struct ConnectedPeer
99 */ 45 */
100 struct GNUNET_PeerIdentity pid; 46 struct GNUNET_PeerIdentity pid;
101 47
102 /**
103 * Head of DLL of transmission handles where we need
104 * to invoke a continuation when we are informed about
105 * successful transmission. The respective request
106 * has already been sent to the DV service.
107 */
108 struct GNUNET_DV_TransmitHandle *head;
109
110 /**
111 * Tail of DLL of transmission handles where we need
112 * to invoke a continuation when we are informed about
113 * successful transmission. The respective request
114 * has already been sent to the DV service.
115 */
116 struct GNUNET_DV_TransmitHandle *tail;
117
118}; 48};
119 49
120 50
@@ -127,12 +57,7 @@ struct GNUNET_DV_ServiceHandle
127 /** 57 /**
128 * Connection to DV service. 58 * Connection to DV service.
129 */ 59 */
130 struct GNUNET_CLIENT_Connection *client; 60 struct GNUNET_MQ_Handle *mq;
131
132 /**
133 * Active request for transmission to DV service.
134 */
135 struct GNUNET_CLIENT_TransmitHandle *th;
136 61
137 /** 62 /**
138 * Our configuration. 63 * Our configuration.
@@ -165,26 +90,11 @@ struct GNUNET_DV_ServiceHandle
165 GNUNET_DV_MessageReceivedCallback message_cb; 90 GNUNET_DV_MessageReceivedCallback message_cb;
166 91
167 /** 92 /**
168 * Head of messages to transmit.
169 */
170 struct GNUNET_DV_TransmitHandle *th_head;
171
172 /**
173 * Tail of messages to transmit.
174 */
175 struct GNUNET_DV_TransmitHandle *th_tail;
176
177 /**
178 * Information tracked per connected peer. Maps peer 93 * Information tracked per connected peer. Maps peer
179 * identities to `struct ConnectedPeer` entries. 94 * identities to `struct ConnectedPeer` entries.
180 */ 95 */
181 struct GNUNET_CONTAINER_MultiPeerMap *peers; 96 struct GNUNET_CONTAINER_MultiPeerMap *peers;
182 97
183 /**
184 * Current unique ID
185 */
186 uint32_t uid_gen;
187
188}; 98};
189 99
190 100
@@ -198,120 +108,160 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh);
198 108
199 109
200/** 110/**
201 * Start sending messages from our queue to the service. 111 * We got disconnected from the service and thus all of the
112 * connections need to be torn down.
202 * 113 *
203 * @param sh service handle 114 * @param cls the `struct GNUNET_DV_ServiceHandle`
115 * @param key a peer identity
116 * @param value a `struct ConnectedPeer` to clean up
117 * @return #GNUNET_OK (continue to iterate)
204 */ 118 */
205static void 119static int
206start_transmit (struct GNUNET_DV_ServiceHandle *sh); 120cleanup_send_cb (void *cls,
121 const struct GNUNET_PeerIdentity *key,
122 void *value)
123{
124 struct GNUNET_DV_ServiceHandle *sh = cls;
125 struct ConnectedPeer *peer = value;
126
127 GNUNET_assert (GNUNET_YES ==
128 GNUNET_CONTAINER_multipeermap_remove (sh->peers,
129 key,
130 peer));
131 sh->disconnect_cb (sh->cls,
132 key);
133 GNUNET_free (peer);
134 return GNUNET_OK;
135}
207 136
208 137
209/** 138/**
210 * Gives a message from our queue to the DV service. 139 * Handles a message sent from the DV service to us.
140 * Parse it out and give it to the plugin.
211 * 141 *
212 * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`) 142 * @param cls the handle to the DV API
213 * @param size how many bytes can we send 143 * @param cm the message that was received
214 * @param buf where to copy the message to send
215 * @return how many bytes we copied to @a buf
216 */ 144 */
217static size_t 145static void
218transmit_pending (void *cls, size_t size, void *buf) 146handle_connect (void *cls,
147 const struct GNUNET_DV_ConnectMessage *cm)
219{ 148{
220 struct GNUNET_DV_ServiceHandle *sh = cls; 149 struct GNUNET_DV_ServiceHandle *sh = cls;
221 char *cbuf = buf; 150 struct ConnectedPeer *peer;
222 struct GNUNET_DV_TransmitHandle *th;
223 size_t ret;
224 size_t tsize;
225 151
226 sh->th = NULL; 152 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
227 if (NULL == buf) 153 &cm->peer);
154 if (NULL != peer)
228 { 155 {
156 GNUNET_break (0);
229 reconnect (sh); 157 reconnect (sh);
230 return 0; 158 return;
231 } 159 }
232 ret = 0; 160 peer = GNUNET_new (struct ConnectedPeer);
233 while ( (NULL != (th = sh->th_head)) && 161 peer->pid = cm->peer;
234 (size - ret >= (tsize = ntohs (th->msg->size)) )) 162 GNUNET_assert (GNUNET_OK ==
163 GNUNET_CONTAINER_multipeermap_put (sh->peers,
164 &peer->pid,
165 peer,
166 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
167 sh->connect_cb (sh->cls,
168 &cm->peer,
169 ntohl (cm->distance),
170 (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
171}
172
173
174/**
175 * Handles a message sent from the DV service to us.
176 * Parse it out and give it to the plugin.
177 *
178 * @param cls the handle to the DV API
179 * @param dm the message that was received
180 */
181static void
182handle_disconnect (void *cls,
183 const struct GNUNET_DV_DisconnectMessage *dm)
184{
185 struct GNUNET_DV_ServiceHandle *sh = cls;
186 struct ConnectedPeer *peer;
187
188 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
189 &dm->peer);
190 if (NULL == peer)
235 { 191 {
236 GNUNET_CONTAINER_DLL_remove (sh->th_head, 192 GNUNET_break (0);
237 sh->th_tail, 193 reconnect (sh);
238 th); 194 return;
239 memcpy (&cbuf[ret], th->msg, tsize);
240 LOG (GNUNET_ERROR_TYPE_DEBUG,
241 "Passing %u bytes of type %u to DV service\n",
242 tsize,
243 ntohs (th->msg->type));
244 th->msg = NULL;
245 ret += tsize;
246 if (NULL != th->cb)
247 {
248 GNUNET_CONTAINER_DLL_insert_tail (th->target->head,
249 th->target->tail,
250 th);
251 }
252 else
253 {
254 GNUNET_free (th);
255 }
256 } 195 }
257 if (NULL != sh->th_head) 196 cleanup_send_cb (sh,
258 start_transmit (sh); 197 &dm->peer,
259 return ret; 198 peer);
260} 199}
261 200
262 201
263/** 202/**
264 * Start sending messages from our queue to the service. 203 * Handles a message sent from the DV service to us.
204 * Parse it out and give it to the plugin.
265 * 205 *
266 * @param sh service handle 206 * @param cls the handle to the DV API
207 * @param msg the message that was received
267 */ 208 */
268static void 209static void
269start_transmit (struct GNUNET_DV_ServiceHandle *sh) 210handle_distance_update (void *cls,
211 const struct GNUNET_DV_DistanceUpdateMessage *dum)
270{ 212{
271 if (NULL != sh->th) 213 struct GNUNET_DV_ServiceHandle *sh = cls;
272 return; 214 struct ConnectedPeer *peer;
273 if (NULL == sh->th_head) 215
216 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
217 &dum->peer);
218 if (NULL == peer)
219 {
220 GNUNET_break (0);
221 reconnect (sh);
274 return; 222 return;
275 sh->th = 223 }
276 GNUNET_CLIENT_notify_transmit_ready (sh->client, 224 sh->distance_cb (sh->cls,
277 ntohs (sh->th_head->msg->size), 225 &dum->peer,
278 GNUNET_TIME_UNIT_FOREVER_REL, 226 ntohl (dum->distance),
279 GNUNET_NO, 227 (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
280 &transmit_pending, sh);
281} 228}
282 229
283 230
284/** 231/**
285 * We got disconnected from the service and thus all of the 232 * Handles a message sent from the DV service to us.
286 * pending send callbacks will never be confirmed. Clean up. 233 * Parse it out and give it to the plugin.
287 * 234 *
288 * @param cls the 'struct GNUNET_DV_ServiceHandle' 235 * @param cls the handle to the DV API
289 * @param key a peer identity 236 * @param rm the message that was received
290 * @param value a `struct ConnectedPeer` to clean up
291 * @return #GNUNET_OK (continue to iterate)
292 */ 237 */
293static int 238static int
294cleanup_send_cb (void *cls, 239check_received (void *cls,
295 const struct GNUNET_PeerIdentity *key, 240 const struct GNUNET_DV_ReceivedMessage *rm)
296 void *value)
297{ 241{
298 struct GNUNET_DV_ServiceHandle *sh = cls; 242 struct GNUNET_DV_ServiceHandle *sh = cls;
299 struct ConnectedPeer *peer = value; 243 const struct GNUNET_MessageHeader *payload;
300 struct GNUNET_DV_TransmitHandle *th;
301 244
302 GNUNET_assert (GNUNET_YES == 245 if (NULL ==
303 GNUNET_CONTAINER_multipeermap_remove (sh->peers, 246 GNUNET_CONTAINER_multipeermap_get (sh->peers,
304 key, 247 &rm->sender))
305 peer));
306 sh->disconnect_cb (sh->cls,
307 key);
308 while (NULL != (th = peer->head))
309 { 248 {
310 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th); 249 GNUNET_break (0);
311 th->cb (th->cb_cls); 250 return GNUNET_SYSERR;
312 GNUNET_free (th); 251 }
252 if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) <
253 sizeof (*payload))
254 {
255 GNUNET_break (0);
256 return GNUNET_SYSERR;
257 }
258 payload = (const struct GNUNET_MessageHeader *) &rm[1];
259 if (ntohs (rm->header.size) !=
260 sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
261 {
262 GNUNET_break (0);
263 return GNUNET_SYSERR;
313 } 264 }
314 GNUNET_free (peer);
315 return GNUNET_OK; 265 return GNUNET_OK;
316} 266}
317 267
@@ -321,211 +271,38 @@ cleanup_send_cb (void *cls,
321 * Parse it out and give it to the plugin. 271 * Parse it out and give it to the plugin.
322 * 272 *
323 * @param cls the handle to the DV API 273 * @param cls the handle to the DV API
324 * @param msg the message that was received 274 * @param rm the message that was received
325 */ 275 */
326static void 276static void
327handle_message_receipt (void *cls, 277handle_received (void *cls,
328 const struct GNUNET_MessageHeader *msg) 278 const struct GNUNET_DV_ReceivedMessage *rm)
329{ 279{
330 struct GNUNET_DV_ServiceHandle *sh = cls; 280 struct GNUNET_DV_ServiceHandle *sh = cls;
331 const struct GNUNET_DV_ConnectMessage *cm;
332 const struct GNUNET_DV_DistanceUpdateMessage *dum;
333 const struct GNUNET_DV_DisconnectMessage *dm;
334 const struct GNUNET_DV_ReceivedMessage *rm;
335 const struct GNUNET_MessageHeader *payload; 281 const struct GNUNET_MessageHeader *payload;
336 const struct GNUNET_DV_AckMessage *ack;
337 struct GNUNET_DV_TransmitHandle *th;
338 struct GNUNET_DV_TransmitHandle *tn;
339 struct ConnectedPeer *peer;
340 282
341 if (NULL == msg) 283 payload = (const struct GNUNET_MessageHeader *) &rm[1];
342 { 284 sh->message_cb (sh->cls,
343 /* Connection closed */ 285 &rm->sender,
344 reconnect (sh); 286 ntohl (rm->distance),
345 return; 287 payload);
346 }
347 LOG (GNUNET_ERROR_TYPE_DEBUG,
348 "Received message of type %u with %u bytes from DV service\n",
349 (unsigned int) ntohs (msg->type),
350 (unsigned int) ntohs (msg->size));
351 switch (ntohs (msg->type))
352 {
353 case GNUNET_MESSAGE_TYPE_DV_CONNECT:
354 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
355 {
356 GNUNET_break (0);
357 reconnect (sh);
358 return;
359 }
360 cm = (const struct GNUNET_DV_ConnectMessage *) msg;
361 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
362 &cm->peer);
363 if (NULL != peer)
364 {
365 GNUNET_break (0);
366 reconnect (sh);
367 return;
368 }
369 peer = GNUNET_new (struct ConnectedPeer);
370 peer->pid = cm->peer;
371 GNUNET_assert (GNUNET_OK ==
372 GNUNET_CONTAINER_multipeermap_put (sh->peers,
373 &peer->pid,
374 peer,
375 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
376 sh->connect_cb (sh->cls,
377 &cm->peer,
378 ntohl (cm->distance),
379 (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
380 break;
381 case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
382 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
383 {
384 GNUNET_break (0);
385 reconnect (sh);
386 return;
387 }
388 dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
389 sh->distance_cb (sh->cls,
390 &dum->peer,
391 ntohl (dum->distance),
392 (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
393 break;
394 case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
395 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
396 {
397 GNUNET_break (0);
398 reconnect (sh);
399 return;
400 }
401 dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
402 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
403 &dm->peer);
404 if (NULL == peer)
405 {
406 GNUNET_break (0);
407 reconnect (sh);
408 return;
409 }
410 tn = sh->th_head;
411 while (NULL != (th = tn))
412 {
413 tn = th->next;
414 if (peer == th->target)
415 {
416 GNUNET_CONTAINER_DLL_remove (sh->th_head,
417 sh->th_tail,
418 th);
419 th->cb (th->cb_cls);
420 GNUNET_free (th);
421 }
422 }
423 cleanup_send_cb (sh, &dm->peer, peer);
424 break;
425 case GNUNET_MESSAGE_TYPE_DV_RECV:
426 if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
427 {
428 GNUNET_break (0);
429 reconnect (sh);
430 return;
431 }
432 rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
433 payload = (const struct GNUNET_MessageHeader *) &rm[1];
434 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
435 {
436 GNUNET_break (0);
437 reconnect (sh);
438 return;
439 }
440 if (NULL ==
441 GNUNET_CONTAINER_multipeermap_get (sh->peers,
442 &rm->sender))
443 {
444 GNUNET_break (0);
445 reconnect (sh);
446 return;
447 }
448 sh->message_cb (sh->cls,
449 &rm->sender,
450 ntohl (rm->distance),
451 payload);
452 break;
453 case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
454 case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
455 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
456 {
457 GNUNET_break (0);
458 reconnect (sh);
459 return;
460 }
461 ack = (const struct GNUNET_DV_AckMessage *) msg;
462 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
463 &ack->target);
464 if (NULL == peer)
465 break; /* this happens, just ignore */
466 for (th = peer->head; NULL != th; th = th->next)
467 {
468 if (th->uid != ntohl (ack->uid))
469 continue;
470 LOG (GNUNET_ERROR_TYPE_DEBUG,
471 "Matched ACK for message to peer %s\n",
472 GNUNET_i2s (&ack->target));
473 GNUNET_CONTAINER_DLL_remove (peer->head,
474 peer->tail,
475 th);
476 th->cb (th->cb_cls);
477 GNUNET_free (th);
478 break;
479 }
480 break;
481 default:
482 reconnect (sh);
483 break;
484 }
485 LOG (GNUNET_ERROR_TYPE_DEBUG,
486 "Received message, continuing receive loop for %p\n",
487 sh->client);
488 GNUNET_CLIENT_receive (sh->client,
489 &handle_message_receipt, sh,
490 GNUNET_TIME_UNIT_FOREVER_REL);
491} 288}
492 289
493 290
494/** 291/**
495 * Transmit the start message to the DV service. 292 * Generic error handler, called with the appropriate error code and
293 * the same closure specified at the creation of the message queue.
294 * Not every message queue implementation supports an error handler.
496 * 295 *
497 * @param cls the `struct GNUNET_DV_ServiceHandle *` 296 * @param cls closure with the `struct GNUNET_DV_ServiceHandle *`
498 * @param size number of bytes available in buf 297 * @param error error code
499 * @param buf where to copy the message
500 * @return number of bytes written to buf
501 */ 298 */
502static size_t 299static void
503transmit_start (void *cls, 300mq_error_handler (void *cls,
504 size_t size, 301 enum GNUNET_MQ_Error error)
505 void *buf)
506{ 302{
507 struct GNUNET_DV_ServiceHandle *sh = cls; 303 struct GNUNET_DV_ServiceHandle *sh = cls;
508 struct GNUNET_MessageHeader start_message;
509 304
510 sh->th = NULL; 305 reconnect (sh);
511 if (NULL == buf)
512 {
513 GNUNET_break (0);
514 reconnect (sh);
515 return 0;
516 }
517 GNUNET_assert (size >= sizeof (start_message));
518 start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
519 start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
520 memcpy (buf, &start_message, sizeof (start_message));
521 LOG (GNUNET_ERROR_TYPE_DEBUG,
522 "Transmitting START request, starting receive loop for %p\n",
523 sh->client);
524 GNUNET_CLIENT_receive (sh->client,
525 &handle_message_receipt, sh,
526 GNUNET_TIME_UNIT_FOREVER_REL);
527 start_transmit (sh);
528 return sizeof (start_message);
529} 306}
530 307
531 308
@@ -537,36 +314,52 @@ transmit_start (void *cls,
537static void 314static void
538reconnect (struct GNUNET_DV_ServiceHandle *sh) 315reconnect (struct GNUNET_DV_ServiceHandle *sh)
539{ 316{
540 if (NULL != sh->th) 317 GNUNET_MQ_hd_fixed_size (connect,
541 { 318 GNUNET_MESSAGE_TYPE_DV_CONNECT,
542 GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); 319 struct GNUNET_DV_ConnectMessage);
543 sh->th = NULL; 320 GNUNET_MQ_hd_fixed_size (disconnect,
544 } 321 GNUNET_MESSAGE_TYPE_DV_DISCONNECT,
545 LOG (GNUNET_ERROR_TYPE_DEBUG, 322 struct GNUNET_DV_DisconnectMessage);
546 "Disconnecting from DV service at %p\n", 323 GNUNET_MQ_hd_fixed_size (distance_update,
547 sh->client); 324 GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED,
548 if (NULL != sh->client) 325 struct GNUNET_DV_DistanceUpdateMessage);
326 GNUNET_MQ_hd_var_size (received,
327 GNUNET_MESSAGE_TYPE_DV_RECV,
328 struct GNUNET_DV_ReceivedMessage);
329 struct GNUNET_MQ_MessageHandler handlers[] = {
330 make_connect_handler (sh),
331 make_disconnect_handler (sh),
332 make_distance_update_handler (sh),
333 make_received_handler (sh),
334 GNUNET_MQ_handler_end ()
335 };
336 struct GNUNET_MessageHeader *sm;
337 struct GNUNET_MQ_Envelope *env;
338
339 if (NULL != sh->mq)
549 { 340 {
550 GNUNET_CLIENT_disconnect (sh->client); 341 GNUNET_MQ_destroy (sh->mq);
551 sh->client = NULL; 342 sh->mq = NULL;
552 } 343 }
553 GNUNET_CONTAINER_multipeermap_iterate (sh->peers, 344 GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
554 &cleanup_send_cb, 345 &cleanup_send_cb,
555 sh); 346 sh);
556 LOG (GNUNET_ERROR_TYPE_DEBUG, 347 LOG (GNUNET_ERROR_TYPE_DEBUG,
557 "Connecting to DV service\n"); 348 "Connecting to DV service\n");
558 sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg); 349 sh->mq = GNUNET_CLIENT_connecT (sh->cfg,
559 if (NULL == sh->client) 350 "dv",
351 handlers,
352 &mq_error_handler,
353 sh);
354 if (NULL == sh->mq)
560 { 355 {
561 GNUNET_break (0); 356 GNUNET_break (0);
562 return; 357 return;
563 } 358 }
564 sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client, 359 env = GNUNET_MQ_msg (sm,
565 sizeof (struct GNUNET_MessageHeader), 360 GNUNET_MESSAGE_TYPE_DV_START);
566 GNUNET_TIME_UNIT_FOREVER_REL, 361 GNUNET_MQ_send (sh->mq,
567 GNUNET_YES, 362 env);
568 &transmit_start,
569 sh);
570} 363}
571 364
572 365
@@ -598,7 +391,8 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
598 sh->distance_cb = distance_cb; 391 sh->distance_cb = distance_cb;
599 sh->disconnect_cb = disconnect_cb; 392 sh->disconnect_cb = disconnect_cb;
600 sh->message_cb = message_cb; 393 sh->message_cb = message_cb;
601 sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES); 394 sh->peers = GNUNET_CONTAINER_multipeermap_create (128,
395 GNUNET_YES);
602 reconnect (sh); 396 reconnect (sh);
603 return sh; 397 return sh;
604} 398}
@@ -612,26 +406,12 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
612void 406void
613GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) 407GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
614{ 408{
615 struct GNUNET_DV_TransmitHandle *pos;
616
617 if (NULL == sh) 409 if (NULL == sh)
618 return; 410 return;
619 if (NULL != sh->th) 411 if (NULL != sh->mq)
620 {
621 GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
622 sh->th = NULL;
623 }
624 while (NULL != (pos = sh->th_head))
625 {
626 GNUNET_CONTAINER_DLL_remove (sh->th_head,
627 sh->th_tail,
628 pos);
629 GNUNET_free (pos);
630 }
631 if (NULL != sh->client)
632 { 412 {
633 GNUNET_CLIENT_disconnect (sh->client); 413 GNUNET_MQ_destroy (sh->mq);
634 sh->client = NULL; 414 sh->mq = NULL;
635 } 415 }
636 GNUNET_CONTAINER_multipeermap_iterate (sh->peers, 416 GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
637 &cleanup_send_cb, 417 &cleanup_send_cb,
@@ -647,87 +427,40 @@ GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
647 * @param sh service handle 427 * @param sh service handle
648 * @param target intended recpient 428 * @param target intended recpient
649 * @param msg message payload 429 * @param msg message payload
650 * @param cb function to invoke when done
651 * @param cb_cls closure for @a cb
652 * @return handle to cancel the operation
653 */ 430 */
654struct GNUNET_DV_TransmitHandle * 431void
655GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, 432GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
656 const struct GNUNET_PeerIdentity *target, 433 const struct GNUNET_PeerIdentity *target,
657 const struct GNUNET_MessageHeader *msg, 434 const struct GNUNET_MessageHeader *msg)
658 GNUNET_DV_MessageSentCallback cb,
659 void *cb_cls)
660{ 435{
661 struct GNUNET_DV_TransmitHandle *th;
662 struct GNUNET_DV_SendMessage *sm; 436 struct GNUNET_DV_SendMessage *sm;
663 struct ConnectedPeer *peer; 437 struct ConnectedPeer *peer;
438 struct GNUNET_MQ_Envelope *env;
664 439
665 if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 440 if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
666 { 441 {
667 GNUNET_break (0); 442 GNUNET_break (0);
668 return NULL; 443 return;
669 } 444 }
670 LOG (GNUNET_ERROR_TYPE_DEBUG, 445 LOG (GNUNET_ERROR_TYPE_DEBUG,
671 "Asked to send %u bytes of type %u to %s via %p\n", 446 "Asked to send %u bytes of type %u to %s\n",
672 (unsigned int) ntohs (msg->size), 447 (unsigned int) ntohs (msg->size),
673 (unsigned int) ntohs (msg->type), 448 (unsigned int) ntohs (msg->type),
674 GNUNET_i2s (target), 449 GNUNET_i2s (target));
675 sh->client);
676 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, 450 peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
677 target); 451 target);
678 if (NULL == peer) 452 if (NULL == peer)
679 { 453 {
680 GNUNET_break (0); 454 GNUNET_break (0);
681 return NULL; 455 return;
682 } 456 }
683 th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) + 457 GNUNET_assert (NULL != sh->mq);
684 sizeof (struct GNUNET_DV_SendMessage) + 458 env = GNUNET_MQ_msg_nested_mh (sm,
685 ntohs (msg->size)); 459 GNUNET_MESSAGE_TYPE_DV_SEND,
686 th->sh = sh; 460 msg);
687 th->target = peer; 461 sm->target = *target;
688 th->cb = cb; 462 GNUNET_MQ_send (sh->mq,
689 th->cb_cls = cb_cls; 463 env);
690 th->msg = (const struct GNUNET_MessageHeader *) &th[1];
691 sm = (struct GNUNET_DV_SendMessage *) &th[1];
692 sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
693 sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
694 ntohs (msg->size));
695 if (0 == sh->uid_gen)
696 sh->uid_gen = 1;
697 th->uid = sh->uid_gen;
698 sm->uid = htonl (sh->uid_gen++);
699 /* use memcpy here as 'target' may not be sufficiently aligned */
700 memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
701 memcpy (&sm[1], msg, ntohs (msg->size));
702 GNUNET_CONTAINER_DLL_insert_tail (sh->th_head,
703 sh->th_tail,
704 th);
705 start_transmit (sh);
706 return th;
707}
708
709
710/**
711 * Abort send operation (naturally, the message may have
712 * already been transmitted; this only stops the 'cb'
713 * from being called again).
714 *
715 * @param th send operation to cancel
716 */
717void
718GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
719{
720 struct GNUNET_DV_ServiceHandle *sh = th->sh;
721
722 if (NULL == th->msg)
723 GNUNET_CONTAINER_DLL_remove (th->target->head,
724 th->target->tail,
725 th);
726 else
727 GNUNET_CONTAINER_DLL_remove (sh->th_head,
728 sh->th_tail,
729 th);
730 GNUNET_free (th);
731} 464}
732 465
733 466
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index af6ddb3d9..d103612a8 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -149,11 +149,6 @@ struct PendingMessage
149 */ 149 */
150 struct GNUNET_PeerIdentity next_target; 150 struct GNUNET_PeerIdentity next_target;
151 151
152 /**
153 * Unique ID of the message.
154 */
155 uint32_t uid;
156
157}; 152};
158 153
159 154
@@ -480,33 +475,6 @@ send_control_to_plugin (const struct GNUNET_MessageHeader *message)
480 475
481 476
482/** 477/**
483 * Give an (N)ACK message to the plugin, we transmitted a message for it.
484 *
485 * @param target peer that received the message
486 * @param uid plugin-chosen UID for the message
487 * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK
488 */
489static void
490send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
491 uint32_t uid,
492 int nack)
493{
494 struct GNUNET_DV_AckMessage ack_msg;
495
496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
497 "Delivering ACK for message to peer `%s'\n",
498 GNUNET_i2s (target));
499 ack_msg.header.size = htons (sizeof (ack_msg));
500 ack_msg.header.type = htons ((GNUNET_YES == nack)
501 ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK
502 : GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
503 ack_msg.uid = htonl (uid);
504 ack_msg.target = *target;
505 send_control_to_plugin (&ack_msg.header);
506}
507
508
509/**
510 * Send a DISTANCE_CHANGED message to the plugin. 478 * Send a DISTANCE_CHANGED message to the plugin.
511 * 479 *
512 * @param peer peer with a changed distance 480 * @param peer peer with a changed distance
@@ -613,16 +581,6 @@ core_transmit_notify (void *cls, size_t size, void *buf)
613 dn->pm_tail, 581 dn->pm_tail,
614 pending); 582 pending);
615 memcpy (&cbuf[off], pending->msg, msize); 583 memcpy (&cbuf[off], pending->msg, msize);
616 if (0 != pending->uid)
617 {
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "Acking transmission of %u bytes to %s with plugin\n",
620 (unsigned int) msize,
621 GNUNET_i2s (&pending->next_target));
622 send_ack_to_plugin (&pending->next_target,
623 pending->uid,
624 GNUNET_NO);
625 }
626 GNUNET_free (pending); 584 GNUNET_free (pending);
627 off += msize; 585 off += msize;
628 } 586 }
@@ -649,7 +607,6 @@ core_transmit_notify (void *cls, size_t size, void *buf)
649 * 607 *
650 * @param target where to send the message 608 * @param target where to send the message
651 * @param distance distance to the @a sender 609 * @param distance distance to the @a sender
652 * @param uid unique ID for the message
653 * @param sender original sender of the message 610 * @param sender original sender of the message
654 * @param actual_target ultimate recipient for the message 611 * @param actual_target ultimate recipient for the message
655 * @param payload payload of the message 612 * @param payload payload of the message
@@ -657,7 +614,6 @@ core_transmit_notify (void *cls, size_t size, void *buf)
657static void 614static void
658forward_payload (struct DirectNeighbor *target, 615forward_payload (struct DirectNeighbor *target,
659 uint32_t distance, 616 uint32_t distance,
660 uint32_t uid,
661 const struct GNUNET_PeerIdentity *sender, 617 const struct GNUNET_PeerIdentity *sender,
662 const struct GNUNET_PeerIdentity *actual_target, 618 const struct GNUNET_PeerIdentity *actual_target,
663 const struct GNUNET_MessageHeader *payload) 619 const struct GNUNET_MessageHeader *payload)
@@ -667,7 +623,6 @@ forward_payload (struct DirectNeighbor *target,
667 size_t msize; 623 size_t msize;
668 624
669 if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && 625 if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
670 (0 == uid) &&
671 (0 != memcmp (sender, 626 (0 != memcmp (sender,
672 &my_identity, 627 &my_identity,
673 sizeof (struct GNUNET_PeerIdentity))) ) 628 sizeof (struct GNUNET_PeerIdentity))) )
@@ -686,7 +641,6 @@ forward_payload (struct DirectNeighbor *target,
686 } 641 }
687 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 642 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
688 pm->next_target = target->peer; 643 pm->next_target = target->peer;
689 pm->uid = uid;
690 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; 644 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
691 rm = (struct RouteMessage *) &pm[1]; 645 rm = (struct RouteMessage *) &pm[1];
692 rm->header.size = htons ((uint16_t) msize); 646 rm->header.size = htons ((uint16_t) msize);
@@ -1888,7 +1842,6 @@ handle_dv_route_message (void *cls,
1888 GNUNET_i2s (&neighbor->peer)); 1842 GNUNET_i2s (&neighbor->peer));
1889 forward_payload (neighbor, 1843 forward_payload (neighbor,
1890 distance + 1, 1844 distance + 1,
1891 0,
1892 &rm->sender, 1845 &rm->sender,
1893 &rm->target, 1846 &rm->target,
1894 payload); 1847 payload);
@@ -1920,7 +1873,6 @@ handle_dv_send_message (void *cls,
1920 return; 1873 return;
1921 } 1874 }
1922 msg = (const struct GNUNET_DV_SendMessage *) message; 1875 msg = (const struct GNUNET_DV_SendMessage *) message;
1923 GNUNET_break (0 != ntohl (msg->uid));
1924 payload = (const struct GNUNET_MessageHeader *) &msg[1]; 1876 payload = (const struct GNUNET_MessageHeader *) &msg[1];
1925 if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) 1877 if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
1926 { 1878 {
@@ -1940,7 +1892,6 @@ handle_dv_send_message (void *cls,
1940 GNUNET_STATISTICS_update (stats, 1892 GNUNET_STATISTICS_update (stats,
1941 "# local messages discarded (no route)", 1893 "# local messages discarded (no route)",
1942 1, GNUNET_NO); 1894 1, GNUNET_NO);
1943 send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES);
1944 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1895 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1945 return; 1896 return;
1946 } 1897 }
@@ -1952,7 +1903,6 @@ handle_dv_send_message (void *cls,
1952 1903
1953 forward_payload (route->next_hop, 1904 forward_payload (route->next_hop,
1954 0 /* first hop, distance is zero */, 1905 0 /* first hop, distance is zero */,
1955 htonl (msg->uid),
1956 &my_identity, 1906 &my_identity,
1957 &msg->target, 1907 &msg->target,
1958 payload); 1908 payload);
diff --git a/src/dv/plugin_transport_dv.c b/src/dv/plugin_transport_dv.c
index 7293d9fb8..0c72cea3f 100644
--- a/src/dv/plugin_transport_dv.c
+++ b/src/dv/plugin_transport_dv.c
@@ -45,51 +45,6 @@ struct Plugin;
45 45
46 46
47/** 47/**
48 * An active request for transmission via DV.
49 */
50struct PendingRequest
51{
52
53 /**
54 * This is a DLL.
55 */
56 struct PendingRequest *next;
57
58 /**
59 * This is a DLL.
60 */
61 struct PendingRequest *prev;
62
63 /**
64 * Continuation function to call once the transmission buffer
65 * has again space available. NULL if there is no
66 * continuation to call.
67 */
68 GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
69
70 /**
71 * Closure for @e transmit_cont.
72 */
73 void *transmit_cont_cls;
74
75 /**
76 * Transmission handle from DV client library.
77 */
78 struct GNUNET_DV_TransmitHandle *th;
79
80 /**
81 * Session of this request.
82 */
83 struct GNUNET_ATS_Session *session;
84
85 /**
86 * Number of bytes to transmit.
87 */
88 size_t size;
89};
90
91
92/**
93 * Session handle for connections. 48 * Session handle for connections.
94 */ 49 */
95struct GNUNET_ATS_Session 50struct GNUNET_ATS_Session
@@ -100,16 +55,6 @@ struct GNUNET_ATS_Session
100 struct Plugin *plugin; 55 struct Plugin *plugin;
101 56
102 /** 57 /**
103 * Head of pending requests.
104 */
105 struct PendingRequest *pr_head;
106
107 /**
108 * Tail of pending requests.
109 */
110 struct PendingRequest *pr_tail;
111
112 /**
113 * Address we use for the other peer. 58 * Address we use for the other peer.
114 */ 59 */
115 struct GNUNET_HELLO_Address *address; 60 struct GNUNET_HELLO_Address *address;
@@ -449,7 +394,6 @@ static void
449free_session (struct GNUNET_ATS_Session *session) 394free_session (struct GNUNET_ATS_Session *session)
450{ 395{
451 struct Plugin *plugin = session->plugin; 396 struct Plugin *plugin = session->plugin;
452 struct PendingRequest *pr;
453 397
454 GNUNET_assert (GNUNET_YES == 398 GNUNET_assert (GNUNET_YES ==
455 GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, 399 GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
@@ -470,20 +414,6 @@ free_session (struct GNUNET_ATS_Session *session)
470 session); 414 session);
471 session->active = GNUNET_NO; 415 session->active = GNUNET_NO;
472 } 416 }
473 while (NULL != (pr = session->pr_head))
474 {
475 GNUNET_CONTAINER_DLL_remove (session->pr_head,
476 session->pr_tail,
477 pr);
478 GNUNET_DV_send_cancel (pr->th);
479 pr->th = NULL;
480 if (NULL != pr->transmit_cont)
481 pr->transmit_cont (pr->transmit_cont_cls,
482 &session->sender,
483 GNUNET_SYSERR,
484 pr->size, 0);
485 GNUNET_free (pr);
486 }
487 GNUNET_HELLO_address_free (session->address); 417 GNUNET_HELLO_address_free (session->address);
488 GNUNET_free (session); 418 GNUNET_free (session);
489} 419}
@@ -515,31 +445,6 @@ handle_dv_disconnect (void *cls,
515 445
516 446
517/** 447/**
518 * Function called once the delivery of a message has been successful.
519 * Clean up the pending request, and call continuations.
520 *
521 * @param cls closure
522 */
523static void
524send_finished (void *cls)
525{
526 struct PendingRequest *pr = cls;
527 struct GNUNET_ATS_Session *session = pr->session;
528
529 pr->th = NULL;
530 GNUNET_CONTAINER_DLL_remove (session->pr_head,
531 session->pr_tail,
532 pr);
533 if (NULL != pr->transmit_cont)
534 pr->transmit_cont (pr->transmit_cont_cls,
535 &session->sender,
536 GNUNET_OK,
537 pr->size, 0);
538 GNUNET_free (pr);
539}
540
541
542/**
543 * Function that can be used by the transport service to transmit 448 * Function that can be used by the transport service to transmit
544 * a message using the plugin. 449 * a message using the plugin.
545 * 450 *
@@ -565,10 +470,10 @@ dv_plugin_send (void *cls,
565 size_t msgbuf_size, 470 size_t msgbuf_size,
566 unsigned int priority, 471 unsigned int priority,
567 struct GNUNET_TIME_Relative timeout, 472 struct GNUNET_TIME_Relative timeout,
568 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) 473 GNUNET_TRANSPORT_TransmitContinuation cont,
474 void *cont_cls)
569{ 475{
570 struct Plugin *plugin = cls; 476 struct Plugin *plugin = cls;
571 struct PendingRequest *pr;
572 const struct GNUNET_MessageHeader *msg; 477 const struct GNUNET_MessageHeader *msg;
573 struct GNUNET_MessageHeader *box; 478 struct GNUNET_MessageHeader *box;
574 479
@@ -585,20 +490,13 @@ dv_plugin_send (void *cls,
585 memcpy (&box[1], msgbuf, msgbuf_size); 490 memcpy (&box[1], msgbuf, msgbuf_size);
586 msg = box; 491 msg = box;
587 } 492 }
588 pr = GNUNET_new (struct PendingRequest); 493 GNUNET_DV_send (plugin->dvh,
589 pr->transmit_cont = cont; 494 &session->sender,
590 pr->transmit_cont_cls = cont_cls; 495 msg);
591 pr->session = session; 496 cont (cont_cls,
592 pr->size = msgbuf_size; 497 &session->sender,
593 GNUNET_CONTAINER_DLL_insert_tail (session->pr_head, 498 GNUNET_OK,
594 session->pr_tail, 499 msgbuf_size, 0);
595 pr);
596
597 pr->th = GNUNET_DV_send (plugin->dvh,
598 &session->sender,
599 msg,
600 &send_finished,
601 pr);
602 GNUNET_free_non_null (box); 500 GNUNET_free_non_null (box);
603 return 0; /* DV */ 501 return 0; /* DV */
604} 502}
@@ -618,26 +516,11 @@ dv_plugin_disconnect_peer (void *cls,
618{ 516{
619 struct Plugin *plugin = cls; 517 struct Plugin *plugin = cls;
620 struct GNUNET_ATS_Session *session; 518 struct GNUNET_ATS_Session *session;
621 struct PendingRequest *pr;
622 519
623 session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions, 520 session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions,
624 target); 521 target);
625 if (NULL == session) 522 if (NULL == session)
626 return; /* nothing to do */ 523 return; /* nothing to do */
627 while (NULL != (pr = session->pr_head))
628 {
629 GNUNET_CONTAINER_DLL_remove (session->pr_head,
630 session->pr_tail,
631 pr);
632 GNUNET_DV_send_cancel (pr->th);
633 pr->th = NULL;
634 if (NULL != pr->transmit_cont)
635 pr->transmit_cont (pr->transmit_cont_cls,
636 &session->sender,
637 GNUNET_SYSERR,
638 pr->size, 0);
639 GNUNET_free (pr);
640 }
641 session->active = GNUNET_NO; 524 session->active = GNUNET_NO;
642} 525}
643 526
@@ -655,22 +538,6 @@ static int
655dv_plugin_disconnect_session (void *cls, 538dv_plugin_disconnect_session (void *cls,
656 struct GNUNET_ATS_Session *session) 539 struct GNUNET_ATS_Session *session)
657{ 540{
658 struct PendingRequest *pr;
659
660 while (NULL != (pr = session->pr_head))
661 {
662 GNUNET_CONTAINER_DLL_remove (session->pr_head,
663 session->pr_tail,
664 pr);
665 GNUNET_DV_send_cancel (pr->th);
666 pr->th = NULL;
667 if (NULL != pr->transmit_cont)
668 pr->transmit_cont (pr->transmit_cont_cls,
669 &session->sender,
670 GNUNET_SYSERR,
671 pr->size, 0);
672 GNUNET_free (pr);
673 }
674 session->active = GNUNET_NO; 541 session->active = GNUNET_NO;
675 return GNUNET_OK; 542 return GNUNET_OK;
676} 543}
@@ -691,9 +558,11 @@ dv_plugin_disconnect_session (void *cls,
691 * @param asc_cls closure for @a asc 558 * @param asc_cls closure for @a asc
692 */ 559 */
693static void 560static void
694dv_plugin_address_pretty_printer (void *cls, const char *type, 561dv_plugin_address_pretty_printer (void *cls,
562 const char *type,
695 const void *addr, 563 const void *addr,
696 size_t addrlen, int numeric, 564 size_t addrlen,
565 int numeric,
697 struct GNUNET_TIME_Relative timeout, 566 struct GNUNET_TIME_Relative timeout,
698 GNUNET_TRANSPORT_AddressStringCallback asc, 567 GNUNET_TRANSPORT_AddressStringCallback asc,
699 void *asc_cls) 568 void *asc_cls)