aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api_core.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api_core.c')
-rw-r--r--src/transport/transport_api_core.c968
1 files changed, 0 insertions, 968 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
deleted file mode 100644
index 0ffaeb444..000000000
--- a/src/transport/transport_api_core.c
+++ /dev/null
@@ -1,968 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2013, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/transport_api_core.c
23 * @brief library to access the transport service for message exchange
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_constants.h"
29#include "gnunet_arm_service.h"
30#include "gnunet_hello_lib.h"
31#include "gnunet_protocols.h"
32#include "gnunet_transport_service.h"
33#include "transport.h"
34
35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
36
37/**
38 * If we could not send any payload to a peer for this amount of
39 * time, we print a warning.
40 */
41#define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
42
43/**
44 * How large to start with for the hashmap of neighbours.
45 */
46#define STARTING_NEIGHBOURS_SIZE 16
47
48
49/**
50 * Entry in hash table of all of our current (connected) neighbours.
51 */
52struct Neighbour
53{
54 /**
55 * Overall transport handle.
56 */
57 struct GNUNET_TRANSPORT_CoreHandle *h;
58
59 /**
60 * Active message queue for the peer.
61 */
62 struct GNUNET_MQ_Handle *mq;
63
64 /**
65 * Envelope with the message we are currently transmitting (or NULL).
66 */
67 struct GNUNET_MQ_Envelope *env;
68
69 /**
70 * Closure for @e mq handlers.
71 */
72 void *handlers_cls;
73
74 /**
75 * Identity of this neighbour.
76 */
77 struct GNUNET_PeerIdentity id;
78
79 /**
80 * Outbound bandwidh tracker.
81 */
82 struct GNUNET_BANDWIDTH_Tracker out_tracker;
83
84 /**
85 * Entry in our readiness heap (which is sorted by @e next_ready
86 * value). NULL if there is no pending transmission request for
87 * this neighbour or if we're waiting for @e is_ready to become
88 * true AFTER the @e out_tracker suggested that this peer's quota
89 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
90 * we should immediately go back into the heap).
91 */
92 struct GNUNET_CONTAINER_HeapNode *hn;
93
94 /**
95 * Task to trigger MQ when we have enough bandwidth for the
96 * next transmission.
97 */
98 struct GNUNET_SCHEDULER_Task *timeout_task;
99
100 /**
101 * Sending consumed more bytes on wire than payload was announced
102 * This overhead is added to the delay of next sending operation
103 */
104 unsigned long long traffic_overhead;
105
106 /**
107 * Is this peer currently ready to receive a message?
108 */
109 int is_ready;
110
111 /**
112 * Size of the message in @e env.
113 */
114 uint16_t env_size;
115};
116
117
118/**
119 * Handle for the transport service (includes all of the
120 * state for the transport service).
121 */
122struct GNUNET_TRANSPORT_CoreHandle
123{
124 /**
125 * Closure for the callbacks.
126 */
127 void *cls;
128
129 /**
130 * Functions to call for received data (template for
131 * new message queues).
132 */
133 struct GNUNET_MQ_MessageHandler *handlers;
134
135 /**
136 * function to call on connect events
137 */
138 GNUNET_TRANSPORT_NotifyConnect nc_cb;
139
140 /**
141 * function to call on disconnect events
142 */
143 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
144
145 /**
146 * function to call on excess bandwidth events
147 */
148 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
149
150 /**
151 * My client connection to the transport service.
152 */
153 struct GNUNET_MQ_Handle *mq;
154
155 /**
156 * My configuration.
157 */
158 const struct GNUNET_CONFIGURATION_Handle *cfg;
159
160 /**
161 * Hash map of the current connected neighbours of this peer.
162 * Maps peer identities to `struct Neighbour` entries.
163 */
164 struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
165
166 /**
167 * Peer identity as assumed by this process, or all zeros.
168 */
169 struct GNUNET_PeerIdentity self;
170
171 /**
172 * ID of the task trying to reconnect to the service.
173 */
174 struct GNUNET_SCHEDULER_Task *reconnect_task;
175
176 /**
177 * Delay until we try to reconnect.
178 */
179 struct GNUNET_TIME_Relative reconnect_delay;
180
181 /**
182 * Internal counter to check how many more receive OK messages this
183 * CORE service is allowed to send in total. Just to detect easy
184 * cases of protocol violations by the CORE implementation.
185 * NOTE: we may want to make this stronger by counting per peer
186 * instead of globally.
187 */
188 unsigned int rom_pending;
189
190 /**
191 * Should we check that @e self matches what the service thinks?
192 * (if #GNUNET_NO, then @e self is all zeros!).
193 */
194 int check_self;
195};
196
197
198/**
199 * Function that will schedule the job that will try
200 * to connect us again to the client.
201 *
202 * @param h transport service to reconnect
203 */
204static void
205disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
206
207
208/**
209 * Get the neighbour list entry for the given peer
210 *
211 * @param h our context
212 * @param peer peer to look up
213 * @return NULL if no such peer entry exists
214 */
215static struct Neighbour *
216neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
217 const struct GNUNET_PeerIdentity *peer)
218{
219 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
220}
221
222
223/**
224 * Function called by the bandwidth tracker if we have excess
225 * bandwidth.
226 *
227 * @param cls the `struct Neighbour` that has excess bandwidth
228 */
229static void
230notify_excess_cb (void *cls)
231{
232 struct Neighbour *n = cls;
233 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
234
235 LOG (GNUNET_ERROR_TYPE_DEBUG,
236 "Notifying CORE that more bandwidth is available for %s\n",
237 GNUNET_i2s (&n->id));
238
239 if (NULL != h->neb_cb)
240 h->neb_cb (h->cls, &n->id, n->handlers_cls);
241}
242
243
244/**
245 * Iterator over hash map entries, for deleting state of a neighbour.
246 *
247 * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
248 * @param key peer identity
249 * @param value value in the hash map, the neighbour entry to delete
250 * @return #GNUNET_YES if we should continue to
251 * iterate,
252 * #GNUNET_NO if not.
253 */
254static int
255neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
256{
257 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
258 struct Neighbour *n = value;
259
260 LOG (GNUNET_ERROR_TYPE_DEBUG,
261 "Dropping entry for neighbour `%s'.\n",
262 GNUNET_i2s (key));
263 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
264 if (NULL != handle->nd_cb)
265 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
266 if (NULL != n->timeout_task)
267 {
268 GNUNET_SCHEDULER_cancel (n->timeout_task);
269 n->timeout_task = NULL;
270 }
271 if (NULL != n->env)
272 {
273 GNUNET_MQ_send_cancel (n->env);
274 n->env = NULL;
275 }
276 GNUNET_MQ_destroy (n->mq);
277 GNUNET_assert (NULL == n->mq);
278 GNUNET_assert (
279 GNUNET_YES ==
280 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
281 GNUNET_free (n);
282 return GNUNET_YES;
283}
284
285
286/**
287 * Generic error handler, called with the appropriate
288 * error code and the same closure specified at the creation of
289 * the message queue.
290 * Not every message queue implementation supports an error handler.
291 *
292 * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
293 * @param error error code
294 */
295static void
296mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
297{
298 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
299
300 LOG (GNUNET_ERROR_TYPE_ERROR,
301 "Error receiving from transport service (%d), disconnecting temporarily.\n",
302 error);
303 disconnect_and_schedule_reconnect (h);
304}
305
306
307/**
308 * Function we use for checking incoming HELLO messages.
309 *
310 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
311 * @param msg message received
312 * @return #GNUNET_OK if message is well-formed
313 */
314static int
315check_hello (void *cls, const struct GNUNET_MessageHeader *msg)
316{
317 struct GNUNET_PeerIdentity me;
318
319 if (GNUNET_OK !=
320 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
321 {
322 GNUNET_break (0);
323 return GNUNET_SYSERR;
324 }
325 return GNUNET_OK;
326}
327
328
329/**
330 * Function we use for handling incoming HELLO messages.
331 *
332 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
333 * @param msg message received
334 */
335static void
336handle_hello (void *cls, const struct GNUNET_MessageHeader *msg)
337{
338 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
339}
340
341
342/**
343 * A message from the handler's message queue to a neighbour was
344 * transmitted. Now trigger (possibly delayed) notification of the
345 * neighbour's message queue that we are done and thus ready for
346 * the next message.
347 *
348 * @param cls the `struct Neighbour` where the message was sent
349 */
350static void
351notify_send_done_fin (void *cls)
352{
353 struct Neighbour *n = cls;
354
355 n->timeout_task = NULL;
356 n->is_ready = GNUNET_YES;
357 GNUNET_MQ_impl_send_continue (n->mq);
358}
359
360
361/**
362 * A message from the handler's message queue to a neighbour was
363 * transmitted. Now trigger (possibly delayed) notification of the
364 * neighbour's message queue that we are done and thus ready for
365 * the next message.
366 *
367 * @param cls the `struct Neighbour` where the message was sent
368 */
369static void
370notify_send_done (void *cls)
371{
372 struct Neighbour *n = cls;
373 struct GNUNET_TIME_Relative delay;
374
375 n->timeout_task = NULL;
376 if (NULL != n->env)
377 {
378 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
379 n->env_size + n->traffic_overhead);
380 n->env = NULL;
381 n->traffic_overhead = 0;
382 }
383 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
384 if (0 == delay.rel_value_us)
385 {
386 n->is_ready = GNUNET_YES;
387 GNUNET_MQ_impl_send_continue (n->mq);
388 return;
389 }
390 GNUNET_MQ_impl_send_in_flight (n->mq);
391 /* cannot send even a small message without violating
392 quota, wait a before allowing MQ to send next message */
393 n->timeout_task =
394 GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done_fin, n);
395}
396
397
398/**
399 * Implement sending functionality of a message queue.
400 * Called one message at a time. Should send the @a msg
401 * to the transport service and then notify the queue
402 * once we are ready for the next one.
403 *
404 * @param mq the message queue
405 * @param msg the message to send
406 * @param impl_state state of the implementation
407 */
408static void
409mq_send_impl (struct GNUNET_MQ_Handle *mq,
410 const struct GNUNET_MessageHeader *msg,
411 void *impl_state)
412{
413 struct Neighbour *n = impl_state;
414 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
415 struct OutboundMessage *obm;
416 uint16_t msize;
417
418 GNUNET_assert (GNUNET_YES == n->is_ready);
419 msize = ntohs (msg->size);
420 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof(*obm))
421 {
422 GNUNET_break (0);
423 GNUNET_MQ_impl_send_continue (mq);
424 return;
425 }
426 GNUNET_assert (NULL == n->env);
427 n->env =
428 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
429 {
430 struct GNUNET_MQ_Envelope *env;
431
432 env = GNUNET_MQ_get_current_envelope (mq);
433 obm->priority = htonl ((uint32_t) GNUNET_MQ_env_get_options (env));
434 }
435 obm->timeout = GNUNET_TIME_relative_hton (
436 GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
437 obm->peer = n->id;
438 GNUNET_assert (NULL == n->timeout_task);
439 n->is_ready = GNUNET_NO;
440 n->env_size = ntohs (msg->size);
441 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
442 GNUNET_MQ_send (h->mq, n->env);
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Queued message of type %u for neighbour `%s'.\n",
445 ntohs (msg->type),
446 GNUNET_i2s (&n->id));
447}
448
449
450/**
451 * Handle destruction of a message queue. Implementations must not
452 * free @a mq, but should take care of @a impl_state.
453 *
454 * @param mq the message queue to destroy
455 * @param impl_state state of the implementation
456 */
457static void
458mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
459{
460 struct Neighbour *n = impl_state;
461
462 GNUNET_assert (mq == n->mq);
463 n->mq = NULL;
464}
465
466
467/**
468 * Implementation function that cancels the currently sent message.
469 * Should basically undo whatever #mq_send_impl() did.
470 *
471 * @param mq message queue
472 * @param impl_state state specific to the implementation
473 */
474static void
475mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
476{
477 struct Neighbour *n = impl_state;
478
479 GNUNET_assert (GNUNET_NO == n->is_ready);
480 if (NULL != n->env)
481 {
482 GNUNET_MQ_send_cancel (n->env);
483 n->env = NULL;
484 }
485
486 n->is_ready = GNUNET_YES;
487}
488
489
490/**
491 * We had an error processing a message we forwarded from a peer to
492 * the CORE service. We should just complain about it but otherwise
493 * continue processing.
494 *
495 * @param cls closure
496 * @param error error code
497 */
498static void
499peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
500{
501 /* struct Neighbour *n = cls; */
502
503 GNUNET_break_op (0);
504}
505
506
507/**
508 * The outbound quota has changed in a way that may require
509 * us to reset the timeout. Update the timeout.
510 *
511 * @param cls the `struct Neighbour` for which the timeout changed
512 */
513static void
514outbound_bw_tracker_update (void *cls)
515{
516 struct Neighbour *n = cls;
517 struct GNUNET_TIME_Relative delay;
518
519 if (NULL == n->timeout_task)
520 return;
521 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
522 GNUNET_SCHEDULER_cancel (n->timeout_task);
523 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done, n);
524}
525
526
527/**
528 * Function we use for handling incoming connect messages.
529 *
530 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
531 * @param cim message received
532 */
533static void
534handle_connect (void *cls, const struct ConnectInfoMessage *cim)
535{
536 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
537 struct Neighbour *n;
538
539 LOG (GNUNET_ERROR_TYPE_DEBUG,
540 "Receiving CONNECT message for `%s' with quota %u\n",
541 GNUNET_i2s (&cim->id),
542 ntohl (cim->quota_out.value__));
543 n = neighbour_find (h, &cim->id);
544 if (NULL != n)
545 {
546 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */
547 disconnect_and_schedule_reconnect (h);
548 return;
549 }
550 n = GNUNET_new (struct Neighbour);
551 n->id = cim->id;
552 n->h = h;
553 n->is_ready = GNUNET_YES;
554 n->traffic_overhead = 0;
555 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
556 &outbound_bw_tracker_update,
557 n,
558 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
559 MAX_BANDWIDTH_CARRY_S,
560 &notify_excess_cb,
561 n);
562 GNUNET_assert (GNUNET_OK ==
563 GNUNET_CONTAINER_multipeermap_put (
564 h->neighbours,
565 &n->id,
566 n,
567 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
568
569 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out);
570 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
571 &mq_destroy_impl,
572 &mq_cancel_impl,
573 n,
574 h->handlers,
575 &peer_mq_error_handler,
576 n);
577 if (NULL != h->nc_cb)
578 {
579 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
580 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
581 }
582}
583
584
585/**
586 * Function we use for handling incoming disconnect messages.
587 *
588 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
589 * @param dim message received
590 */
591static void
592handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
593{
594 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
595 struct Neighbour *n;
596
597 GNUNET_break (ntohl (dim->reserved) == 0);
598 LOG (GNUNET_ERROR_TYPE_DEBUG,
599 "Receiving DISCONNECT message for `%s'.\n",
600 GNUNET_i2s (&dim->peer));
601 n = neighbour_find (h, &dim->peer);
602 if (NULL == n)
603 {
604 GNUNET_break (0);
605 disconnect_and_schedule_reconnect (h);
606 return;
607 }
608 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
609}
610
611
612/**
613 * Function we use for handling incoming send-ok messages.
614 *
615 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
616 * @param okm message received
617 */
618static void
619handle_send_ok (void *cls, const struct SendOkMessage *okm)
620{
621 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
622 struct Neighbour *n;
623 uint32_t bytes_msg;
624 uint32_t bytes_physical;
625 uint16_t success = ntohs (okm->success);
626
627 bytes_msg = ntohs (okm->bytes_msg);
628 bytes_physical = ntohl (okm->bytes_physical);
629 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 "Receiving SEND_OK message, transmission to %s %s.\n",
631 GNUNET_i2s (&okm->peer),
632 success == GNUNET_OK ? "succeeded" : "failed");
633 n = neighbour_find (h, &okm->peer);
634 if (NULL == n)
635 {
636 /* We should never get a 'SEND_OK' for a peer that we are not
637 connected to */
638 GNUNET_break (0);
639 disconnect_and_schedule_reconnect (h);
640 return;
641 }
642 if (bytes_physical > bytes_msg)
643 {
644 LOG (GNUNET_ERROR_TYPE_DEBUG,
645 "Overhead for %u byte message was %u\n",
646 bytes_msg,
647 bytes_physical - bytes_msg);
648 n->traffic_overhead += bytes_physical - bytes_msg;
649 }
650}
651
652
653/**
654 * Function we use for checking incoming "inbound" messages.
655 *
656 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
657 * @param im message received
658 */
659static int
660check_recv (void *cls, const struct InboundMessage *im)
661{
662 const struct GNUNET_MessageHeader *imm;
663 uint16_t size;
664
665 size = ntohs (im->header.size) - sizeof(*im);
666 if (size < sizeof(struct GNUNET_MessageHeader))
667 {
668 GNUNET_break (0);
669 return GNUNET_SYSERR;
670 }
671 imm = (const struct GNUNET_MessageHeader *) &im[1];
672 if (ntohs (imm->size) != size)
673 {
674 GNUNET_break (0);
675 return GNUNET_SYSERR;
676 }
677 return GNUNET_OK;
678}
679
680
681/**
682 * Function we use for handling incoming messages.
683 *
684 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
685 * @param im message received
686 */
687static void
688handle_recv (void *cls, const struct InboundMessage *im)
689{
690 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
691 const struct GNUNET_MessageHeader *imm =
692 (const struct GNUNET_MessageHeader *) &im[1];
693 struct Neighbour *n;
694
695 LOG (GNUNET_ERROR_TYPE_DEBUG,
696 "Received message of type %u with %u bytes from `%s'.\n",
697 (unsigned int) ntohs (imm->type),
698 (unsigned int) ntohs (imm->size),
699 GNUNET_i2s (&im->peer));
700 n = neighbour_find (h, &im->peer);
701 if (NULL == n)
702 {
703 GNUNET_break (0);
704 disconnect_and_schedule_reconnect (h);
705 return;
706 }
707 h->rom_pending++;
708 GNUNET_MQ_inject_message (n->mq, imm);
709}
710
711
712/**
713 * Function we use for handling incoming set quota messages.
714 *
715 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
716 * @param msg message received
717 */
718static void
719handle_set_quota (void *cls, const struct QuotaSetMessage *qm)
720{
721 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
722 struct Neighbour *n;
723
724 LOG (GNUNET_ERROR_TYPE_DEBUG,
725 "Receiving SET_QUOTA message for `%s' with quota %u\n",
726 GNUNET_i2s (&qm->peer),
727 ntohl (qm->quota.value__));
728 n = neighbour_find (h, &qm->peer);
729 if (NULL == n)
730 {
731 GNUNET_break (
732 0); /* FIXME: julius reports this assertion fails sometimes? */
733 disconnect_and_schedule_reconnect (h);
734 return;
735 }
736 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota);
737}
738
739
740/**
741 * Try again to connect to transport service.
742 *
743 * @param cls the handle to the transport service
744 */
745static void
746reconnect (void *cls)
747{
748 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
749 struct GNUNET_MQ_MessageHandler handlers[] =
750 { GNUNET_MQ_hd_var_size (hello,
751 GNUNET_MESSAGE_TYPE_HELLO,
752 struct GNUNET_MessageHeader,
753 h),
754 GNUNET_MQ_hd_fixed_size (connect,
755 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
756 struct ConnectInfoMessage,
757 h),
758 GNUNET_MQ_hd_fixed_size (disconnect,
759 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
760 struct DisconnectInfoMessage,
761 h),
762 GNUNET_MQ_hd_fixed_size (send_ok,
763 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
764 struct SendOkMessage,
765 h),
766 GNUNET_MQ_hd_var_size (recv,
767 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
768 struct InboundMessage,
769 h),
770 GNUNET_MQ_hd_fixed_size (set_quota,
771 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
772 struct QuotaSetMessage,
773 h),
774 GNUNET_MQ_handler_end () };
775 struct GNUNET_MQ_Envelope *env;
776 struct StartMessage *s;
777 uint32_t options;
778
779 h->reconnect_task = NULL;
780 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
781 GNUNET_assert (NULL == h->mq);
782 h->mq =
783 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
784 if (NULL == h->mq)
785 return;
786 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
787 options = 0;
788 if (h->check_self)
789 options |= 1;
790 if (NULL != h->handlers)
791 options |= 2;
792 s->options = htonl (options);
793 s->self = h->self;
794 GNUNET_MQ_send (h->mq, env);
795}
796
797
798/**
799 * Function that will schedule the job that will try
800 * to connect us again to the client.
801 *
802 * @param h transport service to reconnect
803 */
804static void
805disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
806{
807 GNUNET_assert (NULL == h->reconnect_task);
808 /* Forget about all neighbours that we used to be connected to */
809 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
810 if (NULL != h->mq)
811 {
812 GNUNET_MQ_destroy (h->mq);
813 h->mq = NULL;
814 }
815 LOG (GNUNET_ERROR_TYPE_DEBUG,
816 "Scheduling task to reconnect to transport service in %s.\n",
817 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
818 h->reconnect_task =
819 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
820 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
821}
822
823
824/**
825 * Checks if a given peer is connected to us and get the message queue.
826 *
827 * @param handle connection to transport service
828 * @param peer the peer to check
829 * @return NULL if disconnected, otherwise message queue for @a peer
830 */
831struct GNUNET_MQ_Handle *
832GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
833 const struct GNUNET_PeerIdentity *peer)
834{
835 struct Neighbour *n;
836
837 n = neighbour_find (handle, peer);
838 if (NULL == n)
839 return NULL;
840 return n->mq;
841}
842
843
844/**
845 * Connect to the transport service. Note that the connection may
846 * complete (or fail) asynchronously.
847 *
848 * @param cfg configuration to use
849 * @param self our own identity (API should check that it matches
850 * the identity found by transport), or NULL (no check)
851 * @param cls closure for the callbacks
852 * @param rec receive function to call
853 * @param nc function to call on connect events
854 * @param nd function to call on disconnect events
855 * @param neb function to call if we have excess bandwidth to a peer
856 * @return NULL on error
857 */
858struct GNUNET_TRANSPORT_CoreHandle *
859GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
860 const struct GNUNET_PeerIdentity *self,
861 const struct GNUNET_MQ_MessageHandler *handlers,
862 void *cls,
863 GNUNET_TRANSPORT_NotifyConnect nc,
864 GNUNET_TRANSPORT_NotifyDisconnect nd,
865 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
866{
867 struct GNUNET_TRANSPORT_CoreHandle *h;
868 unsigned int i;
869
870 h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
871 if (NULL != self)
872 {
873 h->self = *self;
874 h->check_self = GNUNET_YES;
875 }
876 h->cfg = cfg;
877 h->cls = cls;
878 h->nc_cb = nc;
879 h->nd_cb = nd;
880 h->neb_cb = neb;
881 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
882 if (NULL != handlers)
883 {
884 for (i = 0; NULL != handlers[i].cb; i++)
885 ;
886 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
887 GNUNET_memcpy (h->handlers,
888 handlers,
889 i * sizeof(struct GNUNET_MQ_MessageHandler));
890 }
891 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
892 reconnect (h);
893 if (NULL == h->mq)
894 {
895 GNUNET_free (h->handlers);
896 GNUNET_free (h);
897 return NULL;
898 }
899 h->neighbours =
900 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
901 return h;
902}
903
904
905/**
906 * Disconnect from the transport service.
907 *
908 * @param handle handle to the service as returned from
909 * #GNUNET_TRANSPORT_core_connect()
910 */
911void
912GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
913{
914 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
915 /* this disconnects all neighbours... */
916 if (NULL == handle->reconnect_task)
917 disconnect_and_schedule_reconnect (handle);
918 /* and now we stop trying to connect again... */
919 if (NULL != handle->reconnect_task)
920 {
921 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
922 handle->reconnect_task = NULL;
923 }
924 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
925 handle->neighbours = NULL;
926 GNUNET_free (handle->handlers);
927 handle->handlers = NULL;
928 GNUNET_free (handle);
929}
930
931
932/**
933 * Notification from the CORE service to the TRANSPORT service
934 * that the CORE service has finished processing a message from
935 * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
936 * and that it is thus now OK for TRANSPORT to send more messages
937 * for @a pid.
938 *
939 * Used to provide flow control, this is our equivalent to
940 * #GNUNET_SERVICE_client_continue() of an ordinary service.
941 *
942 * Note that due to the use of a window, TRANSPORT may send multiple
943 * messages destined for the same peer even without an intermediate
944 * call to this function. However, CORE must still call this function
945 * once per message received, as otherwise eventually the window will
946 * be full and TRANSPORT will stop providing messages to CORE for @a
947 * pid.
948 *
949 * @param ch core handle
950 * @param pid which peer was the message from that was fully processed by CORE
951 */
952void
953GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
954 const struct GNUNET_PeerIdentity *pid)
955{
956 struct RecvOkMessage *rom;
957 struct GNUNET_MQ_Envelope *env;
958
959 GNUNET_assert (ch->rom_pending > 0);
960 ch->rom_pending--;
961 env = GNUNET_MQ_msg (rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
962 rom->increase_window_delta = htonl (1);
963 rom->peer = *pid;
964 GNUNET_MQ_send (ch->mq, env);
965}
966
967
968/* end of transport_api_core.c */