aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/plugin_transport_tcp.c')
-rw-r--r--src/transport/plugin_transport_tcp.c1782
1 files changed, 1782 insertions, 0 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
new file mode 100644
index 000000000..c87056e71
--- /dev/null
+++ b/src/transport/plugin_transport_tcp.c
@@ -0,0 +1,1782 @@
1/*
2 This file is part of GNUnet
3 (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file transport/plugin_transport_tcp.c
23 * @brief Implementation of the TCP transport service
24 * @author Christian Grothoff
25 */
26
27#include "platform.h"
28#include "gnunet_hello_lib.h"
29#include "gnunet_network_lib.h"
30#include "gnunet_os_lib.h"
31#include "gnunet_peerinfo_service.h"
32#include "gnunet_protocols.h"
33#include "gnunet_resolver_service.h"
34#include "gnunet_server_lib.h"
35#include "gnunet_service_lib.h"
36#include "gnunet_statistics_service.h"
37#include "gnunet_transport_service.h"
38#include "plugin_transport.h"
39#include "transport.h"
40
41#define DEBUG_TCP GNUNET_NO
42
43/**
44 * After how long do we expire an address that we
45 * learned from another peer if it is not reconfirmed
46 * by anyone?
47 */
48#define LEARNED_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 6)
49
50/**
51 * How long until we give up on transmitting the welcome message?
52 */
53#define WELCOME_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
54
55/**
56 * How long until we give up on transmitting the welcome message?
57 */
58#define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
59
60/**
61 * For how many messages back to we keep transmission times?
62 */
63#define ACK_LOG_SIZE 32
64
65/**
66 * Initial handshake message for a session. This header
67 * is followed by the address that the other peer used to
68 * connect to us (so that we may learn it) or the address
69 * that the other peer got from the accept call.
70 */
71struct WelcomeMessage
72{
73 struct GNUNET_MessageHeader header;
74
75 /**
76 * Identity of the node connecting (TCP client)
77 */
78 struct GNUNET_PeerIdentity clientIdentity;
79
80};
81
82
83/**
84 * Encapsulation for normal TCP traffic.
85 */
86struct DataMessage
87{
88 struct GNUNET_MessageHeader header;
89
90 /**
91 * For alignment.
92 */
93 uint32_t reserved GNUNET_PACKED;
94
95 /**
96 * Number of the last message that was received from the other peer.
97 */
98 uint64_t ack_in GNUNET_PACKED;
99
100 /**
101 * Number of this outgoing message.
102 */
103 uint64_t ack_out GNUNET_PACKED;
104
105 /**
106 * How long was sending this ack delayed by the other peer
107 * (estimate). The receiver of this message can use the delay
108 * between sending his message number 'ack' and receiving this ack
109 * minus the delay as an estimate of the round-trip time.
110 */
111 struct GNUNET_TIME_RelativeNBO delay;
112
113};
114
115
116/**
117 * Encapsulation of all of the state of the plugin.
118 */
119struct Plugin;
120
121
122/**
123 * Information kept for each message that is yet to
124 * be transmitted.
125 */
126struct PendingMessage
127{
128
129 /**
130 * This is a linked list.
131 */
132 struct PendingMessage *next;
133
134 /**
135 * The pending message, pointer to the end
136 * of this struct, do not free!
137 */
138 struct GNUNET_MessageHeader *msg;
139
140
141 /**
142 * Continuation function to call once the message
143 * has been sent. Can be NULL if there is no
144 * continuation to call.
145 */
146 GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
147
148 /**
149 * Closure for transmit_cont.
150 */
151 void *transmit_cont_cls;
152
153 /**
154 * Timeout value for the pending message.
155 */
156 struct GNUNET_TIME_Absolute timeout;
157
158 /**
159 * GNUNET_YES if this is a welcome message;
160 * otherwise this should be a DATA message.
161 */
162 int is_welcome;
163
164};
165
166
167/**
168 * Session handle for TCP connections.
169 */
170struct Session
171{
172
173 /**
174 * Stored in a linked list.
175 */
176 struct Session *next;
177
178 /**
179 * Pointer to the global plugin struct.
180 */
181 struct Plugin *plugin;
182
183 /**
184 * The client (used to identify this connection)
185 */
186 struct GNUNET_SERVER_Client *client;
187
188 /**
189 * gnunet-service-transport context for this connection.
190 */
191 struct ReadyList *service_context;
192
193 /**
194 * Messages currently pending for transmission
195 * to this peer, if any.
196 */
197 struct PendingMessage *pending_messages;
198
199 /**
200 * Handle for pending transmission request.
201 */
202 struct GNUNET_NETWORK_TransmitHandle *transmit_handle;
203
204 /**
205 * To whom are we talking to (set to our identity
206 * if we are still waiting for the welcome message)
207 */
208 struct GNUNET_PeerIdentity target;
209
210 /**
211 * At what time did we reset last_received last?
212 */
213 struct GNUNET_TIME_Absolute last_quota_update;
214
215 /**
216 * Address of the other peer if WE initiated the connection
217 * (and hence can be sure what it is), otherwise NULL.
218 */
219 void *connect_addr;
220
221 /**
222 * How many bytes have we received since the "last_quota_update"
223 * timestamp?
224 */
225 uint64_t last_received;
226
227 /**
228 * Our current latency estimate (in ms).
229 */
230 double latency_estimate;
231
232 /**
233 * Time when we generated the last ACK_LOG_SIZE acks.
234 * (the "last" refers to the "out_msg_counter" here)
235 */
236 struct GNUNET_TIME_Absolute gen_time[ACK_LOG_SIZE];
237
238 /**
239 * Our current sequence number.
240 */
241 uint64_t out_msg_counter;
242
243 /**
244 * Highest received incoming sequence number.
245 */
246 uint64_t max_in_msg_counter;
247
248 /**
249 * Number of bytes per ms that this peer is allowed
250 * to send to us.
251 */
252 uint32_t quota_in;
253
254 /**
255 * Length of connect_addr, can be 0.
256 */
257 size_t connect_alen;
258
259 /**
260 * Are we still expecting the welcome message? (GNUNET_YES/GNUNET_NO)
261 */
262 int expecting_welcome;
263
264 /**
265 * Are we still trying to connect?
266 */
267 int still_connecting;
268
269};
270
271
272/**
273 * Encapsulation of all of the state of the plugin.
274 */
275struct Plugin
276{
277 /**
278 * Our environment.
279 */
280 struct GNUNET_TRANSPORT_PluginEnvironment *env;
281
282 /**
283 * The listen socket.
284 */
285 struct GNUNET_NETWORK_SocketHandle *lsock;
286
287 /**
288 * List of open TCP sessions.
289 */
290 struct Session *sessions;
291
292 /**
293 * Handle for the statistics service.
294 */
295 struct GNUNET_STATISTICS_Handle *statistics;
296
297 /**
298 * Handle to the network service.
299 */
300 struct GNUNET_SERVICE_Context *service;
301
302 /**
303 * Handle to the server for this service.
304 */
305 struct GNUNET_SERVER_Handle *server;
306
307 /**
308 * Copy of the handler array where the closures are
309 * set to this struct's instance.
310 */
311 struct GNUNET_SERVER_MessageHandler *handlers;
312
313 /**
314 * ID of task used to update our addresses when one expires.
315 */
316 GNUNET_SCHEDULER_TaskIdentifier address_update_task;
317
318 /**
319 * Port that we are actually listening on.
320 */
321 uint16_t open_port;
322
323 /**
324 * Port that the user said we would have visible to the
325 * rest of the world.
326 */
327 uint16_t adv_port;
328
329};
330
331
332/**
333 * Find the session handle for the given peer.
334 */
335static struct Session *
336find_session_by_target (struct Plugin *plugin,
337 const struct GNUNET_PeerIdentity *target)
338{
339 struct Session *ret;
340
341 ret = plugin->sessions;
342 while ((ret != NULL) &&
343 (0 != memcmp (target,
344 &ret->target, sizeof (struct GNUNET_PeerIdentity))))
345 ret = ret->next;
346 return ret;
347}
348
349
350/**
351 * Find the session handle for the given peer.
352 */
353static struct Session *
354find_session_by_client (struct Plugin *plugin,
355 const struct GNUNET_SERVER_Client *client)
356{
357 struct Session *ret;
358
359 ret = plugin->sessions;
360 while ((ret != NULL) && (client != ret->client))
361 ret = ret->next;
362 return ret;
363}
364
365
366/**
367 * Create a welcome message.
368 */
369static struct PendingMessage *
370create_welcome (size_t addrlen, const void *addr, struct Plugin *plugin)
371{
372 struct PendingMessage *pm;
373 struct WelcomeMessage *welcome;
374
375 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
376 sizeof (struct WelcomeMessage) + addrlen);
377 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
378 welcome = (struct WelcomeMessage *) &pm[1];
379 welcome->header.size = htons (sizeof (struct WelcomeMessage) + addrlen);
380 welcome->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME);
381 GNUNET_CRYPTO_hash (plugin->env->my_public_key,
382 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
383 &welcome->clientIdentity.hashPubKey);
384 memcpy (&welcome[1], addr, addrlen);
385 pm->timeout = GNUNET_TIME_relative_to_absolute (WELCOME_TIMEOUT);
386 pm->is_welcome = GNUNET_YES;
387 return pm;
388}
389
390
391/**
392 * Create a new session using the specified address
393 * for the welcome message.
394 *
395 * @param plugin us
396 * @param target peer to connect to
397 * @param client client to use
398 * @param addrlen IPv4 or IPv6
399 * @param addr either struct sockaddr_in or struct sockaddr_in6
400 * @return NULL connection failed / invalid address
401 */
402static struct Session *
403create_session (struct Plugin *plugin,
404 const struct GNUNET_PeerIdentity *target,
405 struct GNUNET_SERVER_Client *client,
406 const void *addr, size_t addrlen)
407{
408 struct Session *ret;
409
410 ret = GNUNET_malloc (sizeof (struct Session));
411 ret->plugin = plugin;
412 ret->next = plugin->sessions;
413 plugin->sessions = ret;
414 ret->client = client;
415 ret->target = *target;
416 ret->last_quota_update = GNUNET_TIME_absolute_get ();
417 ret->quota_in = plugin->env->default_quota_in;
418 ret->expecting_welcome = GNUNET_YES;
419 ret->pending_messages = create_welcome (addrlen, addr, plugin);
420 return ret;
421}
422
423
424/**
425 * Create a new session connecting to the specified
426 * target at the specified address.
427 *
428 * @param plugin us
429 * @param target peer to connect to
430 * @param addrlen IPv4 or IPv6
431 * @param addr either struct sockaddr_in or struct sockaddr_in6
432 * @return NULL connection failed / invalid address
433 */
434static struct Session *
435connect_and_create_session (struct Plugin *plugin,
436 const struct GNUNET_PeerIdentity *target,
437 const void *addr, size_t addrlen)
438{
439 struct GNUNET_SERVER_Client *client;
440 struct GNUNET_NETWORK_SocketHandle *conn;
441 struct Session *session;
442 int af;
443 char buf[INET6_ADDRSTRLEN];
444 uint16_t port;
445
446 session = plugin->sessions;
447 while (session != NULL)
448 {
449 if ((0 == memcmp (target,
450 &session->target,
451 sizeof (struct GNUNET_PeerIdentity))) &&
452 (session->connect_alen == addrlen) &&
453 (0 == memcmp (session->connect_addr, addr, addrlen)))
454 return session; /* already exists! */
455 session = session->next;
456 }
457
458 if (addrlen == sizeof (struct sockaddr_in))
459 {
460 af = AF_INET;
461 inet_ntop (af,
462 &((struct sockaddr_in *) addr)->sin_addr, buf, sizeof (buf));
463 port = ntohs (((struct sockaddr_in *) addr)->sin_port);
464 }
465 else if (addrlen == sizeof (struct sockaddr_in6))
466 {
467 af = AF_INET6;
468 inet_ntop (af,
469 &((struct sockaddr_in6 *) addr)->sin6_addr,
470 buf, sizeof (buf));
471 port = ntohs (((struct sockaddr_in6 *) addr)->sin6_port);
472 }
473 else
474 {
475 GNUNET_break_op (0);
476 return NULL; /* invalid address */
477 }
478 conn = GNUNET_NETWORK_socket_create_from_sockaddr (plugin->env->sched,
479 af,
480 addr,
481 addrlen,
482 GNUNET_SERVER_MAX_MESSAGE_SIZE);
483 if (conn == NULL)
484 {
485#if DEBUG_TCP
486 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
487 "tcp",
488 "Failed to create connection to peer at `%s:%u'.\n",
489 buf, port);
490#endif
491 return NULL;
492 }
493 client = GNUNET_SERVER_connect_socket (plugin->server, conn);
494 GNUNET_assert (client != NULL);
495 session = create_session (plugin, target, client, addr, addrlen);
496 session->connect_alen = addrlen;
497 session->connect_addr = GNUNET_malloc (addrlen);
498 memcpy (session->connect_addr, addr, addrlen);
499#if DEBUG_TCP
500 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
501 "tcp",
502 "Creating new session %p with `%s:%u' based on `%s' request.\n",
503 session, buf, port, "send_to");
504#endif
505 return session;
506}
507
508
509/**
510 * If we have pending messages, ask the server to
511 * transmit them (schedule the respective tasks, etc.)
512 *
513 * @param session for which session should we do this
514 */
515static void process_pending_messages (struct Session *session);
516
517
518/**
519 * Function called to notify a client about the socket
520 * begin ready to queue more data. "buf" will be
521 * NULL and "size" zero if the socket was closed for
522 * writing in the meantime.
523 *
524 * @param cls closure
525 * @param size number of bytes available in buf
526 * @param buf where the callee should write the message
527 * @return number of bytes written to buf
528 */
529static size_t
530do_transmit (void *cls, size_t size, void *buf)
531{
532 struct Session *session = cls;
533 struct PendingMessage *pm;
534 char *cbuf;
535 uint16_t msize;
536 size_t ret;
537 struct DataMessage *dm;
538
539 session->transmit_handle = NULL;
540 if (buf == NULL)
541 {
542#if DEBUG_TCP
543 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
544 "tcp", "Timeout trying to transmit\n");
545#endif
546 /* timeout */
547 while (NULL != (pm = session->pending_messages))
548 {
549 session->pending_messages = pm->next;
550 if (pm->transmit_cont != NULL)
551 pm->transmit_cont (pm->transmit_cont_cls,
552 session->service_context,
553 &session->target, GNUNET_SYSERR);
554 GNUNET_free (pm);
555 }
556 return 0;
557 }
558 ret = 0;
559 cbuf = buf;
560 while (NULL != (pm = session->pending_messages))
561 {
562 if (pm->is_welcome)
563 {
564 if (size < (msize = htons (pm->msg->size)))
565 break;
566 memcpy (cbuf, pm->msg, msize);
567 cbuf += msize;
568 ret += msize;
569 size -= msize;
570 }
571 else
572 {
573 if (size <
574 sizeof (struct DataMessage) + (msize = htons (pm->msg->size)))
575 break;
576 dm = (struct DataMessage *) cbuf;
577 dm->header.size = htons (sizeof (struct DataMessage) + msize);
578 dm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA);
579 dm->ack_out = GNUNET_htonll (++session->out_msg_counter);
580 dm->ack_in = GNUNET_htonll (session->max_in_msg_counter);
581 cbuf += sizeof (struct DataMessage);
582 ret += sizeof (struct DataMessage);
583 size -= sizeof (struct DataMessage);
584 memcpy (cbuf, pm->msg, msize);
585 cbuf += msize;
586 ret += msize;
587 size -= msize;
588 }
589 session->pending_messages = pm->next;
590 if (pm->transmit_cont != NULL)
591 pm->transmit_cont (pm->transmit_cont_cls,
592 session->service_context,
593 &session->target, GNUNET_OK);
594 GNUNET_free (pm);
595 session->gen_time[session->out_msg_counter % ACK_LOG_SIZE]
596 = GNUNET_TIME_absolute_get ();
597 }
598 process_pending_messages (session);
599#if DEBUG_TCP || 1
600 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
601 "tcp", "Transmitting %u bytes\n", ret);
602#endif
603 return ret;
604}
605
606
607/**
608 * If we have pending messages, ask the server to
609 * transmit them (schedule the respective tasks, etc.)
610 *
611 * @param session for which session should we do this
612 */
613static void
614process_pending_messages (struct Session *session)
615{
616 GNUNET_assert (session->client != NULL);
617 if (session->pending_messages == NULL)
618 return;
619 if (session->transmit_handle != NULL)
620 return;
621 session->transmit_handle
622 = GNUNET_SERVER_notify_transmit_ready (session->client,
623 htons (session->pending_messages->
624 msg->size) +
625 (session->pending_messages->
626 is_welcome ? 0 : sizeof (struct
627 DataMessage)),
628 GNUNET_TIME_absolute_get_remaining
629 (session->pending_messages[0].
630 timeout), &do_transmit, session);
631}
632
633
634/**
635 * Function that can be used by the transport service to transmit
636 * a message using the plugin using a fresh connection (even if
637 * we already have a connection to this peer, this function is
638 * required to establish a new one).
639 *
640 * @param cls closure
641 * @param target who should receive this message
642 * @param msg1 first message to transmit
643 * @param msg2 second message to transmit (can be NULL)
644 * @param timeout how long should we try to transmit these?
645 * @param addrlen length of the address
646 * @param addr the address
647 * @return session if the transmission has been scheduled
648 * NULL if the address format is invalid
649 */
650static void *
651tcp_plugin_send_to (void *cls,
652 const struct GNUNET_PeerIdentity *target,
653 const struct GNUNET_MessageHeader *msg1,
654 const struct GNUNET_MessageHeader *msg2,
655 struct GNUNET_TIME_Relative timeout,
656 const void *addr, size_t addrlen)
657{
658 struct Plugin *plugin = cls;
659 struct Session *session;
660 struct PendingMessage *pl;
661 struct PendingMessage *pm;
662
663 session = connect_and_create_session (plugin, target, addr, addrlen);
664 if (session == NULL)
665 {
666#if DEBUG_TCP
667 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
668 "tcp", "Failed to create fresh session.\n");
669#endif
670 return NULL;
671 }
672 pl = NULL;
673 if (msg2 != NULL)
674 {
675 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
676 ntohs (msg2->size));
677 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
678 memcpy (pm->msg, msg2, ntohs (msg2->size));
679 pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
680 pm->is_welcome = GNUNET_NO;
681 pl = pm;
682 }
683 if (msg1 != NULL)
684 {
685 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
686 ntohs (msg1->size));
687 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
688 memcpy (pm->msg, msg1, ntohs (msg1->size));
689 pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
690 pm->is_welcome = GNUNET_NO;
691 pm->next = pl;
692 pl = pm;
693 }
694 /* append */
695 if (session->pending_messages != NULL)
696 {
697 pm = session->pending_messages;
698 while (pm->next != NULL)
699 pm = pm->next;
700 pm->next = pl;
701 }
702 else
703 {
704 session->pending_messages = pl;
705 }
706 process_pending_messages (session);
707 return session;
708}
709
710
711/**
712 * Functions with this signature are called whenever we need
713 * to close a session due to a disconnect or failure to
714 * establish a connection.
715 *
716 * @param session session to close down
717 */
718static void
719disconnect_session (struct Session *session)
720{
721 struct Session *prev;
722 struct Session *pos;
723 struct PendingMessage *pm;
724
725#if DEBUG_TCP
726 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
727 "tcp",
728 "Disconnecting from other peer (session %p).\n", session);
729#endif
730 /* remove from session list */
731 prev = NULL;
732 pos = session->plugin->sessions;
733 while (pos != session)
734 {
735 prev = pos;
736 pos = pos->next;
737 }
738 if (prev == NULL)
739 session->plugin->sessions = session->next;
740 else
741 prev->next = session->next;
742 /* clean up state */
743 if (session->client != NULL)
744 {
745#if DEBUG_TCP
746 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
747 "Disconnecting from client address %p\n", session->client);
748#endif
749 GNUNET_SERVER_client_drop (session->client);
750 session->client = NULL;
751 }
752 if (session->transmit_handle != NULL)
753 {
754 GNUNET_NETWORK_notify_transmit_ready_cancel (session->transmit_handle);
755 session->transmit_handle = NULL;
756 }
757 while (NULL != (pm = session->pending_messages))
758 {
759 session->pending_messages = pm->next;
760 if (NULL != pm->transmit_cont)
761 pm->transmit_cont (pm->transmit_cont_cls,
762 session->service_context,
763 &session->target, GNUNET_SYSERR);
764 GNUNET_free (pm);
765 }
766 /* notify transport service about disconnect */
767 session->plugin->env->receive (session->plugin->env->cls,
768 session,
769 session->service_context,
770 GNUNET_TIME_UNIT_ZERO,
771 &session->target, NULL);
772 GNUNET_free_non_null (session->connect_addr);
773 GNUNET_free (session);
774}
775
776
777/**
778 * Iterator callback to go over all addresses. If we get
779 * a TCP address, increment the counter
780 *
781 * @param cls closure, points to the counter
782 * @param tname name of the transport
783 * @param expiration expiration time
784 * @param addr the address
785 * @param addrlen length of the address
786 * @return GNUNET_OK to keep the address,
787 * GNUNET_NO to delete it from the HELLO
788 * GNUNET_SYSERR to stop iterating (but keep current address)
789 */
790static int
791count_tcp_addresses (void *cls,
792 const char *tname,
793 struct GNUNET_TIME_Absolute expiration,
794 const void *addr, size_t addrlen)
795{
796 unsigned int *counter = cls;
797
798 if (0 != strcmp (tname, "tcp"))
799 return GNUNET_OK; /* not one of ours */
800 (*counter)++;
801 return GNUNET_OK; /* failed to connect */
802}
803
804
805struct ConnectContext
806{
807 struct Plugin *plugin;
808
809 struct GNUNET_NETWORK_SocketHandle *sa;
810
811 struct PendingMessage *welcome;
812
813 unsigned int pos;
814};
815
816
817/**
818 * Iterator callback to go over all addresses. If we get
819 * the "pos" TCP address, try to connect to it.
820 *
821 * @param cls closure
822 * @param tname name of the transport
823 * @param expiration expiration time
824 * @param addrlen length of the address
825 * @param addr the address
826 * @return GNUNET_OK to keep the address,
827 * GNUNET_NO to delete it from the HELLO
828 * GNUNET_SYSERR to stop iterating (but keep current address)
829 */
830static int
831try_connect_to_address (void *cls,
832 const char *tname,
833 struct GNUNET_TIME_Absolute expiration,
834 const void *addr, size_t addrlen)
835{
836 struct ConnectContext *cc = cls;
837 int af;
838
839 if (0 != strcmp (tname, "tcp"))
840 return GNUNET_OK; /* not one of ours */
841 if (sizeof (struct sockaddr_in) == addrlen)
842 af = AF_INET;
843 else if (sizeof (struct sockaddr_in6) == addrlen)
844 af = AF_INET6;
845 else
846 {
847 /* not a valid address */
848 GNUNET_break (0);
849 return GNUNET_NO;
850 }
851 if (0 == cc->pos--)
852 {
853 cc->welcome = create_welcome (addrlen, addr, cc->plugin);
854 cc->sa =
855 GNUNET_NETWORK_socket_create_from_sockaddr (cc->plugin->env->sched,
856 af, addr, addrlen,
857 GNUNET_SERVER_MAX_MESSAGE_SIZE);
858#if DEBUG_TCP
859 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
860 "tcp", "Connected to other peer.\n");
861#endif
862 return GNUNET_SYSERR;
863 }
864 return GNUNET_OK; /* failed to connect */
865}
866
867
868/**
869 * Type of an iterator over the hosts. Note that each
870 * host will be called with each available protocol.
871 *
872 * @param cls closure
873 * @param peer id of the peer, NULL for last call
874 * @param hello hello message for the peer (can be NULL)
875 * @param trust amount of trust we have in the peer
876 */
877static void
878session_try_connect (void *cls,
879 const struct GNUNET_PeerIdentity *peer,
880 const struct GNUNET_HELLO_Message *hello, uint32_t trust)
881{
882 struct Session *session = cls;
883 unsigned int count;
884 struct ConnectContext cctx;
885 struct PendingMessage *pm;
886
887 if (peer == NULL)
888 {
889 /* last call, destroy session if we are still not
890 connected */
891 if (session->still_connecting == GNUNET_NO)
892 {
893#if DEBUG_TCP
894 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
895 "tcp",
896 "Connected to other peer, now processing messages.\n");
897#endif
898 process_pending_messages (session);
899 }
900 else
901 {
902#if DEBUG_TCP
903 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
904 "tcp",
905 "Failed to connect to other peer, now closing session.\n");
906#endif
907 disconnect_session (session);
908 }
909 return;
910 }
911 if ((hello == NULL) || (session->client != NULL))
912 {
913 GNUNET_break (0); /* should this ever happen!? */
914 return;
915 }
916 count = 0;
917 GNUNET_HELLO_iterate_addresses (hello,
918 GNUNET_NO, &count_tcp_addresses, &count);
919 if (count == 0)
920 {
921#if DEBUG_TCP
922 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
923 "tcp",
924 "Asked to connect, but have no addresses to try.\n");
925#endif
926 return;
927 }
928 cctx.plugin = session->plugin;
929 cctx.sa = NULL;
930 cctx.welcome = NULL;
931 cctx.pos = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
932 GNUNET_HELLO_iterate_addresses (hello,
933 GNUNET_NO, &try_connect_to_address, &cctx);
934 if (cctx.sa == NULL)
935 {
936#if DEBUG_TCP
937 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
938 "tcp",
939 "Asked to connect, but all addresses failed.\n");
940#endif
941 GNUNET_free_non_null (cctx.welcome);
942 return;
943 }
944 session->client = GNUNET_SERVER_connect_socket (session->plugin->server,
945 cctx.sa);
946#if DEBUG_TCP
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "Connected getting client address %p\n", session->client);
949#endif
950 if (session->client == NULL)
951 {
952 GNUNET_break (0); /* how could this happen? */
953 GNUNET_free_non_null (cctx.welcome);
954 return;
955 }
956 pm = cctx.welcome;
957 /* prepend (!) */
958 pm->next = session->pending_messages;
959 session->pending_messages = pm;
960 session->still_connecting = GNUNET_NO;
961#if DEBUG_TCP
962 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
963 "tcp",
964 "Connected to other peer, now sending `%s' message.\n",
965 "WELCOME");
966#endif
967}
968
969
970/**
971 * Function that can be used by the transport service to transmit
972 * a message using the plugin.
973 *
974 * @param cls closure
975 * @param plugin_context value we were asked to pass to this plugin
976 * to respond to the given peer (use is optional,
977 * but may speed up processing), can be NULL
978 * @param service_context value passed to the transport-service
979 * to identify the neighbour
980 * @param target who should receive this message
981 * @param msg the message to transmit
982 * @param cont continuation to call once the message has
983 * been transmitted (or if the transport is ready
984 * for the next transmission call; or if the
985 * peer disconnected...)
986 * @param cont_cls closure for cont
987 * @return plugin_context that should be used next time for
988 * sending messages to the specified peer
989 */
990static void *
991tcp_plugin_send (void *cls,
992 void *plugin_context,
993 struct ReadyList *service_context,
994 const struct GNUNET_PeerIdentity *target,
995 const struct GNUNET_MessageHeader *msg,
996 struct GNUNET_TIME_Relative timeout,
997 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
998{
999 struct Plugin *plugin = cls;
1000 struct Session *session = plugin_context;
1001 struct PendingMessage *pm;
1002 struct PendingMessage *pme;
1003
1004 if (session == NULL)
1005 session = find_session_by_target (plugin, target);
1006 pm = GNUNET_malloc (sizeof (struct PendingMessage) + ntohs (msg->size));
1007 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
1008 memcpy (pm->msg, msg, ntohs (msg->size));
1009 pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1010 pm->transmit_cont = cont;
1011 pm->transmit_cont_cls = cont_cls;
1012 if (session == NULL)
1013 {
1014 session = GNUNET_malloc (sizeof (struct Session));
1015#if DEBUG_TCP
1016 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1017 "tcp",
1018 "Asked to transmit, creating fresh session %p.\n",
1019 session);
1020#endif
1021 session->next = plugin->sessions;
1022 plugin->sessions = session;
1023 session->plugin = plugin;
1024 session->target = *target;
1025 session->last_quota_update = GNUNET_TIME_absolute_get ();
1026 session->quota_in = plugin->env->default_quota_in;
1027 session->expecting_welcome = GNUNET_YES;
1028 session->still_connecting = GNUNET_YES;
1029 session->pending_messages = pm;
1030 GNUNET_PEERINFO_for_all (plugin->env->cfg,
1031 plugin->env->sched,
1032 target,
1033 0, timeout, &session_try_connect, session);
1034 return session;
1035 }
1036 GNUNET_assert (session != NULL);
1037 GNUNET_assert (session->still_connecting == GNUNET_NO);
1038 /* append pm to pending_messages list */
1039 pme = session->pending_messages;
1040 if (pme == NULL)
1041 {
1042 session->pending_messages = pm;
1043 }
1044 else
1045 {
1046 while (NULL != pme->next)
1047 pme = pme->next;
1048 pme->next = pm;
1049 }
1050#if DEBUG_TCP
1051 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1052 "tcp", "Asked to transmit, added message to list.\n");
1053#endif
1054 process_pending_messages (session);
1055 return session;
1056}
1057
1058
1059
1060/**
1061 * Function that can be called to force a disconnect from the
1062 * specified neighbour. This should also cancel all previously
1063 * scheduled transmissions. Obviously the transmission may have been
1064 * partially completed already, which is OK. The plugin is supposed
1065 * to close the connection (if applicable) and no longer call the
1066 * transmit continuation(s).
1067 *
1068 * Finally, plugin MUST NOT call the services's receive function to
1069 * notify the service that the connection to the specified target was
1070 * closed after a getting this call.
1071 *
1072 * @param cls closure
1073 * @param plugin_context value we were asked to pass to this plugin
1074 * to respond to the given peer (use is optional,
1075 * but may speed up processing), can be NULL (if
1076 * NULL was returned from the transmit function)
1077 * @param service_context must correspond to the service context
1078 * of the corresponding Transmit call; the plugin should
1079 * not cancel a send call made with a different service
1080 * context pointer! Never NULL.
1081 * @param target peer for which the last transmission is
1082 * to be cancelled
1083 */
1084static void
1085tcp_plugin_cancel (void *cls,
1086 void *plugin_context,
1087 struct ReadyList *service_context,
1088 const struct GNUNET_PeerIdentity *target)
1089{
1090 struct Plugin *plugin = cls;
1091 struct PendingMessage *pm;
1092 struct Session *session;
1093 struct Session *next;
1094
1095 session = plugin->sessions;
1096 while (session != NULL)
1097 {
1098 next = session->next;
1099 if (0 == memcmp (target,
1100 &session->target, sizeof (struct GNUNET_PeerIdentity)))
1101 {
1102 pm = session->pending_messages;
1103 while (pm != NULL)
1104 {
1105 pm->transmit_cont = NULL;
1106 pm->transmit_cont_cls = NULL;
1107 pm = pm->next;
1108 }
1109 session->service_context = NULL;
1110 GNUNET_SERVER_client_disconnect (session->client);
1111 /* rest of the clean-up of the session will be done as part of
1112 disconnect_notify which should be triggered any time now */
1113 }
1114 session = next;
1115 }
1116}
1117
1118
1119struct PrettyPrinterContext
1120{
1121 GNUNET_TRANSPORT_AddressStringCallback asc;
1122 void *asc_cls;
1123 uint16_t port;
1124};
1125
1126
1127/**
1128 * Append our port and forward the result.
1129 */
1130static void
1131append_port (void *cls, const char *hostname)
1132{
1133 struct PrettyPrinterContext *ppc = cls;
1134 char *ret;
1135
1136 if (hostname == NULL)
1137 {
1138 ppc->asc (ppc->asc_cls, NULL);
1139 GNUNET_free (ppc);
1140 return;
1141 }
1142 GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
1143 ppc->asc (ppc->asc_cls, ret);
1144 GNUNET_free (ret);
1145}
1146
1147
1148/**
1149 * Convert the transports address to a nice, human-readable
1150 * format.
1151 *
1152 * @param cls closure
1153 * @param name name of the transport that generated the address
1154 * @param addr one of the addresses of the host, NULL for the last address
1155 * the specific address format depends on the transport
1156 * @param addrlen length of the address
1157 * @param numeric should (IP) addresses be displayed in numeric form?
1158 * @param timeout after how long should we give up?
1159 * @param asc function to call on each string
1160 * @param asc_cls closure for asc
1161 */
1162static void
1163tcp_plugin_address_pretty_printer (void *cls,
1164 const char *type,
1165 const void *addr,
1166 size_t addrlen,
1167 int numeric,
1168 struct GNUNET_TIME_Relative timeout,
1169 GNUNET_TRANSPORT_AddressStringCallback asc,
1170 void *asc_cls)
1171{
1172 struct Plugin *plugin = cls;
1173 const struct sockaddr_in *v4;
1174 const struct sockaddr_in6 *v6;
1175 struct PrettyPrinterContext *ppc;
1176
1177 if ((addrlen != sizeof (struct sockaddr_in)) &&
1178 (addrlen != sizeof (struct sockaddr_in6)))
1179 {
1180 /* invalid address */
1181 GNUNET_break_op (0);
1182 asc (asc_cls, NULL);
1183 return;
1184 }
1185 ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
1186 ppc->asc = asc;
1187 ppc->asc_cls = asc_cls;
1188 if (addrlen == sizeof (struct sockaddr_in))
1189 {
1190 v4 = (const struct sockaddr_in *) addr;
1191 ppc->port = ntohs (v4->sin_port);
1192 }
1193 else
1194 {
1195 v6 = (const struct sockaddr_in6 *) addr;
1196 ppc->port = ntohs (v6->sin6_port);
1197
1198 }
1199 GNUNET_RESOLVER_hostname_get (plugin->env->sched,
1200 plugin->env->cfg,
1201 addr,
1202 addrlen,
1203 !numeric, timeout, &append_port, ppc);
1204}
1205
1206
1207/**
1208 * Update the last-received and bandwidth quota values
1209 * for this session.
1210 *
1211 * @param session session to update
1212 * @param force set to GNUNET_YES if we should update even
1213 * though the minimum refresh time has not yet expired
1214 */
1215static void
1216update_quota (struct Session *session, int force)
1217{
1218 struct GNUNET_TIME_Absolute now;
1219 unsigned long long delta;
1220 unsigned long long total_allowed;
1221 unsigned long long total_remaining;
1222
1223 now = GNUNET_TIME_absolute_get ();
1224 delta = now.value - session->last_quota_update.value;
1225 if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
1226 return; /* too early, not enough data */
1227
1228 total_allowed = session->quota_in * delta;
1229 if (total_allowed > session->last_received)
1230 {
1231 /* got less than acceptable */
1232 total_remaining = total_allowed - session->last_received;
1233 session->last_received = 0;
1234 delta = total_remaining / session->quota_in; /* bonus seconds */
1235 if (delta > MAX_BANDWIDTH_CARRY)
1236 delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */
1237 }
1238 else
1239 {
1240 /* got more than acceptable */
1241 total_remaining = 0;
1242 session->last_received -= total_allowed;
1243 delta = 0;
1244 }
1245 session->last_quota_update.value = now.value - delta;
1246}
1247
1248
1249/**
1250 * Set a quota for receiving data from the given peer; this is a
1251 * per-transport limit. The transport should limit its read/select
1252 * calls to stay below the quota (in terms of incoming data).
1253 *
1254 * @param cls closure
1255 * @param peer the peer for whom the quota is given
1256 * @param quota_in quota for receiving/sending data in bytes per ms
1257 */
1258static void
1259tcp_plugin_set_receive_quota (void *cls,
1260 const struct GNUNET_PeerIdentity *target,
1261 uint32_t quota_in)
1262{
1263 struct Plugin *plugin = cls;
1264 struct Session *session;
1265
1266 session = find_session_by_target (plugin, target);
1267 if (session->quota_in != quota_in)
1268 {
1269 update_quota (session, GNUNET_YES);
1270 if (session->quota_in > quota_in)
1271 session->last_quota_update = GNUNET_TIME_absolute_get ();
1272 session->quota_in = quota_in;
1273 }
1274}
1275
1276
1277/**
1278 * Check if the given port is plausible (must be either
1279 * our listen port or our advertised port). If it is
1280 * neither, we return one of these two ports at random.
1281 *
1282 * @return either in_port or a more plausible port
1283 */
1284static uint16_t
1285check_port (struct Plugin *plugin, uint16_t in_port)
1286{
1287 if ((in_port == plugin->adv_port) || (in_port == plugin->open_port))
1288 return in_port;
1289 return (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1290 2) == 0)
1291 ? plugin->open_port : plugin->adv_port;
1292}
1293
1294
1295/**
1296 * Another peer has suggested an address for this
1297 * peer and transport plugin. Check that this could be a valid
1298 * address. If so, consider adding it to the list
1299 * of addresses.
1300 *
1301 * @param cls closure
1302 * @param addr pointer to the address
1303 * @param addrlen length of addr
1304 * @return GNUNET_OK if this is a plausible address for this peer
1305 * and transport
1306 */
1307static int
1308tcp_plugin_address_suggested (void *cls, const void *addr, size_t addrlen)
1309{
1310 struct Plugin *plugin = cls;
1311 char buf[sizeof (struct sockaddr_in6)];
1312 struct sockaddr_in *v4;
1313 struct sockaddr_in6 *v6;
1314 char dst[INET6_ADDRSTRLEN];
1315 uint16_t port;
1316
1317 if ((addrlen != sizeof (struct sockaddr_in)) &&
1318 (addrlen != sizeof (struct sockaddr_in6)))
1319 {
1320 GNUNET_break_op (0);
1321 return GNUNET_SYSERR;
1322 }
1323 memcpy (buf, addr, sizeof (struct sockaddr_in6));
1324 if (addrlen == sizeof (struct sockaddr_in))
1325 {
1326 v4 = (struct sockaddr_in *) buf;
1327 v4->sin_port = htons (check_port (plugin, ntohs (v4->sin_port)));
1328 inet_ntop (AF_INET, &v4->sin_addr, dst, sizeof (dst));
1329 port = ntohs (v4->sin_port);
1330 }
1331 else
1332 {
1333 v6 = (struct sockaddr_in6 *) buf;
1334 v6->sin6_port = htons (check_port (plugin, ntohs (v6->sin6_port)));
1335 inet_ntop (AF_INET6, &v6->sin6_addr, dst, sizeof (dst));
1336 port = ntohs (v6->sin6_port);
1337 }
1338#if DEBUG_TCP
1339 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1340 "tcp",
1341 "Informing transport service about my address `%s:%u'.\n",
1342 dst, port);
1343#endif
1344 plugin->env->notify_address (plugin->env->cls,
1345 "tcp",
1346 buf, addrlen, LEARNED_ADDRESS_EXPIRATION);
1347 return GNUNET_OK;
1348}
1349
1350
1351/**
1352 * We've received a welcome from this peer via TCP.
1353 * Possibly create a fresh client record and send back
1354 * our welcome.
1355 *
1356 * @param cls closure
1357 * @param server the server handling the message
1358 * @param client identification of the client
1359 * @param message the actual message
1360 */
1361static void
1362handle_tcp_welcome (void *cls,
1363 struct GNUNET_SERVER_Handle *server,
1364 struct GNUNET_SERVER_Client *client,
1365 const struct GNUNET_MessageHeader *message)
1366{
1367 struct Plugin *plugin = cls;
1368 struct Session *session_c;
1369 const struct WelcomeMessage *wm;
1370 uint16_t msize;
1371 uint32_t addrlen;
1372 size_t alen;
1373 void *vaddr;
1374 const struct sockaddr *addr;
1375
1376#if DEBUG_TCP
1377 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1378 "tcp",
1379 "Received `%s' message from %p.\n", "WELCOME", client);
1380#endif
1381 msize = ntohs (message->size);
1382 if (msize < sizeof (struct WelcomeMessage))
1383 {
1384 GNUNET_break_op (0);
1385 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1386 return;
1387 }
1388 wm = (const struct WelcomeMessage *) message;
1389 session_c = find_session_by_client (plugin, client);
1390 if (session_c == NULL)
1391 {
1392 vaddr = NULL;
1393 GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
1394 GNUNET_SERVER_client_keep (client);
1395 session_c = create_session (plugin,
1396 &wm->clientIdentity, client, vaddr, alen);
1397#if DEBUG_TCP
1398 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1399 "tcp",
1400 "Creating new session %p for incoming `%s' message.\n",
1401 session_c, "WELCOME");
1402#endif
1403 GNUNET_free_non_null (vaddr);
1404 process_pending_messages (session_c);
1405 }
1406 session_c->expecting_welcome = GNUNET_NO;
1407 if (0 < (addrlen = msize - sizeof (struct WelcomeMessage)))
1408 {
1409 addr = (const struct sockaddr *) &wm[1];
1410 tcp_plugin_address_suggested (plugin, addr, addrlen);
1411 }
1412 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1413}
1414
1415
1416/**
1417 * Calculate how long we should delay reading from the TCP socket to
1418 * ensure that we stay within our bandwidth limits (push back).
1419 *
1420 * @param session for which client should this be calculated
1421 */
1422static struct GNUNET_TIME_Relative
1423calculate_throttle_delay (struct Session *session)
1424{
1425 struct GNUNET_TIME_Relative ret;
1426 struct GNUNET_TIME_Absolute now;
1427 uint64_t del;
1428 uint64_t avail;
1429 uint64_t excess;
1430
1431 now = GNUNET_TIME_absolute_get ();
1432 del = now.value - session->last_quota_update.value;
1433 if (del > MAX_BANDWIDTH_CARRY)
1434 {
1435 update_quota (session, GNUNET_YES);
1436 del = now.value - session->last_quota_update.value;
1437 GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
1438 }
1439 if (session->quota_in == 0)
1440 session->quota_in = 1; /* avoid divison by zero */
1441 avail = del * session->quota_in;
1442 if (avail > session->last_received)
1443 return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
1444 excess = session->last_received - avail;
1445 ret.value = excess / session->quota_in;
1446 return ret;
1447}
1448
1449
1450/**
1451 * Task to signal the server that we can continue
1452 * receiving from the TCP client now.
1453 */
1454static void
1455delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1456{
1457 struct Session *session = cls;
1458 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1459}
1460
1461
1462/**
1463 * We've received data for this peer via TCP. Unbox,
1464 * compute latency and forward.
1465 *
1466 * @param cls closure
1467 * @param server the server handling the message
1468 * @param client identification of the client
1469 * @param message the actual message
1470 */
1471static void
1472handle_tcp_data (void *cls,
1473 struct GNUNET_SERVER_Handle *server,
1474 struct GNUNET_SERVER_Client *client,
1475 const struct GNUNET_MessageHeader *message)
1476{
1477 struct Plugin *plugin = cls;
1478 struct Session *session;
1479 const struct DataMessage *dm;
1480 uint16_t msize;
1481 const struct GNUNET_MessageHeader *msg;
1482 struct GNUNET_TIME_Relative latency;
1483 struct GNUNET_TIME_Absolute ttime;
1484 struct GNUNET_TIME_Absolute now;
1485 struct GNUNET_TIME_Relative delay;
1486 uint64_t ack_in;
1487
1488#if DEBUG_TCP
1489 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1490 "tcp", "Receiving data from other peer.\n");
1491#endif
1492 msize = ntohs (message->size);
1493 if ((msize <
1494 sizeof (struct DataMessage) + sizeof (struct GNUNET_MessageHeader)))
1495 {
1496 GNUNET_break_op (0);
1497 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1498 return;
1499 }
1500 session = find_session_by_client (plugin, client);
1501 if ((NULL == session) || (GNUNET_YES == session->expecting_welcome))
1502 {
1503 GNUNET_break_op (0);
1504 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1505 return;
1506 }
1507 dm = (const struct DataMessage *) message;
1508 session->max_in_msg_counter = GNUNET_MAX (session->max_in_msg_counter,
1509 GNUNET_ntohll (dm->ack_out));
1510 msg = (const struct GNUNET_MessageHeader *) &dm[1];
1511 if (msize != sizeof (struct DataMessage) + ntohs (msg->size))
1512 {
1513 GNUNET_break_op (0);
1514 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1515 return;
1516 }
1517 /* estimate latency */
1518 ack_in = GNUNET_ntohll (dm->ack_in);
1519 if ((ack_in <= session->out_msg_counter) &&
1520 (session->out_msg_counter - ack_in < ACK_LOG_SIZE))
1521 {
1522 delay = GNUNET_TIME_relative_ntoh (dm->delay);
1523 ttime = session->gen_time[ack_in % ACK_LOG_SIZE];
1524 now = GNUNET_TIME_absolute_get ();
1525 if (delay.value > now.value - ttime.value)
1526 delay.value = 0; /* not plausible */
1527 /* update (round-trip) latency using ageing; we
1528 use 7:1 so that we can reasonably quickly react
1529 to changes, but not so fast that latency is largely
1530 jitter... */
1531 session->latency_estimate
1532 = ((7 * session->latency_estimate) +
1533 (now.value - ttime.value - delay.value)) / 8;
1534 }
1535 latency.value = (uint64_t) session->latency_estimate;
1536 /* deliver on */
1537#if DEBUG_TCP
1538 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1539 "tcp",
1540 "Forwarding data of type %u to transport service.\n",
1541 ntohs (msg->type));
1542#endif
1543 session->service_context
1544 = plugin->env->receive (plugin->env->cls,
1545 session,
1546 session->service_context,
1547 latency, &session->target, msg);
1548 /* update bandwidth used */
1549 session->last_received += msize;
1550 update_quota (session, GNUNET_NO);
1551
1552 delay = calculate_throttle_delay (session);
1553 if (delay.value == 0)
1554 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1555 else
1556 GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
1557 GNUNET_NO,
1558 GNUNET_SCHEDULER_PRIORITY_HIGH,
1559 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1560 delay, &delayed_done, session);
1561}
1562
1563
1564/**
1565 * Handlers for the various TCP messages.
1566 */
1567static struct GNUNET_SERVER_MessageHandler my_handlers[] = {
1568 {&handle_tcp_welcome, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME, 0},
1569 {&handle_tcp_data, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA, 0},
1570 {NULL, NULL, 0, 0}
1571};
1572
1573
1574static void
1575create_tcp_handlers (struct Plugin *plugin)
1576{
1577 unsigned int i;
1578 plugin->handlers = GNUNET_malloc (sizeof (my_handlers));
1579 memcpy (plugin->handlers, my_handlers, sizeof (my_handlers));
1580 for (i = 0;
1581 i <
1582 sizeof (my_handlers) / sizeof (struct GNUNET_SERVER_MessageHandler);
1583 i++)
1584 plugin->handlers[i].callback_cls = plugin;
1585 GNUNET_SERVER_add_handlers (plugin->server, plugin->handlers);
1586}
1587
1588
1589/**
1590 * Functions with this signature are called whenever a peer
1591 * is disconnected on the network level.
1592 *
1593 * @param cls closure
1594 * @param client identification of the client
1595 */
1596static void
1597disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client)
1598{
1599 struct Plugin *plugin = cls;
1600 struct Session *session;
1601
1602#if DEBUG_TCP
1603 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1604 "tcp",
1605 "Notified about network-level disconnect of client %p.\n",
1606 client);
1607#endif
1608 session = find_session_by_client (plugin, client);
1609 if (session == NULL)
1610 return; /* unknown, nothing to do */
1611#if DEBUG_TCP
1612 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1613 "tcp", "Will now destroy session %p.\n", session);
1614#endif
1615 disconnect_session (session);
1616}
1617
1618
1619/**
1620 * Add the IP of our network interface to the list of
1621 * our external IP addresses.
1622 */
1623static int
1624process_interfaces (void *cls,
1625 const char *name,
1626 int isDefault,
1627 const struct sockaddr *addr, socklen_t addrlen)
1628{
1629 struct Plugin *plugin = cls;
1630 char dst[INET6_ADDRSTRLEN];
1631 int af;
1632 struct sockaddr_in *v4;
1633 struct sockaddr_in6 *v6;
1634
1635 af = addr->sa_family;
1636 if (af == AF_INET)
1637 {
1638 v4 = (struct sockaddr_in *) addr;
1639 inet_ntop (AF_INET, &v4->sin_addr, dst, sizeof (dst));
1640 v4->sin_port = htons (plugin->adv_port);
1641 }
1642 else
1643 {
1644 GNUNET_assert (af == AF_INET6);
1645 v6 = (struct sockaddr_in6 *) addr;
1646 inet_ntop (AF_INET6, &v6->sin6_addr, dst, sizeof (dst));
1647 v6->sin6_port = htons (plugin->adv_port);
1648 }
1649 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO |
1650 GNUNET_ERROR_TYPE_BULK,
1651 "tcp", _("Found address `%s' (%s)\n"), dst, name);
1652 plugin->env->notify_address (plugin->env->cls,
1653 "tcp",
1654 addr, addrlen, GNUNET_TIME_UNIT_FOREVER_REL);
1655 return GNUNET_OK;
1656}
1657
1658
1659/**
1660 * Function called by the resolver for each address obtained from DNS
1661 * for our own hostname. Add the addresses to the list of our
1662 * external IP addresses.
1663 *
1664 * @param cls closure
1665 * @param addr one of the addresses of the host, NULL for the last address
1666 * @param addrlen length of the address
1667 */
1668static void
1669process_hostname_ips (void *cls,
1670 const struct sockaddr *addr, socklen_t addrlen)
1671{
1672 struct Plugin *plugin = cls;
1673
1674 if (addr == NULL)
1675 return;
1676 plugin->env->notify_address (plugin->env->cls,
1677 "tcp",
1678 addr, addrlen, GNUNET_TIME_UNIT_FOREVER_REL);
1679}
1680
1681
1682/**
1683 * Entry point for the plugin.
1684 */
1685void *
1686libgnunet_plugin_transport_tcp_init (void *cls)
1687{
1688 struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
1689 struct GNUNET_TRANSPORT_PluginFunctions *api;
1690 struct Plugin *plugin;
1691 struct GNUNET_SERVICE_Context *service;
1692 unsigned long long aport;
1693 unsigned long long bport;
1694
1695 service = GNUNET_SERVICE_start ("tcp", env->sched, env->cfg);
1696 if (service == NULL)
1697 {
1698 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1699 "tcp",
1700 _
1701 ("Failed to start service for `%s' transport plugin.\n"),
1702 "tcp");
1703 return NULL;
1704 }
1705 aport = 0;
1706 if ((GNUNET_OK !=
1707 GNUNET_CONFIGURATION_get_value_number (env->cfg,
1708 "tcp",
1709 "PORT",
1710 &bport)) ||
1711 (bport > 65535) ||
1712 ((GNUNET_OK ==
1713 GNUNET_CONFIGURATION_get_value_number (env->cfg,
1714 "tcp",
1715 "ADVERTISED-PORT",
1716 &aport)) && (aport > 65535)))
1717 {
1718 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
1719 "tcp",
1720 _
1721 ("Require valid port number for service `%s' in configuration!\n"),
1722 "tcp");
1723 GNUNET_SERVICE_stop (service);
1724 return NULL;
1725 }
1726 if (aport == 0)
1727 aport = bport;
1728 plugin = GNUNET_malloc (sizeof (struct Plugin));
1729 plugin->open_port = bport;
1730 plugin->adv_port = aport;
1731 plugin->env = env;
1732 plugin->lsock = NULL;
1733 plugin->statistics = NULL;
1734 api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
1735 api->cls = plugin;
1736 api->send_to = &tcp_plugin_send_to;
1737 api->send = &tcp_plugin_send;
1738 api->cancel = &tcp_plugin_cancel;
1739 api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
1740 api->set_receive_quota = &tcp_plugin_set_receive_quota;
1741 api->address_suggested = &tcp_plugin_address_suggested;
1742 api->cost_estimate = 42; /* TODO: ATS */
1743 plugin->service = service;
1744 plugin->server = GNUNET_SERVICE_get_server (service);
1745 create_tcp_handlers (plugin);
1746 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1747 "tcp", _("TCP transport listening on port %u\n"), bport);
1748 if (aport != bport)
1749 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1750 "tcp",
1751 _
1752 ("TCP transport advertises itself as being on port %u\n"),
1753 aport);
1754 GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
1755 plugin);
1756 GNUNET_OS_network_interfaces_list (&process_interfaces, plugin);
1757 GNUNET_RESOLVER_hostname_resolve (env->sched,
1758 env->cfg,
1759 AF_UNSPEC,
1760 HOSTNAME_RESOLVE_TIMEOUT,
1761 &process_hostname_ips, plugin);
1762 return api;
1763}
1764
1765
1766/**
1767 * Exit point from the plugin.
1768 */
1769void *
1770libgnunet_plugin_transport_tcp_done (void *cls)
1771{
1772 struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
1773 struct Plugin *plugin = api->cls;
1774
1775 GNUNET_SERVICE_stop (plugin->service);
1776 GNUNET_free (plugin->handlers);
1777 GNUNET_free (plugin);
1778 GNUNET_free (api);
1779 return NULL;
1780}
1781
1782/* end of plugin_transport_tcp.c */