diff options
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r-- | src/transport/transport_api2_communication.c | 959 |
1 files changed, 959 insertions, 0 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c new file mode 100644 index 000000000..d446516bd --- /dev/null +++ b/src/transport/transport_api2_communication.c | |||
@@ -0,0 +1,959 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2018 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | */ | ||
18 | |||
19 | /** | ||
20 | * @file transport/transport_api2_communication.c | ||
21 | * @brief implementation of the gnunet_transport_communication_service.h API | ||
22 | * @author Christian Grothoff | ||
23 | */ | ||
24 | #include "platform.h" | ||
25 | #include "gnunet_util_lib.h" | ||
26 | #include "gnunet_protocols.h" | ||
27 | #include "gnunet_transport_communication_service.h" | ||
28 | #include "transport.h" | ||
29 | |||
30 | |||
31 | /** | ||
32 | * How many messages do we keep at most in the queue to the | ||
33 | * transport service before we start to drop (default, | ||
34 | * can be changed via the configuration file). | ||
35 | */ | ||
36 | #define DEFAULT_MAX_QUEUE_LENGTH 16 | ||
37 | |||
38 | |||
39 | /** | ||
40 | * Information we track per packet to enable flow control. | ||
41 | */ | ||
42 | struct FlowControl | ||
43 | { | ||
44 | /** | ||
45 | * Kept in a DLL. | ||
46 | */ | ||
47 | struct FlowControl *next; | ||
48 | |||
49 | /** | ||
50 | * Kept in a DLL. | ||
51 | */ | ||
52 | struct FlowControl *prev; | ||
53 | |||
54 | /** | ||
55 | * Function to call once the message was processed. | ||
56 | */ | ||
57 | GNUNET_TRANSPORT_MessageCompletedCallback cb; | ||
58 | |||
59 | /** | ||
60 | * Closure for @e cb | ||
61 | */ | ||
62 | void *cb_cls; | ||
63 | |||
64 | /** | ||
65 | * Which peer is this about? | ||
66 | */ | ||
67 | struct GNUNET_PeerIdentity sender; | ||
68 | |||
69 | /** | ||
70 | * More-or-less unique ID for the message. | ||
71 | */ | ||
72 | uint64_t id; | ||
73 | }; | ||
74 | |||
75 | |||
76 | /** | ||
77 | * Information we track per message to tell the transport about | ||
78 | * success or failures. | ||
79 | */ | ||
80 | struct AckPending | ||
81 | { | ||
82 | /** | ||
83 | * Kept in a DLL. | ||
84 | */ | ||
85 | struct AckPending *next; | ||
86 | |||
87 | /** | ||
88 | * Kept in a DLL. | ||
89 | */ | ||
90 | struct AckPending *prev; | ||
91 | |||
92 | /** | ||
93 | * Which peer is this about? | ||
94 | */ | ||
95 | struct GNUNET_PeerIdentity receiver; | ||
96 | |||
97 | /** | ||
98 | * More-or-less unique ID for the message. | ||
99 | */ | ||
100 | uint64_t mid; | ||
101 | }; | ||
102 | |||
103 | |||
104 | /** | ||
105 | * Opaque handle to the transport service for communicators. | ||
106 | */ | ||
107 | struct GNUNET_TRANSPORT_CommunicatorHandle | ||
108 | { | ||
109 | /** | ||
110 | * Head of DLL of addresses this communicator offers to the transport service. | ||
111 | */ | ||
112 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_head; | ||
113 | |||
114 | /** | ||
115 | * Tail of DLL of addresses this communicator offers to the transport service. | ||
116 | */ | ||
117 | struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; | ||
118 | |||
119 | /** | ||
120 | * DLL of messages awaiting flow control confirmation (ack). | ||
121 | */ | ||
122 | struct FlowControl *fc_head; | ||
123 | |||
124 | /** | ||
125 | * DLL of messages awaiting flow control confirmation (ack). | ||
126 | */ | ||
127 | struct FlowControl *fc_tail; | ||
128 | |||
129 | /** | ||
130 | * DLL of messages awaiting transmission confirmation (ack). | ||
131 | */ | ||
132 | struct AckPending *ap_head; | ||
133 | |||
134 | /** | ||
135 | * DLL of messages awaiting transmission confirmation (ack). | ||
136 | */ | ||
137 | struct AckPending *ac_tail; | ||
138 | |||
139 | /** | ||
140 | * DLL of queues we offer. | ||
141 | */ | ||
142 | struct QueueHandle *queue_head; | ||
143 | |||
144 | /** | ||
145 | * DLL of queues we offer. | ||
146 | */ | ||
147 | struct QueueHandle *queue_tail; | ||
148 | |||
149 | /** | ||
150 | * Our configuration. | ||
151 | */ | ||
152 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
153 | |||
154 | /** | ||
155 | * Name of the communicator. | ||
156 | */ | ||
157 | const char *name; | ||
158 | |||
159 | /** | ||
160 | * Function to call when the transport service wants us to initiate | ||
161 | * a communication channel with another peer. | ||
162 | */ | ||
163 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init; | ||
164 | |||
165 | /** | ||
166 | * Closure for @e mq_init. | ||
167 | */ | ||
168 | void *mq_init_cls; | ||
169 | |||
170 | /** | ||
171 | * Maximum permissable queue length. | ||
172 | */ | ||
173 | unsigned long long max_queue_length; | ||
174 | |||
175 | /** | ||
176 | * Flow-control identifier generator. | ||
177 | */ | ||
178 | uint64_t fc_gen; | ||
179 | |||
180 | /** | ||
181 | * MTU of the communicator | ||
182 | */ | ||
183 | size_t mtu; | ||
184 | |||
185 | /** | ||
186 | * Internal UUID for the address used in communication with the | ||
187 | * transport service. | ||
188 | */ | ||
189 | uint32_t aid_gen; | ||
190 | |||
191 | /** | ||
192 | * Queue identifier generator. | ||
193 | */ | ||
194 | uint32_t queue_gen; | ||
195 | |||
196 | }; | ||
197 | |||
198 | |||
199 | /** | ||
200 | * Handle returned to identify the internal data structure the transport | ||
201 | * API has created to manage a message queue to a particular peer. | ||
202 | */ | ||
203 | struct GNUNET_TRANSPORT_QueueHandle | ||
204 | { | ||
205 | /** | ||
206 | * Handle this queue belongs to. | ||
207 | */ | ||
208 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
209 | |||
210 | /** | ||
211 | * Which peer we can communciate with. | ||
212 | */ | ||
213 | struct GNUNET_PeerIdentity peer; | ||
214 | |||
215 | /** | ||
216 | * Address used by the communication queue. | ||
217 | */ | ||
218 | char *address; | ||
219 | |||
220 | /** | ||
221 | * Network type of the communciation queue. | ||
222 | */ | ||
223 | enum GNUNET_ATS_Network_Type nt; | ||
224 | |||
225 | /** | ||
226 | * The queue itself. | ||
227 | */ | ||
228 | struct GNUNET_MQ_Handle *mq; | ||
229 | |||
230 | /** | ||
231 | * ID for this queue when talking to the transport service. | ||
232 | */ | ||
233 | uint32_t queue_id; | ||
234 | |||
235 | }; | ||
236 | |||
237 | |||
238 | /** | ||
239 | * Internal representation of an address a communicator is | ||
240 | * currently providing for the transport service. | ||
241 | */ | ||
242 | struct GNUNET_TRANSPORT_AddressIdentifier | ||
243 | { | ||
244 | |||
245 | /** | ||
246 | * Kept in a DLL. | ||
247 | */ | ||
248 | struct GNUNET_TRANSPORT_AddressIdentifier *next; | ||
249 | |||
250 | /** | ||
251 | * Kept in a DLL. | ||
252 | */ | ||
253 | struct GNUNET_TRANSPORT_AddressIdentifier *prev; | ||
254 | |||
255 | /** | ||
256 | * Transport handle where the address was added. | ||
257 | */ | ||
258 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
259 | |||
260 | /** | ||
261 | * The actual address. | ||
262 | */ | ||
263 | char *address; | ||
264 | |||
265 | /** | ||
266 | * When does the address expire? (Expected lifetime of the | ||
267 | * address.) | ||
268 | */ | ||
269 | struct GNUNET_TIME_Relative expiration; | ||
270 | |||
271 | /** | ||
272 | * Internal UUID for the address used in communication with the | ||
273 | * transport service. | ||
274 | */ | ||
275 | uint32_t aid; | ||
276 | |||
277 | /** | ||
278 | * Network type for the address. | ||
279 | */ | ||
280 | enum GNUNET_ATS_Network_Type nt; | ||
281 | |||
282 | }; | ||
283 | |||
284 | |||
285 | /** | ||
286 | * (re)connect our communicator to the transport service | ||
287 | * | ||
288 | * @param ch handle to reconnect | ||
289 | */ | ||
290 | static void | ||
291 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch); | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Send message to the transport service about address @a ai | ||
296 | * being now available. | ||
297 | * | ||
298 | * @param ai address to add | ||
299 | */ | ||
300 | static void | ||
301 | send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
302 | { | ||
303 | struct GNUNET_MQ_Envelope *env; | ||
304 | struct GNUNET_TRANSPORT_AddAddressMessage *aam; | ||
305 | |||
306 | if (NULL == ai->ch->mq) | ||
307 | return; | ||
308 | env = GNUNET_MQ_msg_extra (aam, | ||
309 | strlen (ai->address) + 1, | ||
310 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); | ||
311 | aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration); | ||
312 | aam->nt = htonl ((uint32_t) ai->nt); | ||
313 | memcpy (&aam[1], | ||
314 | ai->address, | ||
315 | strlen (ai->address) + 1); | ||
316 | GNUNET_MQ_send (ai->ch->mq, | ||
317 | env); | ||
318 | } | ||
319 | |||
320 | |||
321 | /** | ||
322 | * Send message to the transport service about address @a ai | ||
323 | * being no longer available. | ||
324 | * | ||
325 | * @param ai address to delete | ||
326 | */ | ||
327 | static void | ||
328 | send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
329 | { | ||
330 | struct GNUNET_MQ_Envelope *env; | ||
331 | struct GNUNET_TRANSPORT_DelAddressMessage *dam; | ||
332 | |||
333 | if (NULL == ai->ch->mq) | ||
334 | return; | ||
335 | env = GNUNET_MQ_msg (dam, | ||
336 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); | ||
337 | dam.aid = htonl (ai->aid); | ||
338 | GNUNET_MQ_send (ai->ch->mq, | ||
339 | env); | ||
340 | } | ||
341 | |||
342 | |||
343 | /** | ||
344 | * Send message to the transport service about queue @a qh | ||
345 | * being now available. | ||
346 | * | ||
347 | * @param qh queue to add | ||
348 | */ | ||
349 | static void | ||
350 | send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
351 | { | ||
352 | struct GNUNET_MQ_Envelope *env; | ||
353 | struct GNUNET_TRANSPORT_AddQueueMessage *aqm; | ||
354 | |||
355 | if (NULL == ai->ch->mq) | ||
356 | return; | ||
357 | env = GNUNET_MQ_msg_extra (aqm, | ||
358 | strlen (ai->address) + 1, | ||
359 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); | ||
360 | aqm.receiver = qh->peer; | ||
361 | aqm.nt = htonl ((uint32_t) qh->nt); | ||
362 | aqm.qid = htonl (qh->qid); | ||
363 | memcpy (&aqm[1], | ||
364 | ai->address, | ||
365 | strlen (ai->address) + 1); | ||
366 | GNUNET_MQ_send (ai->ch->mq, | ||
367 | env); | ||
368 | } | ||
369 | |||
370 | |||
371 | /** | ||
372 | * Send message to the transport service about queue @a qh | ||
373 | * being no longer available. | ||
374 | * | ||
375 | * @param qh queue to delete | ||
376 | */ | ||
377 | static void | ||
378 | send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
379 | { | ||
380 | struct GNUNET_MQ_Envelope *env; | ||
381 | struct GNUNET_TRANSPORT_DelQueueMessage *dqm; | ||
382 | |||
383 | if (NULL == ai->ch->mq) | ||
384 | return; | ||
385 | env = GNUNET_MQ_msg (dqm, | ||
386 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); | ||
387 | dqm.qid = htonl (qh->qid); | ||
388 | dqm.receiver = qh->peer; | ||
389 | GNUNET_MQ_send (ai->ch->mq, | ||
390 | env); | ||
391 | } | ||
392 | |||
393 | |||
394 | /** | ||
395 | * Disconnect from the transport service. Purges | ||
396 | * all flow control entries as we will no longer receive | ||
397 | * the ACKs. Purges the ack pending entries as the | ||
398 | * transport will no longer expect the confirmations. | ||
399 | * | ||
400 | * @param ch service to disconnect from | ||
401 | */ | ||
402 | static void | ||
403 | disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
404 | { | ||
405 | struct FlowControl *fcn; | ||
406 | struct AckPending *apn; | ||
407 | |||
408 | for (struct FlowControl *fc = ch->fc_head; | ||
409 | NULL != fc; | ||
410 | fc = fcn) | ||
411 | { | ||
412 | fcn = fc->next; | ||
413 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, | ||
414 | ch->fc_tail, | ||
415 | fc); | ||
416 | fc->cb (fc->cb_cls, | ||
417 | GNUNET_SYSERR); | ||
418 | GNUNET_free (fc); | ||
419 | } | ||
420 | for (struct AckPending *ap = ch->ap_head; | ||
421 | NULL != ap; | ||
422 | ap = apn) | ||
423 | { | ||
424 | apn = ap->next; | ||
425 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, | ||
426 | ch->ap_tail, | ||
427 | ap); | ||
428 | GNUNET_free (ap); | ||
429 | } | ||
430 | if (NULL == ch->mq) | ||
431 | return; | ||
432 | GNUNET_MQ_destroy (ch->mq); | ||
433 | ch->mq = NULL; | ||
434 | } | ||
435 | |||
436 | |||
437 | /** | ||
438 | * Function called on MQ errors. | ||
439 | */ | ||
440 | static void | ||
441 | error_handler (void *cls, | ||
442 | enum GNUNET_MQ_Error error) | ||
443 | { | ||
444 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
445 | |||
446 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
447 | "MQ failure, reconnecting to transport service.\n"); | ||
448 | disconnect (ch); | ||
449 | /* TODO: maybe do this with exponential backoff/delay */ | ||
450 | reconnect (ch); | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Transport service acknowledged a message we gave it | ||
456 | * (with flow control enabled). Tell the communicator. | ||
457 | * | ||
458 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
459 | * @param incoming_ack the ack | ||
460 | */ | ||
461 | static void | ||
462 | handle_incoming_ack (void *cls, | ||
463 | struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) | ||
464 | { | ||
465 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
466 | |||
467 | for (struct FlowControl *fc = ch->fc_head; | ||
468 | NULL != fc; | ||
469 | fc = fc->next) | ||
470 | { | ||
471 | if ( (fc->id == incoming_ack->fc_id) && | ||
472 | (0 == memcmp (&fc->sender, | ||
473 | incoming_ack->sender, | ||
474 | sizeof (struct GNUNET_PeerIdentity))) ) | ||
475 | { | ||
476 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, | ||
477 | ch->fc_tail, | ||
478 | fc); | ||
479 | fc->cb (fc->cb_cls, | ||
480 | GNUNET_OK); | ||
481 | GNUNET_free (fc); | ||
482 | return; | ||
483 | } | ||
484 | } | ||
485 | GNUNET_break (0); | ||
486 | disconnect (ch); | ||
487 | /* TODO: maybe do this with exponential backoff/delay */ | ||
488 | reconnect (ch); | ||
489 | } | ||
490 | |||
491 | |||
492 | /** | ||
493 | * Transport service wants us to create a queue. Check if @a cq | ||
494 | * is well-formed. | ||
495 | * | ||
496 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
497 | * @param cq the queue creation request | ||
498 | * @return #GNUNET_OK if @a smt is well-formed | ||
499 | */ | ||
500 | static int | ||
501 | check_create_queue (void *cls, | ||
502 | struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
503 | { | ||
504 | uint16_t len = ntohs (cq->header.size) - sizeof (*cq); | ||
505 | const char *addr = (const char *) &cq[1]; | ||
506 | |||
507 | if ( (0 == len) || | ||
508 | ('\0' != addr[len-1]) ) | ||
509 | { | ||
510 | GNUNET_break (0); | ||
511 | return GNUNET_SYSERR; | ||
512 | } | ||
513 | return GNUNET_OK; | ||
514 | } | ||
515 | |||
516 | |||
517 | /** | ||
518 | * Transport service wants us to create a queue. Tell the communicator. | ||
519 | * | ||
520 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
521 | * @param cq the queue creation request | ||
522 | */ | ||
523 | static void | ||
524 | handle_create_queue (void *cls, | ||
525 | struct GNUNET_TRANSPORT_CreateQueue *cq) | ||
526 | { | ||
527 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
528 | const char *addr = (const char *) &cq[1]; | ||
529 | |||
530 | if (GNUNET_OK != | ||
531 | ch->mq_init (ch->mq_init_cls, | ||
532 | &cq->receiver, | ||
533 | addr)) | ||
534 | { | ||
535 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
536 | "Address `%s' invalid for this communicator\n", | ||
537 | addr); | ||
538 | // TODO: do we notify the transport!? | ||
539 | } | ||
540 | } | ||
541 | |||
542 | |||
543 | /** | ||
544 | * Transport service wants us to send a message. Check if @a smt | ||
545 | * is well-formed. | ||
546 | * | ||
547 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
548 | * @param smt the transmission request | ||
549 | * @return #GNUNET_OK if @a smt is well-formed | ||
550 | */ | ||
551 | static int | ||
552 | check_send_msg (void *cls, | ||
553 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
554 | { | ||
555 | uint16_t len = ntohs (smt->header.size) - sizeof (*smt); | ||
556 | const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; | ||
557 | |||
558 | if (ntohs (mh->size) != len) | ||
559 | { | ||
560 | GNUNET_break (0); | ||
561 | return GNUNET_SYSERR; | ||
562 | } | ||
563 | return GNUNET_OK; | ||
564 | } | ||
565 | |||
566 | |||
567 | /** | ||
568 | * Notify transport service about @a status of a message with | ||
569 | * @a mid sent to @a receiver. | ||
570 | * | ||
571 | * @param ch handle | ||
572 | * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure | ||
573 | * @param receiver which peer was the receiver | ||
574 | * @param mid message that the ack is about | ||
575 | */ | ||
576 | static void | ||
577 | send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
578 | int status, | ||
579 | const struct GNUNET_PeerIdentity *receiver, | ||
580 | uint64_t mid) | ||
581 | { | ||
582 | struct GNUNET_MQ_Envelope *env; | ||
583 | struct GNUNET_TRANSPORT_SendMessageToAck *ack; | ||
584 | |||
585 | env = GNUNET_MQ_msg (ack, | ||
586 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); | ||
587 | ack->status = htonl (GNUNET_OK); | ||
588 | ack->mid = ap->mid; | ||
589 | ack->receiver = ap->receiver; | ||
590 | GNUNET_MQ_send (ch->mq, | ||
591 | env); | ||
592 | } | ||
593 | |||
594 | |||
595 | /** | ||
596 | * Message queue transmission by communicator was successful, | ||
597 | * notify transport service. | ||
598 | * | ||
599 | * @param cls an `struct AckPending *` | ||
600 | */ | ||
601 | static void | ||
602 | send_ack_cb (void *cls) | ||
603 | { | ||
604 | struct AckPending *ap = cls; | ||
605 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; | ||
606 | |||
607 | GNUNET_CONTAINER_DLL_remove (ch->ap_head, | ||
608 | ch->ap_tail, | ||
609 | ap); | ||
610 | send_ack (ch, | ||
611 | GNUNET_OK, | ||
612 | &ap->receiver, | ||
613 | ap->mid); | ||
614 | GNUNET_free (ap); | ||
615 | } | ||
616 | |||
617 | |||
618 | /** | ||
619 | * Transport service wants us to send a message. Tell the communicator. | ||
620 | * | ||
621 | * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` | ||
622 | * @param smt the transmission request | ||
623 | */ | ||
624 | static void | ||
625 | handle_send_msg (void *cls, | ||
626 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | ||
627 | { | ||
628 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | ||
629 | const struct GNUNET_MessageHeader *mh; | ||
630 | struct GNUNET_MQ_Envelope *env; | ||
631 | struct AckPending *ap; | ||
632 | struct QueueHandle *qh; | ||
633 | |||
634 | for (qh = ch->queue_head;NULL != qh; qh = qh->next) | ||
635 | if ( (qh->queue_id == smt->qid) && | ||
636 | (0 == memcmp (&qh->peer, | ||
637 | &smt->target, | ||
638 | sizeof (struct GNUNET_PeerIdentity))) ) | ||
639 | break; | ||
640 | if (NULL == qh) | ||
641 | { | ||
642 | /* queue is already gone, tell transport this one failed */ | ||
643 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
644 | "Transmission failed, queue no longer exists.\n"); | ||
645 | send_ack (ch, | ||
646 | GNUNET_NO, | ||
647 | &smt->receiver, | ||
648 | smt->mid); | ||
649 | return; | ||
650 | } | ||
651 | ap = GNUNET_new (struct AckPending); | ||
652 | ap->ch = ch; | ||
653 | ap->receiver = smt->receiver; | ||
654 | ap->mid = smt->mid; | ||
655 | GNUNET_CONTAINER_DLL_insert (ch->ap_head, | ||
656 | cp->ap_tail, | ||
657 | ap); | ||
658 | mh = (const struct GNUNET_MessageHeader *) &smt[1]; | ||
659 | env = GNUNET_MQ_msg_copy (mh); | ||
660 | GNUNET_MQ_notify_sent (env, | ||
661 | &send_ack_cb, | ||
662 | ap); | ||
663 | GNUNET_MQ_send (qh->mq, | ||
664 | env); | ||
665 | } | ||
666 | |||
667 | |||
668 | /** | ||
669 | * (re)connect our communicator to the transport service | ||
670 | * | ||
671 | * @param ch handle to reconnect | ||
672 | */ | ||
673 | static void | ||
674 | reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
675 | { | ||
676 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
677 | GNUNET_MQ_hd_fixed_size (incoming_ack, | ||
678 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, | ||
679 | struct GNUNET_TRANSPORT_IncomingMessageAck, | ||
680 | ch), | ||
681 | GNUNET_MQ_hd_var_size (create_queue, | ||
682 | GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, | ||
683 | struct GNUNET_TRANSPORT_CreateQueue, | ||
684 | ch), | ||
685 | GNUNET_MQ_hd_var_size (send_msg, | ||
686 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, | ||
687 | struct GNUNET_TRANSPORT_SendMessageTo, | ||
688 | ch), | ||
689 | GNUNET_MQ_handler_end() | ||
690 | }; | ||
691 | |||
692 | ch->mq = GNUNET_CLIENT_connect (cfg, | ||
693 | "transport", | ||
694 | handlers, | ||
695 | &error_handler, | ||
696 | ch); | ||
697 | for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; | ||
698 | NULL != ai; | ||
699 | ai = ai->next) | ||
700 | send_add_address (ai); | ||
701 | for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; | ||
702 | NULL != qh; | ||
703 | qh = qh->next) | ||
704 | send_add_queue (qh); | ||
705 | } | ||
706 | |||
707 | |||
708 | /** | ||
709 | * Connect to the transport service. | ||
710 | * | ||
711 | * @param cfg configuration to use | ||
712 | * @param name name of the communicator that is connecting | ||
713 | * @param mtu maximum message size supported by communicator, 0 if | ||
714 | * sending is not supported, SIZE_MAX for no MTU | ||
715 | * @param mq_init function to call to initialize a message queue given | ||
716 | * the address of another peer, can be NULL if the | ||
717 | * communicator only supports receiving messages | ||
718 | * @param mq_init_cls closure for @a mq_init | ||
719 | * @return NULL on error | ||
720 | */ | ||
721 | struct GNUNET_TRANSPORT_CommunicatorHandle * | ||
722 | GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
723 | const char *name, | ||
724 | size_t mtu, | ||
725 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init, | ||
726 | void *mq_init_cls) | ||
727 | { | ||
728 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
729 | |||
730 | ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); | ||
731 | ch->cfg = cfg; | ||
732 | ch->name = name; | ||
733 | ch->mtu = mtu; | ||
734 | ch->mq_init = mq_init; | ||
735 | ch->mq_init_cls = mq_init_cls; | ||
736 | reconnect (ch); | ||
737 | if (GNUNET_OK != | ||
738 | GNUNET_CONFIGURATION_get_value_number (cfg, | ||
739 | name, | ||
740 | "MAX_QUEUE_LENGTH", | ||
741 | &ch->max_queue_length)) | ||
742 | ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | ||
743 | if (NULL == ch->mq) | ||
744 | { | ||
745 | GNUNET_free (ch); | ||
746 | return NULL; | ||
747 | } | ||
748 | return ch; | ||
749 | } | ||
750 | |||
751 | |||
752 | /** | ||
753 | * Disconnect from the transport service. | ||
754 | * | ||
755 | * @param ch handle returned from connect | ||
756 | */ | ||
757 | void | ||
758 | GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | ||
759 | { | ||
760 | disconnect (ch); | ||
761 | while (NULL != ch->ai_head) | ||
762 | { | ||
763 | GNUNET_break (0); /* communicator forgot to remove address, warn! */ | ||
764 | GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); | ||
765 | } | ||
766 | GNUNET_free (ch); | ||
767 | } | ||
768 | |||
769 | |||
770 | /* ************************* Receiving *************************** */ | ||
771 | |||
772 | |||
773 | /** | ||
774 | * Notify transport service that the communicator has received | ||
775 | * a message. | ||
776 | * | ||
777 | * @param ch connection to transport service | ||
778 | * @param sender presumed sender of the message (details to be checked | ||
779 | * by higher layers) | ||
780 | * @param msg the message | ||
781 | * @param cb function to call once handling the message is done, NULL if | ||
782 | * flow control is not supported by this communicator | ||
783 | * @param cb_cls closure for @a cb | ||
784 | * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was | ||
785 | * immediately dropped due to memory limitations (communicator | ||
786 | * should try to apply back pressure), | ||
787 | * #GNUNET_SYSERR if the message could not be delivered because | ||
788 | * the tranport service is not yet up | ||
789 | */ | ||
790 | int | ||
791 | GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
792 | const struct GNUNET_PeerIdentity *sender, | ||
793 | const struct GNUNET_MessageHeader *msg, | ||
794 | GNUNET_TRANSPORT_MessageCompletedCallback cb, | ||
795 | void *cb_cls) | ||
796 | { | ||
797 | struct GNUNET_MQ_Envelope *env; | ||
798 | struct GNUNET_TRANSPORT_IncomingMessage *im; | ||
799 | uint16_t msize; | ||
800 | |||
801 | if (NULL == ai->ch->mq) | ||
802 | return GNUNET_SYSERR; | ||
803 | if (NULL != cb) | ||
804 | { | ||
805 | struct FlowControl *fc; | ||
806 | |||
807 | im->fc_on = htonl (GNUNET_YES); | ||
808 | im->fc_id = ai->ch->fc_gen++; | ||
809 | fc = GNUNET_new (struct FlowControl); | ||
810 | fc->sender = *sender; | ||
811 | fc->id = im->fc_id; | ||
812 | fc->cb = cb; | ||
813 | fc->cb_cls = cb_cls; | ||
814 | GNUNET_CONTAINER_DLL_insert (ch->fc_head, | ||
815 | ch->fc_tail, | ||
816 | fc); | ||
817 | } | ||
818 | else | ||
819 | { | ||
820 | if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) | ||
821 | { | ||
822 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
823 | "Dropping message: transprot is too slow, queue length %u exceeded\n", | ||
824 | ch->max_queue_length); | ||
825 | return GNUNET_NO; | ||
826 | } | ||
827 | } | ||
828 | |||
829 | msize = ntohs (msg->size); | ||
830 | env = GNUNET_MQ_msg_extra (im, | ||
831 | msize, | ||
832 | GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG); | ||
833 | if (NULL == env) | ||
834 | { | ||
835 | GNUNET_break (0); | ||
836 | return GNUNET_SYSERR; | ||
837 | } | ||
838 | im->sender = *sender; | ||
839 | memcpy (&im[1], | ||
840 | msg, | ||
841 | msize); | ||
842 | GNUNET_MQ_send (ai->ch->mq, | ||
843 | env); | ||
844 | return GNUNET_OK; | ||
845 | } | ||
846 | |||
847 | |||
848 | /* ************************* Discovery *************************** */ | ||
849 | |||
850 | |||
851 | /** | ||
852 | * Notify transport service that an MQ became available due to an | ||
853 | * "inbound" connection or because the communicator discovered the | ||
854 | * presence of another peer. | ||
855 | * | ||
856 | * @param ch connection to transport service | ||
857 | * @param peer peer with which we can now communicate | ||
858 | * @param address address in human-readable format, 0-terminated, UTF-8 | ||
859 | * @param nt which network type does the @a address belong to? | ||
860 | * @param mq message queue of the @a peer | ||
861 | * @return API handle identifying the new MQ | ||
862 | */ | ||
863 | struct GNUNET_TRANSPORT_QueueHandle * | ||
864 | GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
865 | const struct GNUNET_PeerIdentity *peer, | ||
866 | const char *address, | ||
867 | enum GNUNET_ATS_Network_Type nt, | ||
868 | struct GNUNET_MQ_Handle *mq) | ||
869 | { | ||
870 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
871 | |||
872 | qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle); | ||
873 | qh->ch = ch; | ||
874 | qh->peer = *peer; | ||
875 | qh->address = GNUNET_strdup (address); | ||
876 | qh->nt = nt; | ||
877 | qh->mq = mq; | ||
878 | qh->queue_id = ch->queue_gen++; | ||
879 | GNUNET_CONTAINER_DLL_insert (ch->queue_head, | ||
880 | ch->queue_tail, | ||
881 | qh); | ||
882 | send_add_queue (qh); | ||
883 | return qh; | ||
884 | } | ||
885 | |||
886 | |||
887 | /** | ||
888 | * Notify transport service that an MQ became unavailable due to a | ||
889 | * disconnect or timeout. | ||
890 | * | ||
891 | * @param qh handle for the queue that must be invalidated | ||
892 | */ | ||
893 | void | ||
894 | GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
895 | { | ||
896 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; | ||
897 | |||
898 | send_del_queue (qh); | ||
899 | GNUNET_CONTAINER_DLL_remove (ch->queue_head, | ||
900 | ch->queue_tail, | ||
901 | qh); | ||
902 | GNUNET_MQ_destroy (qh->mq); | ||
903 | GNUNET_free (qh->address); | ||
904 | GNUNET_free (qh); | ||
905 | } | ||
906 | |||
907 | |||
908 | /** | ||
909 | * Notify transport service about an address that this communicator | ||
910 | * provides for this peer. | ||
911 | * | ||
912 | * @param ch connection to transport service | ||
913 | * @param address our address in human-readable format, 0-terminated, UTF-8 | ||
914 | * @param nt which network type does the address belong to? | ||
915 | * @param expiration when does the communicator forsee this address expiring? | ||
916 | */ | ||
917 | struct GNUNET_TRANSPORT_AddressIdentifier * | ||
918 | GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
919 | const char *address, | ||
920 | enum GNUNET_ATS_Network_Type nt, | ||
921 | struct GNUNET_TIME_Relative expiration) | ||
922 | { | ||
923 | struct GNUNET_TRANSPORT_AddressIdentifier *ai; | ||
924 | |||
925 | ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier); | ||
926 | ai->ch = ch; | ||
927 | ai->address = GNUNET_strdup (address); | ||
928 | ai->nt = nt; | ||
929 | ai->expiration = expiration; | ||
930 | ai->aid = handle->aid_gen++; | ||
931 | GNUNET_CONTAINER_DLL_insert (handle->ai_head, | ||
932 | handle->ai_tail, | ||
933 | ai); | ||
934 | send_add_address (ai); | ||
935 | return ai; | ||
936 | } | ||
937 | |||
938 | |||
939 | /** | ||
940 | * Notify transport service about an address that this communicator no | ||
941 | * longer provides for this peer. | ||
942 | * | ||
943 | * @param ai address that is no longer provided | ||
944 | */ | ||
945 | void | ||
946 | GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | ||
947 | { | ||
948 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; | ||
949 | |||
950 | send_del_address (ai); | ||
951 | GNUNET_CONTAINER_DLL_remove (ch->ai_head, | ||
952 | ch->ai_tail, | ||
953 | ai); | ||
954 | GNUNET_free (ai->address); | ||
955 | GNUNET_free (ai); | ||
956 | } | ||
957 | |||
958 | |||
959 | /* end of transport_api2_communication.c */ | ||