aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c1115
1 files changed, 0 insertions, 1115 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
deleted file mode 100644
index 35ef039da..000000000
--- a/src/transport/transport_api2_communication.c
+++ /dev/null
@@ -1,1115 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/transport_api2_communication.c
23 * @brief implementation of the gnunet_transport_communication_service.h API
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_transport_communication_service.h"
30#include "gnunet_ats_transport_service.h"
31#include "transport.h"
32
33
34/**
35 * How many messages do we keep at most in the queue to the
36 * transport service before we start to drop (default,
37 * can be changed via the configuration file).
38 */
39#define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42/**
43 * Information we track per packet to enable flow control.
44 */
45struct FlowControl
46{
47 /**
48 * Kept in a DLL.
49 */
50 struct FlowControl *next;
51
52 /**
53 * Kept in a DLL.
54 */
55 struct FlowControl *prev;
56
57 /**
58 * Function to call once the message was processed.
59 */
60 GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62 /**
63 * Closure for @e cb
64 */
65 void *cb_cls;
66
67 /**
68 * Which peer is this about?
69 */
70 struct GNUNET_PeerIdentity sender;
71
72 /**
73 * More-or-less unique ID for the message.
74 */
75 uint64_t id;
76};
77
78
79/**
80 * Information we track per message to tell the transport about
81 * success or failures.
82 */
83struct AckPending
84{
85 /**
86 * Kept in a DLL.
87 */
88 struct AckPending *next;
89
90 /**
91 * Kept in a DLL.
92 */
93 struct AckPending *prev;
94
95 /**
96 * Communicator this entry belongs to.
97 */
98 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100 /**
101 * Which peer is this about?
102 */
103 struct GNUNET_PeerIdentity receiver;
104
105 /**
106 * More-or-less unique ID for the message.
107 */
108 uint64_t mid;
109
110 /**
111 * Queue ID of the queue which will be used for the message.
112 */
113 uint32_t qid;
114};
115
116
117/**
118 * Opaque handle to the transport service for communicators.
119 */
120struct GNUNET_TRANSPORT_CommunicatorHandle
121{
122 /**
123 * Head of DLL of addresses this communicator offers to the transport service.
124 */
125 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
126
127 /**
128 * Tail of DLL of addresses this communicator offers to the transport service.
129 */
130 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
131
132 /**
133 * DLL of messages awaiting flow control confirmation (ack).
134 */
135 struct FlowControl *fc_head;
136
137 /**
138 * DLL of messages awaiting flow control confirmation (ack).
139 */
140 struct FlowControl *fc_tail;
141
142 /**
143 * DLL of messages awaiting transmission confirmation (ack).
144 */
145 struct AckPending *ap_head;
146
147 /**
148 * DLL of messages awaiting transmission confirmation (ack).
149 */
150 struct AckPending *ap_tail;
151
152 /**
153 * DLL of queues we offer.
154 */
155 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
156
157 /**
158 * DLL of queues we offer.
159 */
160 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
161
162 /**
163 * Our configuration.
164 */
165 const struct GNUNET_CONFIGURATION_Handle *cfg;
166
167 /**
168 * Config section to use.
169 */
170 const char *config_section;
171
172 /**
173 * Address prefix to use.
174 */
175 const char *addr_prefix;
176
177 /**
178 * Function to call when the transport service wants us to initiate
179 * a communication channel with another peer.
180 */
181 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
182
183 /**
184 * Closure for @e mq_init.
185 */
186 void *mq_init_cls;
187
188 /**
189 * Function to call when the transport service receives messages
190 * for a communicator (i.e. for NAT traversal or for non-bidirectional
191 * communicators).
192 */
193 GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
194
195 /**
196 * Closure for @e notify_Cb.
197 */
198 void *notify_cb_cls;
199
200 /**
201 * Queue to talk to the transport service.
202 */
203 struct GNUNET_MQ_Handle *mq;
204
205 /**
206 * Maximum permissible queue length.
207 */
208 unsigned long long max_queue_length;
209
210 /**
211 * Flow-control identifier generator.
212 */
213 uint64_t fc_gen;
214
215 /**
216 * Internal UUID for the address used in communication with the
217 * transport service.
218 */
219 uint32_t aid_gen;
220
221 /**
222 * Queue identifier generator.
223 */
224 uint32_t queue_gen;
225
226 /**
227 * Characteristics of the communicator.
228 */
229 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
230};
231
232
233/**
234 * Handle returned to identify the internal data structure the transport
235 * API has created to manage a message queue to a particular peer.
236 */
237struct GNUNET_TRANSPORT_QueueHandle
238{
239 /**
240 * Kept in a DLL.
241 */
242 struct GNUNET_TRANSPORT_QueueHandle *next;
243
244 /**
245 * Kept in a DLL.
246 */
247 struct GNUNET_TRANSPORT_QueueHandle *prev;
248
249 /**
250 * Handle this queue belongs to.
251 */
252 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
253
254 /**
255 * Address used by the communication queue.
256 */
257 char *address;
258
259 /**
260 * The queue itself.
261 */
262 struct GNUNET_MQ_Handle *mq;
263
264 /**
265 * Which peer we can communciate with.
266 */
267 struct GNUNET_PeerIdentity peer;
268
269 /**
270 * Network type of the communication queue.
271 */
272 enum GNUNET_NetworkType nt;
273
274 /**
275 * Communication status of the queue.
276 */
277 enum GNUNET_TRANSPORT_ConnectionStatus cs;
278
279 /**
280 * ID for this queue when talking to the transport service.
281 */
282 uint32_t queue_id;
283
284 /**
285 * Maximum transmission unit for the queue.
286 */
287 uint32_t mtu;
288
289 /**
290 * Queue length.
291 */
292 uint64_t q_len;
293 /**
294 * Queue priority.
295 */
296 uint32_t priority;
297};
298
299
300/**
301 * Internal representation of an address a communicator is
302 * currently providing for the transport service.
303 */
304struct GNUNET_TRANSPORT_AddressIdentifier
305{
306 /**
307 * Kept in a DLL.
308 */
309 struct GNUNET_TRANSPORT_AddressIdentifier *next;
310
311 /**
312 * Kept in a DLL.
313 */
314 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
315
316 /**
317 * Transport handle where the address was added.
318 */
319 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
320
321 /**
322 * The actual address.
323 */
324 char *address;
325
326 /**
327 * When does the address expire? (Expected lifetime of the
328 * address.)
329 */
330 struct GNUNET_TIME_Relative expiration;
331
332 /**
333 * Internal UUID for the address used in communication with the
334 * transport service.
335 */
336 uint32_t aid;
337
338 /**
339 * Network type for the address.
340 */
341 enum GNUNET_NetworkType nt;
342};
343
344
345/**
346 * (re)connect our communicator to the transport service
347 *
348 * @param ch handle to reconnect
349 */
350static void
351reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
352
353
354/**
355 * Send message to the transport service about address @a ai
356 * being now available.
357 *
358 * @param ai address to add
359 */
360static void
361send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
362{
363 struct GNUNET_MQ_Envelope *env;
364 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
365
366 if (NULL == ai->ch->mq)
367 return;
368 env = GNUNET_MQ_msg_extra (aam,
369 strlen (ai->address) + 1,
370 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
371 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
372 aam->nt = htonl ((uint32_t) ai->nt);
373 memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
374 GNUNET_MQ_send (ai->ch->mq, env);
375}
376
377
378/**
379 * Send message to the transport service about address @a ai
380 * being no longer available.
381 *
382 * @param ai address to delete
383 */
384static void
385send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
386{
387 struct GNUNET_MQ_Envelope *env;
388 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
389
390 if (NULL == ai->ch->mq)
391 return;
392 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
393 dam->aid = htonl (ai->aid);
394 GNUNET_MQ_send (ai->ch->mq, env);
395}
396
397
398/**
399 * Send message to the transport service about queue @a qh
400 * being now available.
401 *
402 * @param qh queue to add
403 */
404static void
405send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
406{
407 struct GNUNET_MQ_Envelope *env;
408 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
409
410 if (NULL == qh->ch->mq)
411 return;
412 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413 "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
414 env = GNUNET_MQ_msg_extra (aqm,
415 strlen (qh->address) + 1,
416 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
417 aqm->qid = htonl (qh->queue_id);
418 aqm->receiver = qh->peer;
419 aqm->nt = htonl ((uint32_t) qh->nt);
420 aqm->mtu = htonl (qh->mtu);
421 aqm->q_len = GNUNET_htonll (qh->q_len);
422 aqm->priority = htonl (qh->priority);
423 aqm->cs = htonl ((uint32_t) qh->cs);
424 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
425 GNUNET_MQ_send (qh->ch->mq, env);
426}
427
428
429/**
430 * Send message to the transport service about queue @a qh
431 * updated.
432 *
433 * @param qh queue to add
434 */
435static void
436send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
437{
438 struct GNUNET_MQ_Envelope *env;
439 struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
440
441 if (NULL == qh->ch->mq)
442 return;
443 env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
444 uqm->qid = htonl (qh->queue_id);
445 uqm->receiver = qh->peer;
446 uqm->nt = htonl ((uint32_t) qh->nt);
447 uqm->mtu = htonl (qh->mtu);
448 uqm->q_len = GNUNET_htonll (qh->q_len);
449 uqm->priority = htonl (qh->priority);
450 uqm->cs = htonl ((uint32_t) qh->cs);
451 GNUNET_MQ_send (qh->ch->mq, env);
452}
453
454
455/**
456 * Send message to the transport service about queue @a qh
457 * being no longer available.
458 *
459 * @param qh queue to delete
460 */
461static void
462send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
463{
464 struct GNUNET_MQ_Envelope *env;
465 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
466
467 if (NULL == qh->ch->mq)
468 return;
469 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
470 dqm->qid = htonl (qh->queue_id);
471 dqm->receiver = qh->peer;
472 GNUNET_MQ_send (qh->ch->mq, env);
473}
474
475
476/**
477 * Disconnect from the transport service. Purges
478 * all flow control entries as we will no longer receive
479 * the ACKs. Purges the ack pending entries as the
480 * transport will no longer expect the confirmations.
481 *
482 * @param ch service to disconnect from
483 */
484static void
485disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
486{
487 struct FlowControl *fcn;
488 struct AckPending *apn;
489
490 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
491 {
492 fcn = fc->next;
493 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
494 fc->cb (fc->cb_cls, GNUNET_SYSERR);
495 GNUNET_free (fc);
496 }
497 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
498 {
499 apn = ap->next;
500 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
501 GNUNET_free (ap);
502 }
503 if (NULL == ch->mq)
504 return;
505 GNUNET_MQ_destroy (ch->mq);
506 ch->mq = NULL;
507}
508
509
510/**
511 * Function called on MQ errors.
512 */
513static void
514error_handler (void *cls, enum GNUNET_MQ_Error error)
515{
516 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
517
518 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
519 "MQ failure %d, reconnecting to transport service.\n",
520 error);
521 disconnect (ch);
522 /* TODO: maybe do this with exponential backoff/delay */
523 reconnect (ch);
524}
525
526
527/**
528 * Transport service acknowledged a message we gave it
529 * (with flow control enabled). Tell the communicator.
530 *
531 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
532 * @param incoming_ack the ack
533 */
534static void
535handle_incoming_ack (
536 void *cls,
537 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
538{
539 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
540
541 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
542 {
543 if ((fc->id == incoming_ack->fc_id) &&
544 (0 == memcmp (&fc->sender,
545 &incoming_ack->sender,
546 sizeof(struct GNUNET_PeerIdentity))))
547 {
548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549 "Done with message with flow control id %" PRIu64
550 " for sender %s from sender %s\n",
551 incoming_ack->fc_id,
552 GNUNET_i2s (&fc->sender),
553 GNUNET_i2s (&incoming_ack->sender));
554 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
555 fc->cb (fc->cb_cls, GNUNET_OK);
556 GNUNET_free (fc);
557 return;
558 }
559 }
560 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
561 "Message with flow control id %" PRIu64
562 " from sender %s not found\n",
563 incoming_ack->fc_id,
564 GNUNET_i2s (&incoming_ack->sender));
565 GNUNET_break (0);
566 disconnect (ch);
567 /* TODO: maybe do this with exponential backoff/delay */
568 reconnect (ch);
569}
570
571
572/**
573 * Transport service wants us to create a queue. Check if @a cq
574 * is well-formed.
575 *
576 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
577 * @param cq the queue creation request
578 * @return #GNUNET_OK if @a smt is well-formed
579 */
580static int
581check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
582{
583 (void) cls;
584 GNUNET_MQ_check_zero_termination (cq);
585 return GNUNET_OK;
586}
587
588
589/**
590 * Transport service wants us to create a queue. Tell the communicator.
591 *
592 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
593 * @param cq the queue creation request
594 */
595static void
596handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
597{
598 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
599 const char *addr = (const char *) &cq[1];
600 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
601 struct GNUNET_MQ_Envelope *env;
602
603 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
604 {
605 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
606 "Address `%s' invalid for this communicator\n",
607 addr);
608 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
609 }
610 else
611 {
612 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
613 }
614 cqr->request_id = cq->request_id;
615 GNUNET_MQ_send (ch->mq, env);
616}
617
618
619/**
620 * Transport service wants us to send a message. Check if @a smt
621 * is well-formed.
622 *
623 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
624 * @param smt the transmission request
625 * @return #GNUNET_OK if @a smt is well-formed
626 */
627static int
628check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
629{
630 (void) cls;
631 GNUNET_MQ_check_boxed_message (smt);
632 return GNUNET_OK;
633}
634
635
636/**
637 * Notify transport service about @a status of a message with
638 * @a mid sent to @a receiver.
639 *
640 * @param ch handle
641 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
642 * @param receiver which peer was the receiver
643 * @param mid message that the ack is about
644 */
645static void
646send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
647 int status,
648 const struct GNUNET_PeerIdentity *receiver,
649 uint64_t mid,
650 uint32_t qid)
651{
652 struct GNUNET_MQ_Envelope *env;
653 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
654
655 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
656 ack->status = htonl (status);
657 ack->mid = mid;
658 ack->qid = qid;
659 ack->receiver = *receiver;
660 GNUNET_MQ_send (ch->mq, env);
661}
662
663
664/**
665 * Message queue transmission by communicator was successful,
666 * notify transport service.
667 *
668 * @param cls an `struct AckPending *`
669 */
670static void
671send_ack_cb (void *cls)
672{
673 struct AckPending *ap = cls;
674 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
675
676 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
677 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid, ap->qid);
678 GNUNET_free (ap);
679}
680
681
682/**
683 * Transport service wants us to send a message. Tell the communicator.
684 *
685 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
686 * @param smt the transmission request
687 */
688static void
689handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
690{
691 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
692 const struct GNUNET_MessageHeader *mh;
693 struct GNUNET_MQ_Envelope *env;
694 struct AckPending *ap;
695 struct GNUNET_TRANSPORT_QueueHandle *qh;
696
697 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
698 if ((qh->queue_id == ntohl (smt->qid)) &&
699 (0 == memcmp (&qh->peer,
700 &smt->receiver,
701 sizeof(struct GNUNET_PeerIdentity))))
702 break;
703 if (NULL == qh)
704 {
705 /* queue is already gone, tell transport this one failed */
706 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
707 "Transmission failed, queue no longer exists.\n");
708 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid, smt->qid);
709 return;
710 }
711 ap = GNUNET_new (struct AckPending);
712 ap->ch = ch;
713 ap->receiver = smt->receiver;
714 ap->mid = smt->mid;
715 ap->qid = smt->qid;
716 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
717 mh = (const struct GNUNET_MessageHeader *) &smt[1];
718 env = GNUNET_MQ_msg_copy (mh);
719 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
720 GNUNET_MQ_send (qh->mq, env);
721}
722
723
724/**
725 * Transport service gives us backchannel message. Check if @a bi
726 * is well-formed.
727 *
728 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
729 * @param bi the backchannel message
730 * @return #GNUNET_OK if @a smt is well-formed
731 */
732static int
733check_backchannel_incoming (
734 void *cls,
735 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
736{
737 (void) cls;
738 GNUNET_MQ_check_boxed_message (bi);
739 return GNUNET_OK;
740}
741
742
743/**
744 * Transport service gives us backchannel message. Handle it.
745 *
746 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
747 * @param bi the backchannel message
748 */
749static void
750handle_backchannel_incoming (
751 void *cls,
752 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
753{
754 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
755 if (NULL != ch->notify_cb)
756 ch->notify_cb (ch->notify_cb_cls,
757 &bi->pid,
758 (const struct GNUNET_MessageHeader *) &bi[1]);
759 else
760 GNUNET_log (
761 GNUNET_ERROR_TYPE_INFO,
762 _ ("Dropped backchanel message: handler not provided by communicator\n"));
763}
764
765
766/**
767 * (re)connect our communicator to the transport service
768 *
769 * @param ch handle to reconnect
770 */
771static void
772reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
773{
774 struct GNUNET_MQ_MessageHandler handlers[] =
775 { GNUNET_MQ_hd_fixed_size (incoming_ack,
776 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
777 struct GNUNET_TRANSPORT_IncomingMessageAck,
778 ch),
779 GNUNET_MQ_hd_var_size (create_queue,
780 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
781 struct GNUNET_TRANSPORT_CreateQueue,
782 ch),
783 GNUNET_MQ_hd_var_size (send_msg,
784 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
785 struct GNUNET_TRANSPORT_SendMessageTo,
786 ch),
787 GNUNET_MQ_hd_var_size (
788 backchannel_incoming,
789 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
790 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
791 ch),
792 GNUNET_MQ_handler_end () };
793 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
794 struct GNUNET_MQ_Envelope *env;
795
796 ch->mq =
797 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
798 if (NULL == ch->mq)
799 return;
800 env = GNUNET_MQ_msg_extra (cam,
801 strlen (ch->addr_prefix) + 1,
802 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
803 cam->cc = htonl ((uint32_t) ch->cc);
804 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
805 GNUNET_MQ_send (ch->mq, env);
806 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
807 ai = ai->next)
808 send_add_address (ai);
809 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
810 qh = qh->next)
811 send_add_queue (qh);
812}
813
814
815struct GNUNET_TRANSPORT_CommunicatorHandle *
816GNUNET_TRANSPORT_communicator_connect (
817 const struct GNUNET_CONFIGURATION_Handle *cfg,
818 const char *config_section,
819 const char *addr_prefix,
820 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
821 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
822 void *mq_init_cls,
823 GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
824 void *notify_cb_cls)
825{
826 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
827
828 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
829 ch->cfg = cfg;
830 ch->config_section = config_section;
831 ch->addr_prefix = addr_prefix;
832 ch->mq_init = mq_init;
833 ch->mq_init_cls = mq_init_cls;
834 ch->notify_cb = notify_cb;
835 ch->notify_cb_cls = notify_cb_cls;
836 ch->cc = cc;
837 reconnect (ch);
838 if (GNUNET_OK !=
839 GNUNET_CONFIGURATION_get_value_number (cfg,
840 config_section,
841 "MAX_QUEUE_LENGTH",
842 &ch->max_queue_length))
843 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
844 if (NULL == ch->mq)
845 {
846 GNUNET_free (ch);
847 return NULL;
848 }
849 return ch;
850}
851
852
853/**
854 * Disconnect from the transport service.
855 *
856 * @param ch handle returned from connect
857 */
858void
859GNUNET_TRANSPORT_communicator_disconnect (
860 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
861{
862 disconnect (ch);
863 while (NULL != ch->ai_head)
864 {
865 GNUNET_break (0); /* communicator forgot to remove address, warn! */
866 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
867 }
868 GNUNET_free (ch);
869}
870
871
872/* ************************* Receiving *************************** */
873
874
875int
876GNUNET_TRANSPORT_communicator_receive (
877 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
878 const struct GNUNET_PeerIdentity *sender,
879 const struct GNUNET_MessageHeader *msg,
880 struct GNUNET_TIME_Relative expected_addr_validity,
881 GNUNET_TRANSPORT_MessageCompletedCallback cb,
882 void *cb_cls)
883{
884 struct GNUNET_MQ_Envelope *env;
885 struct GNUNET_TRANSPORT_IncomingMessage *im;
886 uint16_t msize;
887
888
889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890 "communicator receive\n");
891
892 if (NULL == ch->mq)
893 return GNUNET_SYSERR;
894 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
895 {
896 GNUNET_log (
897 GNUNET_ERROR_TYPE_WARNING,
898 "Dropping message: transport is too slow, queue length %llu exceeded\n",
899 ch->max_queue_length);
900 return GNUNET_NO;
901 }
902
903 msize = ntohs (msg->size);
904 env =
905 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
906 if (NULL == env)
907 {
908 GNUNET_break (0);
909 return GNUNET_SYSERR;
910 }
911 im->expected_address_validity =
912 GNUNET_TIME_relative_hton (expected_addr_validity);
913 im->sender = *sender;
914 // FIXME: this is expensive, would be better if we would
915 // re-design the API to allow us to create the envelope first,
916 // and then have the application fill in the body so we do
917 // not have to memcpy()
918 memcpy (&im[1], msg, msize);
919 im->fc_on = htonl (GNUNET_NO);
920 if (NULL != cb)
921 {
922 struct FlowControl *fc;
923
924 im->fc_on = htonl (GNUNET_YES);
925 im->fc_id = ch->fc_gen++;
926 fc = GNUNET_new (struct FlowControl);
927 fc->sender = *sender;
928 fc->id = im->fc_id;
929 fc->cb = cb;
930 fc->cb_cls = cb_cls;
931 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933 "Created flow control id %" PRIu64 " for sender %s\n",
934 fc->id,
935 GNUNET_i2s (&fc->sender));
936 }
937 GNUNET_MQ_send (ch->mq, env);
938 return GNUNET_OK;
939}
940
941
942/* ************************* Discovery *************************** */
943
944
945struct GNUNET_TRANSPORT_QueueHandle *
946GNUNET_TRANSPORT_communicator_mq_add (
947 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
948 const struct GNUNET_PeerIdentity *peer,
949 const char *address,
950 uint32_t mtu,
951 uint64_t q_len,
952 uint32_t priority,
953 enum GNUNET_NetworkType nt,
954 enum GNUNET_TRANSPORT_ConnectionStatus cs,
955 struct GNUNET_MQ_Handle *mq)
956{
957 struct GNUNET_TRANSPORT_QueueHandle *qh;
958
959 // Do not notify the service if there is no intial capacity.
960 GNUNET_assert (0 < q_len);
961
962 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
963 qh->ch = ch;
964 qh->peer = *peer;
965 qh->address = GNUNET_strdup (address);
966 qh->nt = nt;
967 qh->mtu = mtu;
968 qh->q_len = q_len;
969 qh->priority = priority;
970 qh->cs = cs;
971 qh->mq = mq;
972 qh->queue_id = ch->queue_gen++;
973 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
974 send_add_queue (qh);
975 return qh;
976}
977
978
979void
980GNUNET_TRANSPORT_communicator_mq_update (
981 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
982 const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
983 uint64_t q_len,
984 uint32_t priority)
985{
986 struct GNUNET_TRANSPORT_QueueHandle *qh;
987
988 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
989 {
990 if (u_qh == qh)
991 break;
992 }
993 GNUNET_assert (NULL != qh);
994 qh->q_len = q_len;
995 qh->priority = priority;
996 send_update_queue (qh);
997}
998
999
1000/**
1001 * Notify transport service that an MQ became unavailable due to a
1002 * disconnect or timeout.
1003 *
1004 * @param qh handle for the queue that must be invalidated
1005 */
1006void
1007GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1008{
1009 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1010
1011 send_del_queue (qh);
1012 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
1013 GNUNET_MQ_destroy (qh->mq);
1014 GNUNET_free (qh->address);
1015 GNUNET_free (qh);
1016}
1017
1018
1019/**
1020 * Notify transport service about an address that this communicator
1021 * provides for this peer.
1022 *
1023 * @param ch connection to transport service
1024 * @param address our address in human-readable format, 0-terminated, UTF-8
1025 * @param nt which network type does the address belong to?
1026 * @param expiration when does the communicator forsee this address expiring?
1027 */
1028struct GNUNET_TRANSPORT_AddressIdentifier *
1029GNUNET_TRANSPORT_communicator_address_add (
1030 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1031 const char *address,
1032 enum GNUNET_NetworkType nt,
1033 struct GNUNET_TIME_Relative expiration)
1034{
1035 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1036
1037 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1038 ai->ch = ch;
1039 ai->address = GNUNET_strdup (address);
1040 ai->nt = nt;
1041 ai->expiration = expiration;
1042 ai->aid = ch->aid_gen++;
1043 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1044 send_add_address (ai);
1045 return ai;
1046}
1047
1048
1049/**
1050 * Notify transport service about an address that this communicator no
1051 * longer provides for this peer.
1052 *
1053 * @param ai address that is no longer provided
1054 */
1055void
1056GNUNET_TRANSPORT_communicator_address_remove (
1057 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1058{
1059 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1060
1061 send_del_address (ai);
1062 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1063 GNUNET_free (ai->address);
1064 GNUNET_free (ai);
1065 ai = NULL;
1066}
1067
1068
1069/**
1070 * Notify transport service that this communicator no longer provides all its addresses for this peer.
1071 *
1072 * @param ch The communicator handle.
1073 */
1074void
1075GNUNET_TRANSPORT_communicator_address_remove_all (
1076 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
1077{
1078 struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
1079 while (NULL != ai)
1080 {
1081 struct GNUNET_TRANSPORT_AddressIdentifier *ai_next = ai->next;
1082 GNUNET_TRANSPORT_communicator_address_remove (ai);
1083 ai = ai_next;
1084 }
1085}
1086
1087
1088/* ************************* Backchannel *************************** */
1089
1090
1091void
1092GNUNET_TRANSPORT_communicator_notify (
1093 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1094 const struct GNUNET_PeerIdentity *pid,
1095 const char *comm,
1096 const struct GNUNET_MessageHeader *header)
1097{
1098 struct GNUNET_MQ_Envelope *env;
1099 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1100 size_t slen = strlen (comm) + 1;
1101 uint16_t mlen = ntohs (header->size);
1102
1103 GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
1104 env =
1105 GNUNET_MQ_msg_extra (cb,
1106 slen + mlen,
1107 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1108 cb->pid = *pid;
1109 memcpy (&cb[1], header, mlen);
1110 memcpy (((char *) &cb[1]) + mlen, comm, slen);
1111 GNUNET_MQ_send (ch->mq, env);
1112}
1113
1114
1115/* end of transport_api2_communication.c */