aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c1103
1 files changed, 0 insertions, 1103 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
deleted file mode 100644
index 079982ca5..000000000
--- a/src/transport/transport_api2_communication.c
+++ /dev/null
@@ -1,1103 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/transport_api2_communication.c
23 * @brief implementation of the gnunet_transport_communication_service.h API
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_transport_communication_service.h"
30#include "gnunet_ats_transport_service.h"
31#include "transport.h"
32
33
34/**
35 * How many messages do we keep at most in the queue to the
36 * transport service before we start to drop (default,
37 * can be changed via the configuration file).
38 */
39#define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42/**
43 * Information we track per packet to enable flow control.
44 */
45struct FlowControl
46{
47 /**
48 * Kept in a DLL.
49 */
50 struct FlowControl *next;
51
52 /**
53 * Kept in a DLL.
54 */
55 struct FlowControl *prev;
56
57 /**
58 * Function to call once the message was processed.
59 */
60 GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62 /**
63 * Closure for @e cb
64 */
65 void *cb_cls;
66
67 /**
68 * Which peer is this about?
69 */
70 struct GNUNET_PeerIdentity sender;
71
72 /**
73 * More-or-less unique ID for the message.
74 */
75 uint64_t id;
76};
77
78
79/**
80 * Information we track per message to tell the transport about
81 * success or failures.
82 */
83struct AckPending
84{
85 /**
86 * Kept in a DLL.
87 */
88 struct AckPending *next;
89
90 /**
91 * Kept in a DLL.
92 */
93 struct AckPending *prev;
94
95 /**
96 * Communicator this entry belongs to.
97 */
98 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100 /**
101 * Which peer is this about?
102 */
103 struct GNUNET_PeerIdentity receiver;
104
105 /**
106 * More-or-less unique ID for the message.
107 */
108 uint64_t mid;
109};
110
111
112/**
113 * Opaque handle to the transport service for communicators.
114 */
115struct GNUNET_TRANSPORT_CommunicatorHandle
116{
117 /**
118 * Head of DLL of addresses this communicator offers to the transport service.
119 */
120 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122 /**
123 * Tail of DLL of addresses this communicator offers to the transport service.
124 */
125 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127 /**
128 * DLL of messages awaiting flow control confirmation (ack).
129 */
130 struct FlowControl *fc_head;
131
132 /**
133 * DLL of messages awaiting flow control confirmation (ack).
134 */
135 struct FlowControl *fc_tail;
136
137 /**
138 * DLL of messages awaiting transmission confirmation (ack).
139 */
140 struct AckPending *ap_head;
141
142 /**
143 * DLL of messages awaiting transmission confirmation (ack).
144 */
145 struct AckPending *ap_tail;
146
147 /**
148 * DLL of queues we offer.
149 */
150 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152 /**
153 * DLL of queues we offer.
154 */
155 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157 /**
158 * Our configuration.
159 */
160 const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162 /**
163 * Config section to use.
164 */
165 const char *config_section;
166
167 /**
168 * Address prefix to use.
169 */
170 const char *addr_prefix;
171
172 /**
173 * Function to call when the transport service wants us to initiate
174 * a communication channel with another peer.
175 */
176 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178 /**
179 * Closure for @e mq_init.
180 */
181 void *mq_init_cls;
182
183 /**
184 * Function to call when the transport service receives messages
185 * for a communicator (i.e. for NAT traversal or for non-bidirectional
186 * communicators).
187 */
188 GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190 /**
191 * Closure for @e notify_Cb.
192 */
193 void *notify_cb_cls;
194
195 /**
196 * Queue to talk to the transport service.
197 */
198 struct GNUNET_MQ_Handle *mq;
199
200 /**
201 * Maximum permissible queue length.
202 */
203 unsigned long long max_queue_length;
204
205 /**
206 * Flow-control identifier generator.
207 */
208 uint64_t fc_gen;
209
210 /**
211 * Internal UUID for the address used in communication with the
212 * transport service.
213 */
214 uint32_t aid_gen;
215
216 /**
217 * Queue identifier generator.
218 */
219 uint32_t queue_gen;
220
221 /**
222 * Characteristics of the communicator.
223 */
224 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225};
226
227
228/**
229 * Handle returned to identify the internal data structure the transport
230 * API has created to manage a message queue to a particular peer.
231 */
232struct GNUNET_TRANSPORT_QueueHandle
233{
234 /**
235 * Kept in a DLL.
236 */
237 struct GNUNET_TRANSPORT_QueueHandle *next;
238
239 /**
240 * Kept in a DLL.
241 */
242 struct GNUNET_TRANSPORT_QueueHandle *prev;
243
244 /**
245 * Handle this queue belongs to.
246 */
247 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
248
249 /**
250 * Address used by the communication queue.
251 */
252 char *address;
253
254 /**
255 * The queue itself.
256 */
257 struct GNUNET_MQ_Handle *mq;
258
259 /**
260 * Which peer we can communciate with.
261 */
262 struct GNUNET_PeerIdentity peer;
263
264 /**
265 * Network type of the communication queue.
266 */
267 enum GNUNET_NetworkType nt;
268
269 /**
270 * Communication status of the queue.
271 */
272 enum GNUNET_TRANSPORT_ConnectionStatus cs;
273
274 /**
275 * ID for this queue when talking to the transport service.
276 */
277 uint32_t queue_id;
278
279 /**
280 * Maximum transmission unit for the queue.
281 */
282 uint32_t mtu;
283
284 /**
285 * Queue length.
286 */
287 uint64_t q_len;
288 /**
289 * Queue priority.
290 */
291 uint32_t priority;
292};
293
294
295/**
296 * Internal representation of an address a communicator is
297 * currently providing for the transport service.
298 */
299struct GNUNET_TRANSPORT_AddressIdentifier
300{
301 /**
302 * Kept in a DLL.
303 */
304 struct GNUNET_TRANSPORT_AddressIdentifier *next;
305
306 /**
307 * Kept in a DLL.
308 */
309 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
310
311 /**
312 * Transport handle where the address was added.
313 */
314 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
315
316 /**
317 * The actual address.
318 */
319 char *address;
320
321 /**
322 * When does the address expire? (Expected lifetime of the
323 * address.)
324 */
325 struct GNUNET_TIME_Relative expiration;
326
327 /**
328 * Internal UUID for the address used in communication with the
329 * transport service.
330 */
331 uint32_t aid;
332
333 /**
334 * Network type for the address.
335 */
336 enum GNUNET_NetworkType nt;
337};
338
339
340/**
341 * (re)connect our communicator to the transport service
342 *
343 * @param ch handle to reconnect
344 */
345static void
346reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
347
348
349/**
350 * Send message to the transport service about address @a ai
351 * being now available.
352 *
353 * @param ai address to add
354 */
355static void
356send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
357{
358 struct GNUNET_MQ_Envelope *env;
359 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
360
361 if (NULL == ai->ch->mq)
362 return;
363 env = GNUNET_MQ_msg_extra (aam,
364 strlen (ai->address) + 1,
365 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
366 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
367 aam->nt = htonl ((uint32_t) ai->nt);
368 memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
369 GNUNET_MQ_send (ai->ch->mq, env);
370}
371
372
373/**
374 * Send message to the transport service about address @a ai
375 * being no longer available.
376 *
377 * @param ai address to delete
378 */
379static void
380send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
381{
382 struct GNUNET_MQ_Envelope *env;
383 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
384
385 if (NULL == ai->ch->mq)
386 return;
387 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
388 dam->aid = htonl (ai->aid);
389 GNUNET_MQ_send (ai->ch->mq, env);
390}
391
392
393/**
394 * Send message to the transport service about queue @a qh
395 * being now available.
396 *
397 * @param qh queue to add
398 */
399static void
400send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
401{
402 struct GNUNET_MQ_Envelope *env;
403 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
404
405 if (NULL == qh->ch->mq)
406 return;
407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
408 "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
409 env = GNUNET_MQ_msg_extra (aqm,
410 strlen (qh->address) + 1,
411 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
412 aqm->qid = htonl (qh->queue_id);
413 aqm->receiver = qh->peer;
414 aqm->nt = htonl ((uint32_t) qh->nt);
415 aqm->mtu = htonl (qh->mtu);
416 aqm->q_len = GNUNET_htonll (qh->q_len);
417 aqm->priority = htonl (qh->priority);
418 aqm->cs = htonl ((uint32_t) qh->cs);
419 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
420 GNUNET_MQ_send (qh->ch->mq, env);
421}
422
423
424/**
425 * Send message to the transport service about queue @a qh
426 * updated.
427 *
428 * @param qh queue to add
429 */
430static void
431send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
432{
433 struct GNUNET_MQ_Envelope *env;
434 struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
435
436 if (NULL == qh->ch->mq)
437 return;
438 env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
439 uqm->qid = htonl (qh->queue_id);
440 uqm->receiver = qh->peer;
441 uqm->nt = htonl ((uint32_t) qh->nt);
442 uqm->mtu = htonl (qh->mtu);
443 uqm->q_len = GNUNET_htonll (qh->q_len);
444 uqm->priority = htonl (qh->priority);
445 uqm->cs = htonl ((uint32_t) qh->cs);
446 GNUNET_MQ_send (qh->ch->mq, env);
447}
448
449
450/**
451 * Send message to the transport service about queue @a qh
452 * being no longer available.
453 *
454 * @param qh queue to delete
455 */
456static void
457send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
458{
459 struct GNUNET_MQ_Envelope *env;
460 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
461
462 if (NULL == qh->ch->mq)
463 return;
464 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
465 dqm->qid = htonl (qh->queue_id);
466 dqm->receiver = qh->peer;
467 GNUNET_MQ_send (qh->ch->mq, env);
468}
469
470
471/**
472 * Disconnect from the transport service. Purges
473 * all flow control entries as we will no longer receive
474 * the ACKs. Purges the ack pending entries as the
475 * transport will no longer expect the confirmations.
476 *
477 * @param ch service to disconnect from
478 */
479static void
480disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
481{
482 struct FlowControl *fcn;
483 struct AckPending *apn;
484
485 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
486 {
487 fcn = fc->next;
488 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
489 fc->cb (fc->cb_cls, GNUNET_SYSERR);
490 GNUNET_free (fc);
491 }
492 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
493 {
494 apn = ap->next;
495 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
496 GNUNET_free (ap);
497 }
498 if (NULL == ch->mq)
499 return;
500 GNUNET_MQ_destroy (ch->mq);
501 ch->mq = NULL;
502}
503
504
505/**
506 * Function called on MQ errors.
507 */
508static void
509error_handler (void *cls, enum GNUNET_MQ_Error error)
510{
511 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
512
513 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
514 "MQ failure %d, reconnecting to transport service.\n",
515 error);
516 disconnect (ch);
517 /* TODO: maybe do this with exponential backoff/delay */
518 reconnect (ch);
519}
520
521
522/**
523 * Transport service acknowledged a message we gave it
524 * (with flow control enabled). Tell the communicator.
525 *
526 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
527 * @param incoming_ack the ack
528 */
529static void
530handle_incoming_ack (
531 void *cls,
532 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
533{
534 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
535
536 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
537 {
538 if ((fc->id == incoming_ack->fc_id) &&
539 (0 == memcmp (&fc->sender,
540 &incoming_ack->sender,
541 sizeof(struct GNUNET_PeerIdentity))))
542 {
543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544 "Done with message with flow control id %lu for sender %s from sender %s\n",
545 incoming_ack->fc_id,
546 GNUNET_i2s (&fc->sender),
547 GNUNET_i2s (&incoming_ack->sender));
548 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
549 fc->cb (fc->cb_cls, GNUNET_OK);
550 GNUNET_free (fc);
551 return;
552 }
553 }
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "Message with flow control id %lu from sender %s not found\n",
556 incoming_ack->fc_id,
557 GNUNET_i2s (&incoming_ack->sender));
558 GNUNET_break (0);
559 disconnect (ch);
560 /* TODO: maybe do this with exponential backoff/delay */
561 reconnect (ch);
562}
563
564
565/**
566 * Transport service wants us to create a queue. Check if @a cq
567 * is well-formed.
568 *
569 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
570 * @param cq the queue creation request
571 * @return #GNUNET_OK if @a smt is well-formed
572 */
573static int
574check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
575{
576 (void) cls;
577 GNUNET_MQ_check_zero_termination (cq);
578 return GNUNET_OK;
579}
580
581
582/**
583 * Transport service wants us to create a queue. Tell the communicator.
584 *
585 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
586 * @param cq the queue creation request
587 */
588static void
589handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
590{
591 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
592 const char *addr = (const char *) &cq[1];
593 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
594 struct GNUNET_MQ_Envelope *env;
595
596 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
597 {
598 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
599 "Address `%s' invalid for this communicator\n",
600 addr);
601 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
602 }
603 else
604 {
605 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
606 }
607 cqr->request_id = cq->request_id;
608 GNUNET_MQ_send (ch->mq, env);
609}
610
611
612/**
613 * Transport service wants us to send a message. Check if @a smt
614 * is well-formed.
615 *
616 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
617 * @param smt the transmission request
618 * @return #GNUNET_OK if @a smt is well-formed
619 */
620static int
621check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
622{
623 (void) cls;
624 GNUNET_MQ_check_boxed_message (smt);
625 return GNUNET_OK;
626}
627
628
629/**
630 * Notify transport service about @a status of a message with
631 * @a mid sent to @a receiver.
632 *
633 * @param ch handle
634 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
635 * @param receiver which peer was the receiver
636 * @param mid message that the ack is about
637 */
638static void
639send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
640 int status,
641 const struct GNUNET_PeerIdentity *receiver,
642 uint64_t mid)
643{
644 struct GNUNET_MQ_Envelope *env;
645 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
646
647 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
648 ack->status = htonl (status);
649 ack->mid = mid;
650 ack->receiver = *receiver;
651 GNUNET_MQ_send (ch->mq, env);
652}
653
654
655/**
656 * Message queue transmission by communicator was successful,
657 * notify transport service.
658 *
659 * @param cls an `struct AckPending *`
660 */
661static void
662send_ack_cb (void *cls)
663{
664 struct AckPending *ap = cls;
665 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
666
667 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
668 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
669 GNUNET_free (ap);
670}
671
672
673/**
674 * Transport service wants us to send a message. Tell the communicator.
675 *
676 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
677 * @param smt the transmission request
678 */
679static void
680handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
681{
682 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
683 const struct GNUNET_MessageHeader *mh;
684 struct GNUNET_MQ_Envelope *env;
685 struct AckPending *ap;
686 struct GNUNET_TRANSPORT_QueueHandle *qh;
687
688 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
689 if ((qh->queue_id == ntohl (smt->qid)) &&
690 (0 == memcmp (&qh->peer,
691 &smt->receiver,
692 sizeof(struct GNUNET_PeerIdentity))))
693 break;
694 if (NULL == qh)
695 {
696 /* queue is already gone, tell transport this one failed */
697 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
698 "Transmission failed, queue no longer exists.\n");
699 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
700 return;
701 }
702 ap = GNUNET_new (struct AckPending);
703 ap->ch = ch;
704 ap->receiver = smt->receiver;
705 ap->mid = smt->mid;
706 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
707 mh = (const struct GNUNET_MessageHeader *) &smt[1];
708 env = GNUNET_MQ_msg_copy (mh);
709 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
710 GNUNET_MQ_send (qh->mq, env);
711}
712
713
714/**
715 * Transport service gives us backchannel message. Check if @a bi
716 * is well-formed.
717 *
718 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
719 * @param bi the backchannel message
720 * @return #GNUNET_OK if @a smt is well-formed
721 */
722static int
723check_backchannel_incoming (
724 void *cls,
725 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
726{
727 (void) cls;
728 GNUNET_MQ_check_boxed_message (bi);
729 return GNUNET_OK;
730}
731
732
733/**
734 * Transport service gives us backchannel message. Handle it.
735 *
736 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
737 * @param bi the backchannel message
738 */
739static void
740handle_backchannel_incoming (
741 void *cls,
742 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
743{
744 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
745 if (NULL != ch->notify_cb)
746 ch->notify_cb (ch->notify_cb_cls,
747 &bi->pid,
748 (const struct GNUNET_MessageHeader *) &bi[1]);
749 else
750 GNUNET_log (
751 GNUNET_ERROR_TYPE_INFO,
752 _ ("Dropped backchanel message: handler not provided by communicator\n"));
753}
754
755
756/**
757 * (re)connect our communicator to the transport service
758 *
759 * @param ch handle to reconnect
760 */
761static void
762reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
763{
764 struct GNUNET_MQ_MessageHandler handlers[] =
765 { GNUNET_MQ_hd_fixed_size (incoming_ack,
766 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
767 struct GNUNET_TRANSPORT_IncomingMessageAck,
768 ch),
769 GNUNET_MQ_hd_var_size (create_queue,
770 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
771 struct GNUNET_TRANSPORT_CreateQueue,
772 ch),
773 GNUNET_MQ_hd_var_size (send_msg,
774 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
775 struct GNUNET_TRANSPORT_SendMessageTo,
776 ch),
777 GNUNET_MQ_hd_var_size (
778 backchannel_incoming,
779 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
780 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
781 ch),
782 GNUNET_MQ_handler_end () };
783 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
784 struct GNUNET_MQ_Envelope *env;
785
786 ch->mq =
787 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
788 if (NULL == ch->mq)
789 return;
790 env = GNUNET_MQ_msg_extra (cam,
791 strlen (ch->addr_prefix) + 1,
792 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
793 cam->cc = htonl ((uint32_t) ch->cc);
794 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
795 GNUNET_MQ_send (ch->mq, env);
796 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
797 ai = ai->next)
798 send_add_address (ai);
799 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
800 qh = qh->next)
801 send_add_queue (qh);
802}
803
804
805struct GNUNET_TRANSPORT_CommunicatorHandle *
806GNUNET_TRANSPORT_communicator_connect (
807 const struct GNUNET_CONFIGURATION_Handle *cfg,
808 const char *config_section,
809 const char *addr_prefix,
810 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
811 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
812 void *mq_init_cls,
813 GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
814 void *notify_cb_cls)
815{
816 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
817
818 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
819 ch->cfg = cfg;
820 ch->config_section = config_section;
821 ch->addr_prefix = addr_prefix;
822 ch->mq_init = mq_init;
823 ch->mq_init_cls = mq_init_cls;
824 ch->notify_cb = notify_cb;
825 ch->notify_cb_cls = notify_cb_cls;
826 ch->cc = cc;
827 reconnect (ch);
828 if (GNUNET_OK !=
829 GNUNET_CONFIGURATION_get_value_number (cfg,
830 config_section,
831 "MAX_QUEUE_LENGTH",
832 &ch->max_queue_length))
833 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
834 if (NULL == ch->mq)
835 {
836 GNUNET_free (ch);
837 return NULL;
838 }
839 return ch;
840}
841
842
843/**
844 * Disconnect from the transport service.
845 *
846 * @param ch handle returned from connect
847 */
848void
849GNUNET_TRANSPORT_communicator_disconnect (
850 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
851{
852 disconnect (ch);
853 while (NULL != ch->ai_head)
854 {
855 GNUNET_break (0); /* communicator forgot to remove address, warn! */
856 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
857 }
858 GNUNET_free (ch);
859}
860
861
862/* ************************* Receiving *************************** */
863
864
865int
866GNUNET_TRANSPORT_communicator_receive (
867 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
868 const struct GNUNET_PeerIdentity *sender,
869 const struct GNUNET_MessageHeader *msg,
870 struct GNUNET_TIME_Relative expected_addr_validity,
871 GNUNET_TRANSPORT_MessageCompletedCallback cb,
872 void *cb_cls)
873{
874 struct GNUNET_MQ_Envelope *env;
875 struct GNUNET_TRANSPORT_IncomingMessage *im;
876 uint16_t msize;
877
878
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "communicator receive\n");
881
882 if (NULL == ch->mq)
883 return GNUNET_SYSERR;
884 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
885 {
886 GNUNET_log (
887 GNUNET_ERROR_TYPE_WARNING,
888 "Dropping message: transport is too slow, queue length %llu exceeded\n",
889 ch->max_queue_length);
890 return GNUNET_NO;
891 }
892
893 msize = ntohs (msg->size);
894 env =
895 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
896 if (NULL == env)
897 {
898 GNUNET_break (0);
899 return GNUNET_SYSERR;
900 }
901 im->expected_address_validity =
902 GNUNET_TIME_relative_hton (expected_addr_validity);
903 im->sender = *sender;
904 // FIXME: this is expensive, would be better if we would
905 // re-design the API to allow us to create the envelope first,
906 // and then have the application fill in the body so we do
907 // not have to memcpy()
908 memcpy (&im[1], msg, msize);
909 im->fc_on = htonl (GNUNET_NO);
910 if (NULL != cb)
911 {
912 struct FlowControl *fc;
913
914 im->fc_on = htonl (GNUNET_YES);
915 im->fc_id = ch->fc_gen++;
916 fc = GNUNET_new (struct FlowControl);
917 fc->sender = *sender;
918 fc->id = im->fc_id;
919 fc->cb = cb;
920 fc->cb_cls = cb_cls;
921 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
922 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
923 "Created flow control id %lu for sender %s\n",
924 fc->id,
925 GNUNET_i2s (&fc->sender));
926 }
927 GNUNET_MQ_send (ch->mq, env);
928 return GNUNET_OK;
929}
930
931
932/* ************************* Discovery *************************** */
933
934
935struct GNUNET_TRANSPORT_QueueHandle *
936GNUNET_TRANSPORT_communicator_mq_add (
937 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
938 const struct GNUNET_PeerIdentity *peer,
939 const char *address,
940 uint32_t mtu,
941 uint64_t q_len,
942 uint32_t priority,
943 enum GNUNET_NetworkType nt,
944 enum GNUNET_TRANSPORT_ConnectionStatus cs,
945 struct GNUNET_MQ_Handle *mq)
946{
947 struct GNUNET_TRANSPORT_QueueHandle *qh;
948
949 // Do not notify the service if there is no intial capacity.
950 GNUNET_assert (0 < q_len);
951
952 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
953 qh->ch = ch;
954 qh->peer = *peer;
955 qh->address = GNUNET_strdup (address);
956 qh->nt = nt;
957 qh->mtu = mtu;
958 qh->q_len = q_len;
959 qh->priority = priority;
960 qh->cs = cs;
961 qh->mq = mq;
962 qh->queue_id = ch->queue_gen++;
963 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
964 send_add_queue (qh);
965 return qh;
966}
967
968
969void
970GNUNET_TRANSPORT_communicator_mq_update (
971 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
972 const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
973 uint64_t q_len,
974 uint32_t priority)
975{
976 struct GNUNET_TRANSPORT_QueueHandle *qh;
977
978 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
979 {
980 if (u_qh == qh)
981 break;
982 }
983 GNUNET_assert (NULL != qh);
984 qh->q_len = q_len;
985 qh->priority = priority;
986 send_update_queue (qh);
987}
988
989
990/**
991 * Notify transport service that an MQ became unavailable due to a
992 * disconnect or timeout.
993 *
994 * @param qh handle for the queue that must be invalidated
995 */
996void
997GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
998{
999 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1000
1001 send_del_queue (qh);
1002 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
1003 GNUNET_MQ_destroy (qh->mq);
1004 GNUNET_free (qh->address);
1005 GNUNET_free (qh);
1006}
1007
1008
1009/**
1010 * Notify transport service about an address that this communicator
1011 * provides for this peer.
1012 *
1013 * @param ch connection to transport service
1014 * @param address our address in human-readable format, 0-terminated, UTF-8
1015 * @param nt which network type does the address belong to?
1016 * @param expiration when does the communicator forsee this address expiring?
1017 */
1018struct GNUNET_TRANSPORT_AddressIdentifier *
1019GNUNET_TRANSPORT_communicator_address_add (
1020 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1021 const char *address,
1022 enum GNUNET_NetworkType nt,
1023 struct GNUNET_TIME_Relative expiration)
1024{
1025 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1026
1027 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1028 ai->ch = ch;
1029 ai->address = GNUNET_strdup (address);
1030 ai->nt = nt;
1031 ai->expiration = expiration;
1032 ai->aid = ch->aid_gen++;
1033 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1034 send_add_address (ai);
1035 return ai;
1036}
1037
1038/**
1039 * Notify transport service about an address that this communicator no
1040 * longer provides for this peer.
1041 *
1042 * @param ai address that is no longer provided
1043 */
1044void
1045GNUNET_TRANSPORT_communicator_address_remove (
1046 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1047{
1048 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1049
1050 send_del_address (ai);
1051 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1052 GNUNET_free (ai->address);
1053 GNUNET_free (ai);
1054 ai = NULL;
1055}
1056
1057/**
1058 * Notify transport service that this communicator no longer provides all its addresses for this peer.
1059 *
1060 * @param ch The communicator handle.
1061 */
1062void
1063GNUNET_TRANSPORT_communicator_address_remove_all (
1064 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
1065{
1066 struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
1067 while (NULL != ai)
1068 {
1069 struct GNUNET_TRANSPORT_AddressIdentifier *ai_next = ai->next;
1070 GNUNET_TRANSPORT_communicator_address_remove (ai);
1071 ai = ai_next;
1072 }
1073}
1074
1075
1076/* ************************* Backchannel *************************** */
1077
1078
1079void
1080GNUNET_TRANSPORT_communicator_notify (
1081 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1082 const struct GNUNET_PeerIdentity *pid,
1083 const char *comm,
1084 const struct GNUNET_MessageHeader *header)
1085{
1086 struct GNUNET_MQ_Envelope *env;
1087 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1088 size_t slen = strlen (comm) + 1;
1089 uint16_t mlen = ntohs (header->size);
1090
1091 GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
1092 env =
1093 GNUNET_MQ_msg_extra (cb,
1094 slen + mlen,
1095 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1096 cb->pid = *pid;
1097 memcpy (&cb[1], header, mlen);
1098 memcpy (((char *) &cb[1]) + mlen, comm, slen);
1099 GNUNET_MQ_send (ch->mq, env);
1100}
1101
1102
1103/* end of transport_api2_communication.c */