diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-06-03 22:26:48 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-06-03 22:26:48 +0000 |
commit | abda91687b433defaad5425c484b142416172e2a (patch) | |
tree | 19ef20bd89cb380c53a81ee64e8a428b77a736d6 /src/transport/transport_api_new.c | |
parent | 3439e5f4f084a8d22124d6d02fcc8199ac06ba7d (diff) | |
download | gnunet-abda91687b433defaad5425c484b142416172e2a.tar.gz gnunet-abda91687b433defaad5425c484b142416172e2a.zip |
first draft for new transport api implementation
Diffstat (limited to 'src/transport/transport_api_new.c')
-rw-r--r-- | src/transport/transport_api_new.c | 1408 |
1 files changed, 1408 insertions, 0 deletions
diff --git a/src/transport/transport_api_new.c b/src/transport/transport_api_new.c new file mode 100644 index 000000000..02f3fc421 --- /dev/null +++ b/src/transport/transport_api_new.c | |||
@@ -0,0 +1,1408 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file transport/transport_api.c | ||
23 | * @brief library to access the low-level P2P IO service | ||
24 | * @author Christian Grothoff | ||
25 | * | ||
26 | * TODO: | ||
27 | * - support 'try connect' in transport service | ||
28 | * - add timeout (see FIXME) | ||
29 | * - adjust testcases to use new 'try connect' style (should be easy, breaks API compatibility!) | ||
30 | * - adjust core service to use new 'try connect' style (should be MUCH nicer there as well!) | ||
31 | * - test test test | ||
32 | */ | ||
33 | #include "platform.h" | ||
34 | #include "gnunet_bandwidth_lib.h" | ||
35 | #include "gnunet_client_lib.h" | ||
36 | #include "gnunet_constants.h" | ||
37 | #include "gnunet_container_lib.h" | ||
38 | #include "gnunet_arm_service.h" | ||
39 | #include "gnunet_hello_lib.h" | ||
40 | #include "gnunet_protocols.h" | ||
41 | #include "gnunet_server_lib.h" | ||
42 | #include "gnunet_time_lib.h" | ||
43 | #include "gnunet_transport_service.h" | ||
44 | #include "transport.h" | ||
45 | |||
46 | /** | ||
47 | * How large to start with for the hashmap of neighbours. | ||
48 | */ | ||
49 | #define STARTING_NEIGHBOURS_SIZE 16 | ||
50 | |||
51 | |||
52 | /** | ||
53 | * Handle for a message that should be transmitted to the service. | ||
54 | * Used for both control messages and normal messages. | ||
55 | */ | ||
56 | struct GNUNET_TRANSPORT_TransmitHandle | ||
57 | { | ||
58 | |||
59 | /** | ||
60 | * We keep all requests in a DLL. | ||
61 | */ | ||
62 | struct GNUNET_TRANSPORT_TransmitHandle *next; | ||
63 | |||
64 | /** | ||
65 | * We keep all requests in a DLL. | ||
66 | */ | ||
67 | struct GNUNET_TRANSPORT_TransmitHandle *prev; | ||
68 | |||
69 | /** | ||
70 | * Neighbour for this handle, NULL for control messages. | ||
71 | */ | ||
72 | struct NeighbourList *neighbour; | ||
73 | |||
74 | /** | ||
75 | * Function to call when notify_size bytes are available | ||
76 | * for transmission. | ||
77 | */ | ||
78 | GNUNET_CONNECTION_TransmitReadyNotify notify; | ||
79 | |||
80 | /** | ||
81 | * Closure for notify. | ||
82 | */ | ||
83 | void *notify_cls; | ||
84 | |||
85 | /** | ||
86 | * Timeout for this request, 0 for control messages. | ||
87 | */ | ||
88 | struct GNUNET_TIME_Absolute timeout; | ||
89 | |||
90 | /** | ||
91 | * How many bytes is our notify callback waiting for? | ||
92 | */ | ||
93 | size_t notify_size; | ||
94 | |||
95 | /** | ||
96 | * How important is this message? Not used for control messages. | ||
97 | */ | ||
98 | uint32_t priority; | ||
99 | |||
100 | }; | ||
101 | |||
102 | |||
103 | /** | ||
104 | * Entry in hash table of all of our current neighbours. | ||
105 | */ | ||
106 | struct Neighbour | ||
107 | { | ||
108 | /** | ||
109 | * Overall transport handle. | ||
110 | */ | ||
111 | struct GNUNET_TRANSPORT_Handle *h; | ||
112 | |||
113 | /** | ||
114 | * Active transmit handle or NULL. | ||
115 | */ | ||
116 | struct GNUNET_TRANSPORT_TransmitHandle *th; | ||
117 | |||
118 | /** | ||
119 | * Identity of this neighbour. | ||
120 | */ | ||
121 | struct GNUNET_PeerIdentity id; | ||
122 | |||
123 | /** | ||
124 | * Outbound bandwidh tracker. | ||
125 | */ | ||
126 | struct GNUNET_BANDWIDTH_Tracker out_tracker; | ||
127 | |||
128 | /** | ||
129 | * Entry in our readyness heap (which is sorted by 'next_ready' | ||
130 | * value). NULL if there is no pending transmission request for | ||
131 | * this neighbour or if we're waiting for 'is_ready' to become | ||
132 | * true AFTER the 'out_tracker' suggested that this peer's quota | ||
133 | * has been satisfied (so once 'is_ready' goes to GNUNET_YES, | ||
134 | * we should immediately go back into the heap). | ||
135 | */ | ||
136 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
137 | |||
138 | /** | ||
139 | * Is this peer currently ready to receive a message? | ||
140 | */ | ||
141 | int is_ready; | ||
142 | |||
143 | }; | ||
144 | |||
145 | |||
146 | /** | ||
147 | * Linked list of functions to call whenever our HELLO is updated. | ||
148 | */ | ||
149 | struct HelloWaitList | ||
150 | { | ||
151 | |||
152 | /** | ||
153 | * This is a doubly linked list. | ||
154 | */ | ||
155 | struct HelloWaitList *next; | ||
156 | |||
157 | /** | ||
158 | * This is a doubly linked list. | ||
159 | */ | ||
160 | struct HelloWaitList *prev; | ||
161 | |||
162 | /** | ||
163 | * Callback to call once we got our HELLO. | ||
164 | */ | ||
165 | GNUNET_TRANSPORT_HelloUpdateCallback rec; | ||
166 | |||
167 | /** | ||
168 | * Closure for rec. | ||
169 | */ | ||
170 | void *rec_cls; | ||
171 | |||
172 | }; | ||
173 | |||
174 | |||
175 | /** | ||
176 | * Handle for the transport service (includes all of the | ||
177 | * state for the transport service). | ||
178 | */ | ||
179 | struct GNUNET_TRANSPORT_Handle | ||
180 | { | ||
181 | |||
182 | /** | ||
183 | * Closure for the callbacks. | ||
184 | */ | ||
185 | void *cls; | ||
186 | |||
187 | /** | ||
188 | * Function to call for received data. | ||
189 | */ | ||
190 | GNUNET_TRANSPORT_ReceiveCallback rec; | ||
191 | |||
192 | /** | ||
193 | * function to call on connect events | ||
194 | */ | ||
195 | GNUNET_TRANSPORT_NotifyConnect nc_cb; | ||
196 | |||
197 | /** | ||
198 | * function to call on disconnect events | ||
199 | */ | ||
200 | GNUNET_TRANSPORT_NotifyDisconnect nd_cb; | ||
201 | |||
202 | /** | ||
203 | * Head of DLL of control messages. | ||
204 | */ | ||
205 | struct GNUNET_TRANSPORT_TransmitHandle *control_head; | ||
206 | |||
207 | /** | ||
208 | * Tail of DLL of control messages. | ||
209 | */ | ||
210 | struct GNUNET_TRANSPORT_TransmitHandle *control_tail; | ||
211 | |||
212 | /** | ||
213 | * The current HELLO message for this peer. Updated | ||
214 | * whenever transports change their addresses. | ||
215 | */ | ||
216 | struct GNUNET_HELLO_Message *my_hello; | ||
217 | |||
218 | /** | ||
219 | * My client connection to the transport service. | ||
220 | */ | ||
221 | struct GNUNET_CLIENT_Connection *client; | ||
222 | |||
223 | /** | ||
224 | * Handle to our registration with the client for notification. | ||
225 | */ | ||
226 | struct GNUNET_CLIENT_TransmitHandle *cth; | ||
227 | |||
228 | /** | ||
229 | * Linked list of pending requests for our HELLO. | ||
230 | */ | ||
231 | struct HelloWaitList *hwl_head; | ||
232 | |||
233 | /** | ||
234 | * Linked list of pending requests for our HELLO. | ||
235 | */ | ||
236 | struct HelloWaitList *hwl_tail; | ||
237 | |||
238 | /** | ||
239 | * My configuration. | ||
240 | */ | ||
241 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
242 | |||
243 | /** | ||
244 | * Hash map of the current connected neighbours of this peer. | ||
245 | * Maps peer identities to 'struct Neighbour' entries. | ||
246 | */ | ||
247 | struct GNUNET_CONTAINER_MultiHashMap *neighbours; | ||
248 | |||
249 | /** | ||
250 | * Heap sorting peers with pending messages by the timestamps that | ||
251 | * specify when we could next send a message to the respective peer. | ||
252 | * Excludes control messages (which can always go out immediately). | ||
253 | * Maps time stamps to 'struct Neighbour' entries. | ||
254 | */ | ||
255 | struct GNUNET_CONTAINER_Heap *ready_heap; | ||
256 | |||
257 | /** | ||
258 | * Peer identity as assumed by this process, or all zeros. | ||
259 | */ | ||
260 | struct GNUNET_PeerIdentity self; | ||
261 | |||
262 | /** | ||
263 | * ID of the task trying to reconnect to the service. | ||
264 | */ | ||
265 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | ||
266 | |||
267 | /** | ||
268 | * ID of the task trying to trigger transmission for a peer while | ||
269 | * maintaining bandwidth quotas. In use if there are no control | ||
270 | * messages and the smallest entry in the 'ready_heap' has a time | ||
271 | * stamp in the future. | ||
272 | */ | ||
273 | GNUNET_SCHEDULER_TaskIdentifier quota_task; | ||
274 | |||
275 | /** | ||
276 | * Delay until we try to reconnect. | ||
277 | */ | ||
278 | struct GNUNET_TIME_Relative reconnect_delay; | ||
279 | |||
280 | /** | ||
281 | * Should we check that 'self' matches what the service thinks? | ||
282 | * (if GNUNET_NO, then 'self' is all zeros!). | ||
283 | */ | ||
284 | int check_self; | ||
285 | }; | ||
286 | |||
287 | |||
288 | /** | ||
289 | * Schedule the task to send one message, either from the control | ||
290 | * list or the peer message queues to the service. | ||
291 | * | ||
292 | * @param h transport service to schedule a transmission for | ||
293 | */ | ||
294 | static void | ||
295 | schedule_transmission (struct GNUNET_TRANSPORT_Handle *h); | ||
296 | |||
297 | |||
298 | /** | ||
299 | * Function that will schedule the job that will try | ||
300 | * to connect us again to the client. | ||
301 | * | ||
302 | * @param h transport service to reconnect | ||
303 | */ | ||
304 | static void | ||
305 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h); | ||
306 | |||
307 | |||
308 | /** | ||
309 | * Get the neighbour list entry for the given peer | ||
310 | * | ||
311 | * @param h our context | ||
312 | * @param peer peer to look up | ||
313 | * @return NULL if no such peer entry exists | ||
314 | */ | ||
315 | static struct Neighbour * | ||
316 | neighbour_find (struct GNUNET_TRANSPORT_Handle *h, | ||
317 | const struct GNUNET_PeerIdentity *peer) | ||
318 | { | ||
319 | return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey); | ||
320 | } | ||
321 | |||
322 | |||
323 | /** | ||
324 | * Add neighbour to our list | ||
325 | * | ||
326 | * @return NULL if this API is currently disconnecting from the service | ||
327 | */ | ||
328 | static struct Neighbour * | ||
329 | neighbour_add (struct GNUNET_TRANSPORT_Handle *h, | ||
330 | const struct GNUNET_PeerIdentity *pid) | ||
331 | { | ||
332 | struct Neighbour *n; | ||
333 | |||
334 | #if DEBUG_TRANSPORT | ||
335 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
336 | "Creating entry for neighbour `%4s'.\n", | ||
337 | GNUNET_i2s (pid)); | ||
338 | #endif | ||
339 | n = GNUNET_malloc (sizeof (struct Neighbour)); | ||
340 | n->id = *pid; | ||
341 | n->h = h; | ||
342 | n->is_ready = GNUNET_YES; | ||
343 | GNUNET_BANDWIDTH_tracker_init (&n->out_tracker, | ||
344 | GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | ||
345 | MAX_BANDWIDTH_CARRY_S); | ||
346 | GNUNET_CONTAINER_multihashmap_put (h->neighbours, | ||
347 | &pid->hashPubKey, | ||
348 | n, | ||
349 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
350 | return n; | ||
351 | } | ||
352 | |||
353 | |||
354 | /** | ||
355 | * Iterator over hash map entries, for deleting state of a neighbour. | ||
356 | * | ||
357 | * @param cls the 'struct GNUNET_TRANSPORT_Handle*' | ||
358 | * @param key peer identity | ||
359 | * @param value value in the hash map, the neighbour entry to delete | ||
360 | * @return GNUNET_YES if we should continue to | ||
361 | * iterate, | ||
362 | * GNUNET_NO if not. | ||
363 | */ | ||
364 | static int | ||
365 | neighbour_delete (void *cls, | ||
366 | const GNUNET_HashCode * key, | ||
367 | void *value) | ||
368 | { | ||
369 | struct GNUNET_TRANSPORT_Handle *handle = cls; | ||
370 | struct Neighbour *n = value; | ||
371 | |||
372 | handle->nd_cb (handle->cls, | ||
373 | &n->id); | ||
374 | GNUNET_assert (NULL == n->th); | ||
375 | GNUNET_assert (NULL == n->hn); | ||
376 | GNUNET_CONTAINER_multihashmap_remove (handle->neighbours, | ||
377 | key, | ||
378 | n); | ||
379 | GNUNET_free (n); | ||
380 | return GNUNET_YES; | ||
381 | } | ||
382 | |||
383 | |||
384 | /** | ||
385 | * Function we use for handling incoming messages. | ||
386 | * | ||
387 | * @param cls closure (struct GNUNET_TRANSPORT_Handle *) | ||
388 | * @param msg message received, NULL on timeout or fatal error | ||
389 | */ | ||
390 | static void | ||
391 | demultiplexer (void *cls, | ||
392 | const struct GNUNET_MessageHeader *msg) | ||
393 | { | ||
394 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
395 | const struct DisconnectInfoMessage *dim; | ||
396 | const struct ConnectInfoMessage *cim; | ||
397 | const struct InboundMessage *im; | ||
398 | const struct GNUNET_MessageHeader *imm; | ||
399 | const struct SendOkMessage *okm; | ||
400 | struct HelloWaitList *hwl; | ||
401 | struct HelloWaitList *next_hwl; | ||
402 | struct NeighbourList *n; | ||
403 | struct GNUNET_PeerIdentity me; | ||
404 | uint16_t size; | ||
405 | uint32_t ats_count; | ||
406 | |||
407 | GNUNET_assert (h->client != NULL); | ||
408 | if (msg == NULL) | ||
409 | { | ||
410 | #if DEBUG_TRANSPORT | ||
411 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
412 | "Error receiving from transport service, disconnecting temporarily.\n"); | ||
413 | #endif | ||
414 | disconnect_and_schedule_reconnect (h); | ||
415 | return; | ||
416 | } | ||
417 | GNUNET_CLIENT_receive (h->client, | ||
418 | &demultiplexer, h, | ||
419 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
420 | size = ntohs (msg->size); | ||
421 | switch (ntohs (msg->type)) | ||
422 | { | ||
423 | case GNUNET_MESSAGE_TYPE_HELLO: | ||
424 | if (GNUNET_OK != | ||
425 | GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, | ||
426 | &me)) | ||
427 | { | ||
428 | GNUNET_break (0); | ||
429 | break; | ||
430 | } | ||
431 | #if DEBUG_TRANSPORT | ||
432 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
433 | "Receiving (my own) `%s' message, I am `%4s'.\n", | ||
434 | "HELLO", GNUNET_i2s (&me)); | ||
435 | #endif | ||
436 | GNUNET_free_non_null (h->my_hello); | ||
437 | h->my_hello = NULL; | ||
438 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
439 | { | ||
440 | GNUNET_break (0); | ||
441 | break; | ||
442 | } | ||
443 | h->my_hello = GNUNET_malloc (size); | ||
444 | memcpy (h->my_hello, msg, size); | ||
445 | hwl = h->hwl_head; | ||
446 | while (NULL != hwl) | ||
447 | { | ||
448 | next_hwl = hwl->next; | ||
449 | hwl->rec (hwl->rec_cls, | ||
450 | (const struct GNUNET_MessageHeader *) h->my_hello); | ||
451 | hwl = next_hwl; | ||
452 | } | ||
453 | break; | ||
454 | case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT: | ||
455 | if (size < sizeof (struct ConnectInfoMessage)) | ||
456 | { | ||
457 | GNUNET_break (0); | ||
458 | break; | ||
459 | } | ||
460 | cim = (const struct ConnectInfoMessage *) msg; | ||
461 | ats_count = ntohl (cim->ats_count); | ||
462 | if (size != sizeof (struct ConnectInfoMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) | ||
463 | { | ||
464 | GNUNET_break (0); | ||
465 | break; | ||
466 | } | ||
467 | #if DEBUG_TRANSPORT | ||
468 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
469 | "Receiving `%s' message for `%4s'.\n", | ||
470 | "CONNECT", GNUNET_i2s (&cim->id)); | ||
471 | #endif | ||
472 | n = neighbour_find (h, &cim->id); | ||
473 | if (n != NULL) | ||
474 | { | ||
475 | GNUNET_break (0); | ||
476 | break; | ||
477 | } | ||
478 | n = neighbour_add (h, &cim->id); | ||
479 | if (h->nc_cb != NULL) | ||
480 | h->nc_cb (h->cls, &n->id, | ||
481 | &cim->ats, ats_count); | ||
482 | break; | ||
483 | case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: | ||
484 | if (size != sizeof (struct DisconnectInfoMessage)) | ||
485 | { | ||
486 | GNUNET_break (0); | ||
487 | break; | ||
488 | } | ||
489 | dim = (const struct DisconnectInfoMessage *) msg; | ||
490 | GNUNET_break (ntohl (dim->reserved) == 0); | ||
491 | #if DEBUG_TRANSPORT_DISCONNECT | ||
492 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
493 | "Receiving `%s' message for `%4s'.\n", | ||
494 | "DISCONNECT", | ||
495 | GNUNET_i2s (&dim->peer)); | ||
496 | #endif | ||
497 | n = neighbour_find (h, &dim->peer); | ||
498 | if (n == NULL) | ||
499 | { | ||
500 | GNUNET_break (0); | ||
501 | break; | ||
502 | } | ||
503 | neighbour_delete (h, &dim->peer.hashPubKey, n); | ||
504 | break; | ||
505 | case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: | ||
506 | if (size != sizeof (struct SendOkMessage)) | ||
507 | { | ||
508 | GNUNET_break (0); | ||
509 | break; | ||
510 | } | ||
511 | okm = (const struct SendOkMessage *) msg; | ||
512 | #if DEBUG_TRANSPORT | ||
513 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
514 | "Receiving `%s' message, transmission %s.\n", "SEND_OK", | ||
515 | ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); | ||
516 | #endif | ||
517 | n = neighbour_find (h, &okm->peer); | ||
518 | if (n == NULL) | ||
519 | { | ||
520 | GNUNET_break (0); | ||
521 | break; | ||
522 | } | ||
523 | GNUNET_break (GNUNET_NO == n->is_ready); | ||
524 | n->is_ready = GNUNET_YES; | ||
525 | if ( (n->th != NULL) && | ||
526 | (n->hn == NULL) ) | ||
527 | { | ||
528 | /* we've been waiting for this (congestion, not quota, | ||
529 | caused delayed transmission) */ | ||
530 | n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, | ||
531 | n, 0); | ||
532 | schedule_transmission (h); | ||
533 | } | ||
534 | break; | ||
535 | case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: | ||
536 | #if DEBUG_TRANSPORT | ||
537 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
538 | "Receiving `%s' message.\n", "RECV"); | ||
539 | #endif | ||
540 | if (size < | ||
541 | sizeof (struct InboundMessage) + | ||
542 | sizeof (struct GNUNET_MessageHeader)) | ||
543 | { | ||
544 | GNUNET_break (0); | ||
545 | break; | ||
546 | } | ||
547 | im = (const struct InboundMessage *) msg; | ||
548 | GNUNET_break (0 == ntohl (im->reserved)); | ||
549 | ats_count = ntohl(im->ats_count); | ||
550 | imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count+1]); | ||
551 | |||
552 | if (ntohs (imm->size) + sizeof (struct InboundMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) != size) | ||
553 | { | ||
554 | GNUNET_break (0); | ||
555 | break; | ||
556 | } | ||
557 | #if DEBUG_TRANSPORT | ||
558 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
559 | "Received message of type %u from `%4s'.\n", | ||
560 | ntohs (imm->type), GNUNET_i2s (&im->peer)); | ||
561 | #endif | ||
562 | n = neighbour_find (h, &im->peer); | ||
563 | if (n == NULL) | ||
564 | { | ||
565 | GNUNET_break (0); | ||
566 | break; | ||
567 | } | ||
568 | if (h->rec != NULL) | ||
569 | h->rec (h->cls, &im->peer, imm, | ||
570 | &im->ats, ats_count); | ||
571 | break; | ||
572 | default: | ||
573 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
574 | _ | ||
575 | ("Received unexpected message of type %u in %s:%u\n"), | ||
576 | ntohs (msg->type), __FILE__, __LINE__); | ||
577 | GNUNET_break (0); | ||
578 | break; | ||
579 | } | ||
580 | } | ||
581 | |||
582 | |||
583 | /** | ||
584 | * Transmit message(s) to service. | ||
585 | * | ||
586 | * @param cls handle to transport | ||
587 | * @param size number of bytes available in buf | ||
588 | * @param buf where to copy the message | ||
589 | * @return number of bytes copied to buf | ||
590 | */ | ||
591 | static size_t | ||
592 | transport_notify_ready (void *cls, size_t size, void *buf) | ||
593 | { | ||
594 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
595 | size_t ssize; | ||
596 | struct GNUNET_TRANSPORT_TransmitHandle *th; | ||
597 | struct Neighbour *n; | ||
598 | char *cbuf; | ||
599 | size_t ret; | ||
600 | |||
601 | GNUNET_assert (NULL != h->client); | ||
602 | h->cth = NULL; | ||
603 | if (NULL == buf) | ||
604 | { | ||
605 | /* transmission failed */ | ||
606 | disconnect_and_schedule_reconnect (h); | ||
607 | return 0; | ||
608 | } | ||
609 | |||
610 | cbuf = buf; | ||
611 | ret = 0; | ||
612 | /* first send control messages */ | ||
613 | while ( (NULL != (th = h->control_head)) && | ||
614 | (th->notify_size <= size) ) | ||
615 | { | ||
616 | GNUNET_CONTAINER_DLL_remove (h->control_head, | ||
617 | h->control_tail, | ||
618 | th); | ||
619 | nret = th->notify (th->notify_cls, size, &cbuf[ret]); | ||
620 | #if DEBUG_TRANSPORT | ||
621 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
622 | "Added %u bytes of control message at %u\n", | ||
623 | nret, | ||
624 | ret); | ||
625 | #endif | ||
626 | GNUNET_free (th); | ||
627 | ret += nret; | ||
628 | size -= nret; | ||
629 | } | ||
630 | |||
631 | /* then, if possible and no control messages pending, send data messages */ | ||
632 | while ( (NULL == h->control_head) && | ||
633 | (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) ) | ||
634 | { | ||
635 | if (GNUNET_YES != n->is_ready) | ||
636 | { | ||
637 | /* peer not ready, wait for notification! */ | ||
638 | GNUNET_CONTAINER_heap_remove_node (n->hn); | ||
639 | n->hn = NULL; | ||
640 | /* FIXME: hitting transport-level congestion, add | ||
641 | a timeout task for 'th' in this case! */ | ||
642 | continue; | ||
643 | } | ||
644 | th = n->th; | ||
645 | if (th->notify_size + sizeof (struct OutboundMessage) > size) | ||
646 | break; /* does not fit */ | ||
647 | n->th = NULL; | ||
648 | n->is_ready = GNUNET_NO; | ||
649 | GNUNET_assert (size >= sizeof (struct OutboundMessage)); | ||
650 | mret = th->notify (th->notify_cls, | ||
651 | size - sizeof (struct OutboundMessage), | ||
652 | &cbuf[ret + sizeof (struct OutboundMessage)]); | ||
653 | GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); | ||
654 | if (mret != 0) | ||
655 | { | ||
656 | GNUNET_assert (mret + sizeof (struct OutboundMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE); | ||
657 | obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); | ||
658 | obm.header.size = htons (mret + sizeof (struct OutboundMessage)); | ||
659 | obm.priority = htonl (th->priority); | ||
660 | obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout)); | ||
661 | obm.peer = n->id; | ||
662 | memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage)); | ||
663 | ret += (mret + sizeof (struct OutboundMessage)); | ||
664 | size -= (mret + sizeof (struct OutboundMessage)); | ||
665 | GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret); | ||
666 | } | ||
667 | GNUNET_free (th); | ||
668 | } | ||
669 | /* if there are more pending messages, try to schedule those */ | ||
670 | schedule_transmission (h); | ||
671 | #if DEBUG_TRANSPORT | ||
672 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
673 | "Transmitting %u bytes to transport service\n", ret); | ||
674 | #endif | ||
675 | return ret; | ||
676 | } | ||
677 | |||
678 | |||
679 | /** | ||
680 | * Schedule the task to send one message, either from the control | ||
681 | * list or the peer message queues to the service. | ||
682 | * | ||
683 | * @param cls transport service to schedule a transmission for | ||
684 | * @param tc scheduler context | ||
685 | */ | ||
686 | static void | ||
687 | schedule_transmission_task (void *cls, | ||
688 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
689 | { | ||
690 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
691 | size_t size; | ||
692 | struct GNUNET_TRANSPORT_TransmitHandle *th; | ||
693 | struct Neighbour *n; | ||
694 | |||
695 | h->quota_task = GNUNET_SCHEDULER_NO_TASK; | ||
696 | GNUNET_assert (NULL != h->client); | ||
697 | /* destroy all requests that have timed out */ | ||
698 | while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) && | ||
699 | (GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value == 0) ) | ||
700 | { | ||
701 | /* notify client that the request could not be satisfied within | ||
702 | the given time constraints */ | ||
703 | th = n->th; | ||
704 | n->th = NULL; | ||
705 | GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); | ||
706 | n->hn = NULL; | ||
707 | GNUNET_assert (0 == | ||
708 | th->notify (th->notify_cls, 0, NULL)); | ||
709 | GNUNET_free (th); | ||
710 | } | ||
711 | if (NULL != h->cth) | ||
712 | return; | ||
713 | if (NULL != h->control_head) | ||
714 | { | ||
715 | size = h->control_head->notify_size; | ||
716 | } | ||
717 | else | ||
718 | { | ||
719 | n = GNUNET_CONTAINER_heap_peek (h->ready_heap); | ||
720 | if (NULL == n) | ||
721 | return; /* no pending messages */ | ||
722 | size = n->th->notify_size + sizeof (struct OutboundMessage); | ||
723 | } | ||
724 | #if DEBUG_TRANSPORT | ||
725 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
726 | "Calling notify_transmit_ready\n"); | ||
727 | #endif | ||
728 | h->cth = | ||
729 | GNUNET_CLIENT_notify_transmit_ready (h->client, | ||
730 | size, | ||
731 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
732 | GNUNET_NO, | ||
733 | &transport_notify_ready, | ||
734 | h); | ||
735 | GNUNET_assert (NULL != h->cth); | ||
736 | } | ||
737 | |||
738 | |||
739 | /** | ||
740 | * Schedule the task to send one message, either from the control | ||
741 | * list or the peer message queues to the service. | ||
742 | * | ||
743 | * @param h transport service to schedule a transmission for | ||
744 | */ | ||
745 | static void | ||
746 | schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) | ||
747 | { | ||
748 | struct GNUNET_TIME_Relative delay; | ||
749 | struct Neighbour *n; | ||
750 | |||
751 | GNUNET_assert (NULL != h->client); | ||
752 | if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) | ||
753 | { | ||
754 | GNUNET_SCHEDULER_cancel (h->quota_task); | ||
755 | h->quota_task = GNUNET_SCHEDULER_NO_TASK; | ||
756 | } | ||
757 | if (NULL != h->control_head) | ||
758 | delay = GNUNET_TIME_UNIT_ZERO; | ||
759 | else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) | ||
760 | delay = GNUNET_TIME_absolute_get_remaining (n->th->timeout); | ||
761 | else | ||
762 | return; /* no work to be done */ | ||
763 | h->quota_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
764 | &schedule_transmission_task, | ||
765 | h); | ||
766 | } | ||
767 | |||
768 | |||
769 | /** | ||
770 | * Queue control request for transmission to the transport | ||
771 | * service. | ||
772 | * | ||
773 | * @param h handle to the transport service | ||
774 | * @param size number of bytes to be transmitted | ||
775 | * @param notify function to call to get the content | ||
776 | * @param notify_cls closure for notify | ||
777 | */ | ||
778 | static void | ||
779 | schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, | ||
780 | size_t size, | ||
781 | GNUNET_CONNECTION_TransmitReadyNotify notify, | ||
782 | void *notify_cls) | ||
783 | { | ||
784 | struct GNUNET_TRANSPORT_TransmitHandle *th; | ||
785 | |||
786 | #if DEBUG_TRANSPORT | ||
787 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
788 | "Control transmit of %u bytes requested\n", | ||
789 | size); | ||
790 | #endif | ||
791 | th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); | ||
792 | th->notify = notify; | ||
793 | th->notify_cls = notify_cls; | ||
794 | th->notify_size = size; | ||
795 | GNUNET_CONTAINER_DLL_insert (h->control_head, | ||
796 | h->control_tail, | ||
797 | th); | ||
798 | schedule_transmission (h); | ||
799 | } | ||
800 | |||
801 | |||
802 | /** | ||
803 | * Transmit START message to service. | ||
804 | * | ||
805 | * @param cls unused | ||
806 | * @param size number of bytes available in buf | ||
807 | * @param buf where to copy the message | ||
808 | * @return number of bytes copied to buf | ||
809 | */ | ||
810 | static size_t | ||
811 | send_start (void *cls, size_t size, void *buf) | ||
812 | { | ||
813 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
814 | struct StartMessage s; | ||
815 | |||
816 | if (buf == NULL) | ||
817 | { | ||
818 | /* Can only be shutdown, just give up */ | ||
819 | #if DEBUG_TRANSPORT | ||
820 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
821 | "Shutdown while trying to transmit `%s' request.\n", | ||
822 | "START"); | ||
823 | #endif | ||
824 | return 0; | ||
825 | } | ||
826 | #if DEBUG_TRANSPORT | ||
827 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
828 | "Transmitting `%s' request.\n", "START"); | ||
829 | #endif | ||
830 | GNUNET_assert (size >= sizeof (struct StartMessage)); | ||
831 | s.header.size = htons (sizeof (struct StartMessage)); | ||
832 | s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START); | ||
833 | s.do_check = htonl (h->check_self); | ||
834 | s.self = h->self; | ||
835 | memcpy (buf, &s, sizeof (struct StartMessage)); | ||
836 | return sizeof (struct StartMessage); | ||
837 | } | ||
838 | |||
839 | |||
840 | /** | ||
841 | * Try again to connect to transport service. | ||
842 | * | ||
843 | * @param cls the handle to the transport service | ||
844 | * @param tc scheduler context | ||
845 | */ | ||
846 | static void | ||
847 | reconnect (void *cls, | ||
848 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
849 | { | ||
850 | struct GNUNET_TRANSPORT_Handle *h = cls; | ||
851 | struct ControlMessage *pos; | ||
852 | |||
853 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
854 | if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) | ||
855 | { | ||
856 | /* shutdown, just give up */ | ||
857 | return; | ||
858 | } | ||
859 | #if DEBUG_TRANSPORT | ||
860 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
861 | "Connecting to transport service.\n"); | ||
862 | #endif | ||
863 | GNUNET_assert (h->client == NULL); | ||
864 | GNUNET_assert (h->control_head == NULL); | ||
865 | GNUNET_assert (h->control_tail == NULL); | ||
866 | h->client = GNUNET_CLIENT_connect ("transport", h->cfg); | ||
867 | GNUNET_assert (h->client != NULL); | ||
868 | schedule_control_transmit (h, | ||
869 | sizeof (struct StartMessage), | ||
870 | &send_start, h); | ||
871 | GNUNET_CLIENT_receive (h->client, | ||
872 | &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL); | ||
873 | } | ||
874 | |||
875 | |||
876 | /** | ||
877 | * Function that will schedule the job that will try | ||
878 | * to connect us again to the client. | ||
879 | * | ||
880 | * @param h transport service to reconnect | ||
881 | */ | ||
882 | static void | ||
883 | disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) | ||
884 | { | ||
885 | GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | ||
886 | /* Forget about all neighbours that we used to be connected to */ | ||
887 | GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, | ||
888 | &neighbour_delete, | ||
889 | NULL); | ||
890 | if (NULL != handle->cth) | ||
891 | { | ||
892 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); | ||
893 | handle->cth = NULL; | ||
894 | } | ||
895 | if (NULL != handle->client) | ||
896 | { | ||
897 | GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); | ||
898 | handle->client = NULL; | ||
899 | } | ||
900 | if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) | ||
901 | { | ||
902 | GNUNET_SCHEDULER_cancel (h->quota_task); | ||
903 | h->quota_task = GNUNET_SCHEDULER_NO_TASK; | ||
904 | } | ||
905 | while ( (NULL != (th = handle->control_head))) | ||
906 | { | ||
907 | GNUNET_CONTAINER_DLL_remove (handle->control_head, | ||
908 | handle->control_tail, | ||
909 | cm); | ||
910 | cm->notify (cm->notify_cls, 0, NULL); | ||
911 | GNUNET_free (cm); | ||
912 | } | ||
913 | #if DEBUG_TRANSPORT | ||
914 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
915 | "Scheduling task to reconnect to transport service in %llu ms.\n", | ||
916 | h->reconnect_delay.rel_value); | ||
917 | #endif | ||
918 | h->reconnect_task | ||
919 | = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
920 | &reconnect, h); | ||
921 | if (h->reconnect_delay.rel_value == 0) | ||
922 | { | ||
923 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
924 | } | ||
925 | else | ||
926 | { | ||
927 | h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2); | ||
928 | h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, | ||
929 | h->reconnect_delay); | ||
930 | } | ||
931 | } | ||
932 | |||
933 | |||
934 | /** | ||
935 | * Closure for 'send_set_quota'. | ||
936 | */ | ||
937 | struct SetQuotaContext | ||
938 | { | ||
939 | |||
940 | /** | ||
941 | * Identity of the peer impacted by the quota change. | ||
942 | */ | ||
943 | struct GNUNET_PeerIdentity target; | ||
944 | |||
945 | /** | ||
946 | * Quota to transmit. | ||
947 | */ | ||
948 | struct GNUNET_BANDWIDTH_Value32NBO quota_in; | ||
949 | }; | ||
950 | |||
951 | |||
952 | /** | ||
953 | * Send SET_QUOTA message to the service. | ||
954 | * | ||
955 | * @param cls the 'struct SetQuotaContext' | ||
956 | * @param size number of bytes available in buf | ||
957 | * @param buf where to copy the message | ||
958 | * @return number of bytes copied to buf | ||
959 | */ | ||
960 | static size_t | ||
961 | send_set_quota (void *cls, size_t size, void *buf) | ||
962 | { | ||
963 | struct SetQuotaContext *sqc = cls; | ||
964 | struct QuotaSetMessage msg; | ||
965 | |||
966 | if (buf == NULL) | ||
967 | { | ||
968 | GNUNET_free (sqc); | ||
969 | return 0; | ||
970 | } | ||
971 | #if DEBUG_TRANSPORT | ||
972 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
973 | "Transmitting `%s' request with respect to `%4s'.\n", | ||
974 | "SET_QUOTA", | ||
975 | GNUNET_i2s (&sqc->target)); | ||
976 | #endif | ||
977 | GNUNET_assert (size >= sizeof (struct QuotaSetMessage)); | ||
978 | msg.header.size = htons (sizeof (struct QuotaSetMessage)); | ||
979 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); | ||
980 | msg.quota = sqc->quota_in; | ||
981 | msg.peer = sqc->target; | ||
982 | memcpy (buf, &msg, sizeof (msg)); | ||
983 | GNUNET_free (sqc); | ||
984 | return sizeof (struct QuotaSetMessage); | ||
985 | } | ||
986 | |||
987 | |||
988 | /** | ||
989 | * Set the share of incoming bandwidth for the given | ||
990 | * peer to the specified amount. | ||
991 | * | ||
992 | * @param handle connection to transport service | ||
993 | * @param target who's bandwidth quota is being changed | ||
994 | * @param quota_in incoming bandwidth quota in bytes per ms | ||
995 | * @param quota_out outgoing bandwidth quota in bytes per ms | ||
996 | */ | ||
997 | void | ||
998 | GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, | ||
999 | const struct GNUNET_PeerIdentity *target, | ||
1000 | struct GNUNET_BANDWIDTH_Value32NBO quota_in, | ||
1001 | struct GNUNET_BANDWIDTH_Value32NBO quota_out) | ||
1002 | { | ||
1003 | struct Neighbour *n; | ||
1004 | struct SetQuotaContext *sqc; | ||
1005 | |||
1006 | n = neighbour_find (handle, target); | ||
1007 | if (NULL == n) | ||
1008 | { | ||
1009 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1010 | "Quota changed to %u for peer `%s', but I have no such neighbour!\n", | ||
1011 | (unsigned int) ntohl (quota_out.value__), | ||
1012 | GNUNET_i2s (target)); | ||
1013 | return; | ||
1014 | } | ||
1015 | #if DEBUG_TRANSPORT | ||
1016 | if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__) | ||
1017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1018 | "Quota changed from %u to %u for peer `%s'\n", | ||
1019 | (unsigned int) n->out_tracker.available_bytes_per_s__, | ||
1020 | (unsigned int) ntohl (quota_out.value__), | ||
1021 | GNUNET_i2s (target)); | ||
1022 | else | ||
1023 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1024 | "Quota remains at %u for peer `%s'\n", | ||
1025 | (unsigned int) n->out_tracker.available_bytes_per_s__, | ||
1026 | GNUNET_i2s (target)); | ||
1027 | #endif | ||
1028 | GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, | ||
1029 | quota_out); | ||
1030 | sqc = GNUNET_malloc (sizeof (struct SetQuotaContext)); | ||
1031 | sqc->target = *target; | ||
1032 | sqc->quota_in = quota_in; | ||
1033 | schedule_control_transmit (handle, | ||
1034 | sizeof (struct QuotaSetMessage), | ||
1035 | &send_set_quota, sqc); | ||
1036 | } | ||
1037 | |||
1038 | |||
1039 | /** | ||
1040 | * Send TRY_CONNECT message to the service. | ||
1041 | * | ||
1042 | * @param cls the 'struct GNUNET_PeerIdentity' | ||
1043 | * @param size number of bytes available in buf | ||
1044 | * @param buf where to copy the message | ||
1045 | * @return number of bytes copied to buf | ||
1046 | */ | ||
1047 | static size_t | ||
1048 | send_try_connect (void *cls, size_t size, void *buf) | ||
1049 | { | ||
1050 | struct GNUNET_PeerIdentity *pid = cls; | ||
1051 | struct TryConnectMessage msg; | ||
1052 | |||
1053 | if (buf == NULL) | ||
1054 | { | ||
1055 | GNUNET_free (pid); | ||
1056 | return 0; | ||
1057 | } | ||
1058 | #if DEBUG_TRANSPORT | ||
1059 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1060 | "Transmitting `%s' request with respect to `%4s'.\n", | ||
1061 | "TRY_CONNECT", | ||
1062 | GNUNET_i2s (&sqc->target)); | ||
1063 | #endif | ||
1064 | GNUNET_assert (size >= sizeof (struct TryConnectMessage)); | ||
1065 | msg.header.size = htons (sizeof (struct TryConnectMessage)); | ||
1066 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); | ||
1067 | msg.reserved = htonl (0); | ||
1068 | msg.peer = *pid; | ||
1069 | memcpy (buf, &msg, sizeof (msg)); | ||
1070 | GNUNET_free (pid); | ||
1071 | return sizeof (struct TryConnectMessage); | ||
1072 | } | ||
1073 | |||
1074 | |||
1075 | /** | ||
1076 | * Ask the transport service to establish a connection to | ||
1077 | * the given peer. | ||
1078 | * | ||
1079 | * @param handle connection to transport service | ||
1080 | * @param target who we should try to connect to | ||
1081 | */ | ||
1082 | void | ||
1083 | GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle, | ||
1084 | const struct GNUNET_PeerIdentity *target) | ||
1085 | { | ||
1086 | struct GNUNET_PeerIdentity *pid; | ||
1087 | |||
1088 | pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); | ||
1089 | *pid = *target; | ||
1090 | schedule_control_transmit (handle, | ||
1091 | sizeof (struct TryConnectMessage), | ||
1092 | &send_try_connect, pid); | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | ||
1097 | * Send HELLO message to the service. | ||
1098 | * | ||
1099 | * @param cls the HELLO message to send | ||
1100 | * @param size number of bytes available in buf | ||
1101 | * @param buf where to copy the message | ||
1102 | * @return number of bytes copied to buf | ||
1103 | */ | ||
1104 | static size_t | ||
1105 | send_hello (void *cls, size_t size, void *buf) | ||
1106 | { | ||
1107 | struct GNUNET_MessageHeader *msg = cls; | ||
1108 | uint16_t ssize; | ||
1109 | |||
1110 | if (buf == NULL) | ||
1111 | { | ||
1112 | #if DEBUG_TRANSPORT_TIMEOUT | ||
1113 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1114 | "Timeout while trying to transmit `%s' request.\n", | ||
1115 | "HELLO"); | ||
1116 | #endif | ||
1117 | GNUNET_free (msg); | ||
1118 | return 0; | ||
1119 | } | ||
1120 | #if DEBUG_TRANSPORT | ||
1121 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1122 | "Transmitting `%s' request.\n", "HELLO"); | ||
1123 | #endif | ||
1124 | ssize = ntohs (msg->size); | ||
1125 | GNUNET_assert (size >= ssize); | ||
1126 | memcpy (buf, msg, ssize); | ||
1127 | GNUNET_free (msg); | ||
1128 | return ssize; | ||
1129 | } | ||
1130 | |||
1131 | |||
1132 | /** | ||
1133 | * Offer the transport service the HELLO of another peer. Note that | ||
1134 | * the transport service may just ignore this message if the HELLO is | ||
1135 | * malformed or useless due to our local configuration. | ||
1136 | * | ||
1137 | * @param handle connection to transport service | ||
1138 | * @param hello the hello message | ||
1139 | * @param cont continuation to call when HELLO has been sent | ||
1140 | * @param cls closure for continuation | ||
1141 | * | ||
1142 | */ | ||
1143 | void | ||
1144 | GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, | ||
1145 | const struct GNUNET_MessageHeader *hello, | ||
1146 | GNUNET_SCHEDULER_Task cont, | ||
1147 | void *cls) | ||
1148 | { | ||
1149 | uint16_t size; | ||
1150 | struct GNUNET_PeerIdentity peer; | ||
1151 | struct GNUNET_MessageHeader *msg; | ||
1152 | |||
1153 | if (NULL == handle->client) | ||
1154 | return; | ||
1155 | GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); | ||
1156 | size = ntohs (hello->size); | ||
1157 | GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); | ||
1158 | if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) hello, | ||
1159 | &peer)) | ||
1160 | { | ||
1161 | GNUNET_break (0); | ||
1162 | return; | ||
1163 | } | ||
1164 | msg = GNUNET_malloc(size); | ||
1165 | memcpy (msg, hello, size); | ||
1166 | #if DEBUG_TRANSPORT | ||
1167 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1168 | "Offering `%s' message of `%4s' to transport for validation.\n", | ||
1169 | "HELLO", | ||
1170 | GNUNET_i2s (&peer)); | ||
1171 | #endif | ||
1172 | schedule_control_transmit (handle, | ||
1173 | size, | ||
1174 | &send_hello, msg); | ||
1175 | } | ||
1176 | |||
1177 | |||
1178 | /** | ||
1179 | * Obtain the HELLO message for this peer. | ||
1180 | * | ||
1181 | * @param handle connection to transport service | ||
1182 | * @param rec function to call with the HELLO, sender will be our peer | ||
1183 | * identity; message and sender will be NULL on timeout | ||
1184 | * (handshake with transport service pending/failed). | ||
1185 | * cost estimate will be 0. | ||
1186 | * @param rec_cls closure for rec | ||
1187 | */ | ||
1188 | void | ||
1189 | GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, | ||
1190 | GNUNET_TRANSPORT_HelloUpdateCallback rec, | ||
1191 | void *rec_cls) | ||
1192 | { | ||
1193 | struct HelloWaitList *hwl; | ||
1194 | |||
1195 | hwl = GNUNET_malloc (sizeof (struct HelloWaitList)); | ||
1196 | hwl->rec = rec; | ||
1197 | hwl->rec_cls = rec_cls; | ||
1198 | GNUNET_CONTAINER_DLL_insert (handle->hwl_head, | ||
1199 | handle->hwl_tail, | ||
1200 | hwl); | ||
1201 | if (handle->my_hello == NULL) | ||
1202 | return; | ||
1203 | rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello); | ||
1204 | } | ||
1205 | |||
1206 | |||
1207 | /** | ||
1208 | * Stop receiving updates about changes to our HELLO message. | ||
1209 | * | ||
1210 | * @param handle connection to transport service | ||
1211 | * @param rec function previously registered to be called with the HELLOs | ||
1212 | * @param rec_cls closure for rec | ||
1213 | */ | ||
1214 | void | ||
1215 | GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle, | ||
1216 | GNUNET_TRANSPORT_HelloUpdateCallback rec, | ||
1217 | void *rec_cls) | ||
1218 | { | ||
1219 | struct HelloWaitList *pos; | ||
1220 | |||
1221 | pos = handle->hwl_head; | ||
1222 | while (pos != NULL) | ||
1223 | { | ||
1224 | if ( (pos->rec == rec) && | ||
1225 | (pos->rec_cls == rec_cls) ) | ||
1226 | break; | ||
1227 | pos = pos->next; | ||
1228 | } | ||
1229 | GNUNET_break (pos != NULL); | ||
1230 | if (pos == NULL) | ||
1231 | return; | ||
1232 | GNUNET_CONTAINER_DLL_remove (handle->hwl_head, | ||
1233 | handle->hwl_tail, | ||
1234 | pos); | ||
1235 | GNUNET_free (pos); | ||
1236 | } | ||
1237 | |||
1238 | |||
1239 | /** | ||
1240 | * Connect to the transport service. Note that the connection may | ||
1241 | * complete (or fail) asynchronously. | ||
1242 | * | ||
1243 | * @param cfg configuration to use | ||
1244 | * @param self our own identity (API should check that it matches | ||
1245 | * the identity found by transport), or NULL (no check) | ||
1246 | * @param cls closure for the callbacks | ||
1247 | * @param rec receive function to call | ||
1248 | * @param nc function to call on connect events | ||
1249 | * @param nd function to call on disconnect events | ||
1250 | */ | ||
1251 | struct GNUNET_TRANSPORT_Handle * | ||
1252 | GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
1253 | const struct GNUNET_PeerIdentity *self, | ||
1254 | void *cls, | ||
1255 | GNUNET_TRANSPORT_ReceiveCallback rec, | ||
1256 | GNUNET_TRANSPORT_NotifyConnect nc, | ||
1257 | GNUNET_TRANSPORT_NotifyDisconnect nd) | ||
1258 | { | ||
1259 | struct GNUNET_TRANSPORT_Handle *ret; | ||
1260 | |||
1261 | ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle)); | ||
1262 | if (self != NULL) | ||
1263 | { | ||
1264 | ret->self = *self; | ||
1265 | ret->check_self = GNUNET_YES; | ||
1266 | } | ||
1267 | ret->cfg = cfg; | ||
1268 | ret->cls = cls; | ||
1269 | ret->rec = rec; | ||
1270 | ret->nc_cb = nc; | ||
1271 | ret->nd_cb = nd; | ||
1272 | ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
1273 | ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE); | ||
1274 | ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1275 | ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); | ||
1276 | return ret; | ||
1277 | } | ||
1278 | |||
1279 | |||
1280 | /** | ||
1281 | * Disconnect from the transport service. | ||
1282 | * | ||
1283 | * @param handle handle to the service as returned from GNUNET_TRANSPORT_connect | ||
1284 | */ | ||
1285 | void | ||
1286 | GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) | ||
1287 | { | ||
1288 | struct GNUNET_TRANSPORT_TransmitHandle *cm; | ||
1289 | |||
1290 | #if DEBUG_TRANSPORT | ||
1291 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1292 | "Transport disconnect called!\n"); | ||
1293 | #endif | ||
1294 | /* this disconnects all neighbours... */ | ||
1295 | disconnect_and_schedule_reconnect (handle); | ||
1296 | /* and now we stop trying to connect again... */ | ||
1297 | if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | ||
1298 | { | ||
1299 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
1300 | handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
1301 | } | ||
1302 | GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours); | ||
1303 | handle->neighbours = NULL; | ||
1304 | if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK) | ||
1305 | { | ||
1306 | GNUNET_SCHEDULER_cancel (handle->quota_task); | ||
1307 | handle->quota_task = GNUNET_SCHEDULER_NO_TASK; | ||
1308 | } | ||
1309 | GNUNET_free_non_null (handle->my_hello); | ||
1310 | handle->my_hello = NULL; | ||
1311 | GNUNET_assert (handle->hwl_head == NULL); | ||
1312 | GNUNET_assert (handle->hwl_tail == NULL); | ||
1313 | GNUNET_free (handle); | ||
1314 | } | ||
1315 | |||
1316 | |||
1317 | /** | ||
1318 | * Check if we could queue a message of the given size for | ||
1319 | * transmission. The transport service will take both its | ||
1320 | * internal buffers and bandwidth limits imposed by the | ||
1321 | * other peer into consideration when answering this query. | ||
1322 | * | ||
1323 | * @param handle connection to transport service | ||
1324 | * @param target who should receive the message | ||
1325 | * @param size how big is the message we want to transmit? | ||
1326 | * @param priority how important is the message? | ||
1327 | * @param timeout after how long should we give up (and call | ||
1328 | * notify with buf NULL and size 0)? | ||
1329 | * @param notify function to call when we are ready to | ||
1330 | * send such a message | ||
1331 | * @param notify_cls closure for notify | ||
1332 | * @return NULL if someone else is already waiting to be notified | ||
1333 | * non-NULL if the notify callback was queued (can be used to cancel | ||
1334 | * using GNUNET_TRANSPORT_notify_transmit_ready_cancel) | ||
1335 | */ | ||
1336 | struct GNUNET_TRANSPORT_TransmitHandle * | ||
1337 | GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle, | ||
1338 | const struct GNUNET_PeerIdentity *target, | ||
1339 | size_t size, | ||
1340 | uint32_t priority, | ||
1341 | struct GNUNET_TIME_Relative timeout, | ||
1342 | GNUNET_CONNECTION_TransmitReadyNotify notify, | ||
1343 | void *notify_cls) | ||
1344 | { | ||
1345 | struct Neighbour *n; | ||
1346 | struct GNUNET_TRANSPORT_TransmitHandle *th; | ||
1347 | struct GNUNET_TIME_Relative delay; | ||
1348 | |||
1349 | n = neighbour_find (handle, target); | ||
1350 | if (NULL == n) | ||
1351 | { | ||
1352 | /* use GNUNET_TRANSPORT_try_connect first, only use this function | ||
1353 | once a connection has been established */ | ||
1354 | GNUNET_break (0); | ||
1355 | return NULL; | ||
1356 | } | ||
1357 | if (NULL != n->th) | ||
1358 | { | ||
1359 | /* attempt to send two messages at the same time to the same peer */ | ||
1360 | GNUNET_break (0); | ||
1361 | return NULL; | ||
1362 | } | ||
1363 | GNUNET_assert (NULL == n->hn); | ||
1364 | th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); | ||
1365 | th->neighbour = n; | ||
1366 | th->notify = notify; | ||
1367 | th->notify_cls = notify_cls; | ||
1368 | th->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
1369 | th->notify_size = size; | ||
1370 | th->priority = priority; | ||
1371 | n->th = th; | ||
1372 | /* calculate when our transmission should be ready */ | ||
1373 | delay = GNUNET_BANDWIDTH_tracker_get_delay (n->out_tracker, size); | ||
1374 | if (delay.rel_value > timeout.rel_value) | ||
1375 | delay.rel_value = 0; /* notify immediately (with failure) */ | ||
1376 | n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, | ||
1377 | n, | ||
1378 | delay.rel_value); | ||
1379 | schedule_transmission (h); | ||
1380 | return th; | ||
1381 | } | ||
1382 | |||
1383 | |||
1384 | /** | ||
1385 | * Cancel the specified transmission-ready notification. | ||
1386 | * | ||
1387 | * @param th handle returned from GNUNET_TRANSPORT_notify_transmit_ready | ||
1388 | */ | ||
1389 | void | ||
1390 | GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th) | ||
1391 | { | ||
1392 | struct Neighbour *n; | ||
1393 | |||
1394 | GNUNET_assert (NULL == th->next); | ||
1395 | GNUNET_assert (NULL == th->prev); | ||
1396 | n = th->neighbour; | ||
1397 | GNUNET_assert (th == n->th); | ||
1398 | n->th = NULL; | ||
1399 | if (n->hn != NULL) | ||
1400 | { | ||
1401 | GNUNET_CONTAINER_heap_remove_node (n->hn); | ||
1402 | n->hn = NULL; | ||
1403 | } | ||
1404 | GNUNET_free (th); | ||
1405 | } | ||
1406 | |||
1407 | |||
1408 | /* end of transport_api.c */ | ||