aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c565
1 files changed, 546 insertions, 19 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index e33c5f444..d446516bd 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -29,6 +29,79 @@
29 29
30 30
31/** 31/**
32 * How many messages do we keep at most in the queue to the
33 * transport service before we start to drop (default,
34 * can be changed via the configuration file).
35 */
36#define DEFAULT_MAX_QUEUE_LENGTH 16
37
38
39/**
40 * Information we track per packet to enable flow control.
41 */
42struct FlowControl
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct FlowControl *next;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct FlowControl *prev;
53
54 /**
55 * Function to call once the message was processed.
56 */
57 GNUNET_TRANSPORT_MessageCompletedCallback cb;
58
59 /**
60 * Closure for @e cb
61 */
62 void *cb_cls;
63
64 /**
65 * Which peer is this about?
66 */
67 struct GNUNET_PeerIdentity sender;
68
69 /**
70 * More-or-less unique ID for the message.
71 */
72 uint64_t id;
73};
74
75
76/**
77 * Information we track per message to tell the transport about
78 * success or failures.
79 */
80struct AckPending
81{
82 /**
83 * Kept in a DLL.
84 */
85 struct AckPending *next;
86
87 /**
88 * Kept in a DLL.
89 */
90 struct AckPending *prev;
91
92 /**
93 * Which peer is this about?
94 */
95 struct GNUNET_PeerIdentity receiver;
96
97 /**
98 * More-or-less unique ID for the message.
99 */
100 uint64_t mid;
101};
102
103
104/**
32 * Opaque handle to the transport service for communicators. 105 * Opaque handle to the transport service for communicators.
33 */ 106 */
34struct GNUNET_TRANSPORT_CommunicatorHandle 107struct GNUNET_TRANSPORT_CommunicatorHandle
@@ -44,6 +117,36 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
44 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; 117 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
45 118
46 /** 119 /**
120 * DLL of messages awaiting flow control confirmation (ack).
121 */
122 struct FlowControl *fc_head;
123
124 /**
125 * DLL of messages awaiting flow control confirmation (ack).
126 */
127 struct FlowControl *fc_tail;
128
129 /**
130 * DLL of messages awaiting transmission confirmation (ack).
131 */
132 struct AckPending *ap_head;
133
134 /**
135 * DLL of messages awaiting transmission confirmation (ack).
136 */
137 struct AckPending *ac_tail;
138
139 /**
140 * DLL of queues we offer.
141 */
142 struct QueueHandle *queue_head;
143
144 /**
145 * DLL of queues we offer.
146 */
147 struct QueueHandle *queue_tail;
148
149 /**
47 * Our configuration. 150 * Our configuration.
48 */ 151 */
49 const struct GNUNET_CONFIGURATION_Handle *cfg; 152 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -65,6 +168,16 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
65 void *mq_init_cls; 168 void *mq_init_cls;
66 169
67 /** 170 /**
171 * Maximum permissable queue length.
172 */
173 unsigned long long max_queue_length;
174
175 /**
176 * Flow-control identifier generator.
177 */
178 uint64_t fc_gen;
179
180 /**
68 * MTU of the communicator 181 * MTU of the communicator
69 */ 182 */
70 size_t mtu; 183 size_t mtu;
@@ -74,10 +187,53 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
74 * transport service. 187 * transport service.
75 */ 188 */
76 uint32_t aid_gen; 189 uint32_t aid_gen;
190
191 /**
192 * Queue identifier generator.
193 */
194 uint32_t queue_gen;
77 195
78}; 196};
79 197
80 198
199/**
200 * Handle returned to identify the internal data structure the transport
201 * API has created to manage a message queue to a particular peer.
202 */
203struct GNUNET_TRANSPORT_QueueHandle
204{
205 /**
206 * Handle this queue belongs to.
207 */
208 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
209
210 /**
211 * Which peer we can communciate with.
212 */
213 struct GNUNET_PeerIdentity peer;
214
215 /**
216 * Address used by the communication queue.
217 */
218 char *address;
219
220 /**
221 * Network type of the communciation queue.
222 */
223 enum GNUNET_ATS_Network_Type nt;
224
225 /**
226 * The queue itself.
227 */
228 struct GNUNET_MQ_Handle *mq;
229
230 /**
231 * ID for this queue when talking to the transport service.
232 */
233 uint32_t queue_id;
234
235};
236
81 237
82/** 238/**
83 * Internal representation of an address a communicator is 239 * Internal representation of an address a communicator is
@@ -185,6 +341,100 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
185 341
186 342
187/** 343/**
344 * Send message to the transport service about queue @a qh
345 * being now available.
346 *
347 * @param qh queue to add
348 */
349static void
350send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
351{
352 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354
355 if (NULL == ai->ch->mq)
356 return;
357 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
360 aqm.receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid);
363 memcpy (&aqm[1],
364 ai->address,
365 strlen (ai->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq,
367 env);
368}
369
370
371/**
372 * Send message to the transport service about queue @a qh
373 * being no longer available.
374 *
375 * @param qh queue to delete
376 */
377static void
378send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
379{
380 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382
383 if (NULL == ai->ch->mq)
384 return;
385 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
387 dqm.qid = htonl (qh->qid);
388 dqm.receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq,
390 env);
391}
392
393
394/**
395 * Disconnect from the transport service. Purges
396 * all flow control entries as we will no longer receive
397 * the ACKs. Purges the ack pending entries as the
398 * transport will no longer expect the confirmations.
399 *
400 * @param ch service to disconnect from
401 */
402static void
403disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
404{
405 struct FlowControl *fcn;
406 struct AckPending *apn;
407
408 for (struct FlowControl *fc = ch->fc_head;
409 NULL != fc;
410 fc = fcn)
411 {
412 fcn = fc->next;
413 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
414 ch->fc_tail,
415 fc);
416 fc->cb (fc->cb_cls,
417 GNUNET_SYSERR);
418 GNUNET_free (fc);
419 }
420 for (struct AckPending *ap = ch->ap_head;
421 NULL != ap;
422 ap = apn)
423 {
424 apn = ap->next;
425 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
426 ch->ap_tail,
427 ap);
428 GNUNET_free (ap);
429 }
430 if (NULL == ch->mq)
431 return;
432 GNUNET_MQ_destroy (ch->mq);
433 ch->mq = NULL;
434}
435
436
437/**
188 * Function called on MQ errors. 438 * Function called on MQ errors.
189 */ 439 */
190static void 440static void
@@ -192,15 +442,230 @@ error_handler (void *cls,
192 enum GNUNET_MQ_Error error) 442 enum GNUNET_MQ_Error error)
193{ 443{
194 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n");
448 disconnect (ch);
449 /* TODO: maybe do this with exponential backoff/delay */
450 reconnect (ch);
451}
452
453
454/**
455 * Transport service acknowledged a message we gave it
456 * (with flow control enabled). Tell the communicator.
457 *
458 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
459 * @param incoming_ack the ack
460 */
461static void
462handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464{
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
195 466
196 GNUNET_MQ_destroy (ch->mq); 467 for (struct FlowControl *fc = ch->fc_head;
197 ch->mq = NULL; 468 NULL != fc;
469 fc = fc->next)
470 {
471 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender,
473 incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) )
475 {
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
477 ch->fc_tail,
478 fc);
479 fc->cb (fc->cb_cls,
480 GNUNET_OK);
481 GNUNET_free (fc);
482 return;
483 }
484 }
485 GNUNET_break (0);
486 disconnect (ch);
198 /* TODO: maybe do this with exponential backoff/delay */ 487 /* TODO: maybe do this with exponential backoff/delay */
199 reconnect (ch); 488 reconnect (ch);
200} 489}
201 490
202 491
203/** 492/**
493 * Transport service wants us to create a queue. Check if @a cq
494 * is well-formed.
495 *
496 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
497 * @param cq the queue creation request
498 * @return #GNUNET_OK if @a smt is well-formed
499 */
500static int
501check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq)
503{
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1];
506
507 if ( (0 == len) ||
508 ('\0' != addr[len-1]) )
509 {
510 GNUNET_break (0);
511 return GNUNET_SYSERR;
512 }
513 return GNUNET_OK;
514}
515
516
517/**
518 * Transport service wants us to create a queue. Tell the communicator.
519 *
520 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
521 * @param cq the queue creation request
522 */
523static void
524handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq)
526{
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1];
529
530 if (GNUNET_OK !=
531 ch->mq_init (ch->mq_init_cls,
532 &cq->receiver,
533 addr))
534 {
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n",
537 addr);
538 // TODO: do we notify the transport!?
539 }
540}
541
542
543/**
544 * Transport service wants us to send a message. Check if @a smt
545 * is well-formed.
546 *
547 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
548 * @param smt the transmission request
549 * @return #GNUNET_OK if @a smt is well-formed
550 */
551static int
552check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt)
554{
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557
558 if (ntohs (mh->size) != len)
559 {
560 GNUNET_break (0);
561 return GNUNET_SYSERR;
562 }
563 return GNUNET_OK;
564}
565
566
567/**
568 * Notify transport service about @a status of a message with
569 * @a mid sent to @a receiver.
570 *
571 * @param ch handle
572 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
573 * @param receiver which peer was the receiver
574 * @param mid message that the ack is about
575 */
576static void
577send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
578 int status,
579 const struct GNUNET_PeerIdentity *receiver,
580 uint64_t mid)
581{
582 struct GNUNET_MQ_Envelope *env;
583 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
584
585 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK);
588 ack->mid = ap->mid;
589 ack->receiver = ap->receiver;
590 GNUNET_MQ_send (ch->mq,
591 env);
592}
593
594
595/**
596 * Message queue transmission by communicator was successful,
597 * notify transport service.
598 *
599 * @param cls an `struct AckPending *`
600 */
601static void
602send_ack_cb (void *cls)
603{
604 struct AckPending *ap = cls;
605 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
606
607 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
608 ch->ap_tail,
609 ap);
610 send_ack (ch,
611 GNUNET_OK,
612 &ap->receiver,
613 ap->mid);
614 GNUNET_free (ap);
615}
616
617
618/**
619 * Transport service wants us to send a message. Tell the communicator.
620 *
621 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
622 * @param smt the transmission request
623 */
624static void
625handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt)
627{
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap;
632 struct QueueHandle *qh;
633
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer,
637 &smt->target,
638 sizeof (struct GNUNET_PeerIdentity))) )
639 break;
640 if (NULL == qh)
641 {
642 /* queue is already gone, tell transport this one failed */
643 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644 "Transmission failed, queue no longer exists.\n");
645 send_ack (ch,
646 GNUNET_NO,
647 &smt->receiver,
648 smt->mid);
649 return;
650 }
651 ap = GNUNET_new (struct AckPending);
652 ap->ch = ch;
653 ap->receiver = smt->receiver;
654 ap->mid = smt->mid;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656 cp->ap_tail,
657 ap);
658 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh);
660 GNUNET_MQ_notify_sent (env,
661 &send_ack_cb,
662 ap);
663 GNUNET_MQ_send (qh->mq,
664 env);
665}
666
667
668/**
204 * (re)connect our communicator to the transport service 669 * (re)connect our communicator to the transport service
205 * 670 *
206 * @param ch handle to reconnect 671 * @param ch handle to reconnect
@@ -209,6 +674,18 @@ static void
209reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 674reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
210{ 675{
211 struct GNUNET_MQ_MessageHandler handlers[] = { 676 struct GNUNET_MQ_MessageHandler handlers[] = {
677 GNUNET_MQ_hd_fixed_size (incoming_ack,
678 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
679 struct GNUNET_TRANSPORT_IncomingMessageAck,
680 ch),
681 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
683 struct GNUNET_TRANSPORT_CreateQueue,
684 ch),
685 GNUNET_MQ_hd_var_size (send_msg,
686 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
687 struct GNUNET_TRANSPORT_SendMessageTo,
688 ch),
212 GNUNET_MQ_handler_end() 689 GNUNET_MQ_handler_end()
213 }; 690 };
214 691
@@ -217,10 +694,14 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
217 handlers, 694 handlers,
218 &error_handler, 695 &error_handler,
219 ch); 696 ch);
220 for (struct GNUNET_TRANSPORT_AddressIdentifier ai = ch->ai_head; 697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
221 NULL != ai; 698 NULL != ai;
222 ai = ai->next) 699 ai = ai->next)
223 send_add_address (ai); 700 send_add_address (ai);
701 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
702 NULL != qh;
703 qh = qh->next)
704 send_add_queue (qh);
224} 705}
225 706
226 707
@@ -253,6 +734,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
253 ch->mq_init = mq_init; 734 ch->mq_init = mq_init;
254 ch->mq_init_cls = mq_init_cls; 735 ch->mq_init_cls = mq_init_cls;
255 reconnect (ch); 736 reconnect (ch);
737 if (GNUNET_OK !=
738 GNUNET_CONFIGURATION_get_value_number (cfg,
739 name,
740 "MAX_QUEUE_LENGTH",
741 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
256 if (NULL == ch->mq) 743 if (NULL == ch->mq)
257 { 744 {
258 GNUNET_free (ch); 745 GNUNET_free (ch);
@@ -270,12 +757,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
270void 757void
271GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 758GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
272{ 759{
760 disconnect (ch);
273 while (NULL != ch->ai_head) 761 while (NULL != ch->ai_head)
274 { 762 {
275 GNUNET_break (0); /* communicator forgot to remove address, warn! */ 763 GNUNET_break (0); /* communicator forgot to remove address, warn! */
276 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); 764 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
277 } 765 }
278 GNUNET_MQ_destroy (ch->mq);
279 GNUNET_free (ch); 766 GNUNET_free (ch);
280} 767}
281 768
@@ -297,8 +784,8 @@ GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHa
297 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was 784 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
298 * immediately dropped due to memory limitations (communicator 785 * immediately dropped due to memory limitations (communicator
299 * should try to apply back pressure), 786 * should try to apply back pressure),
300 * #GNUNET_SYSERR if the message is ill formed and communicator 787 * #GNUNET_SYSERR if the message could not be delivered because
301 * should try to reset stream 788 * the tranport service is not yet up
302 */ 789 */
303int 790int
304GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 791GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
@@ -312,7 +799,33 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
312 uint16_t msize; 799 uint16_t msize;
313 800
314 if (NULL == ai->ch->mq) 801 if (NULL == ai->ch->mq)
315 return; 802 return GNUNET_SYSERR;
803 if (NULL != cb)
804 {
805 struct FlowControl *fc;
806
807 im->fc_on = htonl (GNUNET_YES);
808 im->fc_id = ai->ch->fc_gen++;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
811 fc->id = im->fc_id;
812 fc->cb = cb;
813 fc->cb_cls = cb_cls;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815 ch->fc_tail,
816 fc);
817 }
818 else
819 {
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
825 return GNUNET_NO;
826 }
827 }
828
316 msize = ntohs (msg->size); 829 msize = ntohs (msg->size);
317 env = GNUNET_MQ_msg_extra (im, 830 env = GNUNET_MQ_msg_extra (im,
318 msize, 831 msize,
@@ -320,7 +833,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
320 if (NULL == env) 833 if (NULL == env)
321 { 834 {
322 GNUNET_break (0); 835 GNUNET_break (0);
323 return; 836 return GNUNET_SYSERR;
324 } 837 }
325 im->sender = *sender; 838 im->sender = *sender;
326 memcpy (&im[1], 839 memcpy (&im[1],
@@ -328,19 +841,12 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
328 msize); 841 msize);
329 GNUNET_MQ_send (ai->ch->mq, 842 GNUNET_MQ_send (ai->ch->mq,
330 env); 843 env);
844 return GNUNET_OK;
331} 845}
332 846
333 847
334/* ************************* Discovery *************************** */ 848/* ************************* Discovery *************************** */
335 849
336/**
337 * Handle returned to identify the internal data structure the transport
338 * API has created to manage a message queue to a particular peer.
339 */
340struct GNUNET_TRANSPORT_QueueHandle
341{
342};
343
344 850
345/** 851/**
346 * Notify transport service that an MQ became available due to an 852 * Notify transport service that an MQ became available due to an
@@ -361,6 +867,20 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle
361 enum GNUNET_ATS_Network_Type nt, 867 enum GNUNET_ATS_Network_Type nt,
362 struct GNUNET_MQ_Handle *mq) 868 struct GNUNET_MQ_Handle *mq)
363{ 869{
870 struct GNUNET_TRANSPORT_QueueHandle *qh;
871
872 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
873 qh->ch = ch;
874 qh->peer = *peer;
875 qh->address = GNUNET_strdup (address);
876 qh->nt = nt;
877 qh->mq = mq;
878 qh->queue_id = ch->queue_gen++;
879 GNUNET_CONTAINER_DLL_insert (ch->queue_head,
880 ch->queue_tail,
881 qh);
882 send_add_queue (qh);
883 return qh;
364} 884}
365 885
366 886
@@ -373,11 +893,18 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle
373void 893void
374GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) 894GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
375{ 895{
896 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
897
898 send_del_queue (qh);
899 GNUNET_CONTAINER_DLL_remove (ch->queue_head,
900 ch->queue_tail,
901 qh);
902 GNUNET_MQ_destroy (qh->mq);
903 GNUNET_free (qh->address);
904 GNUNET_free (qh);
376} 905}
377 906
378 907
379
380
381/** 908/**
382 * Notify transport service about an address that this communicator 909 * Notify transport service about an address that this communicator
383 * provides for this peer. 910 * provides for this peer.
@@ -421,10 +948,10 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde
421 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; 948 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
422 949
423 send_del_address (ai); 950 send_del_address (ai);
424 GNUNET_free (ai->address);
425 GNUNET_CONTAINER_DLL_remove (ch->ai_head, 951 GNUNET_CONTAINER_DLL_remove (ch->ai_head,
426 ch->ai_tail, 952 ch->ai_tail,
427 ai); 953 ai);
954 GNUNET_free (ai->address);
428 GNUNET_free (ai); 955 GNUNET_free (ai);
429} 956}
430 957