diff options
Diffstat (limited to 'src/transport/transport_api2_core.c')
-rw-r--r-- | src/transport/transport_api2_core.c | 938 |
1 files changed, 938 insertions, 0 deletions
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c new file mode 100644 index 000000000..78d8dcce0 --- /dev/null +++ b/src/transport/transport_api2_core.c | |||
@@ -0,0 +1,938 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2009-2013, 2016, 2018 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | */ | ||
18 | |||
19 | /** | ||
20 | * @file transport/transport_api_core.c | ||
21 | * @brief library to access the transport service for message exchange | ||
22 | * @author Christian Grothoff | ||
23 | */ | ||
24 | #include "platform.h" | ||
25 | #include "gnunet_util_lib.h" | ||
26 | #include "gnunet_constants.h" | ||
27 | #include "gnunet_arm_service.h" | ||
28 | #include "gnunet_hello_lib.h" | ||
29 | #include "gnunet_protocols.h" | ||
30 | #include "gnunet_transport_core_service.h" | ||
31 | #include "transport.h" | ||
32 | |||
33 | #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) | ||
34 | |||
35 | /** | ||
36 | * How large to start with for the hashmap of neighbours. | ||
37 | */ | ||
38 | #define STARTING_NEIGHBOURS_SIZE 16 | ||
39 | |||
40 | |||
41 | /** | ||
42 | * Entry in hash table of all of our current (connected) neighbours. | ||
43 | */ | ||
44 | struct Neighbour | ||
45 | { | ||
46 | |||
47 | /** | ||
48 | * Identity of this neighbour. | ||
49 | */ | ||
50 | struct GNUNET_PeerIdentity id; | ||
51 | |||
52 | /** | ||
53 | * Overall transport handle. | ||
54 | */ | ||
55 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
56 | |||
57 | /** | ||
58 | * Active message queue for the peer. | ||
59 | */ | ||
60 | struct GNUNET_MQ_Handle *mq; | ||
61 | |||
62 | /** | ||
63 | * Envelope with the message we are currently transmitting (or NULL). | ||
64 | */ | ||
65 | struct GNUNET_MQ_Envelope *env; | ||
66 | |||
67 | /** | ||
68 | * Closure for @e mq handlers. | ||
69 | */ | ||
70 | void *handlers_cls; | ||
71 | |||
72 | /** | ||
73 | * Entry in our readyness heap (which is sorted by @e next_ready | ||
74 | * value). NULL if there is no pending transmission request for | ||
75 | * this neighbour or if we're waiting for @e is_ready to become | ||
76 | * true AFTER the @e out_tracker suggested that this peer's quota | ||
77 | * has been satisfied (so once @e is_ready goes to #GNUNET_YES, | ||
78 | * we should immediately go back into the heap). | ||
79 | */ | ||
80 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
81 | |||
82 | /** | ||
83 | * Task to trigger MQ when we have enough bandwidth for the | ||
84 | * next transmission. | ||
85 | */ | ||
86 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
87 | |||
88 | /** | ||
89 | * Outbound bandwidh tracker. | ||
90 | */ | ||
91 | struct GNUNET_BANDWIDTH_Tracker out_tracker; | ||
92 | |||
93 | /** | ||
94 | * Sending consumed more bytes on wire than payload was announced | ||
95 | * This overhead is added to the delay of next sending operation | ||
96 | */ | ||
97 | unsigned long long traffic_overhead; | ||
98 | |||
99 | /** | ||
100 | * Is this peer currently ready to receive a message? | ||
101 | */ | ||
102 | int is_ready; | ||
103 | |||
104 | /** | ||
105 | * Size of the message in @e env. | ||
106 | */ | ||
107 | uint16_t env_size; | ||
108 | |||
109 | }; | ||
110 | |||
111 | |||
112 | |||
113 | /** | ||
114 | * Handle for the transport service (includes all of the | ||
115 | * state for the transport service). | ||
116 | */ | ||
117 | struct GNUNET_TRANSPORT_CoreHandle | ||
118 | { | ||
119 | |||
120 | /** | ||
121 | * Closure for the callbacks. | ||
122 | */ | ||
123 | void *cls; | ||
124 | |||
125 | /** | ||
126 | * Functions to call for received data (template for | ||
127 | * new message queues). | ||
128 | */ | ||
129 | struct GNUNET_MQ_MessageHandler *handlers; | ||
130 | |||
131 | /** | ||
132 | * function to call on connect events | ||
133 | */ | ||
134 | GNUNET_TRANSPORT_NotifyConnect nc_cb; | ||
135 | |||
136 | /** | ||
137 | * function to call on disconnect events | ||
138 | */ | ||
139 | GNUNET_TRANSPORT_NotifyDisconnect nd_cb; | ||
140 | |||
141 | /** | ||
142 | * function to call on excess bandwidth events | ||
143 | */ | ||
144 | GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; | ||
145 | |||
146 | /** | ||
147 | * My client connection to the transport service. | ||
148 | */ | ||
149 | struct GNUNET_MQ_Handle *mq; | ||
150 | |||
151 | /** | ||
152 | * My configuration. | ||
153 | */ | ||
154 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
155 | |||
156 | /** | ||
157 | * Hash map of the current connected neighbours of this peer. | ||
158 | * Maps peer identities to `struct Neighbour` entries. | ||
159 | */ | ||
160 | struct GNUNET_CONTAINER_MultiPeerMap *neighbours; | ||
161 | |||
162 | /** | ||
163 | * Peer identity as assumed by this process, or all zeros. | ||
164 | */ | ||
165 | struct GNUNET_PeerIdentity self; | ||
166 | |||
167 | /** | ||
168 | * ID of the task trying to reconnect to the service. | ||
169 | */ | ||
170 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
171 | |||
172 | /** | ||
173 | * Delay until we try to reconnect. | ||
174 | */ | ||
175 | struct GNUNET_TIME_Relative reconnect_delay; | ||
176 | |||
177 | /** | ||
178 | * Should we check that @e self matches what the service thinks? | ||
179 | * (if #GNUNET_NO, then @e self is all zeros!). | ||
180 | */ | ||
181 | int check_self; | ||
182 | |||
183 | }; | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Function that will schedule the job that will try | ||
188 | * to connect us again to the client. | ||
189 | * | ||
190 | * @param h transport service to reconnect | ||
191 | */ | ||
192 | static void | ||
193 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h); | ||
194 | |||
195 | |||
196 | /** | ||
197 | * Get the neighbour list entry for the given peer | ||
198 | * | ||
199 | * @param h our context | ||
200 | * @param peer peer to look up | ||
201 | * @return NULL if no such peer entry exists | ||
202 | */ | ||
203 | static struct Neighbour * | ||
204 | neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, | ||
205 | const struct GNUNET_PeerIdentity *peer) | ||
206 | { | ||
207 | return GNUNET_CONTAINER_multipeermap_get (h->neighbours, | ||
208 | peer); | ||
209 | } | ||
210 | |||
211 | |||
212 | /** | ||
213 | * Function called by the bandwidth tracker if we have excess | ||
214 | * bandwidth. | ||
215 | * | ||
216 | * @param cls the `struct Neighbour` that has excess bandwidth | ||
217 | */ | ||
218 | static void | ||
219 | notify_excess_cb (void *cls) | ||
220 | { | ||
221 | struct Neighbour *n = cls; | ||
222 | struct GNUNET_TRANSPORT_CoreHandle *h = n->h; | ||
223 | |||
224 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
225 | "Notifying CORE that more bandwidth is available for %s\n", | ||
226 | GNUNET_i2s (&n->id)); | ||
227 | |||
228 | if (NULL != h->neb_cb) | ||
229 | h->neb_cb (h->cls, | ||
230 | &n->id, | ||
231 | n->handlers_cls); | ||
232 | } | ||
233 | |||
234 | |||
235 | /** | ||
236 | * Iterator over hash map entries, for deleting state of a neighbour. | ||
237 | * | ||
238 | * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *` | ||
239 | * @param key peer identity | ||
240 | * @param value value in the hash map, the neighbour entry to delete | ||
241 | * @return #GNUNET_YES if we should continue to | ||
242 | * iterate, | ||
243 | * #GNUNET_NO if not. | ||
244 | */ | ||
245 | static int | ||
246 | neighbour_delete (void *cls, | ||
247 | const struct GNUNET_PeerIdentity *key, | ||
248 | void *value) | ||
249 | { | ||
250 | struct GNUNET_TRANSPORT_CoreHandle *handle = cls; | ||
251 | struct Neighbour *n = value; | ||
252 | |||
253 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
254 | "Dropping entry for neighbour `%s'.\n", | ||
255 | GNUNET_i2s (key)); | ||
256 | GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); | ||
257 | if (NULL != handle->nd_cb) | ||
258 | handle->nd_cb (handle->cls, | ||
259 | &n->id, | ||
260 | n->handlers_cls); | ||
261 | if (NULL != n->timeout_task) | ||
262 | { | ||
263 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
264 | n->timeout_task = NULL; | ||
265 | } | ||
266 | if (NULL != n->env) | ||
267 | { | ||
268 | GNUNET_MQ_send_cancel (n->env); | ||
269 | n->env = NULL; | ||
270 | } | ||
271 | GNUNET_MQ_destroy (n->mq); | ||
272 | GNUNET_assert (NULL == n->mq); | ||
273 | GNUNET_assert (GNUNET_YES == | ||
274 | GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, | ||
275 | key, | ||
276 | n)); | ||
277 | GNUNET_free (n); | ||
278 | return GNUNET_YES; | ||
279 | } | ||
280 | |||
281 | |||
282 | /** | ||
283 | * Generic error handler, called with the appropriate | ||
284 | * error code and the same closure specified at the creation of | ||
285 | * the message queue. | ||
286 | * Not every message queue implementation supports an error handler. | ||
287 | * | ||
288 | * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *` | ||
289 | * @param error error code | ||
290 | */ | ||
291 | static void | ||
292 | mq_error_handler (void *cls, | ||
293 | enum GNUNET_MQ_Error error) | ||
294 | { | ||
295 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
296 | |||
297 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
298 | "Error receiving from transport service, disconnecting temporarily.\n"); | ||
299 | disconnect_and_schedule_reconnect (h); | ||
300 | } | ||
301 | |||
302 | |||
303 | /** | ||
304 | * A message from the handler's message queue to a neighbour was | ||
305 | * transmitted. Now trigger (possibly delayed) notification of the | ||
306 | * neighbour's message queue that we are done and thus ready for | ||
307 | * the next message. | ||
308 | * | ||
309 | * @param cls the `struct Neighbour` where the message was sent | ||
310 | */ | ||
311 | static void | ||
312 | notify_send_done_fin (void *cls) | ||
313 | { | ||
314 | struct Neighbour *n = cls; | ||
315 | |||
316 | n->timeout_task = NULL; | ||
317 | n->is_ready = GNUNET_YES; | ||
318 | GNUNET_MQ_impl_send_continue (n->mq); | ||
319 | } | ||
320 | |||
321 | |||
322 | /** | ||
323 | * A message from the handler's message queue to a neighbour was | ||
324 | * transmitted. Now trigger (possibly delayed) notification of the | ||
325 | * neighbour's message queue that we are done and thus ready for | ||
326 | * the next message. | ||
327 | * | ||
328 | * @param cls the `struct Neighbour` where the message was sent | ||
329 | */ | ||
330 | static void | ||
331 | notify_send_done (void *cls) | ||
332 | { | ||
333 | struct Neighbour *n = cls; | ||
334 | struct GNUNET_TIME_Relative delay; | ||
335 | |||
336 | n->timeout_task = NULL; | ||
337 | if (NULL != n->env) | ||
338 | { | ||
339 | GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, | ||
340 | n->env_size + n->traffic_overhead); | ||
341 | n->env = NULL; | ||
342 | n->traffic_overhead = 0; | ||
343 | } | ||
344 | delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, | ||
345 | 128); | ||
346 | if (0 == delay.rel_value_us) | ||
347 | { | ||
348 | n->is_ready = GNUNET_YES; | ||
349 | GNUNET_MQ_impl_send_continue (n->mq); | ||
350 | return; | ||
351 | } | ||
352 | GNUNET_MQ_impl_send_in_flight (n->mq); | ||
353 | /* cannot send even a small message without violating | ||
354 | quota, wait a before allowing MQ to send next message */ | ||
355 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
356 | ¬ify_send_done_fin, | ||
357 | n); | ||
358 | } | ||
359 | |||
360 | |||
361 | /** | ||
362 | * Implement sending functionality of a message queue. | ||
363 | * Called one message at a time. Should send the @a msg | ||
364 | * to the transport service and then notify the queue | ||
365 | * once we are ready for the next one. | ||
366 | * | ||
367 | * @param mq the message queue | ||
368 | * @param msg the message to send | ||
369 | * @param impl_state state of the implementation | ||
370 | */ | ||
371 | static void | ||
372 | mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
373 | const struct GNUNET_MessageHeader *msg, | ||
374 | void *impl_state) | ||
375 | { | ||
376 | struct Neighbour *n = impl_state; | ||
377 | struct GNUNET_TRANSPORT_CoreHandle *h = n->h; | ||
378 | struct OutboundMessage *obm; | ||
379 | uint16_t msize; | ||
380 | |||
381 | GNUNET_assert (GNUNET_YES == n->is_ready); | ||
382 | msize = ntohs (msg->size); | ||
383 | if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) | ||
384 | { | ||
385 | GNUNET_break (0); | ||
386 | GNUNET_MQ_impl_send_continue (mq); | ||
387 | return; | ||
388 | } | ||
389 | GNUNET_assert (NULL == n->env); | ||
390 | n->env = GNUNET_MQ_msg_nested_mh (obm, | ||
391 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, | ||
392 | msg); | ||
393 | obm->reserved = htonl (0); | ||
394 | obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ | ||
395 | obm->peer = n->id; | ||
396 | GNUNET_assert (NULL == n->timeout_task); | ||
397 | n->is_ready = GNUNET_NO; | ||
398 | n->env_size = ntohs (msg->size); | ||
399 | GNUNET_MQ_notify_sent (n->env, | ||
400 | ¬ify_send_done, | ||
401 | n); | ||
402 | GNUNET_MQ_send (h->mq, | ||
403 | n->env); | ||
404 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
405 | "Queued message of type %u for neighbour `%s'.\n", | ||
406 | ntohs (msg->type), | ||
407 | GNUNET_i2s (&n->id)); | ||
408 | } | ||
409 | |||
410 | |||
411 | /** | ||
412 | * Handle destruction of a message queue. Implementations must not | ||
413 | * free @a mq, but should take care of @a impl_state. | ||
414 | * | ||
415 | * @param mq the message queue to destroy | ||
416 | * @param impl_state state of the implementation | ||
417 | */ | ||
418 | static void | ||
419 | mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
420 | void *impl_state) | ||
421 | { | ||
422 | struct Neighbour *n = impl_state; | ||
423 | |||
424 | GNUNET_assert (mq == n->mq); | ||
425 | n->mq = NULL; | ||
426 | } | ||
427 | |||
428 | |||
429 | /** | ||
430 | * Implementation function that cancels the currently sent message. | ||
431 | * Should basically undo whatever #mq_send_impl() did. | ||
432 | * | ||
433 | * @param mq message queue | ||
434 | * @param impl_state state specific to the implementation | ||
435 | */ | ||
436 | static void | ||
437 | mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
438 | void *impl_state) | ||
439 | { | ||
440 | struct Neighbour *n = impl_state; | ||
441 | |||
442 | GNUNET_assert (GNUNET_NO == n->is_ready); | ||
443 | if (NULL != n->env) | ||
444 | { | ||
445 | GNUNET_MQ_send_cancel (n->env); | ||
446 | n->env = NULL; | ||
447 | } | ||
448 | |||
449 | n->is_ready = GNUNET_YES; | ||
450 | } | ||
451 | |||
452 | |||
453 | /** | ||
454 | * We had an error processing a message we forwarded from a peer to | ||
455 | * the CORE service. We should just complain about it but otherwise | ||
456 | * continue processing. | ||
457 | * | ||
458 | * @param cls closure | ||
459 | * @param error error code | ||
460 | */ | ||
461 | static void | ||
462 | peer_mq_error_handler (void *cls, | ||
463 | enum GNUNET_MQ_Error error) | ||
464 | { | ||
465 | /* struct Neighbour *n = cls; */ | ||
466 | |||
467 | GNUNET_break_op (0); | ||
468 | } | ||
469 | |||
470 | |||
471 | /** | ||
472 | * The outbound quota has changed in a way that may require | ||
473 | * us to reset the timeout. Update the timeout. | ||
474 | * | ||
475 | * @param cls the `struct Neighbour` for which the timeout changed | ||
476 | */ | ||
477 | static void | ||
478 | outbound_bw_tracker_update (void *cls) | ||
479 | { | ||
480 | struct Neighbour *n = cls; | ||
481 | struct GNUNET_TIME_Relative delay; | ||
482 | |||
483 | if (NULL == n->timeout_task) | ||
484 | return; | ||
485 | delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, | ||
486 | 128); | ||
487 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
488 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
489 | ¬ify_send_done, | ||
490 | n); | ||
491 | } | ||
492 | |||
493 | |||
494 | /** | ||
495 | * Function we use for handling incoming connect messages. | ||
496 | * | ||
497 | * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` | ||
498 | * @param cim message received | ||
499 | */ | ||
500 | static void | ||
501 | handle_connect (void *cls, | ||
502 | const struct ConnectInfoMessage *cim) | ||
503 | { | ||
504 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
505 | struct Neighbour *n; | ||
506 | |||
507 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
508 | "Receiving CONNECT message for `%s' with quota %u\n", | ||
509 | GNUNET_i2s (&cim->id), | ||
510 | ntohl (cim->quota_out.value__)); | ||
511 | n = neighbour_find (h, | ||
512 | &cim->id); | ||
513 | if (NULL != n) | ||
514 | { | ||
515 | GNUNET_break (0); | ||
516 | disconnect_and_schedule_reconnect (h); | ||
517 | return; | ||
518 | } | ||
519 | n = GNUNET_new (struct Neighbour); | ||
520 | n->id = cim->id; | ||
521 | n->h = h; | ||
522 | n->is_ready = GNUNET_YES; | ||
523 | n->traffic_overhead = 0; | ||
524 | GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker, | ||
525 | &outbound_bw_tracker_update, | ||
526 | n, | ||
527 | GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | ||
528 | MAX_BANDWIDTH_CARRY_S, | ||
529 | ¬ify_excess_cb, | ||
530 | n); | ||
531 | GNUNET_assert (GNUNET_OK == | ||
532 | GNUNET_CONTAINER_multipeermap_put (h->neighbours, | ||
533 | &n->id, | ||
534 | n, | ||
535 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
536 | |||
537 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
538 | cim->quota_out); | ||
539 | n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, | ||
540 | &mq_destroy_impl, | ||
541 | &mq_cancel_impl, | ||
542 | n, | ||
543 | h->handlers, | ||
544 | &peer_mq_error_handler, | ||
545 | n); | ||
546 | if (NULL != h->nc_cb) | ||
547 | { | ||
548 | n->handlers_cls = h->nc_cb (h->cls, | ||
549 | &n->id, | ||
550 | n->mq); | ||
551 | GNUNET_MQ_set_handlers_closure (n->mq, | ||
552 | n->handlers_cls); | ||
553 | } | ||
554 | } | ||
555 | |||
556 | |||
557 | /** | ||
558 | * Function we use for handling incoming disconnect messages. | ||
559 | * | ||
560 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
561 | * @param dim message received | ||
562 | */ | ||
563 | static void | ||
564 | handle_disconnect (void *cls, | ||
565 | const struct DisconnectInfoMessage *dim) | ||
566 | { | ||
567 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
568 | struct Neighbour *n; | ||
569 | |||
570 | GNUNET_break (ntohl (dim->reserved) == 0); | ||
571 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
572 | "Receiving DISCONNECT message for `%s'.\n", | ||
573 | GNUNET_i2s (&dim->peer)); | ||
574 | n = neighbour_find (h, | ||
575 | &dim->peer); | ||
576 | if (NULL == n) | ||
577 | { | ||
578 | GNUNET_break (0); | ||
579 | disconnect_and_schedule_reconnect (h); | ||
580 | return; | ||
581 | } | ||
582 | GNUNET_assert (GNUNET_YES == | ||
583 | neighbour_delete (h, | ||
584 | &dim->peer, | ||
585 | n)); | ||
586 | } | ||
587 | |||
588 | |||
589 | /** | ||
590 | * Function we use for handling incoming send-ok messages. | ||
591 | * | ||
592 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
593 | * @param okm message received | ||
594 | */ | ||
595 | static void | ||
596 | handle_send_ok (void *cls, | ||
597 | const struct SendOkMessage *okm) | ||
598 | { | ||
599 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
600 | struct Neighbour *n; | ||
601 | uint32_t bytes_msg; | ||
602 | uint32_t bytes_physical; | ||
603 | |||
604 | bytes_msg = ntohl (okm->bytes_msg); | ||
605 | bytes_physical = ntohl (okm->bytes_physical); | ||
606 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
607 | "Receiving SEND_OK message, transmission to %s %s.\n", | ||
608 | GNUNET_i2s (&okm->peer), | ||
609 | (GNUNET_OK == ntohl (okm->success)) | ||
610 | ? "succeeded" | ||
611 | : "failed"); | ||
612 | n = neighbour_find (h, | ||
613 | &okm->peer); | ||
614 | if (NULL == n) | ||
615 | { | ||
616 | /* We should never get a 'SEND_OK' for a peer that we are not | ||
617 | connected to */ | ||
618 | GNUNET_break (0); | ||
619 | disconnect_and_schedule_reconnect (h); | ||
620 | return; | ||
621 | } | ||
622 | if (bytes_physical > bytes_msg) | ||
623 | { | ||
624 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
625 | "Overhead for %u byte message was %u\n", | ||
626 | (unsigned int) bytes_msg, | ||
627 | (unsigned int) (bytes_physical - bytes_msg)); | ||
628 | n->traffic_overhead += bytes_physical - bytes_msg; | ||
629 | } | ||
630 | } | ||
631 | |||
632 | |||
633 | /** | ||
634 | * Function we use for checking incoming "inbound" messages. | ||
635 | * | ||
636 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
637 | * @param im message received | ||
638 | */ | ||
639 | static int | ||
640 | check_recv (void *cls, | ||
641 | const struct InboundMessage *im) | ||
642 | { | ||
643 | const struct GNUNET_MessageHeader *imm; | ||
644 | uint16_t size; | ||
645 | |||
646 | size = ntohs (im->header.size) - sizeof (*im); | ||
647 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
648 | { | ||
649 | GNUNET_break (0); | ||
650 | return GNUNET_SYSERR; | ||
651 | } | ||
652 | imm = (const struct GNUNET_MessageHeader *) &im[1]; | ||
653 | if (ntohs (imm->size) != size) | ||
654 | { | ||
655 | GNUNET_break (0); | ||
656 | return GNUNET_SYSERR; | ||
657 | } | ||
658 | return GNUNET_OK; | ||
659 | } | ||
660 | |||
661 | |||
662 | /** | ||
663 | * Function we use for handling incoming messages. | ||
664 | * | ||
665 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
666 | * @param im message received | ||
667 | */ | ||
668 | static void | ||
669 | handle_recv (void *cls, | ||
670 | const struct InboundMessage *im) | ||
671 | { | ||
672 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
673 | const struct GNUNET_MessageHeader *imm | ||
674 | = (const struct GNUNET_MessageHeader *) &im[1]; | ||
675 | struct Neighbour *n; | ||
676 | |||
677 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
678 | "Received message of type %u with %u bytes from `%s'.\n", | ||
679 | (unsigned int) ntohs (imm->type), | ||
680 | (unsigned int) ntohs (imm->size), | ||
681 | GNUNET_i2s (&im->peer)); | ||
682 | n = neighbour_find (h, | ||
683 | &im->peer); | ||
684 | if (NULL == n) | ||
685 | { | ||
686 | GNUNET_break (0); | ||
687 | disconnect_and_schedule_reconnect (h); | ||
688 | return; | ||
689 | } | ||
690 | GNUNET_MQ_inject_message (n->mq, | ||
691 | imm); | ||
692 | } | ||
693 | |||
694 | |||
695 | /** | ||
696 | * Function we use for handling incoming set quota messages. | ||
697 | * | ||
698 | * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` | ||
699 | * @param msg message received | ||
700 | */ | ||
701 | static void | ||
702 | handle_set_quota (void *cls, | ||
703 | const struct QuotaSetMessage *qm) | ||
704 | { | ||
705 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
706 | struct Neighbour *n; | ||
707 | |||
708 | n = neighbour_find (h, | ||
709 | &qm->peer); | ||
710 | if (NULL == n) | ||
711 | { | ||
712 | GNUNET_break (0); | ||
713 | disconnect_and_schedule_reconnect (h); | ||
714 | return; | ||
715 | } | ||
716 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
717 | "Receiving SET_QUOTA message for `%s' with quota %u\n", | ||
718 | GNUNET_i2s (&qm->peer), | ||
719 | (unsigned int) ntohl (qm->quota.value__)); | ||
720 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
721 | qm->quota); | ||
722 | } | ||
723 | |||
724 | |||
725 | /** | ||
726 | * Try again to connect to transport service. | ||
727 | * | ||
728 | * @param cls the handle to the transport service | ||
729 | */ | ||
730 | static void | ||
731 | reconnect (void *cls) | ||
732 | { | ||
733 | struct GNUNET_TRANSPORT_CoreHandle *h = cls; | ||
734 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
735 | GNUNET_MQ_hd_fixed_size (connect, | ||
736 | GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, | ||
737 | struct ConnectInfoMessage, | ||
738 | h), | ||
739 | GNUNET_MQ_hd_fixed_size (disconnect, | ||
740 | GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, | ||
741 | struct DisconnectInfoMessage, | ||
742 | h), | ||
743 | GNUNET_MQ_hd_fixed_size (send_ok, | ||
744 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, | ||
745 | struct SendOkMessage, | ||
746 | h), | ||
747 | GNUNET_MQ_hd_var_size (recv, | ||
748 | GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, | ||
749 | struct InboundMessage, | ||
750 | h), | ||
751 | GNUNET_MQ_hd_fixed_size (set_quota, | ||
752 | GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, | ||
753 | struct QuotaSetMessage, | ||
754 | h), | ||
755 | GNUNET_MQ_handler_end () | ||
756 | }; | ||
757 | struct GNUNET_MQ_Envelope *env; | ||
758 | struct StartMessage *s; | ||
759 | uint32_t options; | ||
760 | |||
761 | h->reconnect_task = NULL; | ||
762 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
763 | "Connecting to transport service.\n"); | ||
764 | GNUNET_assert (NULL == h->mq); | ||
765 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
766 | "transport", | ||
767 | handlers, | ||
768 | &mq_error_handler, | ||
769 | h); | ||
770 | if (NULL == h->mq) | ||
771 | return; | ||
772 | env = GNUNET_MQ_msg (s, | ||
773 | GNUNET_MESSAGE_TYPE_TRANSPORT_START); | ||
774 | options = 0; | ||
775 | if (h->check_self) | ||
776 | options |= 1; | ||
777 | if (NULL != h->handlers) | ||
778 | options |= 2; | ||
779 | s->options = htonl (options); | ||
780 | s->self = h->self; | ||
781 | GNUNET_MQ_send (h->mq, | ||
782 | env); | ||
783 | } | ||
784 | |||
785 | |||
786 | /** | ||
787 | * Disconnect from the transport service. | ||
788 | * | ||
789 | * @param h transport service to reconnect | ||
790 | */ | ||
791 | static void | ||
792 | disconnect (struct GNUNET_TRANSPORT_CoreHandle *h) | ||
793 | { | ||
794 | GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, | ||
795 | &neighbour_delete, | ||
796 | h); | ||
797 | if (NULL != h->mq) | ||
798 | { | ||
799 | GNUNET_MQ_destroy (h->mq); | ||
800 | h->mq = NULL; | ||
801 | } | ||
802 | } | ||
803 | |||
804 | |||
805 | /** | ||
806 | * Function that will schedule the job that will try | ||
807 | * to connect us again to the client. | ||
808 | * | ||
809 | * @param h transport service to reconnect | ||
810 | */ | ||
811 | static void | ||
812 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) | ||
813 | { | ||
814 | GNUNET_assert (NULL == h->reconnect_task); | ||
815 | disconnect (h); | ||
816 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
817 | "Scheduling task to reconnect to transport service in %s.\n", | ||
818 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, | ||
819 | GNUNET_YES)); | ||
820 | h->reconnect_task = | ||
821 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
822 | &reconnect, | ||
823 | h); | ||
824 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
825 | } | ||
826 | |||
827 | |||
828 | /** | ||
829 | * Checks if a given peer is connected to us and get the message queue. | ||
830 | * | ||
831 | * @param handle connection to transport service | ||
832 | * @param peer the peer to check | ||
833 | * @return NULL if disconnected, otherwise message queue for @a peer | ||
834 | */ | ||
835 | struct GNUNET_MQ_Handle * | ||
836 | GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, | ||
837 | const struct GNUNET_PeerIdentity *peer) | ||
838 | { | ||
839 | struct Neighbour *n; | ||
840 | |||
841 | n = neighbour_find (handle, | ||
842 | peer); | ||
843 | if (NULL == n) | ||
844 | return NULL; | ||
845 | return n->mq; | ||
846 | } | ||
847 | |||
848 | |||
849 | /** | ||
850 | * Connect to the transport service. Note that the connection may | ||
851 | * complete (or fail) asynchronously. | ||
852 | * | ||
853 | * @param cfg configuration to use | ||
854 | * @param self our own identity (API should check that it matches | ||
855 | * the identity found by transport), or NULL (no check) | ||
856 | * @param cls closure for the callbacks | ||
857 | * @param rec receive function to call | ||
858 | * @param nc function to call on connect events | ||
859 | * @param nd function to call on disconnect events | ||
860 | * @param neb function to call if we have excess bandwidth to a peer | ||
861 | * @return NULL on error | ||
862 | */ | ||
863 | struct GNUNET_TRANSPORT_CoreHandle * | ||
864 | GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
865 | const struct GNUNET_PeerIdentity *self, | ||
866 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
867 | void *cls, | ||
868 | GNUNET_TRANSPORT_NotifyConnect nc, | ||
869 | GNUNET_TRANSPORT_NotifyDisconnect nd, | ||
870 | GNUNET_TRANSPORT_NotifyExcessBandwidth neb) | ||
871 | { | ||
872 | struct GNUNET_TRANSPORT_CoreHandle *h; | ||
873 | unsigned int i; | ||
874 | |||
875 | h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle); | ||
876 | if (NULL != self) | ||
877 | { | ||
878 | h->self = *self; | ||
879 | h->check_self = GNUNET_YES; | ||
880 | } | ||
881 | h->cfg = cfg; | ||
882 | h->cls = cls; | ||
883 | h->nc_cb = nc; | ||
884 | h->nd_cb = nd; | ||
885 | h->neb_cb = neb; | ||
886 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
887 | if (NULL != handlers) | ||
888 | { | ||
889 | for (i=0;NULL != handlers[i].cb; i++) ; | ||
890 | h->handlers = GNUNET_new_array (i + 1, | ||
891 | struct GNUNET_MQ_MessageHandler); | ||
892 | GNUNET_memcpy (h->handlers, | ||
893 | handlers, | ||
894 | i * sizeof (struct GNUNET_MQ_MessageHandler)); | ||
895 | } | ||
896 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
897 | "Connecting to transport service\n"); | ||
898 | reconnect (h); | ||
899 | if (NULL == h->mq) | ||
900 | { | ||
901 | GNUNET_free_non_null (h->handlers); | ||
902 | GNUNET_free (h); | ||
903 | return NULL; | ||
904 | } | ||
905 | h->neighbours = | ||
906 | GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, | ||
907 | GNUNET_YES); | ||
908 | return h; | ||
909 | } | ||
910 | |||
911 | |||
912 | /** | ||
913 | * Disconnect from the transport service. | ||
914 | * | ||
915 | * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() | ||
916 | */ | ||
917 | void | ||
918 | GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) | ||
919 | { | ||
920 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
921 | "Transport disconnect called!\n"); | ||
922 | /* this disconnects all neighbours... */ | ||
923 | disconnect (handle); | ||
924 | /* and now we stop trying to connect again... */ | ||
925 | if (NULL != handle->reconnect_task) | ||
926 | { | ||
927 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
928 | handle->reconnect_task = NULL; | ||
929 | } | ||
930 | GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours); | ||
931 | handle->neighbours = NULL; | ||
932 | GNUNET_free_non_null (handle->handlers); | ||
933 | handle->handlers = NULL; | ||
934 | GNUNET_free (handle); | ||
935 | } | ||
936 | |||
937 | |||
938 | /* end of transport_api_core.c */ | ||