aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c959
1 files changed, 959 insertions, 0 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
new file mode 100644
index 000000000..d446516bd
--- /dev/null
+++ b/src/transport/transport_api2_communication.c
@@ -0,0 +1,959 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17*/
18
19/**
20 * @file transport/transport_api2_communication.c
21 * @brief implementation of the gnunet_transport_communication_service.h API
22 * @author Christian Grothoff
23 */
24#include "platform.h"
25#include "gnunet_util_lib.h"
26#include "gnunet_protocols.h"
27#include "gnunet_transport_communication_service.h"
28#include "transport.h"
29
30
31/**
32 * How many messages do we keep at most in the queue to the
33 * transport service before we start to drop (default,
34 * can be changed via the configuration file).
35 */
36#define DEFAULT_MAX_QUEUE_LENGTH 16
37
38
39/**
40 * Information we track per packet to enable flow control.
41 */
42struct FlowControl
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct FlowControl *next;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct FlowControl *prev;
53
54 /**
55 * Function to call once the message was processed.
56 */
57 GNUNET_TRANSPORT_MessageCompletedCallback cb;
58
59 /**
60 * Closure for @e cb
61 */
62 void *cb_cls;
63
64 /**
65 * Which peer is this about?
66 */
67 struct GNUNET_PeerIdentity sender;
68
69 /**
70 * More-or-less unique ID for the message.
71 */
72 uint64_t id;
73};
74
75
76/**
77 * Information we track per message to tell the transport about
78 * success or failures.
79 */
80struct AckPending
81{
82 /**
83 * Kept in a DLL.
84 */
85 struct AckPending *next;
86
87 /**
88 * Kept in a DLL.
89 */
90 struct AckPending *prev;
91
92 /**
93 * Which peer is this about?
94 */
95 struct GNUNET_PeerIdentity receiver;
96
97 /**
98 * More-or-less unique ID for the message.
99 */
100 uint64_t mid;
101};
102
103
104/**
105 * Opaque handle to the transport service for communicators.
106 */
107struct GNUNET_TRANSPORT_CommunicatorHandle
108{
109 /**
110 * Head of DLL of addresses this communicator offers to the transport service.
111 */
112 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
113
114 /**
115 * Tail of DLL of addresses this communicator offers to the transport service.
116 */
117 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
118
119 /**
120 * DLL of messages awaiting flow control confirmation (ack).
121 */
122 struct FlowControl *fc_head;
123
124 /**
125 * DLL of messages awaiting flow control confirmation (ack).
126 */
127 struct FlowControl *fc_tail;
128
129 /**
130 * DLL of messages awaiting transmission confirmation (ack).
131 */
132 struct AckPending *ap_head;
133
134 /**
135 * DLL of messages awaiting transmission confirmation (ack).
136 */
137 struct AckPending *ac_tail;
138
139 /**
140 * DLL of queues we offer.
141 */
142 struct QueueHandle *queue_head;
143
144 /**
145 * DLL of queues we offer.
146 */
147 struct QueueHandle *queue_tail;
148
149 /**
150 * Our configuration.
151 */
152 const struct GNUNET_CONFIGURATION_Handle *cfg;
153
154 /**
155 * Name of the communicator.
156 */
157 const char *name;
158
159 /**
160 * Function to call when the transport service wants us to initiate
161 * a communication channel with another peer.
162 */
163 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
164
165 /**
166 * Closure for @e mq_init.
167 */
168 void *mq_init_cls;
169
170 /**
171 * Maximum permissable queue length.
172 */
173 unsigned long long max_queue_length;
174
175 /**
176 * Flow-control identifier generator.
177 */
178 uint64_t fc_gen;
179
180 /**
181 * MTU of the communicator
182 */
183 size_t mtu;
184
185 /**
186 * Internal UUID for the address used in communication with the
187 * transport service.
188 */
189 uint32_t aid_gen;
190
191 /**
192 * Queue identifier generator.
193 */
194 uint32_t queue_gen;
195
196};
197
198
199/**
200 * Handle returned to identify the internal data structure the transport
201 * API has created to manage a message queue to a particular peer.
202 */
203struct GNUNET_TRANSPORT_QueueHandle
204{
205 /**
206 * Handle this queue belongs to.
207 */
208 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
209
210 /**
211 * Which peer we can communciate with.
212 */
213 struct GNUNET_PeerIdentity peer;
214
215 /**
216 * Address used by the communication queue.
217 */
218 char *address;
219
220 /**
221 * Network type of the communciation queue.
222 */
223 enum GNUNET_ATS_Network_Type nt;
224
225 /**
226 * The queue itself.
227 */
228 struct GNUNET_MQ_Handle *mq;
229
230 /**
231 * ID for this queue when talking to the transport service.
232 */
233 uint32_t queue_id;
234
235};
236
237
238/**
239 * Internal representation of an address a communicator is
240 * currently providing for the transport service.
241 */
242struct GNUNET_TRANSPORT_AddressIdentifier
243{
244
245 /**
246 * Kept in a DLL.
247 */
248 struct GNUNET_TRANSPORT_AddressIdentifier *next;
249
250 /**
251 * Kept in a DLL.
252 */
253 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
254
255 /**
256 * Transport handle where the address was added.
257 */
258 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
259
260 /**
261 * The actual address.
262 */
263 char *address;
264
265 /**
266 * When does the address expire? (Expected lifetime of the
267 * address.)
268 */
269 struct GNUNET_TIME_Relative expiration;
270
271 /**
272 * Internal UUID for the address used in communication with the
273 * transport service.
274 */
275 uint32_t aid;
276
277 /**
278 * Network type for the address.
279 */
280 enum GNUNET_ATS_Network_Type nt;
281
282};
283
284
285/**
286 * (re)connect our communicator to the transport service
287 *
288 * @param ch handle to reconnect
289 */
290static void
291reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
292
293
294/**
295 * Send message to the transport service about address @a ai
296 * being now available.
297 *
298 * @param ai address to add
299 */
300static void
301send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
302{
303 struct GNUNET_MQ_Envelope *env;
304 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
305
306 if (NULL == ai->ch->mq)
307 return;
308 env = GNUNET_MQ_msg_extra (aam,
309 strlen (ai->address) + 1,
310 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
311 aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
312 aam->nt = htonl ((uint32_t) ai->nt);
313 memcpy (&aam[1],
314 ai->address,
315 strlen (ai->address) + 1);
316 GNUNET_MQ_send (ai->ch->mq,
317 env);
318}
319
320
321/**
322 * Send message to the transport service about address @a ai
323 * being no longer available.
324 *
325 * @param ai address to delete
326 */
327static void
328send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
329{
330 struct GNUNET_MQ_Envelope *env;
331 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
332
333 if (NULL == ai->ch->mq)
334 return;
335 env = GNUNET_MQ_msg (dam,
336 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
337 dam.aid = htonl (ai->aid);
338 GNUNET_MQ_send (ai->ch->mq,
339 env);
340}
341
342
343/**
344 * Send message to the transport service about queue @a qh
345 * being now available.
346 *
347 * @param qh queue to add
348 */
349static void
350send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
351{
352 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354
355 if (NULL == ai->ch->mq)
356 return;
357 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
360 aqm.receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid);
363 memcpy (&aqm[1],
364 ai->address,
365 strlen (ai->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq,
367 env);
368}
369
370
371/**
372 * Send message to the transport service about queue @a qh
373 * being no longer available.
374 *
375 * @param qh queue to delete
376 */
377static void
378send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
379{
380 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382
383 if (NULL == ai->ch->mq)
384 return;
385 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
387 dqm.qid = htonl (qh->qid);
388 dqm.receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq,
390 env);
391}
392
393
394/**
395 * Disconnect from the transport service. Purges
396 * all flow control entries as we will no longer receive
397 * the ACKs. Purges the ack pending entries as the
398 * transport will no longer expect the confirmations.
399 *
400 * @param ch service to disconnect from
401 */
402static void
403disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
404{
405 struct FlowControl *fcn;
406 struct AckPending *apn;
407
408 for (struct FlowControl *fc = ch->fc_head;
409 NULL != fc;
410 fc = fcn)
411 {
412 fcn = fc->next;
413 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
414 ch->fc_tail,
415 fc);
416 fc->cb (fc->cb_cls,
417 GNUNET_SYSERR);
418 GNUNET_free (fc);
419 }
420 for (struct AckPending *ap = ch->ap_head;
421 NULL != ap;
422 ap = apn)
423 {
424 apn = ap->next;
425 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
426 ch->ap_tail,
427 ap);
428 GNUNET_free (ap);
429 }
430 if (NULL == ch->mq)
431 return;
432 GNUNET_MQ_destroy (ch->mq);
433 ch->mq = NULL;
434}
435
436
437/**
438 * Function called on MQ errors.
439 */
440static void
441error_handler (void *cls,
442 enum GNUNET_MQ_Error error)
443{
444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n");
448 disconnect (ch);
449 /* TODO: maybe do this with exponential backoff/delay */
450 reconnect (ch);
451}
452
453
454/**
455 * Transport service acknowledged a message we gave it
456 * (with flow control enabled). Tell the communicator.
457 *
458 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
459 * @param incoming_ack the ack
460 */
461static void
462handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464{
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
466
467 for (struct FlowControl *fc = ch->fc_head;
468 NULL != fc;
469 fc = fc->next)
470 {
471 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender,
473 incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) )
475 {
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
477 ch->fc_tail,
478 fc);
479 fc->cb (fc->cb_cls,
480 GNUNET_OK);
481 GNUNET_free (fc);
482 return;
483 }
484 }
485 GNUNET_break (0);
486 disconnect (ch);
487 /* TODO: maybe do this with exponential backoff/delay */
488 reconnect (ch);
489}
490
491
492/**
493 * Transport service wants us to create a queue. Check if @a cq
494 * is well-formed.
495 *
496 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
497 * @param cq the queue creation request
498 * @return #GNUNET_OK if @a smt is well-formed
499 */
500static int
501check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq)
503{
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1];
506
507 if ( (0 == len) ||
508 ('\0' != addr[len-1]) )
509 {
510 GNUNET_break (0);
511 return GNUNET_SYSERR;
512 }
513 return GNUNET_OK;
514}
515
516
517/**
518 * Transport service wants us to create a queue. Tell the communicator.
519 *
520 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
521 * @param cq the queue creation request
522 */
523static void
524handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq)
526{
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1];
529
530 if (GNUNET_OK !=
531 ch->mq_init (ch->mq_init_cls,
532 &cq->receiver,
533 addr))
534 {
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n",
537 addr);
538 // TODO: do we notify the transport!?
539 }
540}
541
542
543/**
544 * Transport service wants us to send a message. Check if @a smt
545 * is well-formed.
546 *
547 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
548 * @param smt the transmission request
549 * @return #GNUNET_OK if @a smt is well-formed
550 */
551static int
552check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt)
554{
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557
558 if (ntohs (mh->size) != len)
559 {
560 GNUNET_break (0);
561 return GNUNET_SYSERR;
562 }
563 return GNUNET_OK;
564}
565
566
567/**
568 * Notify transport service about @a status of a message with
569 * @a mid sent to @a receiver.
570 *
571 * @param ch handle
572 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
573 * @param receiver which peer was the receiver
574 * @param mid message that the ack is about
575 */
576static void
577send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
578 int status,
579 const struct GNUNET_PeerIdentity *receiver,
580 uint64_t mid)
581{
582 struct GNUNET_MQ_Envelope *env;
583 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
584
585 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK);
588 ack->mid = ap->mid;
589 ack->receiver = ap->receiver;
590 GNUNET_MQ_send (ch->mq,
591 env);
592}
593
594
595/**
596 * Message queue transmission by communicator was successful,
597 * notify transport service.
598 *
599 * @param cls an `struct AckPending *`
600 */
601static void
602send_ack_cb (void *cls)
603{
604 struct AckPending *ap = cls;
605 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
606
607 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
608 ch->ap_tail,
609 ap);
610 send_ack (ch,
611 GNUNET_OK,
612 &ap->receiver,
613 ap->mid);
614 GNUNET_free (ap);
615}
616
617
618/**
619 * Transport service wants us to send a message. Tell the communicator.
620 *
621 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
622 * @param smt the transmission request
623 */
624static void
625handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt)
627{
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap;
632 struct QueueHandle *qh;
633
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer,
637 &smt->target,
638 sizeof (struct GNUNET_PeerIdentity))) )
639 break;
640 if (NULL == qh)
641 {
642 /* queue is already gone, tell transport this one failed */
643 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644 "Transmission failed, queue no longer exists.\n");
645 send_ack (ch,
646 GNUNET_NO,
647 &smt->receiver,
648 smt->mid);
649 return;
650 }
651 ap = GNUNET_new (struct AckPending);
652 ap->ch = ch;
653 ap->receiver = smt->receiver;
654 ap->mid = smt->mid;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656 cp->ap_tail,
657 ap);
658 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh);
660 GNUNET_MQ_notify_sent (env,
661 &send_ack_cb,
662 ap);
663 GNUNET_MQ_send (qh->mq,
664 env);
665}
666
667
668/**
669 * (re)connect our communicator to the transport service
670 *
671 * @param ch handle to reconnect
672 */
673static void
674reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
675{
676 struct GNUNET_MQ_MessageHandler handlers[] = {
677 GNUNET_MQ_hd_fixed_size (incoming_ack,
678 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
679 struct GNUNET_TRANSPORT_IncomingMessageAck,
680 ch),
681 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
683 struct GNUNET_TRANSPORT_CreateQueue,
684 ch),
685 GNUNET_MQ_hd_var_size (send_msg,
686 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
687 struct GNUNET_TRANSPORT_SendMessageTo,
688 ch),
689 GNUNET_MQ_handler_end()
690 };
691
692 ch->mq = GNUNET_CLIENT_connect (cfg,
693 "transport",
694 handlers,
695 &error_handler,
696 ch);
697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
698 NULL != ai;
699 ai = ai->next)
700 send_add_address (ai);
701 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
702 NULL != qh;
703 qh = qh->next)
704 send_add_queue (qh);
705}
706
707
708/**
709 * Connect to the transport service.
710 *
711 * @param cfg configuration to use
712 * @param name name of the communicator that is connecting
713 * @param mtu maximum message size supported by communicator, 0 if
714 * sending is not supported, SIZE_MAX for no MTU
715 * @param mq_init function to call to initialize a message queue given
716 * the address of another peer, can be NULL if the
717 * communicator only supports receiving messages
718 * @param mq_init_cls closure for @a mq_init
719 * @return NULL on error
720 */
721struct GNUNET_TRANSPORT_CommunicatorHandle *
722GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
723 const char *name,
724 size_t mtu,
725 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
726 void *mq_init_cls)
727{
728 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
729
730 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
731 ch->cfg = cfg;
732 ch->name = name;
733 ch->mtu = mtu;
734 ch->mq_init = mq_init;
735 ch->mq_init_cls = mq_init_cls;
736 reconnect (ch);
737 if (GNUNET_OK !=
738 GNUNET_CONFIGURATION_get_value_number (cfg,
739 name,
740 "MAX_QUEUE_LENGTH",
741 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
743 if (NULL == ch->mq)
744 {
745 GNUNET_free (ch);
746 return NULL;
747 }
748 return ch;
749}
750
751
752/**
753 * Disconnect from the transport service.
754 *
755 * @param ch handle returned from connect
756 */
757void
758GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
759{
760 disconnect (ch);
761 while (NULL != ch->ai_head)
762 {
763 GNUNET_break (0); /* communicator forgot to remove address, warn! */
764 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
765 }
766 GNUNET_free (ch);
767}
768
769
770/* ************************* Receiving *************************** */
771
772
773/**
774 * Notify transport service that the communicator has received
775 * a message.
776 *
777 * @param ch connection to transport service
778 * @param sender presumed sender of the message (details to be checked
779 * by higher layers)
780 * @param msg the message
781 * @param cb function to call once handling the message is done, NULL if
782 * flow control is not supported by this communicator
783 * @param cb_cls closure for @a cb
784 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
785 * immediately dropped due to memory limitations (communicator
786 * should try to apply back pressure),
787 * #GNUNET_SYSERR if the message could not be delivered because
788 * the tranport service is not yet up
789 */
790int
791GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
792 const struct GNUNET_PeerIdentity *sender,
793 const struct GNUNET_MessageHeader *msg,
794 GNUNET_TRANSPORT_MessageCompletedCallback cb,
795 void *cb_cls)
796{
797 struct GNUNET_MQ_Envelope *env;
798 struct GNUNET_TRANSPORT_IncomingMessage *im;
799 uint16_t msize;
800
801 if (NULL == ai->ch->mq)
802 return GNUNET_SYSERR;
803 if (NULL != cb)
804 {
805 struct FlowControl *fc;
806
807 im->fc_on = htonl (GNUNET_YES);
808 im->fc_id = ai->ch->fc_gen++;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
811 fc->id = im->fc_id;
812 fc->cb = cb;
813 fc->cb_cls = cb_cls;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815 ch->fc_tail,
816 fc);
817 }
818 else
819 {
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
825 return GNUNET_NO;
826 }
827 }
828
829 msize = ntohs (msg->size);
830 env = GNUNET_MQ_msg_extra (im,
831 msize,
832 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
833 if (NULL == env)
834 {
835 GNUNET_break (0);
836 return GNUNET_SYSERR;
837 }
838 im->sender = *sender;
839 memcpy (&im[1],
840 msg,
841 msize);
842 GNUNET_MQ_send (ai->ch->mq,
843 env);
844 return GNUNET_OK;
845}
846
847
848/* ************************* Discovery *************************** */
849
850
851/**
852 * Notify transport service that an MQ became available due to an
853 * "inbound" connection or because the communicator discovered the
854 * presence of another peer.
855 *
856 * @param ch connection to transport service
857 * @param peer peer with which we can now communicate
858 * @param address address in human-readable format, 0-terminated, UTF-8
859 * @param nt which network type does the @a address belong to?
860 * @param mq message queue of the @a peer
861 * @return API handle identifying the new MQ
862 */
863struct GNUNET_TRANSPORT_QueueHandle *
864GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
865 const struct GNUNET_PeerIdentity *peer,
866 const char *address,
867 enum GNUNET_ATS_Network_Type nt,
868 struct GNUNET_MQ_Handle *mq)
869{
870 struct GNUNET_TRANSPORT_QueueHandle *qh;
871
872 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
873 qh->ch = ch;
874 qh->peer = *peer;
875 qh->address = GNUNET_strdup (address);
876 qh->nt = nt;
877 qh->mq = mq;
878 qh->queue_id = ch->queue_gen++;
879 GNUNET_CONTAINER_DLL_insert (ch->queue_head,
880 ch->queue_tail,
881 qh);
882 send_add_queue (qh);
883 return qh;
884}
885
886
887/**
888 * Notify transport service that an MQ became unavailable due to a
889 * disconnect or timeout.
890 *
891 * @param qh handle for the queue that must be invalidated
892 */
893void
894GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
895{
896 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
897
898 send_del_queue (qh);
899 GNUNET_CONTAINER_DLL_remove (ch->queue_head,
900 ch->queue_tail,
901 qh);
902 GNUNET_MQ_destroy (qh->mq);
903 GNUNET_free (qh->address);
904 GNUNET_free (qh);
905}
906
907
908/**
909 * Notify transport service about an address that this communicator
910 * provides for this peer.
911 *
912 * @param ch connection to transport service
913 * @param address our address in human-readable format, 0-terminated, UTF-8
914 * @param nt which network type does the address belong to?
915 * @param expiration when does the communicator forsee this address expiring?
916 */
917struct GNUNET_TRANSPORT_AddressIdentifier *
918GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
919 const char *address,
920 enum GNUNET_ATS_Network_Type nt,
921 struct GNUNET_TIME_Relative expiration)
922{
923 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
924
925 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
926 ai->ch = ch;
927 ai->address = GNUNET_strdup (address);
928 ai->nt = nt;
929 ai->expiration = expiration;
930 ai->aid = handle->aid_gen++;
931 GNUNET_CONTAINER_DLL_insert (handle->ai_head,
932 handle->ai_tail,
933 ai);
934 send_add_address (ai);
935 return ai;
936}
937
938
939/**
940 * Notify transport service about an address that this communicator no
941 * longer provides for this peer.
942 *
943 * @param ai address that is no longer provided
944 */
945void
946GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
947{
948 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
949
950 send_del_address (ai);
951 GNUNET_CONTAINER_DLL_remove (ch->ai_head,
952 ch->ai_tail,
953 ai);
954 GNUNET_free (ai->address);
955 GNUNET_free (ai);
956}
957
958
959/* end of transport_api2_communication.c */