aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-01-20 20:55:47 +0000
committerChristian Grothoff <christian@grothoff.org>2010-01-20 20:55:47 +0000
commitc3a70847fec2f50c4e98449292c15a123cfcbb16 (patch)
tree6bc3b5495bca6a4ec02af3fe4a20f50c9e74e962 /src
parentd30fe66aa5ecc0308d9f4e4e24342de4046c946e (diff)
downloadgnunet-c3a70847fec2f50c4e98449292c15a123cfcbb16.tar.gz
gnunet-c3a70847fec2f50c4e98449292c15a123cfcbb16.zip
TCP plugin that might actually implement the new API done
Diffstat (limited to 'src')
-rw-r--r--src/transport/plugin_transport_tcp.c658
1 files changed, 134 insertions, 524 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 9f180d14f..8b9a2b695 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors) 3 (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -28,7 +28,6 @@
28#include "gnunet_hello_lib.h" 28#include "gnunet_hello_lib.h"
29#include "gnunet_connection_lib.h" 29#include "gnunet_connection_lib.h"
30#include "gnunet_os_lib.h" 30#include "gnunet_os_lib.h"
31#include "gnunet_peerinfo_service.h"
32#include "gnunet_protocols.h" 31#include "gnunet_protocols.h"
33#include "gnunet_resolver_service.h" 32#include "gnunet_resolver_service.h"
34#include "gnunet_server_lib.h" 33#include "gnunet_server_lib.h"
@@ -42,13 +41,6 @@
42#define DEBUG_TCP GNUNET_NO 41#define DEBUG_TCP GNUNET_NO
43 42
44/** 43/**
45 * After how long do we expire an address that we
46 * learned from another peer if it is not reconfirmed
47 * by anyone?
48 */
49#define LEARNED_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 6)
50
51/**
52 * How long until we give up on transmitting the welcome message? 44 * How long until we give up on transmitting the welcome message?
53 */ 45 */
54#define WELCOME_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 46#define WELCOME_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
@@ -58,94 +50,9 @@
58 */ 50 */
59#define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 51#define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
60 52
61/**
62 * For how many messages back to we keep transmission times?
63 */
64#define ACK_LOG_SIZE 32
65
66
67
68/**
69 * Message used to ask a peer to validate receipt (to check an address
70 * from a HELLO). Followed by the address used. Note that the
71 * recipients response does not affirm that he has this address,
72 * only that he got the challenge message.
73 */
74struct ValidationChallengeMessage
75{
76
77 /**
78 * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PING
79 */
80 struct GNUNET_MessageHeader header;
81
82 /**
83 * Random challenge number (in network byte order).
84 */
85 uint32_t challenge GNUNET_PACKED;
86
87 /**
88 * Who is the intended recipient?
89 */
90 struct GNUNET_PeerIdentity target;
91
92};
93
94 53
95/** 54/**
96 * Message used to validate a HELLO. The challenge is included in the 55 * Initial handshake message for a session.
97 * confirmation to make matching of replies to requests possible. The
98 * signature signs the original challenge number, our public key, the
99 * sender's address (so that the sender can check that the address we
100 * saw is plausible for him and possibly detect a MiM attack) and a
101 * timestamp (to limit replay).<p>
102 *
103 * This message is followed by the address of the
104 * client that we are observing (which is part of what
105 * is being signed).
106 */
107struct ValidationChallengeResponse
108{
109
110 /**
111 * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PONG
112 */
113 struct GNUNET_MessageHeader header;
114
115 /**
116 * For padding, always zero.
117 */
118 uint32_t reserved GNUNET_PACKED;
119
120 /**
121 * Signature.
122 */
123 struct GNUNET_CRYPTO_RsaSignature signature;
124
125 /**
126 * What are we signing and why?
127 */
128 struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
129
130 /**
131 * Random challenge number (in network byte order).
132 */
133 uint32_t challenge GNUNET_PACKED;
134
135 /**
136 * Who signed this message?
137 */
138 struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer;
139
140};
141
142
143
144/**
145 * Initial handshake message for a session. This header
146 * is followed by the address that the other peer used to
147 * connect to us (so that we may learn it) or the address
148 * that the other peer got from the accept call.
149 */ 56 */
150struct WelcomeMessage 57struct WelcomeMessage
151{ 58{
@@ -160,39 +67,6 @@ struct WelcomeMessage
160 67
161 68
162/** 69/**
163 * Encapsulation for normal TCP traffic.
164 */
165struct DataMessage
166{
167 struct GNUNET_MessageHeader header;
168
169 /**
170 * For alignment.
171 */
172 uint32_t reserved GNUNET_PACKED;
173
174 /**
175 * Number of the last message that was received from the other peer.
176 */
177 uint64_t ack_in GNUNET_PACKED;
178
179 /**
180 * Number of this outgoing message.
181 */
182 uint64_t ack_out GNUNET_PACKED;
183
184 /**
185 * How long was sending this ack delayed by the other peer
186 * (estimate). The receiver of this message can use the delay
187 * between sending his message number 'ack' and receiving this ack
188 * minus the delay as an estimate of the round-trip time.
189 */
190 struct GNUNET_TIME_RelativeNBO delay;
191
192};
193
194
195/**
196 * Encapsulation of all of the state of the plugin. 70 * Encapsulation of all of the state of the plugin.
197 */ 71 */
198struct Plugin; 72struct Plugin;
@@ -214,7 +88,7 @@ struct PendingMessage
214 * The pending message, pointer to the end 88 * The pending message, pointer to the end
215 * of this struct, do not free! 89 * of this struct, do not free!
216 */ 90 */
217 struct GNUNET_MessageHeader *msg; 91 const struct GNUNET_MessageHeader *msg;
218 92
219 /** 93 /**
220 * Continuation function to call once the message 94 * Continuation function to call once the message
@@ -233,12 +107,6 @@ struct PendingMessage
233 */ 107 */
234 struct GNUNET_TIME_Absolute timeout; 108 struct GNUNET_TIME_Absolute timeout;
235 109
236 /**
237 * GNUNET_YES if this is a welcome message;
238 * otherwise this should be a DATA message.
239 */
240 int is_welcome;
241
242}; 110};
243 111
244 112
@@ -286,14 +154,8 @@ struct Session
286 struct GNUNET_TIME_Absolute last_quota_update; 154 struct GNUNET_TIME_Absolute last_quota_update;
287 155
288 /** 156 /**
289 * Context for our iteration to find HELLOs for this peer. NULL 157 * Address of the other peer (either based on our 'connect'
290 * after iteration has completed. 158 * call or on our 'accept' call).
291 */
292 struct GNUNET_PEERINFO_IteratorContext *ic;
293
294 /**
295 * Address of the other peer if WE initiated the connection
296 * (and hence can be sure what it is), otherwise NULL.
297 */ 159 */
298 void *connect_addr; 160 void *connect_addr;
299 161
@@ -304,34 +166,13 @@ struct Session
304 uint64_t last_received; 166 uint64_t last_received;
305 167
306 /** 168 /**
307 * Our current latency estimate (in ms).
308 */
309 double latency_estimate;
310
311 /**
312 * Time when we generated the last ACK_LOG_SIZE acks.
313 * (the "last" refers to the "out_msg_counter" here)
314 */
315 struct GNUNET_TIME_Absolute gen_time[ACK_LOG_SIZE];
316
317 /**
318 * Our current sequence number.
319 */
320 uint64_t out_msg_counter;
321
322 /**
323 * Highest received incoming sequence number.
324 */
325 uint64_t max_in_msg_counter;
326
327 /**
328 * Number of bytes per ms that this peer is allowed 169 * Number of bytes per ms that this peer is allowed
329 * to send to us. 170 * to send to us.
330 */ 171 */
331 uint32_t quota_in; 172 uint32_t quota_in;
332 173
333 /** 174 /**
334 * Length of connect_addr, can be 0. 175 * Length of connect_addr.
335 */ 176 */
336 size_t connect_alen; 177 size_t connect_alen;
337 178
@@ -420,10 +261,10 @@ find_session_by_target (struct Plugin *plugin,
420 struct Session *ret; 261 struct Session *ret;
421 262
422 ret = plugin->sessions; 263 ret = plugin->sessions;
423 while ((ret != NULL) && 264 while ( (ret != NULL) &&
424 ((GNUNET_SYSERR == ret->expecting_welcome) || 265 ((GNUNET_SYSERR == ret->expecting_welcome) ||
425 (0 != memcmp (target, 266 (0 != memcmp (target,
426 &ret->target, sizeof (struct GNUNET_PeerIdentity))))) 267 &ret->target, sizeof (struct GNUNET_PeerIdentity)))))
427 ret = ret->next; 268 ret = ret->next;
428 return ret; 269 return ret;
429} 270}
@@ -462,7 +303,6 @@ create_welcome (struct Plugin *plugin)
462 welcome->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME); 303 welcome->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME);
463 welcome->clientIdentity = *plugin->env->my_identity; 304 welcome->clientIdentity = *plugin->env->my_identity;
464 pm->timeout = GNUNET_TIME_relative_to_absolute (WELCOME_TIMEOUT); 305 pm->timeout = GNUNET_TIME_relative_to_absolute (WELCOME_TIMEOUT);
465 pm->is_welcome = GNUNET_YES;
466 return pm; 306 return pm;
467} 307}
468 308
@@ -491,7 +331,7 @@ create_session (struct Plugin *plugin,
491 ret->last_quota_update = GNUNET_TIME_absolute_get (); 331 ret->last_quota_update = GNUNET_TIME_absolute_get ();
492 ret->quota_in = plugin->env->default_quota_in; 332 ret->quota_in = plugin->env->default_quota_in;
493 ret->expecting_welcome = GNUNET_YES; 333 ret->expecting_welcome = GNUNET_YES;
494 ret->pending_messages = create_welcome (plugin); 334 ret->pending_messages = create_welcome (plugin);
495 return ret; 335 return ret;
496} 336}
497 337
@@ -524,7 +364,6 @@ do_transmit (void *cls, size_t size, void *buf)
524 char *cbuf; 364 char *cbuf;
525 uint16_t msize; 365 uint16_t msize;
526 size_t ret; 366 size_t ret;
527 struct DataMessage *dm;
528 367
529 session->transmit_handle = NULL; 368 session->transmit_handle = NULL;
530 if (buf == NULL) 369 if (buf == NULL)
@@ -557,40 +396,17 @@ do_transmit (void *cls, size_t size, void *buf)
557 cbuf = buf; 396 cbuf = buf;
558 while (NULL != (pm = session->pending_messages)) 397 while (NULL != (pm = session->pending_messages))
559 { 398 {
560 if (pm->is_welcome) 399 if (size < (msize = ntohs (pm->msg->size)))
561 { 400 break;
562 if (size < (msize = ntohs (pm->msg->size))) 401 memcpy (cbuf, pm->msg, msize);
563 break; 402 cbuf += msize;
564 memcpy (cbuf, pm->msg, msize); 403 ret += msize;
565 cbuf += msize; 404 size -= msize;
566 ret += msize;
567 size -= msize;
568 }
569 else
570 {
571 if (size <
572 sizeof (struct DataMessage) + (msize = ntohs (pm->msg->size)))
573 break;
574 dm = (struct DataMessage *) cbuf;
575 dm->header.size = htons (sizeof (struct DataMessage) + msize);
576 dm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA);
577 dm->ack_out = GNUNET_htonll (++session->out_msg_counter);
578 dm->ack_in = GNUNET_htonll (session->max_in_msg_counter);
579 cbuf += sizeof (struct DataMessage);
580 ret += sizeof (struct DataMessage);
581 size -= sizeof (struct DataMessage);
582 memcpy (cbuf, pm->msg, msize);
583 cbuf += msize;
584 ret += msize;
585 size -= msize;
586 }
587 session->pending_messages = pm->next; 405 session->pending_messages = pm->next;
588 if (pm->transmit_cont != NULL) 406 if (pm->transmit_cont != NULL)
589 pm->transmit_cont (pm->transmit_cont_cls, 407 pm->transmit_cont (pm->transmit_cont_cls,
590 &session->target, GNUNET_OK); 408 &session->target, GNUNET_OK);
591 GNUNET_free (pm); 409 GNUNET_free (pm);
592 session->gen_time[session->out_msg_counter % ACK_LOG_SIZE]
593 = GNUNET_TIME_absolute_get ();
594 } 410 }
595 process_pending_messages (session); 411 process_pending_messages (session);
596#if DEBUG_TCP 412#if DEBUG_TCP
@@ -610,22 +426,17 @@ do_transmit (void *cls, size_t size, void *buf)
610static void 426static void
611process_pending_messages (struct Session *session) 427process_pending_messages (struct Session *session)
612{ 428{
429 struct PendingMessage *pm;
613 GNUNET_assert (session->client != NULL); 430 GNUNET_assert (session->client != NULL);
614 if (session->pending_messages == NULL)
615 return;
616 if (session->transmit_handle != NULL) 431 if (session->transmit_handle != NULL)
617 return; 432 return;
433 if (NULL == (pm = session->pending_messages))
434 return;
618 session->transmit_handle 435 session->transmit_handle
619 = GNUNET_SERVER_notify_transmit_ready (session->client, 436 = GNUNET_SERVER_notify_transmit_ready (session->client,
620 ntohs (session-> 437 ntohs (pm->msg->size),
621 pending_messages->msg->
622 size) +
623 (session->
624 pending_messages->is_welcome ? 0 :
625 sizeof (struct DataMessage)),
626 GNUNET_TIME_absolute_get_remaining 438 GNUNET_TIME_absolute_get_remaining
627 (session-> 439 (pm->timeout),
628 pending_messages[0].timeout),
629 &do_transmit, session); 440 &do_transmit, session);
630} 441}
631 442
@@ -666,11 +477,6 @@ disconnect_session (struct Session *session)
666 else 477 else
667 prev->next = session->next; 478 prev->next = session->next;
668 /* clean up state */ 479 /* clean up state */
669 if (session->ic != NULL)
670 {
671 GNUNET_PEERINFO_iterate_cancel (session->ic);
672 session->ic = NULL;
673 }
674 if (session->transmit_handle != NULL) 480 if (session->transmit_handle != NULL)
675 { 481 {
676 GNUNET_CONNECTION_notify_transmit_ready_cancel 482 GNUNET_CONNECTION_notify_transmit_ready_cancel
@@ -708,7 +514,8 @@ disconnect_session (struct Session *session)
708 notify transport service about disconnect */ 514 notify transport service about disconnect */
709 session->plugin->env->receive (session->plugin->env->cls, 515 session->plugin->env->receive (session->plugin->env->cls,
710 1, 516 1,
711 NULL, 0, /* FIXME: address! */ 517 session->connect_addr,
518 session->connect_alen,
712 &session->target, NULL); 519 &session->target, NULL);
713 } 520 }
714 if (session->client != NULL) 521 if (session->client != NULL)
@@ -722,205 +529,6 @@ disconnect_session (struct Session *session)
722 529
723 530
724/** 531/**
725 * Iterator callback to go over all addresses. If we get
726 * a TCP address, increment the counter
727 *
728 * @param cls closure, points to the counter
729 * @param tname name of the transport
730 * @param expiration expiration time
731 * @param addr the address
732 * @param addrlen length of the address
733 * @return GNUNET_OK to keep the address,
734 * GNUNET_NO to delete it from the HELLO
735 * GNUNET_SYSERR to stop iterating (but keep current address)
736 */
737static int
738count_tcp_addresses (void *cls,
739 const char *tname,
740 struct GNUNET_TIME_Absolute expiration,
741 const void *addr, size_t addrlen)
742{
743 unsigned int *counter = cls;
744
745 if (0 != strcmp (tname, "tcp"))
746 return GNUNET_OK; /* not one of ours */
747 (*counter)++;
748 return GNUNET_OK; /* failed to connect */
749}
750
751
752struct ConnectContext
753{
754 struct Plugin *plugin;
755
756 struct GNUNET_CONNECTION_Handle *sa;
757
758 struct PendingMessage *welcome;
759
760 unsigned int pos;
761};
762
763
764/**
765 * Iterator callback to go over all addresses. If we get
766 * the "pos" TCP address, try to connect to it.
767 *
768 * @param cls closure
769 * @param tname name of the transport
770 * @param expiration expiration time
771 * @param addrlen length of the address
772 * @param addr the address
773 * @return GNUNET_OK to keep the address,
774 * GNUNET_NO to delete it from the HELLO
775 * GNUNET_SYSERR to stop iterating (but keep current address)
776 */
777static int
778try_connect_to_address (void *cls,
779 const char *tname,
780 struct GNUNET_TIME_Absolute expiration,
781 const void *addr, size_t addrlen)
782{
783 struct ConnectContext *cc = cls;
784 int af;
785
786 if (0 != strcmp (tname, "tcp"))
787 return GNUNET_OK; /* not one of ours */
788 if (sizeof (struct sockaddr_in) == addrlen)
789 af = AF_INET;
790 else if (sizeof (struct sockaddr_in6) == addrlen)
791 af = AF_INET6;
792 else
793 {
794 /* not a valid address */
795 GNUNET_break (0);
796 return GNUNET_NO;
797 }
798 if (0 == cc->pos--)
799 {
800 cc->welcome = create_welcome (cc->plugin);
801 cc->sa =
802 GNUNET_CONNECTION_create_from_sockaddr (cc->plugin->env->sched,
803 af, addr, addrlen,
804 GNUNET_SERVER_MAX_MESSAGE_SIZE);
805#if DEBUG_TCP
806 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
807 "tcp",
808 "Connecting using address %s.\n",
809 GNUNET_a2s (addr, addrlen));
810#endif
811 return GNUNET_SYSERR;
812 }
813 return GNUNET_OK; /* failed to connect */
814}
815
816
817/**
818 * Type of an iterator over the hosts. Note that each
819 * host will be called with each available protocol.
820 *
821 * @param cls closure
822 * @param peer id of the peer, NULL for last call
823 * @param hello hello message for the peer (can be NULL)
824 * @param trust amount of trust we have in the peer
825 */
826static void
827session_try_connect (void *cls,
828 const struct GNUNET_PeerIdentity *peer,
829 const struct GNUNET_HELLO_Message *hello, uint32_t trust)
830{
831 struct Session *session = cls;
832 unsigned int count;
833 struct ConnectContext cctx;
834 struct PendingMessage *pm;
835
836 if (peer == NULL)
837 {
838 session->ic = NULL;
839 /* last call, destroy session if we are still not
840 connected */
841 if (session->client != NULL)
842 {
843#if DEBUG_TCP
844 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
845 "tcp",
846 "Now connected to `%4s', now processing messages.\n",
847 GNUNET_i2s (&session->target));
848#endif
849 process_pending_messages (session);
850 }
851 else
852 {
853#if DEBUG_TCP
854 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
855 "tcp",
856 "Failed to connect to `%4s' (no working `%s'), closing session.\n",
857 GNUNET_i2s (&session->target), "HELLO");
858#endif
859 disconnect_session (session);
860 }
861 return;
862 }
863 if ((hello == NULL) || (session->client != NULL))
864 {
865 GNUNET_break (0); /* should this ever happen!? */
866 return;
867 }
868 count = 0;
869 GNUNET_HELLO_iterate_addresses (hello,
870 GNUNET_NO, &count_tcp_addresses, &count);
871 if (count == 0)
872 {
873#if DEBUG_TCP
874 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
875 "tcp",
876 "Asked to connect to `%4s', but have no addresses to try.\n",
877 GNUNET_i2s (&session->target));
878#endif
879 return;
880 }
881 cctx.plugin = session->plugin;
882 cctx.sa = NULL;
883 cctx.welcome = NULL;
884 cctx.pos = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
885 GNUNET_HELLO_iterate_addresses (hello,
886 GNUNET_NO, &try_connect_to_address, &cctx);
887 if (cctx.sa == NULL)
888 {
889#if DEBUG_TCP
890 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
891 "tcp",
892 "Asked to connect, but all addresses failed.\n");
893#endif
894 GNUNET_free_non_null (cctx.welcome);
895 return;
896 }
897 session->client = GNUNET_SERVER_connect_socket (session->plugin->server,
898 cctx.sa);
899#if DEBUG_TCP
900 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901 "Connected to `%4s' for session %p\n",
902 GNUNET_i2s (&session->target), session->client);
903#endif
904 if (session->client == NULL)
905 {
906 GNUNET_break (0); /* how could this happen? */
907 GNUNET_free_non_null (cctx.welcome);
908 return;
909 }
910 pm = cctx.welcome;
911 /* prepend (!) */
912 pm->next = session->pending_messages;
913 session->pending_messages = pm;
914#if DEBUG_TCP
915 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
916 "tcp",
917 "Connected to `%4s', now sending `%s' message.\n",
918 GNUNET_i2s (&session->target), "WELCOME");
919#endif
920}
921
922
923/**
924 * Function that can be used by the transport service to transmit 532 * Function that can be used by the transport service to transmit
925 * a message using the plugin. Note that in the case of a 533 * a message using the plugin. Note that in the case of a
926 * peer disconnecting, the continuation MUST be called 534 * peer disconnecting, the continuation MUST be called
@@ -967,43 +575,82 @@ tcp_plugin_send (void *cls,
967 struct Session *session; 575 struct Session *session;
968 struct PendingMessage *pm; 576 struct PendingMessage *pm;
969 struct PendingMessage *pme; 577 struct PendingMessage *pme;
578 struct GNUNET_CONNECTION_Handle *sa;
579 int af;
970 uint16_t mlen; 580 uint16_t mlen;
971 581
972 /* FIXME: support 'force_address' */
973 mlen = ntohs (msg->size); 582 mlen = ntohs (msg->size);
974 session = find_session_by_target (plugin, target); 583 session = find_session_by_target (plugin, target);
975 pm = GNUNET_malloc (sizeof (struct PendingMessage) + ntohs (msg->size)); 584 if ( (GNUNET_YES == force_address) &&
976 pm->msg = (struct GNUNET_MessageHeader *) &pm[1]; 585 ( (session->connect_alen != addrlen) ||
977 memcpy (pm->msg, msg, ntohs (msg->size)); 586 (0 != memcmp (session->connect_addr,
978 pm->timeout = GNUNET_TIME_relative_to_absolute (timeout); 587 addr,
979 pm->transmit_cont = cont; 588 addrlen)) ) )
980 pm->transmit_cont_cls = cont_cls; 589 session = NULL; /* ignore existing session */
590 if ( (session == NULL) &&
591 (addr == NULL) )
592 {
593#if DEBUG_TCP
594 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
595 "tcp",
596 "Asked to transmit to `%4s' without address and I have no existing connection (failing).\n",
597 GNUNET_i2s (target));
598#endif
599 return -1;
600 }
981 if (session == NULL) 601 if (session == NULL)
982 { 602 {
983 session = GNUNET_malloc (sizeof (struct Session)); 603 if (sizeof (struct sockaddr_in) == addrlen)
604 af = AF_INET;
605 else if (sizeof (struct sockaddr_in6) == addrlen)
606 af = AF_INET6;
607 else
608 {
609 GNUNET_break_op (0);
610 return -1;
611 }
612 sa = GNUNET_CONNECTION_create_from_sockaddr (plugin->env->sched,
613 af, addr, addrlen,
614 GNUNET_SERVER_MAX_MESSAGE_SIZE);
615 if (sa == NULL)
616 {
617#if DEBUG_TCP
618 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
619 "tcp",
620 "Failed to create connection to `%4s' at `%s'\n",
621 GNUNET_i2s (target),
622 GNUNET_a2s (addr, addrlen));
623#endif
624 return -1;
625 }
626
984#if DEBUG_TCP 627#if DEBUG_TCP
985 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 628 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
986 "tcp", 629 "tcp",
987 "Asked to transmit, creating fresh session %p.\n", 630 "Asked to transmit to `%4s', creating fresh session.\n",
988 session); 631 GNUNET_i2s (target));
989#endif 632#endif
990 session->next = plugin->sessions; 633 session = create_session (plugin,
991 plugin->sessions = session; 634 target,
992 session->plugin = plugin; 635 GNUNET_SERVER_connect_socket (session->plugin->server,
993 session->target = *target; 636 sa));
994 session->last_quota_update = GNUNET_TIME_absolute_get (); 637 session->connect_addr = GNUNET_malloc (addrlen);
995 session->quota_in = plugin->env->default_quota_in; 638 memcpy (session->connect_addr,
996 session->expecting_welcome = GNUNET_YES; 639 addr,
997 session->pending_messages = pm; 640 addrlen);
998 session->ic = GNUNET_PEERINFO_iterate (plugin->env->cfg, 641 session->connect_alen = addrlen;
999 plugin->env->sched,
1000 target,
1001 0, timeout, &session_try_connect,
1002 session);
1003 return mlen + sizeof (struct GNUNET_MessageHeader);
1004 } 642 }
1005 GNUNET_assert (session != NULL); 643 GNUNET_assert (session != NULL);
1006 GNUNET_assert (session->client != NULL); 644 GNUNET_assert (session->client != NULL);
645
646 /* create new message entry */
647 pm = GNUNET_malloc (mlen + sizeof (struct PendingMessage));
648 memcpy (&pm[1], msg, mlen);
649 pm->msg = (const struct GNUNET_MessageHeader*) &pm[1];
650 pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
651 pm->transmit_cont = cont;
652 pm->transmit_cont_cls = cont_cls;
653
1007 /* append pm to pending_messages list */ 654 /* append pm to pending_messages list */
1008 pme = session->pending_messages; 655 pme = session->pending_messages;
1009 if (pme == NULL) 656 if (pme == NULL)
@@ -1012,20 +659,24 @@ tcp_plugin_send (void *cls,
1012 } 659 }
1013 else 660 else
1014 { 661 {
662 /* FIXME: this could be done faster by keeping
663 track of the tail of the list... */
1015 while (NULL != pme->next) 664 while (NULL != pme->next)
1016 pme = pme->next; 665 pme = pme->next;
1017 pme->next = pm; 666 pme->next = pm;
1018 } 667 }
1019#if DEBUG_TCP 668#if DEBUG_TCP
1020 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 669 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1021 "tcp", "Asked to transmit, added message to list.\n"); 670 "tcp",
671 "Asked to transmit %u bytes to `%s', added message to list.\n",
672 mlen,
673 GNUNET_i2s (target));
1022#endif 674#endif
1023 process_pending_messages (session); 675 process_pending_messages (session);
1024 return mlen + sizeof (struct GNUNET_MessageHeader); 676 return mlen;
1025} 677}
1026 678
1027 679
1028
1029/** 680/**
1030 * Function that can be called to force a disconnect from the 681 * Function that can be called to force a disconnect from the
1031 * specified neighbour. This should also cancel all previously 682 * specified neighbour. This should also cancel all previously
@@ -1049,33 +700,30 @@ tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1049 struct Session *session; 700 struct Session *session;
1050 struct PendingMessage *pm; 701 struct PendingMessage *pm;
1051 702
1052 session = find_session_by_target (plugin, target);
1053 if (session == NULL)
1054 {
1055 GNUNET_break (0);
1056 return;
1057 }
1058#if DEBUG_TCP 703#if DEBUG_TCP
1059 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 704 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1060 "tcp", 705 "tcp",
1061 "Asked to cancel session with `%4s'\n", 706 "Asked to cancel session with `%4s'\n",
1062 GNUNET_i2s (target)); 707 GNUNET_i2s (target));
1063#endif 708#endif
1064 pm = session->pending_messages; 709 while (NULL != (session = find_session_by_target (plugin, target)))
1065 while (pm != NULL)
1066 {
1067 pm->transmit_cont = NULL;
1068 pm->transmit_cont_cls = NULL;
1069 pm = pm->next;
1070 }
1071 if (session->client != NULL)
1072 { 710 {
1073 GNUNET_SERVER_client_drop (session->client); 711 pm = session->pending_messages;
1074 session->client = NULL; 712 while (pm != NULL)
713 {
714 pm->transmit_cont = NULL;
715 pm->transmit_cont_cls = NULL;
716 pm = pm->next;
717 }
718 if (session->client != NULL)
719 {
720 GNUNET_SERVER_client_drop (session->client);
721 session->client = NULL;
722 }
723 /* rest of the clean-up of the session will be done as part of
724 disconnect_notify which should be triggered any time now
725 (or which may be triggering this call in the first place) */
1075 } 726 }
1076 /* rest of the clean-up of the session will be done as part of
1077 disconnect_notify which should be triggered any time now
1078 (or which may be triggering this call in the first place) */
1079} 727}
1080 728
1081 729
@@ -1316,7 +964,7 @@ handle_tcp_welcome (void *cls,
1316{ 964{
1317 struct Plugin *plugin = cls; 965 struct Plugin *plugin = cls;
1318 const struct WelcomeMessage *wm = (const struct WelcomeMessage *) message; 966 const struct WelcomeMessage *wm = (const struct WelcomeMessage *) message;
1319 struct Session *session_c; 967 struct Session *session;
1320 size_t alen; 968 size_t alen;
1321 void *vaddr; 969 void *vaddr;
1322 970
@@ -1326,31 +974,33 @@ handle_tcp_welcome (void *cls,
1326 "Received `%s' message from `%4s/%p'.\n", "WELCOME", 974 "Received `%s' message from `%4s/%p'.\n", "WELCOME",
1327 GNUNET_i2s (&wm->clientIdentity), client); 975 GNUNET_i2s (&wm->clientIdentity), client);
1328#endif 976#endif
1329 session_c = find_session_by_client (plugin, client); 977 session = find_session_by_client (plugin, client);
1330 if (session_c == NULL) 978 if (session == NULL)
1331 { 979 {
1332 vaddr = NULL;
1333 GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
1334 /* FIXME: keep vaddr / alen! */
1335 GNUNET_SERVER_client_keep (client); 980 GNUNET_SERVER_client_keep (client);
1336 session_c = create_session (plugin, 981 session = create_session (plugin,
1337 &wm->clientIdentity, client); 982 &wm->clientIdentity, client);
983 if (GNUNET_OK ==
984 GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
985 {
986 session->connect_addr = vaddr;
987 session->connect_alen = alen;
988 }
1338#if DEBUG_TCP 989#if DEBUG_TCP
1339 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 990 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1340 "tcp", 991 "tcp",
1341 "Creating new session %p for incoming `%s' message.\n", 992 "Creating new session %p for incoming `%s' message.\n",
1342 session_c, "WELCOME"); 993 session_c, "WELCOME");
1343#endif 994#endif
1344 GNUNET_free_non_null (vaddr); 995 process_pending_messages (session);
1345 process_pending_messages (session_c);
1346 } 996 }
1347 if (session_c->expecting_welcome != GNUNET_YES) 997 if (session->expecting_welcome != GNUNET_YES)
1348 { 998 {
1349 GNUNET_break_op (0); 999 GNUNET_break_op (0);
1350 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1000 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1351 return; 1001 return;
1352 } 1002 }
1353 session_c->expecting_welcome = GNUNET_NO; 1003 session->expecting_welcome = GNUNET_NO;
1354 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1004 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1355} 1005}
1356 1006
@@ -1416,25 +1066,12 @@ handle_tcp_data (void *cls,
1416{ 1066{
1417 struct Plugin *plugin = cls; 1067 struct Plugin *plugin = cls;
1418 struct Session *session; 1068 struct Session *session;
1419 const struct DataMessage *dm;
1420 uint16_t msize; 1069 uint16_t msize;
1421 const struct GNUNET_MessageHeader *msg;
1422 struct GNUNET_TIME_Relative latency;
1423 struct GNUNET_TIME_Absolute ttime;
1424 struct GNUNET_TIME_Absolute now;
1425 struct GNUNET_TIME_Relative delay; 1070 struct GNUNET_TIME_Relative delay;
1426 uint64_t ack_in;
1427 1071
1428 msize = ntohs (message->size); 1072 msize = ntohs (message->size);
1429 if ((msize <
1430 sizeof (struct DataMessage) + sizeof (struct GNUNET_MessageHeader)))
1431 {
1432 GNUNET_break_op (0);
1433 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1434 return;
1435 }
1436 session = find_session_by_client (plugin, client); 1073 session = find_session_by_client (plugin, client);
1437 if ((NULL == session) || (GNUNET_NO != session->expecting_welcome)) 1074 if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome))
1438 { 1075 {
1439 GNUNET_break_op (0); 1076 GNUNET_break_op (0);
1440 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1077 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -1445,49 +1082,20 @@ handle_tcp_data (void *cls,
1445 "tcp", "Receiving %u bytes from `%4s'.\n", 1082 "tcp", "Receiving %u bytes from `%4s'.\n",
1446 msize, GNUNET_i2s (&session->target)); 1083 msize, GNUNET_i2s (&session->target));
1447#endif 1084#endif
1448 dm = (const struct DataMessage *) message;
1449 session->max_in_msg_counter = GNUNET_MAX (session->max_in_msg_counter,
1450 GNUNET_ntohll (dm->ack_out));
1451 msg = (const struct GNUNET_MessageHeader *) &dm[1];
1452 if (msize != sizeof (struct DataMessage) + ntohs (msg->size))
1453 {
1454 GNUNET_break_op (0);
1455 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1456 return;
1457 }
1458 /* estimate latency */
1459 ack_in = GNUNET_ntohll (dm->ack_in);
1460 if ((ack_in <= session->out_msg_counter) &&
1461 (session->out_msg_counter - ack_in < ACK_LOG_SIZE))
1462 {
1463 delay = GNUNET_TIME_relative_ntoh (dm->delay);
1464 ttime = session->gen_time[ack_in % ACK_LOG_SIZE];
1465 now = GNUNET_TIME_absolute_get ();
1466 if (delay.value > now.value - ttime.value)
1467 delay.value = 0; /* not plausible */
1468 /* update (round-trip) latency using ageing; we
1469 use 7:1 so that we can reasonably quickly react
1470 to changes, but not so fast that latency is largely
1471 jitter... */
1472 session->latency_estimate
1473 = ((7 * session->latency_estimate) +
1474 (now.value - ttime.value - delay.value)) / 8;
1475 }
1476 latency.value = (uint64_t) session->latency_estimate;
1477 /* deliver on */
1478#if DEBUG_TCP 1085#if DEBUG_TCP
1479 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 1086 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1480 "tcp", 1087 "tcp",
1481 "Forwarding data of type %u to transport service.\n", 1088 "Forwarding %u bytes of data of type %u to transport service.\n",
1482 ntohs (msg->type)); 1089 (unsigned int) msize,
1090 (unsigned int) ntohs (msg->type));
1483#endif 1091#endif
1484 plugin->env->receive (plugin->env->cls, 1, 1092 plugin->env->receive (plugin->env->cls, 1,
1485 NULL, 0, /* FIXME: sender IP! */ 1093 session->connect_addr,
1486 &session->target, msg); 1094 session->connect_alen,
1095 &session->target, message);
1487 /* update bandwidth used */ 1096 /* update bandwidth used */
1488 session->last_received += msize; 1097 session->last_received += msize;
1489 update_quota (session, GNUNET_NO); 1098 update_quota (session, GNUNET_NO);
1490
1491 delay = calculate_throttle_delay (session); 1099 delay = calculate_throttle_delay (session);
1492 if (delay.value == 0) 1100 if (delay.value == 0)
1493 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1101 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1686,6 +1294,8 @@ libgnunet_plugin_transport_tcp_init (void *cls)
1686 aport); 1294 aport);
1687 GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify, 1295 GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
1688 plugin); 1296 plugin);
1297 /* FIXME: do the two calls below periodically again and
1298 not just once (since the info we get might change...) */
1689 GNUNET_OS_network_interfaces_list (&process_interfaces, plugin); 1299 GNUNET_OS_network_interfaces_list (&process_interfaces, plugin);
1690 plugin->hostname_dns = GNUNET_RESOLVER_hostname_resolve (env->sched, 1300 plugin->hostname_dns = GNUNET_RESOLVER_hostname_resolve (env->sched,
1691 env->cfg, 1301 env->cfg,