diff options
author | Christian Grothoff <christian@grothoff.org> | 2018-11-15 09:27:35 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2018-11-15 09:27:35 +0100 |
commit | ad1244d958b76cb249028c7ad87ff84df49293ff (patch) | |
tree | 3b5c5aec34804ffd6cd0fab2f17bfad4b1651489 /src/transport/transport_api2_core.c | |
parent | 75c351e315e2e8578ed7eaec4323d66d3db901d8 (diff) | |
download | gnunet-ad1244d958b76cb249028c7ad87ff84df49293ff.tar.gz gnunet-ad1244d958b76cb249028c7ad87ff84df49293ff.zip |
make copy of transport_api_core.c
Diffstat (limited to 'src/transport/transport_api2_core.c')
-rw-r--r-- | src/transport/transport_api2_core.c | 970 |
1 files changed, 970 insertions, 0 deletions
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c new file mode 100644 index 000000000..b7edc3cc1 --- /dev/null +++ b/src/transport/transport_api2_core.c | |||
@@ -0,0 +1,970 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2009-2013, 2016 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | */ | ||
18 | |||
19 | /** | ||
20 | * @file transport/transport_api_core.c | ||
21 | * @brief library to access the transport service for message exchange | ||
22 | * @author Christian Grothoff | ||
23 | */ | ||
24 | #include "platform.h" | ||
25 | #include "gnunet_util_lib.h" | ||
26 | #include "gnunet_constants.h" | ||
27 | #include "gnunet_arm_service.h" | ||
28 | #include "gnunet_hello_lib.h" | ||
29 | #include "gnunet_protocols.h" | ||
30 | #include "gnunet_transport_core_service.h" | ||
31 | #include "transport.h" | ||
32 | |||
33 | #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) | ||
34 | |||
35 | /** | ||
36 | * If we could not send any payload to a peer for this amount of | ||
37 | * time, we print a warning. | ||
38 | */ | ||
39 | #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES | ||
40 | |||
41 | /** | ||
42 | * How large to start with for the hashmap of neighbours. | ||
43 | */ | ||
44 | #define STARTING_NEIGHBOURS_SIZE 16 | ||
45 | |||
46 | |||
47 | /** | ||
48 | * Entry in hash table of all of our current (connected) neighbours. | ||
49 | */ | ||
50 | struct Neighbour | ||
51 | { | ||
52 | /** | ||
53 | * Overall transport handle. | ||
54 | */ | ||
55 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
56 | |||
57 | /** | ||
58 | * Active message queue for the peer. | ||
59 | */ | ||
60 | struct GNUNET_MQ_Handle *mq; | ||
61 | |||
62 | /** | ||
63 | * Envelope with the message we are currently transmitting (or NULL). | ||
64 | */ | ||
65 | struct GNUNET_MQ_Envelope *env; | ||
66 | |||
67 | /** | ||
68 | * Closure for @e mq handlers. | ||
69 | */ | ||
70 | void *handlers_cls; | ||
71 | |||
72 | /** | ||
73 | * Identity of this neighbour. | ||
74 | */ | ||
75 | struct GNUNET_PeerIdentity id; | ||
76 | |||
77 | /** | ||
78 | * Outbound bandwidh tracker. | ||
79 | */ | ||
80 | struct GNUNET_BANDWIDTH_Tracker out_tracker; | ||
81 | |||
82 | /** | ||
83 | * Entry in our readyness heap (which is sorted by @e next_ready | ||
84 | * value). NULL if there is no pending transmission request for | ||
85 | * this neighbour or if we're waiting for @e is_ready to become | ||
86 | * true AFTER the @e out_tracker suggested that this peer's quota | ||
87 | * has been satisfied (so once @e is_ready goes to #GNUNET_YES, | ||
88 | * we should immediately go back into the heap). | ||
89 | */ | ||
90 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
91 | |||
92 | /** | ||
93 | * Task to trigger MQ when we have enough bandwidth for the | ||
94 | * next transmission. | ||
95 | */ | ||
96 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
97 | |||
98 | /** | ||
99 | * Sending consumed more bytes on wire than payload was announced | ||
100 | * This overhead is added to the delay of next sending operation | ||
101 | */ | ||
102 | unsigned long long traffic_overhead; | ||
103 | |||
104 | /** | ||
105 | * Is this peer currently ready to receive a message? | ||
106 | */ | ||
107 | int is_ready; | ||
108 | |||
109 | /** | ||
110 | * Size of the message in @e env. | ||
111 | */ | ||
112 | uint16_t env_size; | ||
113 | |||
114 | }; | ||
115 | |||
116 | |||
117 | |||
118 | /** | ||
119 | * Handle for the transport service (includes all of the | ||
120 | * state for the transport service). | ||
121 | */ | ||
122 | struct GNUNET_TRANSPORT_CoreHandle | ||
123 | { | ||
124 | |||
125 | /** | ||
126 | * Closure for the callbacks. | ||
127 | */ | ||
128 | void *cls; | ||
129 | |||
130 | /** | ||
131 | * Functions to call for received data (template for | ||
132 | * new message queues). | ||
133 | */ | ||
134 | struct GNUNET_MQ_MessageHandler *handlers; | ||
135 | |||
136 | /** | ||
137 | * function to call on connect events | ||
138 | */ | ||
139 | GNUNET_TRANSPORT_NotifyConnecT nc_cb; | ||
140 | |||
141 | /** | ||
142 | * function to call on disconnect events | ||
143 | */ | ||
144 | GNUNET_TRANSPORT_NotifyDisconnecT nd_cb; | ||
145 | |||
146 | /** | ||
147 | * function to call on excess bandwidth events | ||
148 | */ | ||
149 | GNUNET_TRANSPORT_NotifyExcessBandwidtH neb_cb; | ||
150 | |||
151 | /** | ||
152 | * My client connection to the transport service. | ||
153 | */ | ||
154 | struct GNUNET_MQ_Handle *mq; | ||
155 | |||
156 | /** | ||
157 | * My configuration. | ||
158 | */ | ||
159 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
160 | |||
161 | /** | ||
162 | * Hash map of the current connected neighbours of this peer. | ||
163 | * Maps peer identities to `struct Neighbour` entries. | ||
164 | */ | ||
165 | struct GNUNET_CONTAINER_MultiPeerMap *neighbours; | ||
166 | |||
167 | /** | ||
168 | * Peer identity as assumed by this process, or all zeros. | ||
169 | */ | ||
170 | struct GNUNET_PeerIdentity self; | ||
171 | |||
172 | /** | ||
173 | * ID of the task trying to reconnect to the service. | ||
174 | */ | ||
175 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
176 | |||
177 | /** | ||
178 | * Delay until we try to reconnect. | ||
179 | */ | ||
180 | struct GNUNET_TIME_Relative reconnect_delay; | ||
181 | |||
182 | /** | ||
183 | * Should we check that @e self matches what the service thinks? | ||
184 | * (if #GNUNET_NO, then @e self is all zeros!). | ||
185 | */ | ||
186 | int check_self; | ||
187 | |||
188 | }; | ||
189 | |||
190 | |||
191 | /** | ||
192 | * Function that will schedule the job that will try | ||
193 | * to connect us again to the client. | ||
194 | * | ||
195 | * @param h transport service to reconnect | ||
196 | */ | ||
197 | static void | ||
198 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h); | ||
199 | |||
200 | |||
201 | /** | ||
202 | * Get the neighbour list entry for the given peer | ||
203 | * | ||
204 | * @param h our context | ||
205 | * @param peer peer to look up | ||
206 | * @return NULL if no such peer entry exists | ||
207 | */ | ||
208 | static struct Neighbour * | ||
209 | neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, | ||
210 | const struct GNUNET_PeerIdentity *peer) | ||
211 | { | ||
212 | return GNUNET_CONTAINER_multipeermap_get (h->neighbours, | ||
213 | peer); | ||
214 | } | ||
215 | |||
216 | |||
217 | /** | ||
218 | * Function called by the bandwidth tracker if we have excess | ||
219 | * bandwidth. | ||
220 | * | ||
221 | * @param cls the `struct Neighbour` that has excess bandwidth | ||
222 | */ | ||
223 | static void | ||
224 | notify_excess_cb (void *cls) | ||
225 | { | ||
226 | struct Neighbour *n = cls; | ||
227 | struct GNUNET_TRANSPORT_CoreHandle *h = n->h; | ||
228 | |||
229 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
230 | "Notifying CORE that more bandwidth is available for %s\n", | ||
231 | GNUNET_i2s (&n->id)); | ||
232 | |||
233 | if (NULL != h->neb_cb) | ||
234 | h->neb_cb (h->cls, | ||
235 | &n->id, | ||
236 | n->handlers_cls); | ||
237 | } | ||
238 | |||
239 | |||
240 | /** | ||
241 | * Iterator over hash map entries, for deleting state of a neighbour. | ||
242 | * | ||
243 | * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *` | ||
244 | * @param key peer identity | ||
245 | * @param value value in the hash map, the neighbour entry to delete | ||
246 | * @return #GNUNET_YES if we should continue to | ||
247 | * iterate, | ||
248 | * #GNUNET_NO if not. | ||
249 | */ | ||
250 | static int | ||
251 | neighbour_delete (void *cls, | ||
252 | const struct GNUNET_PeerIdentity *key, | ||
253 | void *value) | ||
254 | { | ||
255 | struct GNUNET_TRANSPORT_CoreHandle *handle = cls; | ||
256 | struct Neighbour *n = value; | ||
257 | |||
258 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
259 | "Dropping entry for neighbour `%s'.\n", | ||
260 | GNUNET_i2s (key)); | ||
261 | GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); | ||
262 | if (NULL != handle->nd_cb) | ||
263 | handle->nd_cb (handle->cls, | ||
264 | &n->id, | ||
265 | n->handlers_cls); | ||
266 | if (NULL != n->timeout_task) | ||
267 | { | ||
268 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
269 | n->timeout_task = NULL; | ||
270 | } | ||
271 | if (NULL != n->env) | ||
272 | { | ||
273 | GNUNET_MQ_send_cancel (n->env); | ||
274 | n->env = NULL; | ||
275 | } | ||
276 | GNUNET_MQ_destroy (n->mq); | ||
277 | GNUNET_assert (NULL == n->mq); | ||
278 | GNUNET_assert (GNUNET_YES == | ||
279 | GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, | ||
280 | key, | ||
281 | n)); | ||
282 | GNUNET_free (n); | ||
283 | return GNUNET_YES; | ||
284 | } | ||
285 | |||
286 | |||
287 | /** | ||
288 | * Generic error handler, called with the appropriate | ||
289 | * error code and the same closure specified at the creation of | ||
290 | * the message queue. | ||
291 | * Not every message queue implementation supports an error handler. | ||
292 | * | ||
293 | * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *` | ||
294 | * @param error error code | ||
295 | */ | ||
296 | static void | ||
297 | mq_error_handler (void *cls, | ||
298 | enum GNUNET_MQ_Error error) | ||
299 | { | ||
300 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
301 | |||
302 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
303 | "Error receiving from transport service, disconnecting temporarily.\n"); | ||
304 | disconnect_and_schedule_reconnect (h); | ||
305 | } | ||
306 | |||
307 | |||
308 | /** | ||
309 | * Function we use for checking incoming HELLO messages. | ||
310 | * | ||
311 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
312 | * @param msg message received | ||
313 | * @return #GNUNET_OK if message is well-formed | ||
314 | */ | ||
315 | static int | ||
316 | check_hello (void *cls, | ||
317 | const struct GNUNET_MessageHeader *msg) | ||
318 | { | ||
319 | struct GNUNET_PeerIdentity me; | ||
320 | |||
321 | if (GNUNET_OK != | ||
322 | GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, | ||
323 | &me)) | ||
324 | { | ||
325 | GNUNET_break (0); | ||
326 | return GNUNET_SYSERR; | ||
327 | } | ||
328 | return GNUNET_OK; | ||
329 | } | ||
330 | |||
331 | |||
332 | /** | ||
333 | * Function we use for handling incoming HELLO messages. | ||
334 | * | ||
335 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
336 | * @param msg message received | ||
337 | */ | ||
338 | static void | ||
339 | handle_hello (void *cls, | ||
340 | const struct GNUNET_MessageHeader *msg) | ||
341 | { | ||
342 | /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ | ||
343 | } | ||
344 | |||
345 | |||
346 | /** | ||
347 | * A message from the handler's message queue to a neighbour was | ||
348 | * transmitted. Now trigger (possibly delayed) notification of the | ||
349 | * neighbour's message queue that we are done and thus ready for | ||
350 | * the next message. | ||
351 | * | ||
352 | * @param cls the `struct Neighbour` where the message was sent | ||
353 | */ | ||
354 | static void | ||
355 | notify_send_done_fin (void *cls) | ||
356 | { | ||
357 | struct Neighbour *n = cls; | ||
358 | |||
359 | n->timeout_task = NULL; | ||
360 | n->is_ready = GNUNET_YES; | ||
361 | GNUNET_MQ_impl_send_continue (n->mq); | ||
362 | } | ||
363 | |||
364 | |||
365 | /** | ||
366 | * A message from the handler's message queue to a neighbour was | ||
367 | * transmitted. Now trigger (possibly delayed) notification of the | ||
368 | * neighbour's message queue that we are done and thus ready for | ||
369 | * the next message. | ||
370 | * | ||
371 | * @param cls the `struct Neighbour` where the message was sent | ||
372 | */ | ||
373 | static void | ||
374 | notify_send_done (void *cls) | ||
375 | { | ||
376 | struct Neighbour *n = cls; | ||
377 | struct GNUNET_TIME_Relative delay; | ||
378 | |||
379 | n->timeout_task = NULL; | ||
380 | if (NULL != n->env) | ||
381 | { | ||
382 | GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, | ||
383 | n->env_size + n->traffic_overhead); | ||
384 | n->env = NULL; | ||
385 | n->traffic_overhead = 0; | ||
386 | } | ||
387 | delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, | ||
388 | 128); | ||
389 | if (0 == delay.rel_value_us) | ||
390 | { | ||
391 | n->is_ready = GNUNET_YES; | ||
392 | GNUNET_MQ_impl_send_continue (n->mq); | ||
393 | return; | ||
394 | } | ||
395 | GNUNET_MQ_impl_send_in_flight (n->mq); | ||
396 | /* cannot send even a small message without violating | ||
397 | quota, wait a before allowing MQ to send next message */ | ||
398 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
399 | ¬ify_send_done_fin, | ||
400 | n); | ||
401 | } | ||
402 | |||
403 | |||
404 | /** | ||
405 | * Implement sending functionality of a message queue. | ||
406 | * Called one message at a time. Should send the @a msg | ||
407 | * to the transport service and then notify the queue | ||
408 | * once we are ready for the next one. | ||
409 | * | ||
410 | * @param mq the message queue | ||
411 | * @param msg the message to send | ||
412 | * @param impl_state state of the implementation | ||
413 | */ | ||
414 | static void | ||
415 | mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
416 | const struct GNUNET_MessageHeader *msg, | ||
417 | void *impl_state) | ||
418 | { | ||
419 | struct Neighbour *n = impl_state; | ||
420 | struct GNUNET_TRANSPORT_CoreHandle *h = n->h; | ||
421 | struct OutboundMessage *obm; | ||
422 | uint16_t msize; | ||
423 | |||
424 | GNUNET_assert (GNUNET_YES == n->is_ready); | ||
425 | msize = ntohs (msg->size); | ||
426 | if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) | ||
427 | { | ||
428 | GNUNET_break (0); | ||
429 | GNUNET_MQ_impl_send_continue (mq); | ||
430 | return; | ||
431 | } | ||
432 | GNUNET_assert (NULL == n->env); | ||
433 | n->env = GNUNET_MQ_msg_nested_mh (obm, | ||
434 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, | ||
435 | msg); | ||
436 | obm->reserved = htonl (0); | ||
437 | obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ | ||
438 | obm->peer = n->id; | ||
439 | GNUNET_assert (NULL == n->timeout_task); | ||
440 | n->is_ready = GNUNET_NO; | ||
441 | n->env_size = ntohs (msg->size); | ||
442 | GNUNET_MQ_notify_sent (n->env, | ||
443 | ¬ify_send_done, | ||
444 | n); | ||
445 | GNUNET_MQ_send (h->mq, | ||
446 | n->env); | ||
447 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
448 | "Queued message of type %u for neighbour `%s'.\n", | ||
449 | ntohs (msg->type), | ||
450 | GNUNET_i2s (&n->id)); | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Handle destruction of a message queue. Implementations must not | ||
456 | * free @a mq, but should take care of @a impl_state. | ||
457 | * | ||
458 | * @param mq the message queue to destroy | ||
459 | * @param impl_state state of the implementation | ||
460 | */ | ||
461 | static void | ||
462 | mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
463 | void *impl_state) | ||
464 | { | ||
465 | struct Neighbour *n = impl_state; | ||
466 | |||
467 | GNUNET_assert (mq == n->mq); | ||
468 | n->mq = NULL; | ||
469 | } | ||
470 | |||
471 | |||
472 | /** | ||
473 | * Implementation function that cancels the currently sent message. | ||
474 | * Should basically undo whatever #mq_send_impl() did. | ||
475 | * | ||
476 | * @param mq message queue | ||
477 | * @param impl_state state specific to the implementation | ||
478 | */ | ||
479 | static void | ||
480 | mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
481 | void *impl_state) | ||
482 | { | ||
483 | struct Neighbour *n = impl_state; | ||
484 | |||
485 | GNUNET_assert (GNUNET_NO == n->is_ready); | ||
486 | if (NULL != n->env) | ||
487 | { | ||
488 | GNUNET_MQ_send_cancel (n->env); | ||
489 | n->env = NULL; | ||
490 | } | ||
491 | |||
492 | n->is_ready = GNUNET_YES; | ||
493 | } | ||
494 | |||
495 | |||
496 | /** | ||
497 | * We had an error processing a message we forwarded from a peer to | ||
498 | * the CORE service. We should just complain about it but otherwise | ||
499 | * continue processing. | ||
500 | * | ||
501 | * @param cls closure | ||
502 | * @param error error code | ||
503 | */ | ||
504 | static void | ||
505 | peer_mq_error_handler (void *cls, | ||
506 | enum GNUNET_MQ_Error error) | ||
507 | { | ||
508 | /* struct Neighbour *n = cls; */ | ||
509 | |||
510 | GNUNET_break_op (0); | ||
511 | } | ||
512 | |||
513 | |||
514 | /** | ||
515 | * The outbound quota has changed in a way that may require | ||
516 | * us to reset the timeout. Update the timeout. | ||
517 | * | ||
518 | * @param cls the `struct Neighbour` for which the timeout changed | ||
519 | */ | ||
520 | static void | ||
521 | outbound_bw_tracker_update (void *cls) | ||
522 | { | ||
523 | struct Neighbour *n = cls; | ||
524 | struct GNUNET_TIME_Relative delay; | ||
525 | |||
526 | if (NULL == n->timeout_task) | ||
527 | return; | ||
528 | delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, | ||
529 | 128); | ||
530 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
531 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
532 | ¬ify_send_done, | ||
533 | n); | ||
534 | } | ||
535 | |||
536 | |||
537 | /** | ||
538 | * Function we use for handling incoming connect messages. | ||
539 | * | ||
540 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
541 | * @param cim message received | ||
542 | */ | ||
543 | static void | ||
544 | handle_connect (void *cls, | ||
545 | const struct ConnectInfoMessage *cim) | ||
546 | { | ||
547 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
548 | struct Neighbour *n; | ||
549 | |||
550 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
551 | "Receiving CONNECT message for `%s' with quota %u\n", | ||
552 | GNUNET_i2s (&cim->id), | ||
553 | ntohl (cim->quota_out.value__)); | ||
554 | n = neighbour_find (h, &cim->id); | ||
555 | if (NULL != n) | ||
556 | { | ||
557 | GNUNET_break (0); | ||
558 | disconnect_and_schedule_reconnect (h); | ||
559 | return; | ||
560 | } | ||
561 | n = GNUNET_new (struct Neighbour); | ||
562 | n->id = cim->id; | ||
563 | n->h = h; | ||
564 | n->is_ready = GNUNET_YES; | ||
565 | n->traffic_overhead = 0; | ||
566 | GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker, | ||
567 | &outbound_bw_tracker_update, | ||
568 | n, | ||
569 | GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | ||
570 | MAX_BANDWIDTH_CARRY_S, | ||
571 | ¬ify_excess_cb, | ||
572 | n); | ||
573 | GNUNET_assert (GNUNET_OK == | ||
574 | GNUNET_CONTAINER_multipeermap_put (h->neighbours, | ||
575 | &n->id, | ||
576 | n, | ||
577 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
578 | |||
579 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
580 | cim->quota_out); | ||
581 | n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, | ||
582 | &mq_destroy_impl, | ||
583 | &mq_cancel_impl, | ||
584 | n, | ||
585 | h->handlers, | ||
586 | &peer_mq_error_handler, | ||
587 | n); | ||
588 | if (NULL != h->nc_cb) | ||
589 | { | ||
590 | n->handlers_cls = h->nc_cb (h->cls, | ||
591 | &n->id, | ||
592 | n->mq); | ||
593 | GNUNET_MQ_set_handlers_closure (n->mq, | ||
594 | n->handlers_cls); | ||
595 | } | ||
596 | } | ||
597 | |||
598 | |||
599 | /** | ||
600 | * Function we use for handling incoming disconnect messages. | ||
601 | * | ||
602 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
603 | * @param dim message received | ||
604 | */ | ||
605 | static void | ||
606 | handle_disconnect (void *cls, | ||
607 | const struct DisconnectInfoMessage *dim) | ||
608 | { | ||
609 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
610 | struct Neighbour *n; | ||
611 | |||
612 | GNUNET_break (ntohl (dim->reserved) == 0); | ||
613 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
614 | "Receiving DISCONNECT message for `%s'.\n", | ||
615 | GNUNET_i2s (&dim->peer)); | ||
616 | n = neighbour_find (h, &dim->peer); | ||
617 | if (NULL == n) | ||
618 | { | ||
619 | GNUNET_break (0); | ||
620 | disconnect_and_schedule_reconnect (h); | ||
621 | return; | ||
622 | } | ||
623 | GNUNET_assert (GNUNET_YES == | ||
624 | neighbour_delete (h, | ||
625 | &dim->peer, | ||
626 | n)); | ||
627 | } | ||
628 | |||
629 | |||
630 | /** | ||
631 | * Function we use for handling incoming send-ok messages. | ||
632 | * | ||
633 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
634 | * @param okm message received | ||
635 | */ | ||
636 | static void | ||
637 | handle_send_ok (void *cls, | ||
638 | const struct SendOkMessage *okm) | ||
639 | { | ||
640 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
641 | struct Neighbour *n; | ||
642 | uint32_t bytes_msg; | ||
643 | uint32_t bytes_physical; | ||
644 | |||
645 | bytes_msg = ntohl (okm->bytes_msg); | ||
646 | bytes_physical = ntohl (okm->bytes_physical); | ||
647 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
648 | "Receiving SEND_OK message, transmission to %s %s.\n", | ||
649 | GNUNET_i2s (&okm->peer), | ||
650 | ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); | ||
651 | n = neighbour_find (h, | ||
652 | &okm->peer); | ||
653 | if (NULL == n) | ||
654 | { | ||
655 | /* We should never get a 'SEND_OK' for a peer that we are not | ||
656 | connected to */ | ||
657 | GNUNET_break (0); | ||
658 | disconnect_and_schedule_reconnect (h); | ||
659 | return; | ||
660 | } | ||
661 | if (bytes_physical > bytes_msg) | ||
662 | { | ||
663 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
664 | "Overhead for %u byte message was %u\n", | ||
665 | bytes_msg, | ||
666 | bytes_physical - bytes_msg); | ||
667 | n->traffic_overhead += bytes_physical - bytes_msg; | ||
668 | } | ||
669 | } | ||
670 | |||
671 | |||
672 | /** | ||
673 | * Function we use for checking incoming "inbound" messages. | ||
674 | * | ||
675 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
676 | * @param im message received | ||
677 | */ | ||
678 | static int | ||
679 | check_recv (void *cls, | ||
680 | const struct InboundMessage *im) | ||
681 | { | ||
682 | const struct GNUNET_MessageHeader *imm; | ||
683 | uint16_t size; | ||
684 | |||
685 | size = ntohs (im->header.size) - sizeof (*im); | ||
686 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
687 | { | ||
688 | GNUNET_break (0); | ||
689 | return GNUNET_SYSERR; | ||
690 | } | ||
691 | imm = (const struct GNUNET_MessageHeader *) &im[1]; | ||
692 | if (ntohs (imm->size) != size) | ||
693 | { | ||
694 | GNUNET_break (0); | ||
695 | return GNUNET_SYSERR; | ||
696 | } | ||
697 | return GNUNET_OK; | ||
698 | } | ||
699 | |||
700 | |||
701 | /** | ||
702 | * Function we use for handling incoming messages. | ||
703 | * | ||
704 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
705 | * @param im message received | ||
706 | */ | ||
707 | static void | ||
708 | handle_recv (void *cls, | ||
709 | const struct InboundMessage *im) | ||
710 | { | ||
711 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
712 | const struct GNUNET_MessageHeader *imm | ||
713 | = (const struct GNUNET_MessageHeader *) &im[1]; | ||
714 | struct Neighbour *n; | ||
715 | |||
716 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
717 | "Received message of type %u with %u bytes from `%s'.\n", | ||
718 | (unsigned int) ntohs (imm->type), | ||
719 | (unsigned int) ntohs (imm->size), | ||
720 | GNUNET_i2s (&im->peer)); | ||
721 | n = neighbour_find (h, &im->peer); | ||
722 | if (NULL == n) | ||
723 | { | ||
724 | GNUNET_break (0); | ||
725 | disconnect_and_schedule_reconnect (h); | ||
726 | return; | ||
727 | } | ||
728 | GNUNET_MQ_inject_message (n->mq, | ||
729 | imm); | ||
730 | } | ||
731 | |||
732 | |||
733 | /** | ||
734 | * Function we use for handling incoming set quota messages. | ||
735 | * | ||
736 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
737 | * @param msg message received | ||
738 | */ | ||
739 | static void | ||
740 | handle_set_quota (void *cls, | ||
741 | const struct QuotaSetMessage *qm) | ||
742 | { | ||
743 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
744 | struct Neighbour *n; | ||
745 | |||
746 | n = neighbour_find (h, | ||
747 | &qm->peer); | ||
748 | if (NULL == n) | ||
749 | { | ||
750 | GNUNET_break (0); | ||
751 | disconnect_and_schedule_reconnect (h); | ||
752 | return; | ||
753 | } | ||
754 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
755 | "Receiving SET_QUOTA message for `%s' with quota %u\n", | ||
756 | GNUNET_i2s (&qm->peer), | ||
757 | ntohl (qm->quota.value__)); | ||
758 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
759 | qm->quota); | ||
760 | } | ||
761 | |||
762 | |||
763 | /** | ||
764 | * Try again to connect to transport service. | ||
765 | * | ||
766 | * @param cls the handle to the transport service | ||
767 | */ | ||
768 | static void | ||
769 | reconnect (void *cls) | ||
770 | { | ||
771 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
772 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
773 | GNUNET_MQ_hd_var_size (hello, | ||
774 | GNUNET_MESSAGE_TYPE_HELLO, | ||
775 | struct GNUNET_MessageHeader, | ||
776 | h), | ||
777 | GNUNET_MQ_hd_fixed_size (connect, | ||
778 | GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, | ||
779 | struct ConnectInfoMessage, | ||
780 | h), | ||
781 | GNUNET_MQ_hd_fixed_size (disconnect, | ||
782 | GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, | ||
783 | struct DisconnectInfoMessage, | ||
784 | h), | ||
785 | GNUNET_MQ_hd_fixed_size (send_ok, | ||
786 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, | ||
787 | struct SendOkMessage, | ||
788 | h), | ||
789 | GNUNET_MQ_hd_var_size (recv, | ||
790 | GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, | ||
791 | struct InboundMessage, | ||
792 | h), | ||
793 | GNUNET_MQ_hd_fixed_size (set_quota, | ||
794 | GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, | ||
795 | struct QuotaSetMessage, | ||
796 | h), | ||
797 | GNUNET_MQ_handler_end () | ||
798 | }; | ||
799 | struct GNUNET_MQ_Envelope *env; | ||
800 | struct StartMessage *s; | ||
801 | uint32_t options; | ||
802 | |||
803 | h->reconnect_task = NULL; | ||
804 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
805 | "Connecting to transport service.\n"); | ||
806 | GNUNET_assert (NULL == h->mq); | ||
807 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
808 | "transport", | ||
809 | handlers, | ||
810 | &mq_error_handler, | ||
811 | h); | ||
812 | if (NULL == h->mq) | ||
813 | return; | ||
814 | env = GNUNET_MQ_msg (s, | ||
815 | GNUNET_MESSAGE_TYPE_TRANSPORT_START); | ||
816 | options = 0; | ||
817 | if (h->check_self) | ||
818 | options |= 1; | ||
819 | if (NULL != h->handlers) | ||
820 | options |= 2; | ||
821 | s->options = htonl (options); | ||
822 | s->self = h->self; | ||
823 | GNUNET_MQ_send (h->mq, | ||
824 | env); | ||
825 | } | ||
826 | |||
827 | |||
828 | /** | ||
829 | * Function that will schedule the job that will try | ||
830 | * to connect us again to the client. | ||
831 | * | ||
832 | * @param h transport service to reconnect | ||
833 | */ | ||
834 | static void | ||
835 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) | ||
836 | { | ||
837 | GNUNET_assert (NULL == h->reconnect_task); | ||
838 | /* Forget about all neighbours that we used to be connected to */ | ||
839 | GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, | ||
840 | &neighbour_delete, | ||
841 | h); | ||
842 | if (NULL != h->mq) | ||
843 | { | ||
844 | GNUNET_MQ_destroy (h->mq); | ||
845 | h->mq = NULL; | ||
846 | } | ||
847 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
848 | "Scheduling task to reconnect to transport service in %s.\n", | ||
849 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, | ||
850 | GNUNET_YES)); | ||
851 | h->reconnect_task = | ||
852 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
853 | &reconnect, | ||
854 | h); | ||
855 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
856 | } | ||
857 | |||
858 | |||
859 | /** | ||
860 | * Checks if a given peer is connected to us and get the message queue. | ||
861 | * | ||
862 | * @param handle connection to transport service | ||
863 | * @param peer the peer to check | ||
864 | * @return NULL if disconnected, otherwise message queue for @a peer | ||
865 | */ | ||
866 | struct GNUNET_MQ_Handle * | ||
867 | GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, | ||
868 | const struct GNUNET_PeerIdentity *peer) | ||
869 | { | ||
870 | struct Neighbour *n; | ||
871 | |||
872 | n = neighbour_find (handle, | ||
873 | peer); | ||
874 | if (NULL == n) | ||
875 | return NULL; | ||
876 | return n->mq; | ||
877 | } | ||
878 | |||
879 | |||
880 | /** | ||
881 | * Connect to the transport service. Note that the connection may | ||
882 | * complete (or fail) asynchronously. | ||
883 | * | ||
884 | * @param cfg configuration to use | ||
885 | * @param self our own identity (API should check that it matches | ||
886 | * the identity found by transport), or NULL (no check) | ||
887 | * @param cls closure for the callbacks | ||
888 | * @param rec receive function to call | ||
889 | * @param nc function to call on connect events | ||
890 | * @param nd function to call on disconnect events | ||
891 | * @param neb function to call if we have excess bandwidth to a peer | ||
892 | * @return NULL on error | ||
893 | */ | ||
894 | struct GNUNET_TRANSPORT_CoreHandle * | ||
895 | GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
896 | const struct GNUNET_PeerIdentity *self, | ||
897 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
898 | void *cls, | ||
899 | GNUNET_TRANSPORT_NotifyConnecT nc, | ||
900 | GNUNET_TRANSPORT_NotifyDisconnecT nd, | ||
901 | GNUNET_TRANSPORT_NotifyExcessBandwidtH neb) | ||
902 | { | ||
903 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
904 | unsigned int i; | ||
905 | |||
906 | h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle); | ||
907 | if (NULL != self) | ||
908 | { | ||
909 | h->self = *self; | ||
910 | h->check_self = GNUNET_YES; | ||
911 | } | ||
912 | h->cfg = cfg; | ||
913 | h->cls = cls; | ||
914 | h->nc_cb = nc; | ||
915 | h->nd_cb = nd; | ||
916 | h->neb_cb = neb; | ||
917 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
918 | if (NULL != handlers) | ||
919 | { | ||
920 | for (i=0;NULL != handlers[i].cb; i++) ; | ||
921 | h->handlers = GNUNET_new_array (i + 1, | ||
922 | struct GNUNET_MQ_MessageHandler); | ||
923 | GNUNET_memcpy (h->handlers, | ||
924 | handlers, | ||
925 | i * sizeof (struct GNUNET_MQ_MessageHandler)); | ||
926 | } | ||
927 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
928 | "Connecting to transport service\n"); | ||
929 | reconnect (h); | ||
930 | if (NULL == h->mq) | ||
931 | { | ||
932 | GNUNET_free_non_null (h->handlers); | ||
933 | GNUNET_free (h); | ||
934 | return NULL; | ||
935 | } | ||
936 | h->neighbours = | ||
937 | GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, | ||
938 | GNUNET_YES); | ||
939 | return h; | ||
940 | } | ||
941 | |||
942 | |||
943 | /** | ||
944 | * Disconnect from the transport service. | ||
945 | * | ||
946 | * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() | ||
947 | */ | ||
948 | void | ||
949 | GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) | ||
950 | { | ||
951 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
952 | "Transport disconnect called!\n"); | ||
953 | /* this disconnects all neighbours... */ | ||
954 | if (NULL == handle->reconnect_task) | ||
955 | disconnect_and_schedule_reconnect (handle); | ||
956 | /* and now we stop trying to connect again... */ | ||
957 | if (NULL != handle->reconnect_task) | ||
958 | { | ||
959 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
960 | handle->reconnect_task = NULL; | ||
961 | } | ||
962 | GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours); | ||
963 | handle->neighbours = NULL; | ||
964 | GNUNET_free_non_null (handle->handlers); | ||
965 | handle->handlers = NULL; | ||
966 | GNUNET_free (handle); | ||
967 | } | ||
968 | |||
969 | |||
970 | /* end of transport_api_core.c */ | ||