aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c1165
1 files changed, 0 insertions, 1165 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
deleted file mode 100644
index 2a80db87b..000000000
--- a/src/transport/transport_api2_communication.c
+++ /dev/null
@@ -1,1165 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/transport_api2_communication.c
23 * @brief implementation of the gnunet_transport_communication_service.h API
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_transport_communication_service.h"
30#include "gnunet_ats_transport_service.h"
31#include "transport.h"
32
33
34/**
35 * How many messages do we keep at most in the queue to the
36 * transport service before we start to drop (default,
37 * can be changed via the configuration file).
38 */
39#define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42/**
43 * Information we track per packet to enable flow control.
44 */
45struct FlowControl
46{
47 /**
48 * Kept in a DLL.
49 */
50 struct FlowControl *next;
51
52 /**
53 * Kept in a DLL.
54 */
55 struct FlowControl *prev;
56
57 /**
58 * Function to call once the message was processed.
59 */
60 GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62 /**
63 * Closure for @e cb
64 */
65 void *cb_cls;
66
67 /**
68 * Which peer is this about?
69 */
70 struct GNUNET_PeerIdentity sender;
71
72 /**
73 * More-or-less unique ID for the message.
74 */
75 uint64_t id;
76};
77
78
79/**
80 * Information we track per message to tell the transport about
81 * success or failures.
82 */
83struct AckPending
84{
85 /**
86 * Kept in a DLL.
87 */
88 struct AckPending *next;
89
90 /**
91 * Kept in a DLL.
92 */
93 struct AckPending *prev;
94
95 /**
96 * Communicator this entry belongs to.
97 */
98 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100 /**
101 * Which peer is this about?
102 */
103 struct GNUNET_PeerIdentity receiver;
104
105 /**
106 * More-or-less unique ID for the message.
107 */
108 uint64_t mid;
109};
110
111
112/**
113 * Opaque handle to the transport service for communicators.
114 */
115struct GNUNET_TRANSPORT_CommunicatorHandle
116{
117 /**
118 * Head of DLL of addresses this communicator offers to the transport service.
119 */
120 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122 /**
123 * Tail of DLL of addresses this communicator offers to the transport service.
124 */
125 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127 /**
128 * DLL of messages awaiting flow control confirmation (ack).
129 */
130 struct FlowControl *fc_head;
131
132 /**
133 * DLL of messages awaiting flow control confirmation (ack).
134 */
135 struct FlowControl *fc_tail;
136
137 /**
138 * DLL of messages awaiting transmission confirmation (ack).
139 */
140 struct AckPending *ap_head;
141
142 /**
143 * DLL of messages awaiting transmission confirmation (ack).
144 */
145 struct AckPending *ap_tail;
146
147 /**
148 * DLL of queues we offer.
149 */
150 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152 /**
153 * DLL of queues we offer.
154 */
155 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157 /**
158 * Our configuration.
159 */
160 const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162 /**
163 * Config section to use.
164 */
165 const char *config_section;
166
167 /**
168 * Address prefix to use.
169 */
170 const char *addr_prefix;
171
172 /**
173 * Function to call when the transport service wants us to initiate
174 * a communication channel with another peer.
175 */
176 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178 /**
179 * Closure for @e mq_init.
180 */
181 void *mq_init_cls;
182
183 /**
184 * Function to call when the transport service receives messages
185 * for a communicator (i.e. for NAT traversal or for non-bidirectional
186 * communicators).
187 */
188 GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190 /**
191 * Closure for @e notify_Cb.
192 */
193 void *notify_cb_cls;
194
195 /**
196 * Queue to talk to the transport service.
197 */
198 struct GNUNET_MQ_Handle *mq;
199
200 /**
201 * Maximum permissible queue length.
202 */
203 unsigned long long max_queue_length;
204
205 /**
206 * Flow-control identifier generator.
207 */
208 uint64_t fc_gen;
209
210 /**
211 * Internal UUID for the address used in communication with the
212 * transport service.
213 */
214 uint32_t aid_gen;
215
216 /**
217 * Queue identifier generator.
218 */
219 uint32_t queue_gen;
220
221 /**
222 * Characteristics of the communicator.
223 */
224 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225};
226
227
228/**
229 * Handle returned to identify the internal data structure the transport
230 * API has created to manage a message queue to a particular peer.
231 */
232struct GNUNET_TRANSPORT_QueueHandle
233{
234 /**
235 * Kept in a DLL.
236 */
237 struct GNUNET_TRANSPORT_QueueHandle *next;
238
239 /**
240 * Kept in a DLL.
241 */
242 struct GNUNET_TRANSPORT_QueueHandle *prev;
243
244 /**
245 * Handle this queue belongs to.
246 */
247 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
248
249 /**
250 * Address used by the communication queue.
251 */
252 char *address;
253
254 /**
255 * The queue itself.
256 */
257 struct GNUNET_MQ_Handle *mq;
258
259 /**
260 * Which peer we can communciate with.
261 */
262 struct GNUNET_PeerIdentity peer;
263
264 /**
265 * Network type of the communication queue.
266 */
267 enum GNUNET_NetworkType nt;
268
269 /**
270 * Communication status of the queue.
271 */
272 enum GNUNET_TRANSPORT_ConnectionStatus cs;
273
274 /**
275 * ID for this queue when talking to the transport service.
276 */
277 uint32_t queue_id;
278
279 /**
280 * Maximum transmission unit for the queue.
281 */
282 uint32_t mtu;
283
284 /**
285 * Queue length.
286 */
287 uint64_t q_len;
288 /**
289 * Queue priority.
290 */
291 uint32_t priority;
292};
293
294
295/**
296 * Internal representation of an address a communicator is
297 * currently providing for the transport service.
298 */
299struct GNUNET_TRANSPORT_AddressIdentifier
300{
301 /**
302 * Kept in a DLL.
303 */
304 struct GNUNET_TRANSPORT_AddressIdentifier *next;
305
306 /**
307 * Kept in a DLL.
308 */
309 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
310
311 /**
312 * Transport handle where the address was added.
313 */
314 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
315
316 /**
317 * The actual address.
318 */
319 char *address;
320
321 /**
322 * When does the address expire? (Expected lifetime of the
323 * address.)
324 */
325 struct GNUNET_TIME_Relative expiration;
326
327 /**
328 * Internal UUID for the address used in communication with the
329 * transport service.
330 */
331 uint32_t aid;
332
333 /**
334 * Network type for the address.
335 */
336 enum GNUNET_NetworkType nt;
337};
338
339
340/**
341 * (re)connect our communicator to the transport service
342 *
343 * @param ch handle to reconnect
344 */
345static void
346reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
347
348
349/**
350 * Send message to the transport service about address @a ai
351 * being now available.
352 *
353 * @param ai address to add
354 */
355static void
356send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
357{
358 struct GNUNET_MQ_Envelope *env;
359 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
360
361 if (NULL == ai->ch->mq)
362 return;
363 env = GNUNET_MQ_msg_extra (aam,
364 strlen (ai->address) + 1,
365 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
366 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
367 aam->nt = htonl ((uint32_t) ai->nt);
368 memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
369 GNUNET_MQ_send (ai->ch->mq, env);
370}
371
372
373/**
374 * Send message to the transport service about address @a ai
375 * being no longer available.
376 *
377 * @param ai address to delete
378 */
379static void
380send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
381{
382 struct GNUNET_MQ_Envelope *env;
383 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
384
385 if (NULL == ai->ch->mq)
386 return;
387 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
388 dam->aid = htonl (ai->aid);
389 GNUNET_MQ_send (ai->ch->mq, env);
390}
391
392
393/**
394 * Send message to the transport service about queue @a qh
395 * being now available.
396 *
397 * @param qh queue to add
398 */
399static void
400send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
401{
402 struct GNUNET_MQ_Envelope *env;
403 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
404
405 if (NULL == qh->ch->mq)
406 return;
407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
408 "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
409 env = GNUNET_MQ_msg_extra (aqm,
410 strlen (qh->address) + 1,
411 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
412 aqm->qid = htonl (qh->queue_id);
413 aqm->receiver = qh->peer;
414 aqm->nt = htonl ((uint32_t) qh->nt);
415 aqm->mtu = htonl (qh->mtu);
416 aqm->q_len = GNUNET_htonll (qh->q_len);
417 aqm->priority = htonl (qh->priority);
418 aqm->cs = htonl ((uint32_t) qh->cs);
419 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
420 GNUNET_MQ_send (qh->ch->mq, env);
421}
422
423
424/**
425 * Send message to the transport service about queue @a qh
426 * updated.
427 *
428 * @param qh queue to add
429 */
430static void
431send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
432{
433 struct GNUNET_MQ_Envelope *env;
434 struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
435
436 if (NULL == qh->ch->mq)
437 return;
438 env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
439 uqm->qid = htonl (qh->queue_id);
440 uqm->receiver = qh->peer;
441 uqm->nt = htonl ((uint32_t) qh->nt);
442 uqm->mtu = htonl (qh->mtu);
443 uqm->q_len = GNUNET_htonll (qh->q_len);
444 uqm->priority = htonl (qh->priority);
445 uqm->cs = htonl ((uint32_t) qh->cs);
446 GNUNET_MQ_send (qh->ch->mq, env);
447}
448
449
450/**
451 * Send message to the transport service about queue @a qh
452 * being no longer available.
453 *
454 * @param qh queue to delete
455 */
456static void
457send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
458{
459 struct GNUNET_MQ_Envelope *env;
460 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
461
462 if (NULL == qh->ch->mq)
463 return;
464 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
465 dqm->qid = htonl (qh->queue_id);
466 dqm->receiver = qh->peer;
467 GNUNET_MQ_send (qh->ch->mq, env);
468}
469
470
471/**
472 * Disconnect from the transport service. Purges
473 * all flow control entries as we will no longer receive
474 * the ACKs. Purges the ack pending entries as the
475 * transport will no longer expect the confirmations.
476 *
477 * @param ch service to disconnect from
478 */
479static void
480disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
481{
482 struct FlowControl *fcn;
483 struct AckPending *apn;
484
485 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
486 {
487 fcn = fc->next;
488 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
489 fc->cb (fc->cb_cls, GNUNET_SYSERR);
490 GNUNET_free (fc);
491 }
492 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
493 {
494 apn = ap->next;
495 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
496 GNUNET_free (ap);
497 }
498 if (NULL == ch->mq)
499 return;
500 GNUNET_MQ_destroy (ch->mq);
501 ch->mq = NULL;
502}
503
504
505/**
506 * Function called on MQ errors.
507 */
508static void
509error_handler (void *cls, enum GNUNET_MQ_Error error)
510{
511 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
512
513 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
514 "MQ failure %d, reconnecting to transport service.\n",
515 error);
516 disconnect (ch);
517 /* TODO: maybe do this with exponential backoff/delay */
518 reconnect (ch);
519}
520
521
522/**
523 * Transport service acknowledged a message we gave it
524 * (with flow control enabled). Tell the communicator.
525 *
526 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
527 * @param incoming_ack the ack
528 */
529static void
530handle_incoming_ack (
531 void *cls,
532 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
533{
534 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
535
536 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
537 {
538 if ((fc->id == incoming_ack->fc_id) &&
539 (0 == memcmp (&fc->sender,
540 &incoming_ack->sender,
541 sizeof(struct GNUNET_PeerIdentity))))
542 {
543 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
544 fc->cb (fc->cb_cls, GNUNET_OK);
545 GNUNET_free (fc);
546 return;
547 }
548 }
549 GNUNET_break (0);
550 disconnect (ch);
551 /* TODO: maybe do this with exponential backoff/delay */
552 reconnect (ch);
553}
554
555
556/**
557 * Transport service wants us to create a queue. Check if @a cq
558 * is well-formed.
559 *
560 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
561 * @param cq the queue creation request
562 * @return #GNUNET_OK if @a smt is well-formed
563 */
564static int
565check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
566{
567 (void) cls;
568 GNUNET_MQ_check_zero_termination (cq);
569 return GNUNET_OK;
570}
571
572
573/**
574 * Transport service wants us to create a queue. Tell the communicator.
575 *
576 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
577 * @param cq the queue creation request
578 */
579static void
580handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
581{
582 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
583 const char *addr = (const char *) &cq[1];
584 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
585 struct GNUNET_MQ_Envelope *env;
586
587 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
588 {
589 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
590 "Address `%s' invalid for this communicator\n",
591 addr);
592 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
593 }
594 else
595 {
596 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
597 }
598 cqr->request_id = cq->request_id;
599 GNUNET_MQ_send (ch->mq, env);
600}
601
602
603/**
604 * Transport service wants us to send a message. Check if @a smt
605 * is well-formed.
606 *
607 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
608 * @param smt the transmission request
609 * @return #GNUNET_OK if @a smt is well-formed
610 */
611static int
612check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
613{
614 (void) cls;
615 GNUNET_MQ_check_boxed_message (smt);
616 return GNUNET_OK;
617}
618
619
620/**
621 * Notify transport service about @a status of a message with
622 * @a mid sent to @a receiver.
623 *
624 * @param ch handle
625 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
626 * @param receiver which peer was the receiver
627 * @param mid message that the ack is about
628 */
629static void
630send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
631 int status,
632 const struct GNUNET_PeerIdentity *receiver,
633 uint64_t mid)
634{
635 struct GNUNET_MQ_Envelope *env;
636 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
637
638 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
639 ack->status = htonl (status);
640 ack->mid = mid;
641 ack->receiver = *receiver;
642 GNUNET_MQ_send (ch->mq, env);
643}
644
645
646/**
647 * Message queue transmission by communicator was successful,
648 * notify transport service.
649 *
650 * @param cls an `struct AckPending *`
651 */
652static void
653send_ack_cb (void *cls)
654{
655 struct AckPending *ap = cls;
656 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
657
658 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
659 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
660 GNUNET_free (ap);
661}
662
663
664/**
665 * Transport service wants us to send a message. Tell the communicator.
666 *
667 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
668 * @param smt the transmission request
669 */
670static void
671handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
672{
673 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
674 const struct GNUNET_MessageHeader *mh;
675 struct GNUNET_MQ_Envelope *env;
676 struct AckPending *ap;
677 struct GNUNET_TRANSPORT_QueueHandle *qh;
678
679 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
680 if ((qh->queue_id == ntohl (smt->qid)) &&
681 (0 == memcmp (&qh->peer,
682 &smt->receiver,
683 sizeof(struct GNUNET_PeerIdentity))))
684 break;
685 if (NULL == qh)
686 {
687 /* queue is already gone, tell transport this one failed */
688 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
689 "Transmission failed, queue no longer exists.\n");
690 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
691 return;
692 }
693 ap = GNUNET_new (struct AckPending);
694 ap->ch = ch;
695 ap->receiver = smt->receiver;
696 ap->mid = smt->mid;
697 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
698 mh = (const struct GNUNET_MessageHeader *) &smt[1];
699 env = GNUNET_MQ_msg_copy (mh);
700 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
701 GNUNET_MQ_send (qh->mq, env);
702}
703
704
705/**
706 * Transport service gives us backchannel message. Check if @a bi
707 * is well-formed.
708 *
709 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
710 * @param bi the backchannel message
711 * @return #GNUNET_OK if @a smt is well-formed
712 */
713static int
714check_backchannel_incoming (
715 void *cls,
716 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
717{
718 (void) cls;
719 GNUNET_MQ_check_boxed_message (bi);
720 return GNUNET_OK;
721}
722
723
724/**
725 * Transport service gives us backchannel message. Handle it.
726 *
727 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
728 * @param bi the backchannel message
729 */
730static void
731handle_backchannel_incoming (
732 void *cls,
733 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
734{
735 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
736 if (NULL != ch->notify_cb)
737 ch->notify_cb (ch->notify_cb_cls,
738 &bi->pid,
739 (const struct GNUNET_MessageHeader *) &bi[1]);
740 else
741 GNUNET_log (
742 GNUNET_ERROR_TYPE_INFO,
743 _ ("Dropped backchanel message: handler not provided by communicator\n"));
744}
745
746
747/**
748 * (re)connect our communicator to the transport service
749 *
750 * @param ch handle to reconnect
751 */
752static void
753reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
754{
755 struct GNUNET_MQ_MessageHandler handlers[] =
756 { GNUNET_MQ_hd_fixed_size (incoming_ack,
757 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
758 struct GNUNET_TRANSPORT_IncomingMessageAck,
759 ch),
760 GNUNET_MQ_hd_var_size (create_queue,
761 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
762 struct GNUNET_TRANSPORT_CreateQueue,
763 ch),
764 GNUNET_MQ_hd_var_size (send_msg,
765 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
766 struct GNUNET_TRANSPORT_SendMessageTo,
767 ch),
768 GNUNET_MQ_hd_var_size (
769 backchannel_incoming,
770 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
771 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
772 ch),
773 GNUNET_MQ_handler_end () };
774 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
775 struct GNUNET_MQ_Envelope *env;
776
777 ch->mq =
778 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
779 if (NULL == ch->mq)
780 return;
781 env = GNUNET_MQ_msg_extra (cam,
782 strlen (ch->addr_prefix) + 1,
783 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
784 cam->cc = htonl ((uint32_t) ch->cc);
785 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
786 GNUNET_MQ_send (ch->mq, env);
787 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
788 ai = ai->next)
789 send_add_address (ai);
790 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
791 qh = qh->next)
792 send_add_queue (qh);
793}
794
795
796/**
797 * Connect to the transport service.
798 *
799 * @param cfg configuration to use
800 * @param config_section section of the configuration to use for options
801 * @param addr_prefix address prefix for addresses supported by this
802 * communicator, could be NULL for incoming-only communicators
803 * @param cc what characteristics does the communicator have?
804 * @param mtu maximum message size supported by communicator, 0 if
805 * sending is not supported, SIZE_MAX for no MTU
806 * @param mq_init function to call to initialize a message queue given
807 * the address of another peer, can be NULL if the
808 * communicator only supports receiving messages
809 * @param mq_init_cls closure for @a mq_init
810 * @param notify_cb function to pass backchannel messages to communicator
811 * @param notify_cb_cls closure for @a notify_cb
812 * @return NULL on error
813 */
814struct GNUNET_TRANSPORT_CommunicatorHandle *
815GNUNET_TRANSPORT_communicator_connect (
816 const struct GNUNET_CONFIGURATION_Handle *cfg,
817 const char *config_section,
818 const char *addr_prefix,
819 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
820 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
821 void *mq_init_cls,
822 GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
823 void *notify_cb_cls)
824{
825 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
826
827 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
828 ch->cfg = cfg;
829 ch->config_section = config_section;
830 ch->addr_prefix = addr_prefix;
831 ch->mq_init = mq_init;
832 ch->mq_init_cls = mq_init_cls;
833 ch->notify_cb = notify_cb;
834 ch->notify_cb_cls = notify_cb_cls;
835 ch->cc = cc;
836 reconnect (ch);
837 if (GNUNET_OK !=
838 GNUNET_CONFIGURATION_get_value_number (cfg,
839 config_section,
840 "MAX_QUEUE_LENGTH",
841 &ch->max_queue_length))
842 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
843 if (NULL == ch->mq)
844 {
845 GNUNET_free (ch);
846 return NULL;
847 }
848 return ch;
849}
850
851
852/**
853 * Disconnect from the transport service.
854 *
855 * @param ch handle returned from connect
856 */
857void
858GNUNET_TRANSPORT_communicator_disconnect (
859 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
860{
861 disconnect (ch);
862 while (NULL != ch->ai_head)
863 {
864 GNUNET_break (0); /* communicator forgot to remove address, warn! */
865 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
866 }
867 GNUNET_free (ch);
868}
869
870
871/* ************************* Receiving *************************** */
872
873
874/**
875 * Notify transport service that the communicator has received
876 * a message.
877 *
878 * @param ch connection to transport service
879 * @param sender presumed sender of the message (details to be checked
880 * by higher layers)
881 * @param msg the message
882 * @param expected_addr_validity how long does the communicator believe it
883 * will continue to be able to receive messages from the same address
884 * on which it received this message?
885 * @param cb function to call once handling the message is done, NULL if
886 * flow control is not supported by this communicator
887 * @param cb_cls closure for @a cb
888 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
889 * immediately dropped due to memory limitations (communicator
890 * should try to apply back pressure),
891 * #GNUNET_SYSERR if the message could not be delivered because
892 * the transport service is not yet up
893 */
894int
895GNUNET_TRANSPORT_communicator_receive (
896 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
897 const struct GNUNET_PeerIdentity *sender,
898 const struct GNUNET_MessageHeader *msg,
899 struct GNUNET_TIME_Relative expected_addr_validity,
900 GNUNET_TRANSPORT_MessageCompletedCallback cb,
901 void *cb_cls)
902{
903 struct GNUNET_MQ_Envelope *env;
904 struct GNUNET_TRANSPORT_IncomingMessage *im;
905 uint16_t msize;
906
907
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909 "communicator receive\n");
910
911 if (NULL == ch->mq)
912 return GNUNET_SYSERR;
913 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
914 {
915 GNUNET_log (
916 GNUNET_ERROR_TYPE_WARNING,
917 "Dropping message: transport is too slow, queue length %llu exceeded\n",
918 ch->max_queue_length);
919 return GNUNET_NO;
920 }
921
922 msize = ntohs (msg->size);
923 env =
924 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
925 if (NULL == env)
926 {
927 GNUNET_break (0);
928 return GNUNET_SYSERR;
929 }
930 im->expected_address_validity =
931 GNUNET_TIME_relative_hton (expected_addr_validity);
932 im->sender = *sender;
933 // FIXME: this is expensive, would be better if we would
934 // re-design the API to allow us to create the envelope first,
935 // and then have the application fill in the body so we do
936 // not have to memcpy()
937 memcpy (&im[1], msg, msize);
938 im->fc_on = htonl (GNUNET_NO);
939 if (NULL != cb)
940 {
941 struct FlowControl *fc;
942
943 im->fc_on = htonl (GNUNET_YES);
944 im->fc_id = ch->fc_gen++;
945 fc = GNUNET_new (struct FlowControl);
946 fc->sender = *sender;
947 fc->id = im->fc_id;
948 fc->cb = cb;
949 fc->cb_cls = cb_cls;
950 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
951 }
952 GNUNET_MQ_send (ch->mq, env);
953 return GNUNET_OK;
954}
955
956
957/* ************************* Discovery *************************** */
958
959
960/**
961 * Notify transport service that an MQ became available due to an
962 * "inbound" connection or because the communicator discovered the
963 * presence of another peer.
964 *
965 * @param ch connection to transport service
966 * @param peer peer with which we can now communicate
967 * @param address address in human-readable format, 0-terminated, UTF-8
968 * @param mtu maximum message size supported by queue, 0 if
969 * sending is not supported, SIZE_MAX for no MTU
970 * @param q_len number of messages that can be send through this queue
971 * @param priority queue priority. Queues with highest priority should be
972 * used
973 * @param nt which network type does the @a address belong to?
974 * @param cc what characteristics does the communicator have?
975 * @param cs what is the connection status of the queue?
976 * @param mq message queue of the @a peer
977 * @return API handle identifying the new MQ
978 */
979struct GNUNET_TRANSPORT_QueueHandle *
980GNUNET_TRANSPORT_communicator_mq_add (
981 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
982 const struct GNUNET_PeerIdentity *peer,
983 const char *address,
984 uint32_t mtu,
985 uint64_t q_len,
986 uint32_t priority,
987 enum GNUNET_NetworkType nt,
988 enum GNUNET_TRANSPORT_ConnectionStatus cs,
989 struct GNUNET_MQ_Handle *mq)
990{
991 struct GNUNET_TRANSPORT_QueueHandle *qh;
992
993 // Do not notify the service if there is no intial capacity.
994 GNUNET_assert (0 < q_len);
995
996 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
997 qh->ch = ch;
998 qh->peer = *peer;
999 qh->address = GNUNET_strdup (address);
1000 qh->nt = nt;
1001 qh->mtu = mtu;
1002 qh->q_len = q_len;
1003 qh->priority = priority;
1004 qh->cs = cs;
1005 qh->mq = mq;
1006 qh->queue_id = ch->queue_gen++;
1007 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
1008 send_add_queue (qh);
1009 return qh;
1010}
1011
1012
1013/**
1014 * Notify transport service that an MQ was updated
1015 *
1016 * @param ch connection to transport service
1017 * @param qh the queue to update
1018 * @param q_len number of messages that can be send through this queue
1019 * @param priority queue priority. Queues with highest priority should be
1020 * used
1021 */
1022void
1023GNUNET_TRANSPORT_communicator_mq_update (
1024 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1025 const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
1026 uint64_t q_len,
1027 uint32_t priority)
1028{
1029 struct GNUNET_TRANSPORT_QueueHandle *qh;
1030
1031 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
1032 {
1033 if (u_qh == qh)
1034 break;
1035 }
1036 GNUNET_assert (NULL != qh);
1037 qh->q_len = q_len;
1038 qh->priority = priority;
1039 send_update_queue (qh);
1040}
1041
1042
1043/**
1044 * Notify transport service that an MQ became unavailable due to a
1045 * disconnect or timeout.
1046 *
1047 * @param qh handle for the queue that must be invalidated
1048 */
1049void
1050GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1051{
1052 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1053
1054 send_del_queue (qh);
1055 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
1056 GNUNET_MQ_destroy (qh->mq);
1057 GNUNET_free (qh->address);
1058 GNUNET_free (qh);
1059}
1060
1061
1062/**
1063 * Notify transport service about an address that this communicator
1064 * provides for this peer.
1065 *
1066 * @param ch connection to transport service
1067 * @param address our address in human-readable format, 0-terminated, UTF-8
1068 * @param nt which network type does the address belong to?
1069 * @param expiration when does the communicator forsee this address expiring?
1070 */
1071struct GNUNET_TRANSPORT_AddressIdentifier *
1072GNUNET_TRANSPORT_communicator_address_add (
1073 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1074 const char *address,
1075 enum GNUNET_NetworkType nt,
1076 struct GNUNET_TIME_Relative expiration)
1077{
1078 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1079
1080 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1081 ai->ch = ch;
1082 ai->address = GNUNET_strdup (address);
1083 ai->nt = nt;
1084 ai->expiration = expiration;
1085 ai->aid = ch->aid_gen++;
1086 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1087 send_add_address (ai);
1088 return ai;
1089}
1090
1091/**
1092 * Notify transport service about an address that this communicator no
1093 * longer provides for this peer.
1094 *
1095 * @param ai address that is no longer provided
1096 */
1097void
1098GNUNET_TRANSPORT_communicator_address_remove (
1099 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1100{
1101 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1102
1103 send_del_address (ai);
1104 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1105 GNUNET_free (ai->address);
1106 GNUNET_free (ai);
1107}
1108
1109/**
1110 * Notify transport service that this communicator no longer provides all its addresses for this peer.
1111 *
1112 * @param ch The communicator handle.
1113 */
1114void
1115GNUNET_TRANSPORT_communicator_address_remove_all (
1116 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
1117{
1118 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
1119 ai = ai->next)
1120 GNUNET_TRANSPORT_communicator_address_remove (ai);
1121}
1122
1123
1124/* ************************* Backchannel *************************** */
1125
1126
1127/**
1128 * The communicator asks the transport service to route a message via
1129 * a different path to another communicator service at another peer.
1130 * This must only be done for special control traffic (as there is no
1131 * flow control for this API), such as acknowledgements, and generally
1132 * only be done if the communicator is uni-directional (i.e. cannot
1133 * send the message back itself).
1134 *
1135 * @param ch handle of this communicator
1136 * @param pid peer to send the message to
1137 * @param comm name of the communicator to send the message to
1138 * @param header header of the message to transmit and pass via the
1139 * notify-API to @a pid's communicator @a comm
1140 */
1141void
1142GNUNET_TRANSPORT_communicator_notify (
1143 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1144 const struct GNUNET_PeerIdentity *pid,
1145 const char *comm,
1146 const struct GNUNET_MessageHeader *header)
1147{
1148 struct GNUNET_MQ_Envelope *env;
1149 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1150 size_t slen = strlen (comm) + 1;
1151 uint16_t mlen = ntohs (header->size);
1152
1153 GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
1154 env =
1155 GNUNET_MQ_msg_extra (cb,
1156 slen + mlen,
1157 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1158 cb->pid = *pid;
1159 memcpy (&cb[1], header, mlen);
1160 memcpy (((char *) &cb[1]) + mlen, comm, slen);
1161 GNUNET_MQ_send (ch->mq, env);
1162}
1163
1164
1165/* end of transport_api2_communication.c */