diff options
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/transport_api_core.c | 940 |
1 files changed, 940 insertions, 0 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c new file mode 100644 index 000000000..3150e6deb --- /dev/null +++ b/src/transport/transport_api_core.c | |||
@@ -0,0 +1,940 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2009-2013, 2016 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, | ||
18 | Boston, MA 02110-1301, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file transport/transport_api_core.c | ||
23 | * @brief library to access the transport service for message exchange | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "gnunet_constants.h" | ||
29 | #include "gnunet_arm_service.h" | ||
30 | #include "gnunet_hello_lib.h" | ||
31 | #include "gnunet_protocols.h" | ||
32 | #include "gnunet_transport_core_service.h" | ||
33 | #include "transport.h" | ||
34 | |||
35 | #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) | ||
36 | |||
37 | /** | ||
38 | * If we could not send any payload to a peer for this amount of | ||
39 | * time, we print a warning. | ||
40 | */ | ||
41 | #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES | ||
42 | |||
43 | /** | ||
44 | * How large to start with for the hashmap of neighbours. | ||
45 | */ | ||
46 | #define STARTING_NEIGHBOURS_SIZE 16 | ||
47 | |||
48 | |||
49 | /** | ||
50 | * Entry in hash table of all of our current (connected) neighbours. | ||
51 | */ | ||
52 | struct Neighbour | ||
53 | { | ||
54 | /** | ||
55 | * Overall transport handle. | ||
56 | */ | ||
57 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
58 | |||
59 | /** | ||
60 | * Active message queue for the peer. | ||
61 | */ | ||
62 | struct GNUNET_MQ_Handle *mq; | ||
63 | |||
64 | /** | ||
65 | * Envelope with the message we are currently transmitting (or NULL). | ||
66 | */ | ||
67 | struct GNUNET_MQ_Envelope *env; | ||
68 | |||
69 | /** | ||
70 | * Closure for @e mq handlers. | ||
71 | */ | ||
72 | void *handlers_cls; | ||
73 | |||
74 | /** | ||
75 | * Identity of this neighbour. | ||
76 | */ | ||
77 | struct GNUNET_PeerIdentity id; | ||
78 | |||
79 | /** | ||
80 | * Outbound bandwidh tracker. | ||
81 | */ | ||
82 | struct GNUNET_BANDWIDTH_Tracker out_tracker; | ||
83 | |||
84 | /** | ||
85 | * Entry in our readyness heap (which is sorted by @e next_ready | ||
86 | * value). NULL if there is no pending transmission request for | ||
87 | * this neighbour or if we're waiting for @e is_ready to become | ||
88 | * true AFTER the @e out_tracker suggested that this peer's quota | ||
89 | * has been satisfied (so once @e is_ready goes to #GNUNET_YES, | ||
90 | * we should immediately go back into the heap). | ||
91 | */ | ||
92 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
93 | |||
94 | /** | ||
95 | * Task to trigger MQ when we have enough bandwidth for the | ||
96 | * next transmission. | ||
97 | */ | ||
98 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
99 | |||
100 | /** | ||
101 | * Sending consumed more bytes on wire than payload was announced | ||
102 | * This overhead is added to the delay of next sending operation | ||
103 | */ | ||
104 | unsigned long long traffic_overhead; | ||
105 | |||
106 | /** | ||
107 | * Is this peer currently ready to receive a message? | ||
108 | */ | ||
109 | int is_ready; | ||
110 | |||
111 | /** | ||
112 | * Size of the message in @e env. | ||
113 | */ | ||
114 | uint16_t env_size; | ||
115 | |||
116 | }; | ||
117 | |||
118 | |||
119 | |||
120 | /** | ||
121 | * Handle for the transport service (includes all of the | ||
122 | * state for the transport service). | ||
123 | */ | ||
124 | struct GNUNET_TRANSPORT_Handle | ||
125 | { | ||
126 | |||
127 | /** | ||
128 | * Closure for the callbacks. | ||
129 | */ | ||
130 | void *cls; | ||
131 | |||
132 | /** | ||
133 | * Functions to call for received data (template for | ||
134 | * new message queues). | ||
135 | */ | ||
136 | struct GNUNET_MQ_MessageHandler *handlers; | ||
137 | |||
138 | /** | ||
139 | * function to call on connect events | ||
140 | */ | ||
141 | GNUNET_TRANSPORT_NotifyConnecT nc_cb; | ||
142 | |||
143 | /** | ||
144 | * function to call on disconnect events | ||
145 | */ | ||
146 | GNUNET_TRANSPORT_NotifyDisconnecT nd_cb; | ||
147 | |||
148 | /** | ||
149 | * function to call on excess bandwidth events | ||
150 | */ | ||
151 | GNUNET_TRANSPORT_NotifyExcessBandwidtH neb_cb; | ||
152 | |||
153 | /** | ||
154 | * My client connection to the transport service. | ||
155 | */ | ||
156 | struct GNUNET_MQ_Handle *mq; | ||
157 | |||
158 | /** | ||
159 | * My configuration. | ||
160 | */ | ||
161 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
162 | |||
163 | /** | ||
164 | * Hash map of the current connected neighbours of this peer. | ||
165 | * Maps peer identities to `struct Neighbour` entries. | ||
166 | */ | ||
167 | struct GNUNET_CONTAINER_MultiPeerMap *neighbours; | ||
168 | |||
169 | /** | ||
170 | * Peer identity as assumed by this process, or all zeros. | ||
171 | */ | ||
172 | struct GNUNET_PeerIdentity self; | ||
173 | |||
174 | /** | ||
175 | * ID of the task trying to reconnect to the service. | ||
176 | */ | ||
177 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
178 | |||
179 | /** | ||
180 | * Delay until we try to reconnect. | ||
181 | */ | ||
182 | struct GNUNET_TIME_Relative reconnect_delay; | ||
183 | |||
184 | /** | ||
185 | * Should we check that @e self matches what the service thinks? | ||
186 | * (if #GNUNET_NO, then @e self is all zeros!). | ||
187 | */ | ||
188 | int check_self; | ||
189 | |||
190 | }; | ||
191 | |||
192 | |||
193 | /** | ||
194 | * Schedule the task to send one message, either from the control | ||
195 | * list or the peer message queues to the service. | ||
196 | * | ||
197 | * @param h transport service to schedule a transmission for | ||
198 | */ | ||
199 | static void | ||
200 | schedule_transmission (struct GNUNET_TRANSPORT_Handle *h); | ||
201 | |||
202 | |||
203 | /** | ||
204 | * Function that will schedule the job that will try | ||
205 | * to connect us again to the client. | ||
206 | * | ||
207 | * @param h transport service to reconnect | ||
208 | */ | ||
209 | static void | ||
210 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h); | ||
211 | |||
212 | |||
213 | /** | ||
214 | * Get the neighbour list entry for the given peer | ||
215 | * | ||
216 | * @param h our context | ||
217 | * @param peer peer to look up | ||
218 | * @return NULL if no such peer entry exists | ||
219 | */ | ||
220 | static struct Neighbour * | ||
221 | neighbour_find (struct GNUNET_TRANSPORT_Handle *h, | ||
222 | const struct GNUNET_PeerIdentity *peer) | ||
223 | { | ||
224 | return GNUNET_CONTAINER_multipeermap_get (h->neighbours, | ||
225 | peer); | ||
226 | } | ||
227 | |||
228 | |||
229 | /** | ||
230 | * Function called by the bandwidth tracker if we have excess | ||
231 | * bandwidth. | ||
232 | * | ||
233 | * @param cls the `struct Neighbour` that has excess bandwidth | ||
234 | */ | ||
235 | static void | ||
236 | notify_excess_cb (void *cls) | ||
237 | { | ||
238 | struct Neighbour *n = cls; | ||
239 | struct GNUNET_TRANSPORT_Handle *h = n->h; | ||
240 | |||
241 | if (NULL != h->neb_cb) | ||
242 | h->neb_cb (h->cls, | ||
243 | &n->id); | ||
244 | } | ||
245 | |||
246 | |||
247 | /** | ||
248 | * Iterator over hash map entries, for deleting state of a neighbour. | ||
249 | * | ||
250 | * @param cls the `struct GNUNET_TRANSPORT_Handle *` | ||
251 | * @param key peer identity | ||
252 | * @param value value in the hash map, the neighbour entry to delete | ||
253 | * @return #GNUNET_YES if we should continue to | ||
254 | * iterate, | ||
255 | * #GNUNET_NO if not. | ||
256 | */ | ||
257 | static int | ||
258 | neighbour_delete (void *cls, | ||
259 | const struct GNUNET_PeerIdentity *key, | ||
260 | void *value) | ||
261 | { | ||
262 | struct GNUNET_TRANSPORT_Handle *handle = cls; | ||
263 | struct Neighbour *n = value; | ||
264 | |||
265 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
266 | "Dropping entry for neighbour `%s'.\n", | ||
267 | GNUNET_i2s (key)); | ||
268 | GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); | ||
269 | if (NULL != handle->nd_cb) | ||
270 | handle->nd_cb (handle->cls, | ||
271 | &n->id, | ||
272 | n->handlers_cls); | ||
273 | if (NULL != n->timeout_task) | ||
274 | { | ||
275 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
276 | n->timeout_task = NULL; | ||
277 | } | ||
278 | GNUNET_MQ_destroy (n->mq); | ||
279 | if (NULL != n->env) | ||
280 | { | ||
281 | GNUNET_MQ_send_cancel (n->env); | ||
282 | n->env = NULL; | ||
283 | } | ||
284 | GNUNET_assert (NULL == n->mq); | ||
285 | GNUNET_assert (GNUNET_YES == | ||
286 | GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, | ||
287 | key, | ||
288 | n)); | ||
289 | GNUNET_free (n); | ||
290 | return GNUNET_YES; | ||
291 | } | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Generic error handler, called with the appropriate | ||
296 | * error code and the same closure specified at the creation of | ||
297 | * the message queue. | ||
298 | * Not every message queue implementation supports an error handler. | ||
299 | * | ||
300 | * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *` | ||
301 | * @param error error code | ||
302 | */ | ||
303 | static void | ||
304 | mq_error_handler (void *cls, | ||
305 | enum GNUNET_MQ_Error error) | ||
306 | { | ||
307 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
308 | |||
309 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
310 | "Error receiving from transport service, disconnecting temporarily.\n"); | ||
311 | disconnect_and_schedule_reconnect (h); | ||
312 | } | ||
313 | |||
314 | |||
315 | /** | ||
316 | * Function we use for checking incoming HELLO messages. | ||
317 | * | ||
318 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
319 | * @param msg message received | ||
320 | * @return #GNUNET_OK if message is well-formed | ||
321 | */ | ||
322 | static int | ||
323 | check_hello (void *cls, | ||
324 | const struct GNUNET_MessageHeader *msg) | ||
325 | { | ||
326 | struct GNUNET_PeerIdentity me; | ||
327 | |||
328 | if (GNUNET_OK != | ||
329 | GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, | ||
330 | &me)) | ||
331 | { | ||
332 | GNUNET_break (0); | ||
333 | return GNUNET_SYSERR; | ||
334 | } | ||
335 | return GNUNET_OK; | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Function we use for handling incoming HELLO messages. | ||
341 | * | ||
342 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
343 | * @param msg message received | ||
344 | */ | ||
345 | static void | ||
346 | handle_hello (void *cls, | ||
347 | const struct GNUNET_MessageHeader *msg) | ||
348 | { | ||
349 | /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ | ||
350 | } | ||
351 | |||
352 | |||
353 | /** | ||
354 | * A message from the handler's message queue to a neighbour was | ||
355 | * transmitted. Now trigger (possibly delayed) notification of the | ||
356 | * neighbour's message queue that we are done and thus ready for | ||
357 | * the next message. | ||
358 | * | ||
359 | * @param cls the `struct Neighbour` where the message was sent | ||
360 | */ | ||
361 | static void | ||
362 | notify_send_done (void *cls) | ||
363 | { | ||
364 | struct Neighbour *n = cls; | ||
365 | struct GNUNET_TIME_Relative delay; | ||
366 | |||
367 | n->timeout_task = NULL; | ||
368 | if (NULL != env) | ||
369 | { | ||
370 | GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, | ||
371 | n->env_size + n->traffic_overhead); | ||
372 | n->traffic_overhead = 0; | ||
373 | n->env = NULL; | ||
374 | } | ||
375 | delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, | ||
376 | 128); | ||
377 | if (0 == delay.rel_value_us) | ||
378 | { | ||
379 | n->is_ready = GNUNET_YES; | ||
380 | GNUNET_MQ_impl_send_continue (mq); | ||
381 | return; | ||
382 | } | ||
383 | /* cannot send even a small message without violating | ||
384 | quota, wait a before notifying MQ */ | ||
385 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (¬ify_send_done, | ||
386 | n); | ||
387 | } | ||
388 | |||
389 | |||
390 | /** | ||
391 | * Implement sending functionality of a message queue. | ||
392 | * Called one message at a time. Should send the @a msg | ||
393 | * to the transport service and then notify the queue | ||
394 | * once we are ready for the next one. | ||
395 | * | ||
396 | * @param mq the message queue | ||
397 | * @param msg the message to send | ||
398 | * @param impl_state state of the implementation | ||
399 | */ | ||
400 | static void | ||
401 | mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
402 | const struct GNUNET_MessageHeader *msg, | ||
403 | void *impl_state) | ||
404 | { | ||
405 | struct Neighbour *n = impl_state; | ||
406 | struct GNUNET_TRANSPORT_CoreHandle *h = n->h; | ||
407 | struct OutboundMessage *obm; | ||
408 | struct GNUNET_MQ_Envelope *env; | ||
409 | uint16_t msize; | ||
410 | |||
411 | GNUNET_assert (GNUNET_YES == n->is_ready); | ||
412 | msize = ntohs (msg->size); | ||
413 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*obm)) | ||
414 | { | ||
415 | GNUNET_break (0); | ||
416 | GNUNET_MQ_impl_send_continue (mq); | ||
417 | return; | ||
418 | } | ||
419 | n->env = GNUNET_MQ_msg_nested_mh (obm, | ||
420 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, | ||
421 | msg); | ||
422 | obm->reserved = htonl (0); | ||
423 | obm->timeout = | ||
424 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining | ||
425 | (th->timeout)); | ||
426 | obm->peer = n->id; | ||
427 | GNUNET_assert (NULL == n->timeout_task); | ||
428 | n->is_ready = GNUNET_NO; | ||
429 | n->env_size = ntohs (msg->size); | ||
430 | GNUNET_MQ_notify_sent (env, | ||
431 | ¬ify_send_done, | ||
432 | n); | ||
433 | GNUNET_MQ_send (h->mq, | ||
434 | env); | ||
435 | } | ||
436 | |||
437 | |||
438 | /** | ||
439 | * Handle destruction of a message queue. Implementations must not | ||
440 | * free @a mq, but should take care of @a impl_state. | ||
441 | * | ||
442 | * @param mq the message queue to destroy | ||
443 | * @param impl_state state of the implementation | ||
444 | */ | ||
445 | static void | ||
446 | mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
447 | void *impl_state) | ||
448 | { | ||
449 | struct Neighbour *n = impl_state; | ||
450 | |||
451 | GNUNET_assert (mq == n->mq); | ||
452 | n->mq = NULL; | ||
453 | } | ||
454 | |||
455 | |||
456 | /** | ||
457 | * Implementation function that cancels the currently sent message. | ||
458 | * Should basically undo whatever #mq_send_impl() did. | ||
459 | * | ||
460 | * @param mq message queue | ||
461 | * @param impl_state state specific to the implementation | ||
462 | */ | ||
463 | static void | ||
464 | mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
465 | void *impl_state) | ||
466 | { | ||
467 | struct Neighbour *n = impl_state; | ||
468 | |||
469 | GNUNET_assert (GNUNET_NO == n->is_ready); | ||
470 | if (NULL != n->env) | ||
471 | { | ||
472 | GNUNET_MQ_send_cancel (n->env); | ||
473 | n->env = NULL; | ||
474 | } | ||
475 | |||
476 | n->is_ready = GNUNET_YES; | ||
477 | } | ||
478 | |||
479 | |||
480 | /** | ||
481 | * We had an error processing a message we forwarded from a peer to | ||
482 | * the CORE service. We should just complain about it but otherwise | ||
483 | * continue processing. | ||
484 | * | ||
485 | * @param cls closure | ||
486 | * @param error error code | ||
487 | */ | ||
488 | static void | ||
489 | peer_mq_error_handler (void *cls, | ||
490 | enum GNUNET_MQ_Error error) | ||
491 | { | ||
492 | struct Neighbour *n = cls; | ||
493 | |||
494 | GNUNET_break_op (0); | ||
495 | } | ||
496 | |||
497 | |||
498 | /** | ||
499 | * Function we use for handling incoming connect messages. | ||
500 | * | ||
501 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
502 | * @param cim message received | ||
503 | */ | ||
504 | static void | ||
505 | handle_connect (void *cls, | ||
506 | const struct ConnectInfoMessage *cim) | ||
507 | { | ||
508 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
509 | struct Neighbour *n; | ||
510 | |||
511 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
512 | "Receiving CONNECT message for `%s' with quota %u\n", | ||
513 | GNUNET_i2s (&cim->id), | ||
514 | ntohl (cim->quota_out.value__)); | ||
515 | n = neighbour_find (h, &cim->id); | ||
516 | if (NULL != n) | ||
517 | { | ||
518 | GNUNET_break (0); | ||
519 | disconnect_and_schedule_reconnect (h); | ||
520 | return; | ||
521 | } | ||
522 | n = GNUNET_new (struct Neighbour); | ||
523 | n->id = cim->id; | ||
524 | n->h = h; | ||
525 | n->is_ready = GNUNET_YES; | ||
526 | n->traffic_overhead = 0; | ||
527 | GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker, | ||
528 | &outbound_bw_tracker_update, | ||
529 | n, | ||
530 | GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | ||
531 | MAX_BANDWIDTH_CARRY_S, | ||
532 | ¬ify_excess_cb, | ||
533 | n); | ||
534 | GNUNET_assert (GNUNET_OK == | ||
535 | GNUNET_CONTAINER_multipeermap_put (h->neighbours, | ||
536 | &n->id, | ||
537 | n, | ||
538 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
539 | |||
540 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
541 | cim->quota_out); | ||
542 | n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, | ||
543 | &mq_destroy_impl, | ||
544 | &mq_cancel_impl, | ||
545 | n, | ||
546 | h->handlers, | ||
547 | &peer_mq_error_handler, | ||
548 | n); | ||
549 | if (NULL != h->nc_cb) | ||
550 | { | ||
551 | n->handlers_cls = h->nc_cb (h->cls, | ||
552 | &n->id, | ||
553 | n->mq); | ||
554 | GNUNET_MQ_set_handlers_closure (n->mq, | ||
555 | n->handlers_cls); | ||
556 | } | ||
557 | } | ||
558 | |||
559 | |||
560 | /** | ||
561 | * Function we use for handling incoming disconnect messages. | ||
562 | * | ||
563 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
564 | * @param dim message received | ||
565 | */ | ||
566 | static void | ||
567 | handle_disconnect (void *cls, | ||
568 | const struct DisconnectInfoMessage *dim) | ||
569 | { | ||
570 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
571 | struct Neighbour *n; | ||
572 | |||
573 | GNUNET_break (ntohl (dim->reserved) == 0); | ||
574 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
575 | "Receiving DISCONNECT message for `%s'.\n", | ||
576 | GNUNET_i2s (&dim->peer)); | ||
577 | n = neighbour_find (h, &dim->peer); | ||
578 | if (NULL == n) | ||
579 | { | ||
580 | GNUNET_break (0); | ||
581 | disconnect_and_schedule_reconnect (h); | ||
582 | return; | ||
583 | } | ||
584 | GNUNET_assert (GNUNET_YES == | ||
585 | neighbour_delete (h, | ||
586 | &dim->peer, | ||
587 | n)); | ||
588 | } | ||
589 | |||
590 | |||
591 | /** | ||
592 | * Function we use for handling incoming send-ok messages. | ||
593 | * | ||
594 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
595 | * @param okm message received | ||
596 | */ | ||
597 | static void | ||
598 | handle_send_ok (void *cls, | ||
599 | const struct SendOkMessage *okm) | ||
600 | { | ||
601 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
602 | struct Neighbour *n; | ||
603 | uint32_t bytes_msg; | ||
604 | uint32_t bytes_physical; | ||
605 | |||
606 | bytes_msg = ntohl (okm->bytes_msg); | ||
607 | bytes_physical = ntohl (okm->bytes_physical); | ||
608 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
609 | "Receiving SEND_OK message, transmission to %s %s.\n", | ||
610 | GNUNET_i2s (&okm->peer), | ||
611 | ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); | ||
612 | n = neighbour_find (h, | ||
613 | &okm->peer); | ||
614 | if (NULL == n) | ||
615 | { | ||
616 | /* We should never get a 'SEND_OK' for a peer that we are not | ||
617 | connected to */ | ||
618 | GNUNET_break (0); | ||
619 | disconnect_and_schedule_reconnect (h); | ||
620 | return; | ||
621 | } | ||
622 | if (bytes_physical > bytes_msg) | ||
623 | { | ||
624 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
625 | "Overhead for %u byte message was %u\n", | ||
626 | bytes_msg, | ||
627 | bytes_physical - bytes_msg); | ||
628 | n->traffic_overhead += bytes_physical - bytes_msg; | ||
629 | } | ||
630 | } | ||
631 | |||
632 | |||
633 | /** | ||
634 | * Function we use for checking incoming "inbound" messages. | ||
635 | * | ||
636 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
637 | * @param im message received | ||
638 | */ | ||
639 | static int | ||
640 | check_recv (void *cls, | ||
641 | const struct InboundMessage *im) | ||
642 | { | ||
643 | const struct GNUNET_MessageHeader *imm; | ||
644 | uint16_t size; | ||
645 | |||
646 | size = ntohs (im->header.size); | ||
647 | if (size < | ||
648 | sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)) | ||
649 | { | ||
650 | GNUNET_break (0); | ||
651 | return GNUNET_SYSERR; | ||
652 | } | ||
653 | imm = (const struct GNUNET_MessageHeader *) &im[1]; | ||
654 | if (ntohs (imm->size) + sizeof (struct InboundMessage) != size) | ||
655 | { | ||
656 | GNUNET_break (0); | ||
657 | return GNUNET_SYSERR; | ||
658 | } | ||
659 | return GNUNET_OK; | ||
660 | } | ||
661 | |||
662 | |||
663 | /** | ||
664 | * Function we use for handling incoming messages. | ||
665 | * | ||
666 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
667 | * @param im message received | ||
668 | */ | ||
669 | static void | ||
670 | handle_recv (void *cls, | ||
671 | const struct InboundMessage *im) | ||
672 | { | ||
673 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
674 | const struct GNUNET_MessageHeader *imm | ||
675 | = (const struct GNUNET_MessageHeader *) &im[1]; | ||
676 | struct Neighbour *n; | ||
677 | |||
678 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
679 | "Received message of type %u with %u bytes from `%s'.\n", | ||
680 | (unsigned int) ntohs (imm->type), | ||
681 | (unsigned int) ntohs (imm->size), | ||
682 | GNUNET_i2s (&im->peer)); | ||
683 | n = neighbour_find (h, &im->peer); | ||
684 | if (NULL == n) | ||
685 | { | ||
686 | GNUNET_break (0); | ||
687 | disconnect_and_schedule_reconnect (h); | ||
688 | return; | ||
689 | } | ||
690 | GNUNET_MQ_inject_message (n->mq, | ||
691 | imm); | ||
692 | } | ||
693 | |||
694 | |||
695 | /** | ||
696 | * Function we use for handling incoming set quota messages. | ||
697 | * | ||
698 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
699 | * @param msg message received | ||
700 | */ | ||
701 | static void | ||
702 | handle_set_quota (void *cls, | ||
703 | const struct QuotaSetMessage *qm) | ||
704 | { | ||
705 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
706 | struct Neighbour *n; | ||
707 | |||
708 | n = neighbour_find (h, &qm->peer); | ||
709 | if (NULL == n) | ||
710 | { | ||
711 | GNUNET_break (0); | ||
712 | disconnect_and_schedule_reconnect (h); | ||
713 | return; | ||
714 | } | ||
715 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
716 | "Receiving SET_QUOTA message for `%s' with quota %u\n", | ||
717 | GNUNET_i2s (&qm->peer), | ||
718 | ntohl (qm->quota.value__)); | ||
719 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
720 | qm->quota); | ||
721 | } | ||
722 | |||
723 | |||
724 | /** | ||
725 | * Try again to connect to transport service. | ||
726 | * | ||
727 | * @param cls the handle to the transport service | ||
728 | */ | ||
729 | static void | ||
730 | reconnect (void *cls) | ||
731 | { | ||
732 | GNUNET_MQ_hd_var_size (hello, | ||
733 | GNUNET_MESSAGE_TYPE_HELLO, | ||
734 | struct GNUNET_MessageHeader); | ||
735 | GNUNET_MQ_hd_fixed_size (connect, | ||
736 | GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, | ||
737 | struct ConnectInfoMessage); | ||
738 | GNUNET_MQ_hd_fixed_size (disconnect, | ||
739 | GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, | ||
740 | struct DisconnectInfoMessage); | ||
741 | GNUNET_MQ_hd_fixed_size (send_ok, | ||
742 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, | ||
743 | struct SendOkMessage); | ||
744 | GNUNET_MQ_hd_var_size (recv, | ||
745 | GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, | ||
746 | struct InboundMessage); | ||
747 | GNUNET_MQ_hd_fixed_size (set_quota, | ||
748 | GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, | ||
749 | struct QuotaSetMessage); | ||
750 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
751 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
752 | make_hello_handler (h), | ||
753 | make_connect_handler (h), | ||
754 | make_disconnect_handler (h), | ||
755 | make_send_ok_handler (h), | ||
756 | make_recv_handler (h), | ||
757 | make_set_quota_handler (h), | ||
758 | GNUNET_MQ_handler_end () | ||
759 | }; | ||
760 | struct GNUNET_MQ_Envelope *env; | ||
761 | struct StartMessage *s; | ||
762 | uint32_t options; | ||
763 | |||
764 | h->reconnect_task = NULL; | ||
765 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
766 | "Connecting to transport service.\n"); | ||
767 | GNUNET_assert (NULL == h->mq); | ||
768 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
769 | "transport", | ||
770 | handlers, | ||
771 | &mq_error_handler, | ||
772 | h); | ||
773 | if (NULL == h->mq) | ||
774 | return; | ||
775 | env = GNUNET_MQ_msg (s, | ||
776 | GNUNET_MESSAGE_TYPE_TRANSPORT_START); | ||
777 | options = 0; | ||
778 | if (h->check_self) | ||
779 | options |= 1; | ||
780 | if (NULL != h->rec) | ||
781 | options |= 2; | ||
782 | s->options = htonl (options); | ||
783 | s->self = h->self; | ||
784 | GNUNET_MQ_send (h->mq, | ||
785 | env); | ||
786 | } | ||
787 | |||
788 | |||
789 | /** | ||
790 | * Function that will schedule the job that will try | ||
791 | * to connect us again to the client. | ||
792 | * | ||
793 | * @param h transport service to reconnect | ||
794 | */ | ||
795 | static void | ||
796 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) | ||
797 | { | ||
798 | GNUNET_assert (NULL == h->reconnect_task); | ||
799 | if (NULL != h->mq) | ||
800 | { | ||
801 | GNUNET_MQ_destroy (h->mq); | ||
802 | h->mq = NULL; | ||
803 | } | ||
804 | /* Forget about all neighbours that we used to be connected to */ | ||
805 | GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, | ||
806 | &neighbour_delete, | ||
807 | h); | ||
808 | if (NULL != h->quota_task) | ||
809 | { | ||
810 | GNUNET_SCHEDULER_cancel (h->quota_task); | ||
811 | h->quota_task = NULL; | ||
812 | } | ||
813 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
814 | "Scheduling task to reconnect to transport service in %s.\n", | ||
815 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, | ||
816 | GNUNET_YES)); | ||
817 | h->reconnect_task = | ||
818 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
819 | &reconnect, | ||
820 | h); | ||
821 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
822 | } | ||
823 | |||
824 | |||
825 | /** | ||
826 | * Checks if a given peer is connected to us and get the message queue. | ||
827 | * | ||
828 | * @param handle connection to transport service | ||
829 | * @param peer the peer to check | ||
830 | * @return NULL if disconnected, otherwise message queue for @a peer | ||
831 | */ | ||
832 | struct GNUNET_MQ_Handle * | ||
833 | GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_Handle *handle, | ||
834 | const struct GNUNET_PeerIdentity *peer) | ||
835 | { | ||
836 | struct Neighbour *n; | ||
837 | |||
838 | n = neighbour_find (handle, | ||
839 | peer); | ||
840 | if (NULL == n) | ||
841 | return NULL; | ||
842 | return n->mq; | ||
843 | } | ||
844 | |||
845 | |||
846 | /** | ||
847 | * Connect to the transport service. Note that the connection may | ||
848 | * complete (or fail) asynchronously. | ||
849 | * | ||
850 | * @param cfg configuration to use | ||
851 | * @param self our own identity (API should check that it matches | ||
852 | * the identity found by transport), or NULL (no check) | ||
853 | * @param cls closure for the callbacks | ||
854 | * @param rec receive function to call | ||
855 | * @param nc function to call on connect events | ||
856 | * @param nd function to call on disconnect events | ||
857 | * @param neb function to call if we have excess bandwidth to a peer | ||
858 | * @return NULL on error | ||
859 | */ | ||
860 | struct GNUNET_TRANSPORT_CoreHandle * | ||
861 | GNUNET_TRANSPORT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
862 | const struct GNUNET_PeerIdentity *self, | ||
863 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
864 | void *cls, | ||
865 | GNUNET_TRANSPORT_NotifyConnect nc, | ||
866 | GNUNET_TRANSPORT_NotifyDisconnect nd, | ||
867 | GNUNET_TRANSPORT_NotifyExcessBandwidth neb) | ||
868 | { | ||
869 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
870 | unsigned int i; | ||
871 | |||
872 | h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle); | ||
873 | if (NULL != self) | ||
874 | { | ||
875 | h->self = *self; | ||
876 | h->check_self = GNUNET_YES; | ||
877 | } | ||
878 | h->cfg = cfg; | ||
879 | h->cls = cls; | ||
880 | h->nc_cb = nc; | ||
881 | h->nd_cb = nd; | ||
882 | h->neb_cb = neb; | ||
883 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
884 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
885 | "Connecting to transport service.\n"); | ||
886 | reconnect (h); | ||
887 | if (NULL == h->mq) | ||
888 | { | ||
889 | GNUNET_free (h); | ||
890 | return NULL; | ||
891 | } | ||
892 | if (NULL != handlers) | ||
893 | { | ||
894 | for (i=0;NULL != handlers[i].cb; i++) ; | ||
895 | h->handlers = GNUNET_new_array (i + 1, | ||
896 | struct GNUNET_MQ_MessageHandler); | ||
897 | memcpy (h->handlers, | ||
898 | handlers, | ||
899 | i * sizeof (struct GNUNET_MQ_MessageHandler)); | ||
900 | } | ||
901 | h->neighbours = | ||
902 | GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, | ||
903 | GNUNET_YES); | ||
904 | return h; | ||
905 | } | ||
906 | |||
907 | |||
908 | /** | ||
909 | * Disconnect from the transport service. | ||
910 | * | ||
911 | * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect() | ||
912 | */ | ||
913 | void | ||
914 | GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) | ||
915 | { | ||
916 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
917 | "Transport disconnect called!\n"); | ||
918 | /* this disconnects all neighbours... */ | ||
919 | if (NULL == handle->reconnect_task) | ||
920 | disconnect_and_schedule_reconnect (handle); | ||
921 | /* and now we stop trying to connect again... */ | ||
922 | if (NULL != handle->reconnect_task) | ||
923 | { | ||
924 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
925 | handle->reconnect_task = NULL; | ||
926 | } | ||
927 | GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours); | ||
928 | handle->neighbours = NULL; | ||
929 | if (NULL != handle->quota_task) | ||
930 | { | ||
931 | GNUNET_SCHEDULER_cancel (handle->quota_task); | ||
932 | handle->quota_task = NULL; | ||
933 | } | ||
934 | GNUNET_free_non_null (handle->handlers); | ||
935 | handle->handlers = NULL; | ||
936 | GNUNET_free (handle); | ||
937 | } | ||
938 | |||
939 | |||
940 | /* end of transport_api_core.c */ | ||