aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-29 23:01:30 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-29 23:01:30 +0000
commitff1c357933910f707cdf13bb6ef705ef0ae90960 (patch)
treef503553309d25ed47983e1e15a1b462ff089ac04 /src/transport/transport_api.c
parent88295b5a3adc0f17ffd5fa4d2c1fafc632db6dab (diff)
downloadgnunet-ff1c357933910f707cdf13bb6ef705ef0ae90960.tar.gz
gnunet-ff1c357933910f707cdf13bb6ef705ef0ae90960.zip
-finally able to remove old transport API transmission logic
Diffstat (limited to 'src/transport/transport_api.c')
-rw-r--r--src/transport/transport_api.c1334
1 files changed, 0 insertions, 1334 deletions
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
deleted file mode 100644
index 83b7732e4..000000000
--- a/src/transport/transport_api.c
+++ /dev/null
@@ -1,1334 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2013, 2016 GNUnet e.V.
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19*/
20
21/**
22 * @file transport/transport_api.c
23 * @brief library to access the low-level P2P IO service
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - test test test
28 */
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_arm_service.h"
33#include "gnunet_hello_lib.h"
34#include "gnunet_protocols.h"
35#include "gnunet_transport_service.h"
36#include "transport.h"
37
38#define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40/**
41 * If we could not send any payload to a peer for this amount of
42 * time, we print a warning.
43 */
44#define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46/**
47 * How large to start with for the hashmap of neighbours.
48 */
49#define STARTING_NEIGHBOURS_SIZE 16
50
51/**
52 * Handle for a message that should be transmitted to the service.
53 * Used for both control messages and normal messages.
54 */
55struct GNUNET_TRANSPORT_TransmitHandle
56{
57
58 /**
59 * We keep all requests in a DLL.
60 */
61 struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63 /**
64 * We keep all requests in a DLL.
65 */
66 struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68 /**
69 * Neighbour for this handle, NULL for control messages.
70 */
71 struct Neighbour *neighbour;
72
73 /**
74 * Function to call when @e notify_size bytes are available
75 * for transmission.
76 */
77 GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79 /**
80 * Closure for @e notify.
81 */
82 void *notify_cls;
83
84 /**
85 * Time at which this request was originally scheduled.
86 */
87 struct GNUNET_TIME_Absolute request_start;
88
89 /**
90 * Timeout for this request, 0 for control messages.
91 */
92 struct GNUNET_TIME_Absolute timeout;
93
94 /**
95 * Task to trigger request timeout if the request is stalled due to
96 * congestion.
97 */
98 struct GNUNET_SCHEDULER_Task *timeout_task;
99
100 /**
101 * How many bytes is our notify callback waiting for?
102 */
103 size_t notify_size;
104
105};
106
107
108/**
109 * Entry in hash table of all of our current (connected) neighbours.
110 */
111struct Neighbour
112{
113 /**
114 * Overall transport handle.
115 */
116 struct GNUNET_TRANSPORT_Handle *h;
117
118 /**
119 * Active transmit handle or NULL.
120 */
121 struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123 /**
124 * Identity of this neighbour.
125 */
126 struct GNUNET_PeerIdentity id;
127
128 /**
129 * Outbound bandwidh tracker.
130 */
131 struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133 /**
134 * Entry in our readyness heap (which is sorted by @e next_ready
135 * value). NULL if there is no pending transmission request for
136 * this neighbour or if we're waiting for @e is_ready to become
137 * true AFTER the @e out_tracker suggested that this peer's quota
138 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139 * we should immediately go back into the heap).
140 */
141 struct GNUNET_CONTAINER_HeapNode *hn;
142
143 /**
144 * Last time when this peer received payload from us.
145 */
146 struct GNUNET_TIME_Absolute last_payload;
147
148 /**
149 * Task to trigger warnings if we do not get SEND_OK after a while.
150 */
151 struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153 /**
154 * Is this peer currently ready to receive a message?
155 */
156 int is_ready;
157
158 /**
159 * Sending consumed more bytes on wire than payload was announced
160 * This overhead is added to the delay of next sending operation
161 */
162 size_t traffic_overhead;
163};
164
165
166
167/**
168 * Handle for the transport service (includes all of the
169 * state for the transport service).
170 */
171struct GNUNET_TRANSPORT_Handle
172{
173
174 /**
175 * Closure for the callbacks.
176 */
177 void *cls;
178
179 /**
180 * Function to call for received data.
181 */
182 GNUNET_TRANSPORT_ReceiveCallback rec;
183
184 /**
185 * function to call on connect events
186 */
187 GNUNET_TRANSPORT_NotifyConnect nc_cb;
188
189 /**
190 * function to call on disconnect events
191 */
192 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
193
194 /**
195 * function to call on excess bandwidth events
196 */
197 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
198
199 /**
200 * The current HELLO message for this peer. Updated
201 * whenever transports change their addresses.
202 */
203 struct GNUNET_MessageHeader *my_hello;
204
205 /**
206 * My client connection to the transport service.
207 */
208 struct GNUNET_MQ_Handle *mq;
209
210 /**
211 * My configuration.
212 */
213 const struct GNUNET_CONFIGURATION_Handle *cfg;
214
215 /**
216 * Hash map of the current connected neighbours of this peer.
217 * Maps peer identities to `struct Neighbour` entries.
218 */
219 struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
220
221 /**
222 * Heap sorting peers with pending messages by the timestamps that
223 * specify when we could next send a message to the respective peer.
224 * Excludes control messages (which can always go out immediately).
225 * Maps time stamps to `struct Neighbour` entries.
226 */
227 struct GNUNET_CONTAINER_Heap *ready_heap;
228
229 /**
230 * Peer identity as assumed by this process, or all zeros.
231 */
232 struct GNUNET_PeerIdentity self;
233
234 /**
235 * ID of the task trying to reconnect to the service.
236 */
237 struct GNUNET_SCHEDULER_Task *reconnect_task;
238
239 /**
240 * ID of the task trying to trigger transmission for a peer while
241 * maintaining bandwidth quotas. In use if there are no control
242 * messages and the smallest entry in the @e ready_heap has a time
243 * stamp in the future.
244 */
245 struct GNUNET_SCHEDULER_Task *quota_task;
246
247 /**
248 * Delay until we try to reconnect.
249 */
250 struct GNUNET_TIME_Relative reconnect_delay;
251
252 /**
253 * Should we check that @e self matches what the service thinks?
254 * (if #GNUNET_NO, then @e self is all zeros!).
255 */
256 int check_self;
257
258 /**
259 * Reconnect in progress
260 */
261 int reconnecting;
262};
263
264
265/**
266 * Schedule the task to send one message, either from the control
267 * list or the peer message queues to the service.
268 *
269 * @param h transport service to schedule a transmission for
270 */
271static void
272schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
273
274
275/**
276 * Function that will schedule the job that will try
277 * to connect us again to the client.
278 *
279 * @param h transport service to reconnect
280 */
281static void
282disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
283
284
285/**
286 * A neighbour has not gotten a SEND_OK in a while. Print a warning.
287 *
288 * @param cls the `struct Neighbour`
289 */
290static void
291do_warn_unready (void *cls)
292{
293 struct Neighbour *n = cls;
294 struct GNUNET_TIME_Relative delay;
295
296 delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
297 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
298 "Lacking SEND_OK, no payload could be send to %s for %s\n",
299 GNUNET_i2s (&n->id),
300 GNUNET_STRINGS_relative_time_to_string (delay,
301 GNUNET_YES));
302 n->unready_warn_task
303 = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
304 &do_warn_unready,
305 n);
306}
307
308
309/**
310 * Get the neighbour list entry for the given peer
311 *
312 * @param h our context
313 * @param peer peer to look up
314 * @return NULL if no such peer entry exists
315 */
316static struct Neighbour *
317neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
318 const struct GNUNET_PeerIdentity *peer)
319{
320 return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
321 peer);
322}
323
324
325/**
326 * The outbound quota has changed in a way that may require
327 * us to reset the timeout. Update the timeout.
328 *
329 * @param cls the `struct Neighbour` for which the timeout changed
330 */
331static void
332outbound_bw_tracker_update (void *cls)
333{
334 struct Neighbour *n = cls;
335 struct GNUNET_TIME_Relative delay;
336
337 if (NULL == n->hn)
338 return;
339 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
340 n->th->notify_size + n->traffic_overhead);
341 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "New outbound delay %s us\n",
343 GNUNET_STRINGS_relative_time_to_string (delay,
344 GNUNET_NO));
345 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
346 n->hn,
347 delay.rel_value_us);
348 schedule_transmission (n->h);
349}
350
351
352/**
353 * Function called by the bandwidth tracker if we have excess
354 * bandwidth.
355 *
356 * @param cls the `struct Neighbour` that has excess bandwidth
357 */
358static void
359notify_excess_cb (void *cls)
360{
361 struct Neighbour *n = cls;
362 struct GNUNET_TRANSPORT_Handle *h = n->h;
363
364 if (NULL != h->neb_cb)
365 h->neb_cb (h->cls,
366 &n->id);
367}
368
369
370/**
371 * Add neighbour to our list
372 *
373 * @return NULL if this API is currently disconnecting from the service
374 */
375static struct Neighbour *
376neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
377 const struct GNUNET_PeerIdentity *pid)
378{
379 struct Neighbour *n;
380
381 LOG (GNUNET_ERROR_TYPE_DEBUG,
382 "Creating entry for neighbour `%s'.\n",
383 GNUNET_i2s (pid));
384 n = GNUNET_new (struct Neighbour);
385 n->id = *pid;
386 n->h = h;
387 n->is_ready = GNUNET_YES;
388 n->traffic_overhead = 0;
389 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
390 &outbound_bw_tracker_update,
391 n,
392 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
393 MAX_BANDWIDTH_CARRY_S,
394 &notify_excess_cb,
395 n);
396 GNUNET_assert (GNUNET_OK ==
397 GNUNET_CONTAINER_multipeermap_put (h->neighbours,
398 &n->id,
399 n,
400 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
401 return n;
402}
403
404
405/**
406 * Iterator over hash map entries, for deleting state of a neighbour.
407 *
408 * @param cls the `struct GNUNET_TRANSPORT_Handle *`
409 * @param key peer identity
410 * @param value value in the hash map, the neighbour entry to delete
411 * @return #GNUNET_YES if we should continue to
412 * iterate,
413 * #GNUNET_NO if not.
414 */
415static int
416neighbour_delete (void *cls,
417 const struct GNUNET_PeerIdentity *key,
418 void *value)
419{
420 struct GNUNET_TRANSPORT_Handle *handle = cls;
421 struct Neighbour *n = value;
422
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "Dropping entry for neighbour `%s'.\n",
425 GNUNET_i2s (key));
426 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
427 if (NULL != handle->nd_cb)
428 handle->nd_cb (handle->cls,
429 &n->id);
430 if (NULL != n->unready_warn_task)
431 {
432 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
433 n->unready_warn_task = NULL;
434 }
435 GNUNET_assert (NULL == n->th);
436 GNUNET_assert (NULL == n->hn);
437 GNUNET_assert (GNUNET_YES ==
438 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
439 key,
440 n));
441 GNUNET_free (n);
442 return GNUNET_YES;
443}
444
445
446/**
447 * Generic error handler, called with the appropriate
448 * error code and the same closure specified at the creation of
449 * the message queue.
450 * Not every message queue implementation supports an error handler.
451 *
452 * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
453 * @param error error code
454 */
455static void
456mq_error_handler (void *cls,
457 enum GNUNET_MQ_Error error)
458{
459 struct GNUNET_TRANSPORT_Handle *h = cls;
460
461 LOG (GNUNET_ERROR_TYPE_DEBUG,
462 "Error receiving from transport service, disconnecting temporarily.\n");
463 h->reconnecting = GNUNET_YES;
464 disconnect_and_schedule_reconnect (h);
465}
466
467
468/**
469 * Function we use for checking incoming HELLO messages.
470 *
471 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
472 * @param msg message received
473 * @return #GNUNET_OK if message is well-formed
474 */
475static int
476check_hello (void *cls,
477 const struct GNUNET_MessageHeader *msg)
478{
479 struct GNUNET_PeerIdentity me;
480
481 if (GNUNET_OK !=
482 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
483 &me))
484 {
485 GNUNET_break (0);
486 return GNUNET_SYSERR;
487 }
488 LOG (GNUNET_ERROR_TYPE_DEBUG,
489 "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
490 (unsigned int) ntohs (msg->size),
491 GNUNET_i2s (&me));
492 return GNUNET_OK;
493}
494
495
496/**
497 * Function we use for handling incoming HELLO messages.
498 *
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param msg message received
501 */
502static void
503handle_hello (void *cls,
504 const struct GNUNET_MessageHeader *msg)
505{
506 struct GNUNET_TRANSPORT_Handle *h = cls;
507
508 GNUNET_free_non_null (h->my_hello);
509 h->my_hello = GNUNET_copy_message (msg);
510}
511
512
513/**
514 * Function we use for handling incoming connect messages.
515 *
516 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
517 * @param cim message received
518 */
519static void
520handle_connect (void *cls,
521 const struct ConnectInfoMessage *cim)
522{
523 struct GNUNET_TRANSPORT_Handle *h = cls;
524 struct Neighbour *n;
525
526 LOG (GNUNET_ERROR_TYPE_DEBUG,
527 "Receiving CONNECT message for `%s'.\n",
528 GNUNET_i2s (&cim->id));
529 n = neighbour_find (h, &cim->id);
530 if (NULL != n)
531 {
532 GNUNET_break (0);
533 h->reconnecting = GNUNET_YES;
534 disconnect_and_schedule_reconnect (h);
535 return;
536 }
537 n = neighbour_add (h,
538 &cim->id);
539 LOG (GNUNET_ERROR_TYPE_DEBUG,
540 "Receiving CONNECT message for `%s' with quota %u\n",
541 GNUNET_i2s (&cim->id),
542 ntohl (cim->quota_out.value__));
543 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
544 cim->quota_out);
545 if (NULL != h->nc_cb)
546 h->nc_cb (h->cls,
547 &n->id);
548}
549
550
551/**
552 * Function we use for handling incoming disconnect messages.
553 *
554 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
555 * @param dim message received
556 */
557static void
558handle_disconnect (void *cls,
559 const struct DisconnectInfoMessage *dim)
560{
561 struct GNUNET_TRANSPORT_Handle *h = cls;
562 struct Neighbour *n;
563
564 GNUNET_break (ntohl (dim->reserved) == 0);
565 LOG (GNUNET_ERROR_TYPE_DEBUG,
566 "Receiving DISCONNECT message for `%s'.\n",
567 GNUNET_i2s (&dim->peer));
568 n = neighbour_find (h, &dim->peer);
569 if (NULL == n)
570 {
571 GNUNET_break (0);
572 h->reconnecting = GNUNET_YES;
573 disconnect_and_schedule_reconnect (h);
574 return;
575 }
576 neighbour_delete (h,
577 &dim->peer,
578 n);
579}
580
581
582/**
583 * Function we use for handling incoming send-ok messages.
584 *
585 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
586 * @param okm message received
587 */
588static void
589handle_send_ok (void *cls,
590 const struct SendOkMessage *okm)
591{
592 struct GNUNET_TRANSPORT_Handle *h = cls;
593 struct Neighbour *n;
594 uint32_t bytes_msg;
595 uint32_t bytes_physical;
596
597 bytes_msg = ntohl (okm->bytes_msg);
598 bytes_physical = ntohl (okm->bytes_physical);
599 LOG (GNUNET_ERROR_TYPE_DEBUG,
600 "Receiving SEND_OK message, transmission to %s %s.\n",
601 GNUNET_i2s (&okm->peer),
602 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
603
604 n = neighbour_find (h,
605 &okm->peer);
606 if (NULL == n)
607 {
608 /* We should never get a 'SEND_OK' for a peer that we are not
609 connected to */
610 GNUNET_break (0);
611 h->reconnecting = GNUNET_YES;
612 disconnect_and_schedule_reconnect (h);
613 return;
614 }
615 if (bytes_physical > bytes_msg)
616 {
617 LOG (GNUNET_ERROR_TYPE_DEBUG,
618 "Overhead for %u byte message was %u\n",
619 bytes_msg,
620 bytes_physical - bytes_msg);
621 n->traffic_overhead += bytes_physical - bytes_msg;
622 }
623 GNUNET_break (GNUNET_NO == n->is_ready);
624 n->is_ready = GNUNET_YES;
625 if (NULL != n->unready_warn_task)
626 {
627 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
628 n->unready_warn_task = NULL;
629 }
630 if ((NULL != n->th) && (NULL == n->hn))
631 {
632 GNUNET_assert (NULL != n->th->timeout_task);
633 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
634 n->th->timeout_task = NULL;
635 /* we've been waiting for this (congestion, not quota,
636 * caused delayed transmission) */
637 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
638 n,
639 0);
640 }
641 schedule_transmission (h);
642}
643
644
645/**
646 * Function we use for checking incoming "inbound" messages.
647 *
648 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
649 * @param im message received
650 */
651static int
652check_recv (void *cls,
653 const struct InboundMessage *im)
654{
655 const struct GNUNET_MessageHeader *imm;
656 uint16_t size;
657
658 size = ntohs (im->header.size);
659 if (size <
660 sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
661 {
662 GNUNET_break (0);
663 return GNUNET_SYSERR;
664 }
665 imm = (const struct GNUNET_MessageHeader *) &im[1];
666 if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
667 {
668 GNUNET_break (0);
669 return GNUNET_SYSERR;
670 }
671 return GNUNET_OK;
672}
673
674
675/**
676 * Function we use for handling incoming messages.
677 *
678 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
679 * @param im message received
680 */
681static void
682handle_recv (void *cls,
683 const struct InboundMessage *im)
684{
685 struct GNUNET_TRANSPORT_Handle *h = cls;
686 const struct GNUNET_MessageHeader *imm
687 = (const struct GNUNET_MessageHeader *) &im[1];
688 struct Neighbour *n;
689
690 LOG (GNUNET_ERROR_TYPE_DEBUG,
691 "Received message of type %u with %u bytes from `%s'.\n",
692 (unsigned int) ntohs (imm->type),
693 (unsigned int) ntohs (imm->size),
694 GNUNET_i2s (&im->peer));
695 n = neighbour_find (h, &im->peer);
696 if (NULL == n)
697 {
698 GNUNET_break (0);
699 h->reconnecting = GNUNET_YES;
700 disconnect_and_schedule_reconnect (h);
701 return;
702 }
703 if (NULL != h->rec)
704 h->rec (h->cls,
705 &im->peer,
706 imm);
707}
708
709
710/**
711 * Function we use for handling incoming set quota messages.
712 *
713 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
714 * @param msg message received
715 */
716static void
717handle_set_quota (void *cls,
718 const struct QuotaSetMessage *qm)
719{
720 struct GNUNET_TRANSPORT_Handle *h = cls;
721 struct Neighbour *n;
722
723 n = neighbour_find (h, &qm->peer);
724 if (NULL == n)
725 {
726 GNUNET_break (0);
727 h->reconnecting = GNUNET_YES;
728 disconnect_and_schedule_reconnect (h);
729 return;
730 }
731 LOG (GNUNET_ERROR_TYPE_DEBUG,
732 "Receiving SET_QUOTA message for `%s' with quota %u\n",
733 GNUNET_i2s (&qm->peer),
734 ntohl (qm->quota.value__));
735 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
736 qm->quota);
737}
738
739
740/**
741 * A transmission request could not be satisfied because of
742 * network congestion. Notify the initiator and clean up.
743 *
744 * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
745 */
746static void
747timeout_request_due_to_congestion (void *cls)
748{
749 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
750 struct Neighbour *n = th->neighbour;
751 struct GNUNET_TIME_Relative delay;
752
753 n->th->timeout_task = NULL;
754 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
755 LOG (GNUNET_ERROR_TYPE_WARNING,
756 "Discarding %u bytes of payload message after %s delay due to congestion\n",
757 th->notify_size,
758 GNUNET_STRINGS_relative_time_to_string (delay,
759 GNUNET_YES));
760 GNUNET_assert (th == n->th);
761 GNUNET_assert (NULL == n->hn);
762 n->th = NULL;
763 th->notify (th->notify_cls,
764 0,
765 NULL);
766 GNUNET_free (th);
767}
768
769
770/**
771 * Transmit ready message(s) to service.
772 *
773 * @param h handle to transport
774 */
775static void
776transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
777{
778 struct GNUNET_TRANSPORT_TransmitHandle *th;
779 struct GNUNET_TIME_Relative delay;
780 struct Neighbour *n;
781 struct OutboundMessage *obm;
782 struct GNUNET_MQ_Envelope *env;
783 size_t mret;
784
785 GNUNET_assert (NULL != h->mq);
786 while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
787 {
788 th = n->th;
789 if (GNUNET_YES != n->is_ready)
790 {
791 /* peer not ready, wait for notification! */
792 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
793 n->hn = NULL;
794 GNUNET_assert (NULL == n->th->timeout_task);
795 th->timeout_task
796 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
797 (th->timeout),
798 &timeout_request_due_to_congestion,
799 th);
800 continue;
801 }
802 if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
803 th->notify_size).rel_value_us > 0)
804 break; /* too early */
805 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
806 n->hn = NULL;
807 n->th = NULL;
808 env = GNUNET_MQ_msg_extra (obm,
809 th->notify_size,
810 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
811 mret = th->notify (th->notify_cls,
812 th->notify_size,
813 &obm[1]);
814 if (0 == mret)
815 {
816 GNUNET_free (th);
817 GNUNET_MQ_discard (env);
818 continue;
819 }
820 obm->header.size = htons (mret + sizeof (*obm));
821 if (NULL != n->unready_warn_task)
822 n->unready_warn_task
823 = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
824 &do_warn_unready,
825 n);
826 n->last_payload = GNUNET_TIME_absolute_get ();
827 n->is_ready = GNUNET_NO;
828 obm->reserved = htonl (0);
829 obm->timeout =
830 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
831 (th->timeout));
832 obm->peer = n->id;
833 GNUNET_MQ_send (h->mq,
834 env);
835 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
836 mret);
837 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
838 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
839 LOG (GNUNET_ERROR_TYPE_WARNING,
840 "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
841 mret,
842 GNUNET_i2s (&n->id),
843 GNUNET_STRINGS_relative_time_to_string (delay,
844 GNUNET_YES),
845 (unsigned int) n->out_tracker.available_bytes_per_s__);
846 else
847 LOG (GNUNET_ERROR_TYPE_DEBUG,
848 "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
849 mret,
850 GNUNET_i2s (&n->id),
851 GNUNET_STRINGS_relative_time_to_string (delay,
852 GNUNET_YES),
853 (unsigned int) n->out_tracker.available_bytes_per_s__);
854 GNUNET_free (th);
855 }
856 /* if there are more pending messages, try to schedule those */
857 schedule_transmission (h);
858}
859
860
861/**
862 * Schedule the task to send one message, either from the control
863 * list or the peer message queues to the service.
864 *
865 * @param cls transport service to schedule a transmission for
866 */
867static void
868schedule_transmission_task (void *cls)
869{
870 struct GNUNET_TRANSPORT_Handle *h = cls;
871 struct GNUNET_TRANSPORT_TransmitHandle *th;
872 struct Neighbour *n;
873
874 h->quota_task = NULL;
875 GNUNET_assert (NULL != h->mq);
876 /* destroy all requests that have timed out */
877 while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
878 (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
879 {
880 /* notify client that the request could not be satisfied within
881 * the given time constraints */
882 th = n->th;
883 n->th = NULL;
884 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
885 n->hn = NULL;
886 LOG (GNUNET_ERROR_TYPE_DEBUG,
887 "Signalling timeout for transmission to peer %s due to congestion\n",
888 GNUNET_i2s (&n->id));
889 GNUNET_assert (0 == th->notify (th->notify_cls,
890 0,
891 NULL));
892 GNUNET_free (th);
893 }
894 n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
895 if (NULL == n)
896 return; /* no pending messages */
897 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "Calling notify_transmit_ready\n");
899 transmit_ready (h);
900}
901
902
903/**
904 * Schedule the task to send one message, either from the control
905 * list or the peer message queues to the service.
906 *
907 * @param h transport service to schedule a transmission for
908 */
909static void
910schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
911{
912 struct GNUNET_TIME_Relative delay;
913 struct Neighbour *n;
914
915 GNUNET_assert (NULL != h->mq);
916 if (NULL != h->quota_task)
917 {
918 GNUNET_SCHEDULER_cancel (h->quota_task);
919 h->quota_task = NULL;
920 }
921 if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
922 {
923 delay =
924 GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
925 n->th->notify_size + n->traffic_overhead);
926 n->traffic_overhead = 0;
927 }
928 else
929 {
930 LOG (GNUNET_ERROR_TYPE_DEBUG,
931 "No work to be done, not scheduling transmission.\n");
932 return; /* no work to be done */
933 }
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Scheduling next transmission to service in %s\n",
936 GNUNET_STRINGS_relative_time_to_string (delay,
937 GNUNET_YES));
938 h->quota_task =
939 GNUNET_SCHEDULER_add_delayed (delay,
940 &schedule_transmission_task,
941 h);
942}
943
944
945/**
946 * Try again to connect to transport service.
947 *
948 * @param cls the handle to the transport service
949 */
950static void
951reconnect (void *cls)
952{
953 GNUNET_MQ_hd_var_size (hello,
954 GNUNET_MESSAGE_TYPE_HELLO,
955 struct GNUNET_MessageHeader);
956 GNUNET_MQ_hd_fixed_size (connect,
957 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
958 struct ConnectInfoMessage);
959 GNUNET_MQ_hd_fixed_size (disconnect,
960 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
961 struct DisconnectInfoMessage);
962 GNUNET_MQ_hd_fixed_size (send_ok,
963 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
964 struct SendOkMessage);
965 GNUNET_MQ_hd_var_size (recv,
966 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
967 struct InboundMessage);
968 GNUNET_MQ_hd_fixed_size (set_quota,
969 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
970 struct QuotaSetMessage);
971 struct GNUNET_TRANSPORT_Handle *h = cls;
972 struct GNUNET_MQ_MessageHandler handlers[] = {
973 make_hello_handler (h),
974 make_connect_handler (h),
975 make_disconnect_handler (h),
976 make_send_ok_handler (h),
977 make_recv_handler (h),
978 make_set_quota_handler (h),
979 GNUNET_MQ_handler_end ()
980 };
981 struct GNUNET_MQ_Envelope *env;
982 struct StartMessage *s;
983 uint32_t options;
984
985 h->reconnect_task = NULL;
986 LOG (GNUNET_ERROR_TYPE_DEBUG,
987 "Connecting to transport service.\n");
988 GNUNET_assert (NULL == h->mq);
989 h->reconnecting = GNUNET_NO;
990 h->mq = GNUNET_CLIENT_connecT (h->cfg,
991 "transport",
992 handlers,
993 &mq_error_handler,
994 h);
995 if (NULL == h->mq)
996 return;
997 env = GNUNET_MQ_msg (s,
998 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
999 options = 0;
1000 if (h->check_self)
1001 options |= 1;
1002 if (NULL != h->rec)
1003 options |= 2;
1004 s->options = htonl (options);
1005 s->self = h->self;
1006 GNUNET_MQ_send (h->mq,
1007 env);
1008}
1009
1010
1011/**
1012 * Function that will schedule the job that will try
1013 * to connect us again to the client.
1014 *
1015 * @param h transport service to reconnect
1016 */
1017static void
1018disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1019{
1020 GNUNET_assert (NULL == h->reconnect_task);
1021 if (NULL != h->mq)
1022 {
1023 GNUNET_MQ_destroy (h->mq);
1024 h->mq = NULL;
1025 }
1026 /* Forget about all neighbours that we used to be connected to */
1027 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1028 &neighbour_delete,
1029 h);
1030 if (NULL != h->quota_task)
1031 {
1032 GNUNET_SCHEDULER_cancel (h->quota_task);
1033 h->quota_task = NULL;
1034 }
1035 LOG (GNUNET_ERROR_TYPE_DEBUG,
1036 "Scheduling task to reconnect to transport service in %s.\n",
1037 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1038 GNUNET_YES));
1039 h->reconnect_task =
1040 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1041 &reconnect,
1042 h);
1043 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1044}
1045
1046
1047/**
1048 * Set transport metrics for a peer and a direction.
1049 *
1050 * @param handle transport handle
1051 * @param peer the peer to set the metric for
1052 * @param prop the performance metrics to set
1053 * @param delay_in inbound delay to introduce
1054 * @param delay_out outbound delay to introduce
1055 *
1056 * Note: Delay restrictions in receiving direction will be enforced
1057 * with one message delay.
1058 */
1059void
1060GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1061 const struct GNUNET_PeerIdentity *peer,
1062 const struct GNUNET_ATS_Properties *prop,
1063 struct GNUNET_TIME_Relative delay_in,
1064 struct GNUNET_TIME_Relative delay_out)
1065{
1066 struct GNUNET_MQ_Envelope *env;
1067 struct TrafficMetricMessage *msg;
1068
1069 if (NULL == handle->mq)
1070 return;
1071 env = GNUNET_MQ_msg (msg,
1072 GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1073 msg->reserved = htonl (0);
1074 msg->peer = *peer;
1075 GNUNET_ATS_properties_hton (&msg->properties,
1076 prop);
1077 msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1078 msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1079 GNUNET_MQ_send (handle->mq,
1080 env);
1081}
1082
1083
1084/**
1085 * Checks if a given peer is connected to us
1086 *
1087 * @param handle connection to transport service
1088 * @param peer the peer to check
1089 * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1090 */
1091int
1092GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1093 const struct GNUNET_PeerIdentity *peer)
1094{
1095 if (GNUNET_YES ==
1096 GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1097 peer))
1098 return GNUNET_YES;
1099 return GNUNET_NO;
1100}
1101
1102
1103/**
1104 * Connect to the transport service. Note that the connection may
1105 * complete (or fail) asynchronously.
1106 *
1107 * @param cfg configuration to use
1108 * @param self our own identity (API should check that it matches
1109 * the identity found by transport), or NULL (no check)
1110 * @param cls closure for the callbacks
1111 * @param rec receive function to call
1112 * @param nc function to call on connect events
1113 * @param nd function to call on disconnect events
1114 * @return NULL on error
1115 */
1116struct GNUNET_TRANSPORT_Handle *
1117GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1118 const struct GNUNET_PeerIdentity *self,
1119 void *cls,
1120 GNUNET_TRANSPORT_ReceiveCallback rec,
1121 GNUNET_TRANSPORT_NotifyConnect nc,
1122 GNUNET_TRANSPORT_NotifyDisconnect nd)
1123{
1124 return GNUNET_TRANSPORT_connect2 (cfg,
1125 self,
1126 cls,
1127 rec,
1128 nc,
1129 nd,
1130 NULL);
1131}
1132
1133
1134/**
1135 * Connect to the transport service. Note that the connection may
1136 * complete (or fail) asynchronously.
1137 *
1138 * @param cfg configuration to use
1139 * @param self our own identity (API should check that it matches
1140 * the identity found by transport), or NULL (no check)
1141 * @param cls closure for the callbacks
1142 * @param rec receive function to call
1143 * @param nc function to call on connect events
1144 * @param nd function to call on disconnect events
1145 * @param neb function to call if we have excess bandwidth to a peer
1146 * @return NULL on error
1147 */
1148struct GNUNET_TRANSPORT_Handle *
1149GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1150 const struct GNUNET_PeerIdentity *self,
1151 void *cls,
1152 GNUNET_TRANSPORT_ReceiveCallback rec,
1153 GNUNET_TRANSPORT_NotifyConnect nc,
1154 GNUNET_TRANSPORT_NotifyDisconnect nd,
1155 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1156{
1157 struct GNUNET_TRANSPORT_Handle *h;
1158
1159 h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1160 if (NULL != self)
1161 {
1162 h->self = *self;
1163 h->check_self = GNUNET_YES;
1164 }
1165 h->cfg = cfg;
1166 h->cls = cls;
1167 h->rec = rec;
1168 h->nc_cb = nc;
1169 h->nd_cb = nd;
1170 h->neb_cb = neb;
1171 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1173 "Connecting to transport service.\n");
1174 reconnect (h);
1175 if (NULL == h->mq)
1176 {
1177 GNUNET_free (h);
1178 return NULL;
1179 }
1180 h->neighbours =
1181 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1182 GNUNET_YES);
1183 h->ready_heap =
1184 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1185 return h;
1186}
1187
1188
1189/**
1190 * Disconnect from the transport service.
1191 *
1192 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1193 */
1194void
1195GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1196{
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Transport disconnect called!\n");
1199 /* this disconnects all neighbours... */
1200 if (NULL == handle->reconnect_task)
1201 disconnect_and_schedule_reconnect (handle);
1202 /* and now we stop trying to connect again... */
1203 if (NULL != handle->reconnect_task)
1204 {
1205 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1206 handle->reconnect_task = NULL;
1207 }
1208 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1209 handle->neighbours = NULL;
1210 if (NULL != handle->quota_task)
1211 {
1212 GNUNET_SCHEDULER_cancel (handle->quota_task);
1213 handle->quota_task = NULL;
1214 }
1215 GNUNET_free_non_null (handle->my_hello);
1216 handle->my_hello = NULL;
1217 GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1218 handle->ready_heap = NULL;
1219 GNUNET_free (handle);
1220}
1221
1222
1223/**
1224 * Check if we could queue a message of the given size for
1225 * transmission. The transport service will take both its
1226 * internal buffers and bandwidth limits imposed by the
1227 * other peer into consideration when answering this query.
1228 *
1229 * @param handle connection to transport service
1230 * @param target who should receive the message
1231 * @param size how big is the message we want to transmit?
1232 * @param timeout after how long should we give up (and call
1233 * notify with buf NULL and size 0)?
1234 * @param notify function to call when we are ready to
1235 * send such a message
1236 * @param notify_cls closure for @a notify
1237 * @return NULL if someone else is already waiting to be notified
1238 * non-NULL if the notify callback was queued (can be used to cancel
1239 * using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1240 */
1241struct GNUNET_TRANSPORT_TransmitHandle *
1242GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1243 const struct GNUNET_PeerIdentity *target,
1244 size_t size,
1245 struct GNUNET_TIME_Relative timeout,
1246 GNUNET_TRANSPORT_TransmitReadyNotify notify,
1247 void *notify_cls)
1248{
1249 struct Neighbour *n;
1250 struct GNUNET_TRANSPORT_TransmitHandle *th;
1251 struct GNUNET_TIME_Relative delay;
1252
1253 n = neighbour_find (handle, target);
1254 if (NULL == n)
1255 {
1256 /* only use this function
1257 * once a connection has been established */
1258 GNUNET_assert (0);
1259 return NULL;
1260 }
1261 if (NULL != n->th)
1262 {
1263 /* attempt to send two messages at the same time to the same peer */
1264 GNUNET_assert (0);
1265 return NULL;
1266 }
1267 GNUNET_assert (NULL == n->hn);
1268 th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1269 th->neighbour = n;
1270 th->notify = notify;
1271 th->notify_cls = notify_cls;
1272 th->request_start = GNUNET_TIME_absolute_get ();
1273 th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1274 th->notify_size = size;
1275 n->th = th;
1276 /* calculate when our transmission should be ready */
1277 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1278 size + n->traffic_overhead);
1279 n->traffic_overhead = 0;
1280 if (delay.rel_value_us > timeout.rel_value_us)
1281 delay.rel_value_us = 0; /* notify immediately (with failure) */
1282 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1283 LOG (GNUNET_ERROR_TYPE_WARNING,
1284 "At bandwidth %u byte/s next transmission to %s in %s\n",
1285 (unsigned int) n->out_tracker.available_bytes_per_s__,
1286 GNUNET_i2s (target),
1287 GNUNET_STRINGS_relative_time_to_string (delay,
1288 GNUNET_YES));
1289 else
1290 LOG (GNUNET_ERROR_TYPE_DEBUG,
1291 "At bandwidth %u byte/s next transmission to %s in %s\n",
1292 (unsigned int) n->out_tracker.available_bytes_per_s__,
1293 GNUNET_i2s (target),
1294 GNUNET_STRINGS_relative_time_to_string (delay,
1295 GNUNET_YES));
1296 n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1297 n,
1298 delay.rel_value_us);
1299 schedule_transmission (handle);
1300 return th;
1301}
1302
1303
1304/**
1305 * Cancel the specified transmission-ready notification.
1306 *
1307 * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1308 */
1309void
1310GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1311{
1312 struct Neighbour *n;
1313
1314 GNUNET_assert (NULL == th->next);
1315 GNUNET_assert (NULL == th->prev);
1316 n = th->neighbour;
1317 GNUNET_assert (th == n->th);
1318 n->th = NULL;
1319 if (NULL != n->hn)
1320 {
1321 GNUNET_CONTAINER_heap_remove_node (n->hn);
1322 n->hn = NULL;
1323 }
1324 else
1325 {
1326 GNUNET_assert (NULL != th->timeout_task);
1327 GNUNET_SCHEDULER_cancel (th->timeout_task);
1328 th->timeout_task = NULL;
1329 }
1330 GNUNET_free (th);
1331}
1332
1333
1334/* end of transport_api.c */