aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_tcp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-03-19 13:22:43 +0000
committerChristian Grothoff <christian@grothoff.org>2010-03-19 13:22:43 +0000
commitc0bb6897499d729673abd2f7d3ff1027c95b93fc (patch)
tree105014cc2a11ce2f490215115cd39d1417728161 /src/transport/plugin_transport_tcp.c
parente98b0a89bb7a9b28a916cb8aa5cd7649c66e7b52 (diff)
downloadgnunet-c0bb6897499d729673abd2f7d3ff1027c95b93fc.tar.gz
gnunet-c0bb6897499d729673abd2f7d3ff1027c95b93fc.zip
fixing disconnect handling, making TCP plugin ready for bi-di use
Diffstat (limited to 'src/transport/plugin_transport_tcp.c')
-rw-r--r--src/transport/plugin_transport_tcp.c280
1 files changed, 172 insertions, 108 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 6786b9c34..afba750f2 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -17,13 +17,11 @@
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20
21/** 20/**
22 * @file transport/plugin_transport_tcp.c 21 * @file transport/plugin_transport_tcp.c
23 * @brief Implementation of the TCP transport service 22 * @brief Implementation of the TCP transport service
24 * @author Christian Grothoff 23 * @author Christian Grothoff
25 */ 24 */
26
27#include "platform.h" 25#include "platform.h"
28#include "gnunet_hello_lib.h" 26#include "gnunet_hello_lib.h"
29#include "gnunet_connection_lib.h" 27#include "gnunet_connection_lib.h"
@@ -185,6 +183,11 @@ struct Session
185 */ 183 */
186 int expecting_welcome; 184 int expecting_welcome;
187 185
186 /**
187 * Was this a connection that was inbound (we accepted)? (GNUNET_YES/GNUNET_NO)
188 */
189 int inbound;
190
188}; 191};
189 192
190 193
@@ -347,12 +350,17 @@ static size_t
347do_transmit (void *cls, size_t size, void *buf) 350do_transmit (void *cls, size_t size, void *buf)
348{ 351{
349 struct Session *session = cls; 352 struct Session *session = cls;
350 struct PendingMessage *pm; 353 struct GNUNET_PeerIdentity pid;
354 struct Plugin *plugin;
355 struct PendingMessage *pos;
356 struct PendingMessage *hd;
357 struct PendingMessage *tl;
358 struct GNUNET_TIME_Absolute now;
351 char *cbuf; 359 char *cbuf;
352
353 size_t ret; 360 size_t ret;
354 361
355 session->transmit_handle = NULL; 362 session->transmit_handle = NULL;
363 plugin = session->plugin;
356 if (buf == NULL) 364 if (buf == NULL)
357 { 365 {
358#if DEBUG_TCP 366#if DEBUG_TCP
@@ -361,63 +369,97 @@ do_transmit (void *cls, size_t size, void *buf)
361 "Timeout trying to transmit to peer `%4s', discarding message queue.\n", 369 "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
362 GNUNET_i2s (&session->target)); 370 GNUNET_i2s (&session->target));
363#endif 371#endif
364 /* timeout */ 372 /* timeout; cancel all messages that have already expired */
365 while (NULL != (pm = session->pending_messages_head)) 373 hd = NULL;
366 { 374 tl = NULL;
375 ret = 0;
376 now = GNUNET_TIME_absolute_get ();
377 while ( (NULL != (pos = session->pending_messages_head)) &&
378 (pos->timeout.value <= now.value) )
379 {
367 GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, 380 GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
368 session->pending_messages_tail, 381 session->pending_messages_tail,
369 pm); 382 pos);
370#if DEBUG_TCP 383#if DEBUG_TCP
371 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 384 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
372 "tcp", 385 "tcp",
373 "Failed to transmit %u byte message to `%4s'.\n", 386 "Failed to transmit %u byte message to `%4s'.\n",
374 pm->message_size, 387 pos->message_size,
375 GNUNET_i2s (&session->target)); 388 GNUNET_i2s (&session->target));
376#endif 389#endif
377 GNUNET_STATISTICS_update (session->plugin->env->stats, 390 ret += pos->message_size;
378 gettext_noop ("# bytes currently in TCP buffers"), 391 GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
379 - (int64_t) pm->message_size,
380 GNUNET_NO);
381 GNUNET_STATISTICS_update (session->plugin->env->stats,
382 gettext_noop ("# bytes discarded by TCP (timeout)"),
383 pm->message_size,
384 GNUNET_NO);
385 if (pm->transmit_cont != NULL)
386 pm->transmit_cont (pm->transmit_cont_cls,
387 &session->target, GNUNET_SYSERR);
388 GNUNET_free (pm);
389 } 392 }
393 /* do this call before callbacks (so that if callbacks destroy
394 session, they have a chance to cancel actions done by this
395 call) */
396 process_pending_messages (session);
397 pid = session->target;
398 /* no do callbacks and do not use session again since
399 the callbacks may abort the session */
400 while (NULL != (pos = hd))
401 {
402 GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
403 if (pos->transmit_cont != NULL)
404 pos->transmit_cont (pos->transmit_cont_cls,
405 &pid, GNUNET_SYSERR);
406 GNUNET_free (pos);
407 }
408 GNUNET_STATISTICS_update (plugin->env->stats,
409 gettext_noop ("# bytes currently in TCP buffers"),
410 - (int64_t) ret,
411 GNUNET_NO);
412 GNUNET_STATISTICS_update (plugin->env->stats,
413 gettext_noop ("# bytes discarded by TCP (timeout)"),
414 ret,
415 GNUNET_NO);
390 return 0; 416 return 0;
391 } 417 }
418 /* copy all pending messages that would fit */
392 ret = 0; 419 ret = 0;
393 cbuf = buf; 420 cbuf = buf;
394 while (NULL != (pm = session->pending_messages_head)) 421 hd = NULL;
422 tl = NULL;
423 while (NULL != (pos = session->pending_messages_head))
395 { 424 {
396 if (size < pm->message_size) 425 if (ret + pos->message_size > size)
397 break; 426 break;
398 memcpy (cbuf, pm->msg, pm->message_size);
399 cbuf += pm->message_size;
400 ret += pm->message_size;
401 size -= pm->message_size;
402 GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, 427 GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
403 session->pending_messages_tail, 428 session->pending_messages_tail,
404 pm); 429 pos);
405 GNUNET_STATISTICS_update (session->plugin->env->stats, 430 GNUNET_assert (size >= pos->message_size);
406 gettext_noop ("# bytes currently in TCP buffers"), 431 memcpy (cbuf, pos->msg, pos->message_size);
407 - (int64_t) pm->message_size, 432 cbuf += pos->message_size;
408 GNUNET_NO); 433 ret += pos->message_size;
409 if (pm->transmit_cont != NULL) 434 size -= pos->message_size;
410 pm->transmit_cont (pm->transmit_cont_cls, 435 GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
411 &session->target, GNUNET_OK); 436 }
412 GNUNET_free (pm); 437 /* schedule 'continuation' before callbacks so that callbacks that
438 cancel everything don't cause us to use a session that no longer
439 exists... */
440 process_pending_messages (session);
441 pid = session->target;
442 /* we'll now call callbacks that may cancel the session; hence
443 we should not use 'session' after this point */
444 while (NULL != (pos = hd))
445 {
446 GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
447 if (pos->transmit_cont != NULL)
448 pos->transmit_cont (pos->transmit_cont_cls,
449 &pid, GNUNET_OK);
450 GNUNET_free (pos);
413 } 451 }
414 if (session->client != NULL) 452 GNUNET_assert (hd == NULL);
415 process_pending_messages (session); 453 GNUNET_assert (tl == NULL);
416#if DEBUG_TCP > 1 454#if DEBUG_TCP > 1
417 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 455 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
418 "tcp", "Transmitting %u bytes\n", ret); 456 "tcp", "Transmitting %u bytes\n", ret);
419#endif 457#endif
420 GNUNET_STATISTICS_update (session->plugin->env->stats, 458 GNUNET_STATISTICS_update (plugin->env->stats,
459 gettext_noop ("# bytes currently in TCP buffers"),
460 - (int64_t) ret,
461 GNUNET_NO);
462 GNUNET_STATISTICS_update (plugin->env->stats,
421 gettext_noop ("# bytes transmitted via TCP"), 463 gettext_noop ("# bytes transmitted via TCP"),
422 ret, 464 ret,
423 GNUNET_NO); 465 GNUNET_NO);
@@ -435,6 +477,7 @@ static void
435process_pending_messages (struct Session *session) 477process_pending_messages (struct Session *session)
436{ 478{
437 struct PendingMessage *pm; 479 struct PendingMessage *pm;
480
438 GNUNET_assert (session->client != NULL); 481 GNUNET_assert (session->client != NULL);
439 if (session->transmit_handle != NULL) 482 if (session->transmit_handle != NULL)
440 return; 483 return;
@@ -518,36 +561,15 @@ disconnect_session (struct Session *session)
518 &session->target, GNUNET_SYSERR); 561 &session->target, GNUNET_SYSERR);
519 GNUNET_free (pm); 562 GNUNET_free (pm);
520 } 563 }
521 if (GNUNET_NO == session->expecting_welcome) 564 GNUNET_break (session->client != NULL);
522 {
523#if DEBUG_TCP
524 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
525 "tcp",
526 "Notifying transport service about loss of connection with `%4s'.\n",
527 GNUNET_i2s (&session->target));
528#endif
529 /* Data session that actually went past the initial handshake;
530 transport service may know about this one, so we need to
531 notify transport service about disconnect */
532 // FIXME: we should have a very clear connect-disconnect
533 // protocol with gnunet-service-transport!
534 // FIXME: but this is not possible for all plugins, so what gives?
535 }
536 if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK) 565 if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
537 { 566 {
538 GNUNET_SCHEDULER_cancel (session->plugin->env->sched, 567 GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
539 session->receive_delay_task); 568 session->receive_delay_task);
540 if (session->client != NULL) 569 GNUNET_SERVER_receive_done (session->client,
541 { 570 GNUNET_SYSERR);
542 GNUNET_SERVER_receive_done (session->client,
543 GNUNET_SYSERR);
544 }
545 } 571 }
546 if (session->client != NULL) 572 GNUNET_SERVER_client_drop (session->client);
547 {
548 GNUNET_SERVER_client_drop (session->client);
549 session->client = NULL;
550 }
551 GNUNET_STATISTICS_update (session->plugin->env->stats, 573 GNUNET_STATISTICS_update (session->plugin->env->stats,
552 gettext_noop ("# TCP sessions active"), 574 gettext_noop ("# TCP sessions active"),
553 -1, 575 -1,
@@ -579,8 +601,10 @@ disconnect_session (struct Session *session)
579 * is "on its own" (i.e. re-use existing TCP connection)) 601 * is "on its own" (i.e. re-use existing TCP connection))
580 * @param addrlen length of the address in bytes 602 * @param addrlen length of the address in bytes
581 * @param force_address GNUNET_YES if the plugin MUST use the given address, 603 * @param force_address GNUNET_YES if the plugin MUST use the given address,
582 * otherwise the plugin may use other addresses or 604 * GNUNET_NO means the plugin may use any other address and
583 * existing connections (if available) 605 * GNUNET_SYSERR means that only reliable existing
606 * bi-directional connections should be used (regardless
607 * of address)
584 * @param cont continuation to call once the message has 608 * @param cont continuation to call once the message has
585 * been transmitted (or if the transport is ready 609 * been transmitted (or if the transport is ready
586 * for the next transmission call; or if the 610 * for the next transmission call; or if the
@@ -604,6 +628,7 @@ tcp_plugin_send (void *cls,
604{ 628{
605 struct Plugin *plugin = cls; 629 struct Plugin *plugin = cls;
606 struct Session *session; 630 struct Session *session;
631 struct Session *next;
607 struct PendingMessage *pm; 632 struct PendingMessage *pm;
608 struct GNUNET_CONNECTION_Handle *sa; 633 struct GNUNET_CONNECTION_Handle *sa;
609 int af; 634 int af;
@@ -612,22 +637,43 @@ tcp_plugin_send (void *cls,
612 gettext_noop ("# bytes TCP was asked to transmit"), 637 gettext_noop ("# bytes TCP was asked to transmit"),
613 msgbuf_size, 638 msgbuf_size,
614 GNUNET_NO); 639 GNUNET_NO);
615 session = plugin->sessions;
616 /* FIXME: we could do this a cheaper with a hash table 640 /* FIXME: we could do this a cheaper with a hash table
617 where we could restrict the iteration to entries that match 641 where we could restrict the iteration to entries that match
618 the target peer... */ 642 the target peer... */
619 while ( (session != NULL) && 643 next = plugin->sessions;
620 ( (session->client == NULL) || 644 while (NULL != (session = next))
621 (0 != memcmp (target, 645 {
622 &session->target, 646 next = session->next;
623 sizeof (struct GNUNET_PeerIdentity))) || 647 if (session->client == NULL)
624 ( (GNUNET_YES == force_address) && 648 continue;
625 (addr != NULL) && 649 if (0 != memcmp (target,
626 ( (addrlen != session->connect_alen) || 650 &session->target,
627 (0 != memcmp (session->connect_addr, 651 sizeof (struct GNUNET_PeerIdentity)))
628 addr, 652 continue;
629 addrlen)) ) ) ) ) 653 if (GNUNET_SYSERR == force_address)
630 session = session->next; 654 {
655 if (session->expecting_welcome == GNUNET_NO)
656 break; /* established and reliable (TCP!) */
657 else
658 continue; /* not established */
659 }
660 if (GNUNET_NO == force_address)
661 break;
662 GNUNET_break (GNUNET_YES == force_address);
663 if (addr == NULL)
664 {
665 GNUNET_break (0);
666 break;
667 }
668 if (session->inbound == GNUNET_YES)
669 continue;
670 if (addrlen != session->connect_alen)
671 continue;
672 if (0 == memcmp (session->connect_addr,
673 addr,
674 addrlen))
675 break;
676 }
631 if ( (session == NULL) && 677 if ( (session == NULL) &&
632 (addr == NULL) ) 678 (addr == NULL) )
633 { 679 {
@@ -646,9 +692,13 @@ tcp_plugin_send (void *cls,
646 if (session == NULL) 692 if (session == NULL)
647 { 693 {
648 if (sizeof (struct sockaddr_in) == addrlen) 694 if (sizeof (struct sockaddr_in) == addrlen)
649 af = AF_INET; 695 {
696 af = AF_INET;
697 }
650 else if (sizeof (struct sockaddr_in6) == addrlen) 698 else if (sizeof (struct sockaddr_in6) == addrlen)
651 af = AF_INET6; 699 {
700 af = AF_INET6;
701 }
652 else 702 else
653 { 703 {
654 GNUNET_break_op (0); 704 GNUNET_break_op (0);
@@ -672,7 +722,6 @@ tcp_plugin_send (void *cls,
672 GNUNET_NO); 722 GNUNET_NO);
673 return -1; 723 return -1;
674 } 724 }
675
676#if DEBUG_TCP 725#if DEBUG_TCP
677 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 726 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
678 "tcp", 727 "tcp",
@@ -692,7 +741,6 @@ tcp_plugin_send (void *cls,
692 } 741 }
693 GNUNET_assert (session != NULL); 742 GNUNET_assert (session != NULL);
694 GNUNET_assert (session->client != NULL); 743 GNUNET_assert (session->client != NULL);
695
696 GNUNET_STATISTICS_update (plugin->env->stats, 744 GNUNET_STATISTICS_update (plugin->env->stats,
697 gettext_noop ("# bytes currently in TCP buffers"), 745 gettext_noop ("# bytes currently in TCP buffers"),
698 msgbuf_size, 746 msgbuf_size,
@@ -740,10 +788,12 @@ tcp_plugin_send (void *cls,
740 * to be cancelled 788 * to be cancelled
741 */ 789 */
742static void 790static void
743tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) 791tcp_plugin_disconnect (void *cls,
792 const struct GNUNET_PeerIdentity *target)
744{ 793{
745 struct Plugin *plugin = cls; 794 struct Plugin *plugin = cls;
746 struct Session *session; 795 struct Session *session;
796 struct Session *next;
747 struct PendingMessage *pm; 797 struct PendingMessage *pm;
748 798
749#if DEBUG_TCP 799#if DEBUG_TCP
@@ -752,44 +802,53 @@ tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
752 "Asked to cancel session with `%4s'\n", 802 "Asked to cancel session with `%4s'\n",
753 GNUNET_i2s (target)); 803 GNUNET_i2s (target));
754#endif 804#endif
755 session = plugin->sessions; 805 next = plugin->sessions;
756 while (NULL != session) 806 while (NULL != (session = next))
757 { 807 {
758 if (0 == memcmp (target, 808 next = session->next;
809 if (0 != memcmp (target,
759 &session->target, 810 &session->target,
760 sizeof (struct GNUNET_PeerIdentity))) 811 sizeof (struct GNUNET_PeerIdentity)))
812 continue;
813 pm = session->pending_messages_head;
814 while (pm != NULL)
761 { 815 {
762 pm = session->pending_messages_head; 816 pm->transmit_cont = NULL;
763 while (pm != NULL) 817 pm->transmit_cont_cls = NULL;
764 { 818 pm = pm->next;
765 pm->transmit_cont = NULL;
766 pm->transmit_cont_cls = NULL;
767 pm = pm->next;
768 }
769 if (session->client != NULL)
770 {
771 GNUNET_SERVER_client_drop (session->client);
772 session->client = NULL;
773 }
774 /* rest of the clean-up of the session will be done as part of
775 disconnect_notify which should be triggered any time now
776 (or which may be triggering this call in the first place) */
777 } 819 }
778 session = session->next; 820 disconnect_session (session);
779 } 821 }
780} 822}
781 823
782 824
825/**
826 * Context for address to string conversion.
827 */
783struct PrettyPrinterContext 828struct PrettyPrinterContext
784{ 829{
830 /**
831 * Function to call with the result.
832 */
785 GNUNET_TRANSPORT_AddressStringCallback asc; 833 GNUNET_TRANSPORT_AddressStringCallback asc;
834
835 /**
836 * Clsoure for 'asc'.
837 */
786 void *asc_cls; 838 void *asc_cls;
839
840 /**
841 * Port to add after the IP address.
842 */
787 uint16_t port; 843 uint16_t port;
788}; 844};
789 845
790 846
791/** 847/**
792 * Append our port and forward the result. 848 * Append our port and forward the result.
849 *
850 * @param cls the 'struct PrettyPrinterContext*'
851 * @param hostname hostname part of the address
793 */ 852 */
794static void 853static void
795append_port (void *cls, const char *hostname) 854append_port (void *cls, const char *hostname)
@@ -873,6 +932,8 @@ tcp_plugin_address_pretty_printer (void *cls,
873 * our listen port or our advertised port). If it is 932 * our listen port or our advertised port). If it is
874 * neither, we return one of these two ports at random. 933 * neither, we return one of these two ports at random.
875 * 934 *
935 * @param plugin global variables
936 * @param in_port port number to check
876 * @return either in_port or a more plausible port 937 * @return either in_port or a more plausible port
877 */ 938 */
878static uint16_t 939static uint16_t
@@ -890,7 +951,7 @@ check_port (struct Plugin *plugin, uint16_t in_port)
890 * Another peer has suggested an address for this peer and transport 951 * Another peer has suggested an address for this peer and transport
891 * plugin. Check that this could be a valid address. 952 * plugin. Check that this could be a valid address.
892 * 953 *
893 * @param cls closure 954 * @param cls closure, our 'struct Plugin*'
894 * @param addr pointer to the address 955 * @param addr pointer to the address
895 * @param addrlen length of addr 956 * @param addrlen length of addr
896 * @return GNUNET_OK if this is a plausible address for this peer 957 * @return GNUNET_OK if this is a plausible address for this peer
@@ -967,6 +1028,7 @@ handle_tcp_welcome (void *cls,
967 GNUNET_SERVER_client_keep (client); 1028 GNUNET_SERVER_client_keep (client);
968 session = create_session (plugin, 1029 session = create_session (plugin,
969 &wm->clientIdentity, client); 1030 &wm->clientIdentity, client);
1031 session->inbound = GNUNET_YES;
970 if (GNUNET_OK == 1032 if (GNUNET_OK ==
971 GNUNET_SERVER_client_get_address (client, &vaddr, &alen)) 1033 GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
972 { 1034 {
@@ -1010,6 +1072,9 @@ handle_tcp_welcome (void *cls,
1010/** 1072/**
1011 * Task to signal the server that we can continue 1073 * Task to signal the server that we can continue
1012 * receiving from the TCP client now. 1074 * receiving from the TCP client now.
1075 *
1076 * @param cls the 'struct Session*'
1077 * @param tc task context (unused)
1013 */ 1078 */
1014static void 1079static void
1015delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1080delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -1056,7 +1121,6 @@ handle_tcp_data (void *cls,
1056 } 1121 }
1057 if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome)) 1122 if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome))
1058 { 1123 {
1059 GNUNET_break_op (0);
1060 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1124 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1061 return; 1125 return;
1062 } 1126 }
@@ -1264,10 +1328,10 @@ libgnunet_plugin_transport_tcp_init (void *cls)
1264 if (aport != bport) 1328 if (aport != bport)
1265 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 1329 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1266 "tcp", 1330 "tcp",
1267 _ 1331 _("TCP transport advertises itself as being on port %llu\n"),
1268 ("TCP transport advertises itself as being on port %llu\n"),
1269 aport); 1332 aport);
1270 GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify, 1333 GNUNET_SERVER_disconnect_notify (plugin->server,
1334 &disconnect_notify,
1271 plugin); 1335 plugin);
1272 /* FIXME: do the two calls below periodically again and 1336 /* FIXME: do the two calls below periodically again and
1273 not just once (since the info we get might change...) */ 1337 not just once (since the info we get might change...) */