diff options
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r-- | src/transport/transport_api2_communication.c | 1103 |
1 files changed, 0 insertions, 1103 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c deleted file mode 100644 index 079982ca5..000000000 --- a/src/transport/transport_api2_communication.c +++ /dev/null | |||
@@ -1,1103 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2018 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file transport/transport_api2_communication.c | ||
23 | * @brief implementation of the gnunet_transport_communication_service.h API | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "gnunet_protocols.h" | ||
29 | #include "gnunet_transport_communication_service.h" | ||
30 | #include "gnunet_ats_transport_service.h" | ||
31 | #include "transport.h" | ||
32 | |||
33 | |||
34 | /** | ||
35 | * How many messages do we keep at most in the queue to the | ||
36 | * transport service before we start to drop (default, | ||
37 | * can be changed via the configuration file). | ||
38 | */ | ||
39 | #define DEFAULT_MAX_QUEUE_LENGTH 16 | ||
40 | |||
41 | |||
42 | /** | ||
43 | * Information we track per packet to enable flow control. | ||
44 | */ | ||
45 | struct FlowControl | ||
46 | { | ||
47 | /** | ||
48 | * Kept in a DLL. | ||
49 | */ | ||
50 | struct FlowControl *next; | ||
51 | |||
52 | /** | ||
53 | * Kept in a DLL. | ||
54 | */ | ||
55 | struct FlowControl *prev; | ||
56 | |||
57 | /** | ||
58 | * Function to call once the message was processed. | ||
59 | */ | ||
60 | GNUNET_TRANSPORT_MessageCompletedCallback cb; | ||
61 | |||
62 | /** | ||
63 | * Closure for @e cb | ||
64 | */ | ||
65 | void *cb_cls; | ||
66 | |||
67 | /** | ||
68 | * Which peer is this about? | ||
69 | */ | ||
70 | struct GNUNET_PeerIdentity sender; | ||
71 | |||
72 | /** | ||
73 | * More-or-less unique ID for the message. | ||
74 | */ | ||
75 | uint64_t id; | ||
76 | }; | ||
77 | |||
78 | |||
79 | /** | ||
80 | * Information we track per message to tell the transport about | ||
81 | * success or failures. | ||
82 | */ | ||
83 | struct AckPending | ||
84 | { | ||
85 | /** | ||
86 | * Kept in a DLL. | ||
87 | */ | ||
88 | struct AckPending *next; | ||
89 | |||
90 | /** | ||
91 | * Kept in a DLL. | ||
92 | */ | ||
93 | struct AckPending *prev; | ||
94 | |||
95 | /** | ||
96 | * Communicator this entry belongs to. | ||
97 | */ | ||
98 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
99 | |||
100 | /** | ||
101 | * Which peer is this about? | ||
102 | */ | ||
103 | struct GNUNET_PeerIdentity receiver; | ||
104 | |||
105 | /** | ||
106 | * More-or-less unique ID for the message. | ||
107 | */ | ||
108 | uint64_t mid; | ||
109 | }; | ||
110 | |||
111 | |||
112 | /** | ||
113 | * Opaque handle to the transport service for communicators. | ||
114 | */ | ||
115 | struct GNUNET_TRANSPORT_CommunicatorHandle | ||
116 | { | ||
117 | /** | ||
118 | * Head of DLL of addresses this communicator offers to the transport service. | ||
119 | */ | ||
120 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_head; | ||
121 | |||
122 | /** | ||
123 | * Tail of DLL of addresses this communicator offers to the transport service. | ||
124 | */ | ||
125 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; | ||
126 | |||
127 | /** | ||
128 | * DLL of messages awaiting flow control confirmation (ack). | ||
129 | */ | ||
130 | struct FlowControl *fc_head; | ||
131 | |||
132 | /** | ||
133 | * DLL of messages awaiting flow control confirmation (ack). | ||
134 | */ | ||
135 | struct FlowControl *fc_tail; | ||
136 | |||
137 | /** | ||
138 | * DLL of messages awaiting transmission confirmation (ack). | ||
139 | */ | ||
140 | struct AckPending *ap_head; | ||
141 | |||
142 | /** | ||
143 | * DLL of messages awaiting transmission confirmation (ack). | ||
144 | */ | ||
145 | struct AckPending *ap_tail; | ||
146 | |||
147 | /** | ||
148 | * DLL of queues we offer. | ||
149 | */ | ||
150 | struct GNUNET_TRANSPORT_QueueHandle *queue_head; | ||
151 | |||
152 | /** | ||
153 | * DLL of queues we offer. | ||
154 | */ | ||
155 | struct GNUNET_TRANSPORT_QueueHandle *queue_tail; | ||
156 | |||
157 | /** | ||
158 | * Our configuration. | ||
159 | */ | ||
160 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
161 | |||
162 | /** | ||
163 | * Config section to use. | ||
164 | */ | ||
165 | const char *config_section; | ||
166 | |||
167 | /** | ||
168 | * Address prefix to use. | ||
169 | */ | ||
170 | const char *addr_prefix; | ||
171 | |||
172 | /** | ||
173 | * Function to call when the transport service wants us to initiate | ||
174 | * a communication channel with another peer. | ||
175 | */ | ||
176 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init; | ||
177 | |||
178 | /** | ||
179 | * Closure for @e mq_init. | ||
180 | */ | ||
181 | void *mq_init_cls; | ||
182 | |||
183 | /** | ||
184 | * Function to call when the transport service receives messages | ||
185 | * for a communicator (i.e. for NAT traversal or for non-bidirectional | ||
186 | * communicators). | ||
187 | */ | ||
188 | GNUNET_TRANSPORT_CommunicatorNotify notify_cb; | ||
189 | |||
190 | /** | ||
191 | * Closure for @e notify_Cb. | ||
192 | */ | ||
193 | void *notify_cb_cls; | ||
194 | |||
195 | /** | ||
196 | * Queue to talk to the transport service. | ||
197 | */ | ||
198 | struct GNUNET_MQ_Handle *mq; | ||
199 | |||
200 | /** | ||
201 | * Maximum permissible queue length. | ||
202 | */ | ||
203 | unsigned long long max_queue_length; | ||
204 | |||
205 | /** | ||
206 | * Flow-control identifier generator. | ||
207 | */ | ||
208 | uint64_t fc_gen; | ||
209 | |||
210 | /** | ||
211 | * Internal UUID for the address used in communication with the | ||
212 | * transport service. | ||
213 | */ | ||
214 | uint32_t aid_gen; | ||
215 | |||
216 | /** | ||
217 | * Queue identifier generator. | ||
218 | */ | ||
219 | uint32_t queue_gen; | ||
220 | |||
221 | /** | ||
222 | * Characteristics of the communicator. | ||
223 | */ | ||
224 | enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; | ||
225 | }; | ||
226 | |||
227 | |||
228 | /** | ||
229 | * Handle returned to identify the internal data structure the transport | ||
230 | * API has created to manage a message queue to a particular peer. | ||
231 | */ | ||
232 | struct GNUNET_TRANSPORT_QueueHandle | ||
233 | { | ||
234 | /** | ||
235 | * Kept in a DLL. | ||
236 | */ | ||
237 | struct GNUNET_TRANSPORT_QueueHandle *next; | ||
238 | |||
239 | /** | ||
240 | * Kept in a DLL. | ||
241 | */ | ||
242 | struct GNUNET_TRANSPORT_QueueHandle *prev; | ||
243 | |||
244 | /** | ||
245 | * Handle this queue belongs to. | ||
246 | */ | ||
247 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
248 | |||
249 | /** | ||
250 | * Address used by the communication queue. | ||
251 | */ | ||
252 | char *address; | ||
253 | |||
254 | /** | ||
255 | * The queue itself. | ||
256 | */ | ||
257 | struct GNUNET_MQ_Handle *mq; | ||
258 | |||
259 | /** | ||
260 | * Which peer we can communciate with. | ||
261 | */ | ||
262 | struct GNUNET_PeerIdentity peer; | ||
263 | |||
264 | /** | ||
265 | * Network type of the communication queue. | ||
266 | */ | ||
267 | enum GNUNET_NetworkType nt; | ||
268 | |||
269 | /** | ||
270 | * Communication status of the queue. | ||
271 | */ | ||
272 | enum GNUNET_TRANSPORT_ConnectionStatus cs; | ||
273 | |||
274 | /** | ||
275 | * ID for this queue when talking to the transport service. | ||
276 | */ | ||
277 | uint32_t queue_id; | ||
278 | |||
279 | /** | ||
280 | * Maximum transmission unit for the queue. | ||
281 | */ | ||
282 | uint32_t mtu; | ||
283 | |||
284 | /** | ||
285 | * Queue length. | ||
286 | */ | ||
287 | uint64_t q_len; | ||
288 | /** | ||
289 | * Queue priority. | ||
290 | */ | ||
291 | uint32_t priority; | ||
292 | }; | ||
293 | |||
294 | |||
295 | /** | ||
296 | * Internal representation of an address a communicator is | ||
297 | * currently providing for the transport service. | ||
298 | */ | ||
299 | struct GNUNET_TRANSPORT_AddressIdentifier | ||
300 | { | ||
301 | /** | ||
302 | * Kept in a DLL. | ||
303 | */ | ||
304 | struct GNUNET_TRANSPORT_AddressIdentifier *next; | ||
305 | |||
306 | /** | ||
307 | * Kept in a DLL. | ||
308 | */ | ||
309 | struct GNUNET_TRANSPORT_AddressIdentifier *prev; | ||
310 | |||
311 | /** | ||
312 | * Transport handle where the address was added. | ||
313 | */ | ||
314 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
315 | |||
316 | /** | ||
317 | * The actual address. | ||
318 | */ | ||
319 | char *address; | ||
320 | |||
321 | /** | ||
322 | * When does the address expire? (Expected lifetime of the | ||
323 | * address.) | ||
324 | */ | ||
325 | struct GNUNET_TIME_Relative expiration; | ||
326 | |||
327 | /** | ||
328 | * Internal UUID for the address used in communication with the | ||
329 | * transport service. | ||
330 | */ | ||
331 | uint32_t aid; | ||
332 | |||
333 | /** | ||
334 | * Network type for the address. | ||
335 | */ | ||
336 | enum GNUNET_NetworkType nt; | ||
337 | }; | ||
338 | |||
339 | |||
340 | /** | ||
341 | * (re)connect our communicator to the transport service | ||
342 | * | ||
343 | * @param ch handle to reconnect | ||
344 | */ | ||
345 | static void | ||
346 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch); | ||
347 | |||
348 | |||
349 | /** | ||
350 | * Send message to the transport service about address @a ai | ||
351 | * being now available. | ||
352 | * | ||
353 | * @param ai address to add | ||
354 | */ | ||
355 | static void | ||
356 | send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
357 | { | ||
358 | struct GNUNET_MQ_Envelope *env; | ||
359 | struct GNUNET_TRANSPORT_AddAddressMessage *aam; | ||
360 | |||
361 | if (NULL == ai->ch->mq) | ||
362 | return; | ||
363 | env = GNUNET_MQ_msg_extra (aam, | ||
364 | strlen (ai->address) + 1, | ||
365 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); | ||
366 | aam->expiration = GNUNET_TIME_relative_hton (ai->expiration); | ||
367 | aam->nt = htonl ((uint32_t) ai->nt); | ||
368 | memcpy (&aam[1], ai->address, strlen (ai->address) + 1); | ||
369 | GNUNET_MQ_send (ai->ch->mq, env); | ||
370 | } | ||
371 | |||
372 | |||
373 | /** | ||
374 | * Send message to the transport service about address @a ai | ||
375 | * being no longer available. | ||
376 | * | ||
377 | * @param ai address to delete | ||
378 | */ | ||
379 | static void | ||
380 | send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
381 | { | ||
382 | struct GNUNET_MQ_Envelope *env; | ||
383 | struct GNUNET_TRANSPORT_DelAddressMessage *dam; | ||
384 | |||
385 | if (NULL == ai->ch->mq) | ||
386 | return; | ||
387 | env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); | ||
388 | dam->aid = htonl (ai->aid); | ||
389 | GNUNET_MQ_send (ai->ch->mq, env); | ||
390 | } | ||
391 | |||
392 | |||
393 | /** | ||
394 | * Send message to the transport service about queue @a qh | ||
395 | * being now available. | ||
396 | * | ||
397 | * @param qh queue to add | ||
398 | */ | ||
399 | static void | ||
400 | send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
401 | { | ||
402 | struct GNUNET_MQ_Envelope *env; | ||
403 | struct GNUNET_TRANSPORT_AddQueueMessage *aqm; | ||
404 | |||
405 | if (NULL == qh->ch->mq) | ||
406 | return; | ||
407 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
408 | "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n"); | ||
409 | env = GNUNET_MQ_msg_extra (aqm, | ||
410 | strlen (qh->address) + 1, | ||
411 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); | ||
412 | aqm->qid = htonl (qh->queue_id); | ||
413 | aqm->receiver = qh->peer; | ||
414 | aqm->nt = htonl ((uint32_t) qh->nt); | ||
415 | aqm->mtu = htonl (qh->mtu); | ||
416 | aqm->q_len = GNUNET_htonll (qh->q_len); | ||
417 | aqm->priority = htonl (qh->priority); | ||
418 | aqm->cs = htonl ((uint32_t) qh->cs); | ||
419 | memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); | ||
420 | GNUNET_MQ_send (qh->ch->mq, env); | ||
421 | } | ||
422 | |||
423 | |||
424 | /** | ||
425 | * Send message to the transport service about queue @a qh | ||
426 | * updated. | ||
427 | * | ||
428 | * @param qh queue to add | ||
429 | */ | ||
430 | static void | ||
431 | send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
432 | { | ||
433 | struct GNUNET_MQ_Envelope *env; | ||
434 | struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm; | ||
435 | |||
436 | if (NULL == qh->ch->mq) | ||
437 | return; | ||
438 | env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE); | ||
439 | uqm->qid = htonl (qh->queue_id); | ||
440 | uqm->receiver = qh->peer; | ||
441 | uqm->nt = htonl ((uint32_t) qh->nt); | ||
442 | uqm->mtu = htonl (qh->mtu); | ||
443 | uqm->q_len = GNUNET_htonll (qh->q_len); | ||
444 | uqm->priority = htonl (qh->priority); | ||
445 | uqm->cs = htonl ((uint32_t) qh->cs); | ||
446 | GNUNET_MQ_send (qh->ch->mq, env); | ||
447 | } | ||
448 | |||
449 | |||
450 | /** | ||
451 | * Send message to the transport service about queue @a qh | ||
452 | * being no longer available. | ||
453 | * | ||
454 | * @param qh queue to delete | ||
455 | */ | ||
456 | static void | ||
457 | send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
458 | { | ||
459 | struct GNUNET_MQ_Envelope *env; | ||
460 | struct GNUNET_TRANSPORT_DelQueueMessage *dqm; | ||
461 | |||
462 | if (NULL == qh->ch->mq) | ||
463 | return; | ||
464 | env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN); | ||
465 | dqm->qid = htonl (qh->queue_id); | ||
466 | dqm->receiver = qh->peer; | ||
467 | GNUNET_MQ_send (qh->ch->mq, env); | ||
468 | } | ||
469 | |||
470 | |||
471 | /** | ||
472 | * Disconnect from the transport service. Purges | ||
473 | * all flow control entries as we will no longer receive | ||
474 | * the ACKs. Purges the ack pending entries as the | ||
475 | * transport will no longer expect the confirmations. | ||
476 | * | ||
477 | * @param ch service to disconnect from | ||
478 | */ | ||
479 | static void | ||
480 | disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
481 | { | ||
482 | struct FlowControl *fcn; | ||
483 | struct AckPending *apn; | ||
484 | |||
485 | for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn) | ||
486 | { | ||
487 | fcn = fc->next; | ||
488 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc); | ||
489 | fc->cb (fc->cb_cls, GNUNET_SYSERR); | ||
490 | GNUNET_free (fc); | ||
491 | } | ||
492 | for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn) | ||
493 | { | ||
494 | apn = ap->next; | ||
495 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap); | ||
496 | GNUNET_free (ap); | ||
497 | } | ||
498 | if (NULL == ch->mq) | ||
499 | return; | ||
500 | GNUNET_MQ_destroy (ch->mq); | ||
501 | ch->mq = NULL; | ||
502 | } | ||
503 | |||
504 | |||
505 | /** | ||
506 | * Function called on MQ errors. | ||
507 | */ | ||
508 | static void | ||
509 | error_handler (void *cls, enum GNUNET_MQ_Error error) | ||
510 | { | ||
511 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
512 | |||
513 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
514 | "MQ failure %d, reconnecting to transport service.\n", | ||
515 | error); | ||
516 | disconnect (ch); | ||
517 | /* TODO: maybe do this with exponential backoff/delay */ | ||
518 | reconnect (ch); | ||
519 | } | ||
520 | |||
521 | |||
522 | /** | ||
523 | * Transport service acknowledged a message we gave it | ||
524 | * (with flow control enabled). Tell the communicator. | ||
525 | * | ||
526 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
527 | * @param incoming_ack the ack | ||
528 | */ | ||
529 | static void | ||
530 | handle_incoming_ack ( | ||
531 | void *cls, | ||
532 | const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) | ||
533 | { | ||
534 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
535 | |||
536 | for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next) | ||
537 | { | ||
538 | if ((fc->id == incoming_ack->fc_id) && | ||
539 | (0 == memcmp (&fc->sender, | ||
540 | &incoming_ack->sender, | ||
541 | sizeof(struct GNUNET_PeerIdentity)))) | ||
542 | { | ||
543 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
544 | "Done with message with flow control id %lu for sender %s from sender %s\n", | ||
545 | incoming_ack->fc_id, | ||
546 | GNUNET_i2s (&fc->sender), | ||
547 | GNUNET_i2s (&incoming_ack->sender)); | ||
548 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc); | ||
549 | fc->cb (fc->cb_cls, GNUNET_OK); | ||
550 | GNUNET_free (fc); | ||
551 | return; | ||
552 | } | ||
553 | } | ||
554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
555 | "Message with flow control id %lu from sender %s not found\n", | ||
556 | incoming_ack->fc_id, | ||
557 | GNUNET_i2s (&incoming_ack->sender)); | ||
558 | GNUNET_break (0); | ||
559 | disconnect (ch); | ||
560 | /* TODO: maybe do this with exponential backoff/delay */ | ||
561 | reconnect (ch); | ||
562 | } | ||
563 | |||
564 | |||
565 | /** | ||
566 | * Transport service wants us to create a queue. Check if @a cq | ||
567 | * is well-formed. | ||
568 | * | ||
569 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
570 | * @param cq the queue creation request | ||
571 | * @return #GNUNET_OK if @a smt is well-formed | ||
572 | */ | ||
573 | static int | ||
574 | check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
575 | { | ||
576 | (void) cls; | ||
577 | GNUNET_MQ_check_zero_termination (cq); | ||
578 | return GNUNET_OK; | ||
579 | } | ||
580 | |||
581 | |||
582 | /** | ||
583 | * Transport service wants us to create a queue. Tell the communicator. | ||
584 | * | ||
585 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
586 | * @param cq the queue creation request | ||
587 | */ | ||
588 | static void | ||
589 | handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
590 | { | ||
591 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
592 | const char *addr = (const char *) &cq[1]; | ||
593 | struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; | ||
594 | struct GNUNET_MQ_Envelope *env; | ||
595 | |||
596 | if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr)) | ||
597 | { | ||
598 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
599 | "Address `%s' invalid for this communicator\n", | ||
600 | addr); | ||
601 | env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); | ||
602 | } | ||
603 | else | ||
604 | { | ||
605 | env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); | ||
606 | } | ||
607 | cqr->request_id = cq->request_id; | ||
608 | GNUNET_MQ_send (ch->mq, env); | ||
609 | } | ||
610 | |||
611 | |||
612 | /** | ||
613 | * Transport service wants us to send a message. Check if @a smt | ||
614 | * is well-formed. | ||
615 | * | ||
616 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
617 | * @param smt the transmission request | ||
618 | * @return #GNUNET_OK if @a smt is well-formed | ||
619 | */ | ||
620 | static int | ||
621 | check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
622 | { | ||
623 | (void) cls; | ||
624 | GNUNET_MQ_check_boxed_message (smt); | ||
625 | return GNUNET_OK; | ||
626 | } | ||
627 | |||
628 | |||
629 | /** | ||
630 | * Notify transport service about @a status of a message with | ||
631 | * @a mid sent to @a receiver. | ||
632 | * | ||
633 | * @param ch handle | ||
634 | * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure | ||
635 | * @param receiver which peer was the receiver | ||
636 | * @param mid message that the ack is about | ||
637 | */ | ||
638 | static void | ||
639 | send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
640 | int status, | ||
641 | const struct GNUNET_PeerIdentity *receiver, | ||
642 | uint64_t mid) | ||
643 | { | ||
644 | struct GNUNET_MQ_Envelope *env; | ||
645 | struct GNUNET_TRANSPORT_SendMessageToAck *ack; | ||
646 | |||
647 | env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); | ||
648 | ack->status = htonl (status); | ||
649 | ack->mid = mid; | ||
650 | ack->receiver = *receiver; | ||
651 | GNUNET_MQ_send (ch->mq, env); | ||
652 | } | ||
653 | |||
654 | |||
655 | /** | ||
656 | * Message queue transmission by communicator was successful, | ||
657 | * notify transport service. | ||
658 | * | ||
659 | * @param cls an `struct AckPending *` | ||
660 | */ | ||
661 | static void | ||
662 | send_ack_cb (void *cls) | ||
663 | { | ||
664 | struct AckPending *ap = cls; | ||
665 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; | ||
666 | |||
667 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap); | ||
668 | send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid); | ||
669 | GNUNET_free (ap); | ||
670 | } | ||
671 | |||
672 | |||
673 | /** | ||
674 | * Transport service wants us to send a message. Tell the communicator. | ||
675 | * | ||
676 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
677 | * @param smt the transmission request | ||
678 | */ | ||
679 | static void | ||
680 | handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
681 | { | ||
682 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
683 | const struct GNUNET_MessageHeader *mh; | ||
684 | struct GNUNET_MQ_Envelope *env; | ||
685 | struct AckPending *ap; | ||
686 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
687 | |||
688 | for (qh = ch->queue_head; NULL != qh; qh = qh->next) | ||
689 | if ((qh->queue_id == ntohl (smt->qid)) && | ||
690 | (0 == memcmp (&qh->peer, | ||
691 | &smt->receiver, | ||
692 | sizeof(struct GNUNET_PeerIdentity)))) | ||
693 | break; | ||
694 | if (NULL == qh) | ||
695 | { | ||
696 | /* queue is already gone, tell transport this one failed */ | ||
697 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
698 | "Transmission failed, queue no longer exists.\n"); | ||
699 | send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid); | ||
700 | return; | ||
701 | } | ||
702 | ap = GNUNET_new (struct AckPending); | ||
703 | ap->ch = ch; | ||
704 | ap->receiver = smt->receiver; | ||
705 | ap->mid = smt->mid; | ||
706 | GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap); | ||
707 | mh = (const struct GNUNET_MessageHeader *) &smt[1]; | ||
708 | env = GNUNET_MQ_msg_copy (mh); | ||
709 | GNUNET_MQ_notify_sent (env, &send_ack_cb, ap); | ||
710 | GNUNET_MQ_send (qh->mq, env); | ||
711 | } | ||
712 | |||
713 | |||
714 | /** | ||
715 | * Transport service gives us backchannel message. Check if @a bi | ||
716 | * is well-formed. | ||
717 | * | ||
718 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
719 | * @param bi the backchannel message | ||
720 | * @return #GNUNET_OK if @a smt is well-formed | ||
721 | */ | ||
722 | static int | ||
723 | check_backchannel_incoming ( | ||
724 | void *cls, | ||
725 | const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) | ||
726 | { | ||
727 | (void) cls; | ||
728 | GNUNET_MQ_check_boxed_message (bi); | ||
729 | return GNUNET_OK; | ||
730 | } | ||
731 | |||
732 | |||
733 | /** | ||
734 | * Transport service gives us backchannel message. Handle it. | ||
735 | * | ||
736 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
737 | * @param bi the backchannel message | ||
738 | */ | ||
739 | static void | ||
740 | handle_backchannel_incoming ( | ||
741 | void *cls, | ||
742 | const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) | ||
743 | { | ||
744 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
745 | if (NULL != ch->notify_cb) | ||
746 | ch->notify_cb (ch->notify_cb_cls, | ||
747 | &bi->pid, | ||
748 | (const struct GNUNET_MessageHeader *) &bi[1]); | ||
749 | else | ||
750 | GNUNET_log ( | ||
751 | GNUNET_ERROR_TYPE_INFO, | ||
752 | _ ("Dropped backchanel message: handler not provided by communicator\n")); | ||
753 | } | ||
754 | |||
755 | |||
756 | /** | ||
757 | * (re)connect our communicator to the transport service | ||
758 | * | ||
759 | * @param ch handle to reconnect | ||
760 | */ | ||
761 | static void | ||
762 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
763 | { | ||
764 | struct GNUNET_MQ_MessageHandler handlers[] = | ||
765 | { GNUNET_MQ_hd_fixed_size (incoming_ack, | ||
766 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, | ||
767 | struct GNUNET_TRANSPORT_IncomingMessageAck, | ||
768 | ch), | ||
769 | GNUNET_MQ_hd_var_size (create_queue, | ||
770 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE, | ||
771 | struct GNUNET_TRANSPORT_CreateQueue, | ||
772 | ch), | ||
773 | GNUNET_MQ_hd_var_size (send_msg, | ||
774 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, | ||
775 | struct GNUNET_TRANSPORT_SendMessageTo, | ||
776 | ch), | ||
777 | GNUNET_MQ_hd_var_size ( | ||
778 | backchannel_incoming, | ||
779 | GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING, | ||
780 | struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming, | ||
781 | ch), | ||
782 | GNUNET_MQ_handler_end () }; | ||
783 | struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; | ||
784 | struct GNUNET_MQ_Envelope *env; | ||
785 | |||
786 | ch->mq = | ||
787 | GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch); | ||
788 | if (NULL == ch->mq) | ||
789 | return; | ||
790 | env = GNUNET_MQ_msg_extra (cam, | ||
791 | strlen (ch->addr_prefix) + 1, | ||
792 | GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); | ||
793 | cam->cc = htonl ((uint32_t) ch->cc); | ||
794 | memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1); | ||
795 | GNUNET_MQ_send (ch->mq, env); | ||
796 | for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai; | ||
797 | ai = ai->next) | ||
798 | send_add_address (ai); | ||
799 | for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh; | ||
800 | qh = qh->next) | ||
801 | send_add_queue (qh); | ||
802 | } | ||
803 | |||
804 | |||
805 | struct GNUNET_TRANSPORT_CommunicatorHandle * | ||
806 | GNUNET_TRANSPORT_communicator_connect ( | ||
807 | const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
808 | const char *config_section, | ||
809 | const char *addr_prefix, | ||
810 | enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, | ||
811 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init, | ||
812 | void *mq_init_cls, | ||
813 | GNUNET_TRANSPORT_CommunicatorNotify notify_cb, | ||
814 | void *notify_cb_cls) | ||
815 | { | ||
816 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
817 | |||
818 | ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); | ||
819 | ch->cfg = cfg; | ||
820 | ch->config_section = config_section; | ||
821 | ch->addr_prefix = addr_prefix; | ||
822 | ch->mq_init = mq_init; | ||
823 | ch->mq_init_cls = mq_init_cls; | ||
824 | ch->notify_cb = notify_cb; | ||
825 | ch->notify_cb_cls = notify_cb_cls; | ||
826 | ch->cc = cc; | ||
827 | reconnect (ch); | ||
828 | if (GNUNET_OK != | ||
829 | GNUNET_CONFIGURATION_get_value_number (cfg, | ||
830 | config_section, | ||
831 | "MAX_QUEUE_LENGTH", | ||
832 | &ch->max_queue_length)) | ||
833 | ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | ||
834 | if (NULL == ch->mq) | ||
835 | { | ||
836 | GNUNET_free (ch); | ||
837 | return NULL; | ||
838 | } | ||
839 | return ch; | ||
840 | } | ||
841 | |||
842 | |||
843 | /** | ||
844 | * Disconnect from the transport service. | ||
845 | * | ||
846 | * @param ch handle returned from connect | ||
847 | */ | ||
848 | void | ||
849 | GNUNET_TRANSPORT_communicator_disconnect ( | ||
850 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
851 | { | ||
852 | disconnect (ch); | ||
853 | while (NULL != ch->ai_head) | ||
854 | { | ||
855 | GNUNET_break (0); /* communicator forgot to remove address, warn! */ | ||
856 | GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); | ||
857 | } | ||
858 | GNUNET_free (ch); | ||
859 | } | ||
860 | |||
861 | |||
862 | /* ************************* Receiving *************************** */ | ||
863 | |||
864 | |||
865 | int | ||
866 | GNUNET_TRANSPORT_communicator_receive ( | ||
867 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
868 | const struct GNUNET_PeerIdentity *sender, | ||
869 | const struct GNUNET_MessageHeader *msg, | ||
870 | struct GNUNET_TIME_Relative expected_addr_validity, | ||
871 | GNUNET_TRANSPORT_MessageCompletedCallback cb, | ||
872 | void *cb_cls) | ||
873 | { | ||
874 | struct GNUNET_MQ_Envelope *env; | ||
875 | struct GNUNET_TRANSPORT_IncomingMessage *im; | ||
876 | uint16_t msize; | ||
877 | |||
878 | |||
879 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
880 | "communicator receive\n"); | ||
881 | |||
882 | if (NULL == ch->mq) | ||
883 | return GNUNET_SYSERR; | ||
884 | if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)) | ||
885 | { | ||
886 | GNUNET_log ( | ||
887 | GNUNET_ERROR_TYPE_WARNING, | ||
888 | "Dropping message: transport is too slow, queue length %llu exceeded\n", | ||
889 | ch->max_queue_length); | ||
890 | return GNUNET_NO; | ||
891 | } | ||
892 | |||
893 | msize = ntohs (msg->size); | ||
894 | env = | ||
895 | GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG); | ||
896 | if (NULL == env) | ||
897 | { | ||
898 | GNUNET_break (0); | ||
899 | return GNUNET_SYSERR; | ||
900 | } | ||
901 | im->expected_address_validity = | ||
902 | GNUNET_TIME_relative_hton (expected_addr_validity); | ||
903 | im->sender = *sender; | ||
904 | // FIXME: this is expensive, would be better if we would | ||
905 | // re-design the API to allow us to create the envelope first, | ||
906 | // and then have the application fill in the body so we do | ||
907 | // not have to memcpy() | ||
908 | memcpy (&im[1], msg, msize); | ||
909 | im->fc_on = htonl (GNUNET_NO); | ||
910 | if (NULL != cb) | ||
911 | { | ||
912 | struct FlowControl *fc; | ||
913 | |||
914 | im->fc_on = htonl (GNUNET_YES); | ||
915 | im->fc_id = ch->fc_gen++; | ||
916 | fc = GNUNET_new (struct FlowControl); | ||
917 | fc->sender = *sender; | ||
918 | fc->id = im->fc_id; | ||
919 | fc->cb = cb; | ||
920 | fc->cb_cls = cb_cls; | ||
921 | GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc); | ||
922 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
923 | "Created flow control id %lu for sender %s\n", | ||
924 | fc->id, | ||
925 | GNUNET_i2s (&fc->sender)); | ||
926 | } | ||
927 | GNUNET_MQ_send (ch->mq, env); | ||
928 | return GNUNET_OK; | ||
929 | } | ||
930 | |||
931 | |||
932 | /* ************************* Discovery *************************** */ | ||
933 | |||
934 | |||
935 | struct GNUNET_TRANSPORT_QueueHandle * | ||
936 | GNUNET_TRANSPORT_communicator_mq_add ( | ||
937 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
938 | const struct GNUNET_PeerIdentity *peer, | ||
939 | const char *address, | ||
940 | uint32_t mtu, | ||
941 | uint64_t q_len, | ||
942 | uint32_t priority, | ||
943 | enum GNUNET_NetworkType nt, | ||
944 | enum GNUNET_TRANSPORT_ConnectionStatus cs, | ||
945 | struct GNUNET_MQ_Handle *mq) | ||
946 | { | ||
947 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
948 | |||
949 | // Do not notify the service if there is no intial capacity. | ||
950 | GNUNET_assert (0 < q_len); | ||
951 | |||
952 | qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle); | ||
953 | qh->ch = ch; | ||
954 | qh->peer = *peer; | ||
955 | qh->address = GNUNET_strdup (address); | ||
956 | qh->nt = nt; | ||
957 | qh->mtu = mtu; | ||
958 | qh->q_len = q_len; | ||
959 | qh->priority = priority; | ||
960 | qh->cs = cs; | ||
961 | qh->mq = mq; | ||
962 | qh->queue_id = ch->queue_gen++; | ||
963 | GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh); | ||
964 | send_add_queue (qh); | ||
965 | return qh; | ||
966 | } | ||
967 | |||
968 | |||
969 | void | ||
970 | GNUNET_TRANSPORT_communicator_mq_update ( | ||
971 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
972 | const struct GNUNET_TRANSPORT_QueueHandle *u_qh, | ||
973 | uint64_t q_len, | ||
974 | uint32_t priority) | ||
975 | { | ||
976 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
977 | |||
978 | for (qh = ch->queue_head; NULL != qh; qh = qh->next) | ||
979 | { | ||
980 | if (u_qh == qh) | ||
981 | break; | ||
982 | } | ||
983 | GNUNET_assert (NULL != qh); | ||
984 | qh->q_len = q_len; | ||
985 | qh->priority = priority; | ||
986 | send_update_queue (qh); | ||
987 | } | ||
988 | |||
989 | |||
990 | /** | ||
991 | * Notify transport service that an MQ became unavailable due to a | ||
992 | * disconnect or timeout. | ||
993 | * | ||
994 | * @param qh handle for the queue that must be invalidated | ||
995 | */ | ||
996 | void | ||
997 | GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
998 | { | ||
999 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; | ||
1000 | |||
1001 | send_del_queue (qh); | ||
1002 | GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh); | ||
1003 | GNUNET_MQ_destroy (qh->mq); | ||
1004 | GNUNET_free (qh->address); | ||
1005 | GNUNET_free (qh); | ||
1006 | } | ||
1007 | |||
1008 | |||
1009 | /** | ||
1010 | * Notify transport service about an address that this communicator | ||
1011 | * provides for this peer. | ||
1012 | * | ||
1013 | * @param ch connection to transport service | ||
1014 | * @param address our address in human-readable format, 0-terminated, UTF-8 | ||
1015 | * @param nt which network type does the address belong to? | ||
1016 | * @param expiration when does the communicator forsee this address expiring? | ||
1017 | */ | ||
1018 | struct GNUNET_TRANSPORT_AddressIdentifier * | ||
1019 | GNUNET_TRANSPORT_communicator_address_add ( | ||
1020 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
1021 | const char *address, | ||
1022 | enum GNUNET_NetworkType nt, | ||
1023 | struct GNUNET_TIME_Relative expiration) | ||
1024 | { | ||
1025 | struct GNUNET_TRANSPORT_AddressIdentifier *ai; | ||
1026 | |||
1027 | ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier); | ||
1028 | ai->ch = ch; | ||
1029 | ai->address = GNUNET_strdup (address); | ||
1030 | ai->nt = nt; | ||
1031 | ai->expiration = expiration; | ||
1032 | ai->aid = ch->aid_gen++; | ||
1033 | GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai); | ||
1034 | send_add_address (ai); | ||
1035 | return ai; | ||
1036 | } | ||
1037 | |||
1038 | /** | ||
1039 | * Notify transport service about an address that this communicator no | ||
1040 | * longer provides for this peer. | ||
1041 | * | ||
1042 | * @param ai address that is no longer provided | ||
1043 | */ | ||
1044 | void | ||
1045 | GNUNET_TRANSPORT_communicator_address_remove ( | ||
1046 | struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
1047 | { | ||
1048 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; | ||
1049 | |||
1050 | send_del_address (ai); | ||
1051 | GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai); | ||
1052 | GNUNET_free (ai->address); | ||
1053 | GNUNET_free (ai); | ||
1054 | ai = NULL; | ||
1055 | } | ||
1056 | |||
1057 | /** | ||
1058 | * Notify transport service that this communicator no longer provides all its addresses for this peer. | ||
1059 | * | ||
1060 | * @param ch The communicator handle. | ||
1061 | */ | ||
1062 | void | ||
1063 | GNUNET_TRANSPORT_communicator_address_remove_all ( | ||
1064 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
1065 | { | ||
1066 | struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; | ||
1067 | while (NULL != ai) | ||
1068 | { | ||
1069 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_next = ai->next; | ||
1070 | GNUNET_TRANSPORT_communicator_address_remove (ai); | ||
1071 | ai = ai_next; | ||
1072 | } | ||
1073 | } | ||
1074 | |||
1075 | |||
1076 | /* ************************* Backchannel *************************** */ | ||
1077 | |||
1078 | |||
1079 | void | ||
1080 | GNUNET_TRANSPORT_communicator_notify ( | ||
1081 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
1082 | const struct GNUNET_PeerIdentity *pid, | ||
1083 | const char *comm, | ||
1084 | const struct GNUNET_MessageHeader *header) | ||
1085 | { | ||
1086 | struct GNUNET_MQ_Envelope *env; | ||
1087 | struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb; | ||
1088 | size_t slen = strlen (comm) + 1; | ||
1089 | uint16_t mlen = ntohs (header->size); | ||
1090 | |||
1091 | GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX); | ||
1092 | env = | ||
1093 | GNUNET_MQ_msg_extra (cb, | ||
1094 | slen + mlen, | ||
1095 | GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL); | ||
1096 | cb->pid = *pid; | ||
1097 | memcpy (&cb[1], header, mlen); | ||
1098 | memcpy (((char *) &cb[1]) + mlen, comm, slen); | ||
1099 | GNUNET_MQ_send (ch->mq, env); | ||
1100 | } | ||
1101 | |||
1102 | |||
1103 | /* end of transport_api2_communication.c */ | ||