diff options
-rw-r--r-- | src/transport/plugin_transport_unix.c | 192 |
1 files changed, 126 insertions, 66 deletions
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 6b90cf1f4..7dff9587f 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c | |||
@@ -338,6 +338,39 @@ struct Plugin | |||
338 | 338 | ||
339 | 339 | ||
340 | /** | 340 | /** |
341 | * If a session monitor is attached, notify it about the new | ||
342 | * session state. | ||
343 | * | ||
344 | * @param plugin our plugin | ||
345 | * @param session session that changed state | ||
346 | * @param state new state of the session | ||
347 | */ | ||
348 | static void | ||
349 | notify_session_monitor (struct Plugin *plugin, | ||
350 | struct Session *session, | ||
351 | enum GNUNET_TRANSPORT_SessionState state) | ||
352 | { | ||
353 | struct GNUNET_TRANSPORT_SessionInfo info; | ||
354 | |||
355 | if (NULL == plugin->sic) | ||
356 | return; | ||
357 | memset (&info, 0, sizeof (info)); | ||
358 | info.state = state; | ||
359 | info.is_inbound = GNUNET_SYSERR; /* hard to say */ | ||
360 | info.num_msg_pending = session->msgs_in_queue; | ||
361 | info.num_bytes_pending = session->bytes_in_queue; | ||
362 | /* info.receive_delay remains zero as this is not supported by UNIX | ||
363 | (cannot selectively not receive from 'some' peer while continuing | ||
364 | to receive from others) */ | ||
365 | info.session_timeout = session->timeout; | ||
366 | info.address = session->address; | ||
367 | plugin->sic (plugin->sic_cls, | ||
368 | session, | ||
369 | &info); | ||
370 | } | ||
371 | |||
372 | |||
373 | /** | ||
341 | * Function called for a quick conversion of the binary address to | 374 | * Function called for a quick conversion of the binary address to |
342 | * a numeric address. Note that the caller must not free the | 375 | * a numeric address. Note that the caller must not free the |
343 | * address and that the next call to this function is allowed | 376 | * address and that the next call to this function is allowed |
@@ -404,7 +437,7 @@ unix_address_to_string (void *cls, | |||
404 | * to close a session due to a disconnect or failure to | 437 | * to close a session due to a disconnect or failure to |
405 | * establish a connection. | 438 | * establish a connection. |
406 | * | 439 | * |
407 | * @param cls closure with the `struct Plugin` | 440 | * @param cls closure with the `struct Plugin *` |
408 | * @param session session to close down | 441 | * @param session session to close down |
409 | * @return #GNUNET_OK on success | 442 | * @return #GNUNET_OK on success |
410 | */ | 443 | */ |
@@ -460,7 +493,11 @@ unix_session_disconnect (void *cls, | |||
460 | { | 493 | { |
461 | GNUNET_SCHEDULER_cancel (session->timeout_task); | 494 | GNUNET_SCHEDULER_cancel (session->timeout_task); |
462 | session->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 495 | session->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
496 | session->timeout = GNUNET_TIME_UNIT_ZERO_ABS; | ||
463 | } | 497 | } |
498 | notify_session_monitor (plugin, | ||
499 | session, | ||
500 | GNUNET_TRANSPORT_SS_DOWN); | ||
464 | GNUNET_HELLO_address_free (session->address); | 501 | GNUNET_HELLO_address_free (session->address); |
465 | GNUNET_break (0 == session->bytes_in_queue); | 502 | GNUNET_break (0 == session->bytes_in_queue); |
466 | GNUNET_break (0 == session->msgs_in_queue); | 503 | GNUNET_break (0 == session->msgs_in_queue); |
@@ -472,50 +509,56 @@ unix_session_disconnect (void *cls, | |||
472 | /** | 509 | /** |
473 | * Session was idle for too long, so disconnect it | 510 | * Session was idle for too long, so disconnect it |
474 | * | 511 | * |
475 | * @param cls the 'struct Session' to disconnect | 512 | * @param cls the `struct Session *` to disconnect |
476 | * @param tc scheduler context | 513 | * @param tc scheduler context |
477 | */ | 514 | */ |
478 | static void | 515 | static void |
479 | session_timeout (void *cls, | 516 | session_timeout (void *cls, |
480 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 517 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
481 | { | 518 | { |
482 | struct Session *s = cls; | 519 | struct Session *session = cls; |
483 | struct GNUNET_TIME_Relative left; | 520 | struct GNUNET_TIME_Relative left; |
484 | 521 | ||
485 | s->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 522 | session->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
486 | left = GNUNET_TIME_absolute_get_remaining (s->timeout); | 523 | left = GNUNET_TIME_absolute_get_remaining (session->timeout); |
487 | if (0 != left.rel_value_us) | 524 | if (0 != left.rel_value_us) |
488 | { | 525 | { |
489 | /* not actually our turn yet */ | 526 | /* not actually our turn yet, but let's at least update |
490 | s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, | 527 | the monitor, it may think we're about to die ... */ |
491 | &session_timeout, | 528 | notify_session_monitor (session->plugin, |
492 | s); | 529 | session, |
530 | GNUNET_TRANSPORT_SS_UP); | ||
531 | session->timeout_task = GNUNET_SCHEDULER_add_delayed (left, | ||
532 | &session_timeout, | ||
533 | session); | ||
493 | return; | 534 | return; |
494 | } | 535 | } |
495 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 536 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
496 | "Session %p was idle for %s, disconnecting\n", | 537 | "Session %p was idle for %s, disconnecting\n", |
497 | s, | 538 | session, |
498 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 539 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
499 | GNUNET_YES)); | 540 | GNUNET_YES)); |
500 | unix_session_disconnect (s->plugin, s); | 541 | unix_session_disconnect (session->plugin, session); |
501 | } | 542 | } |
502 | 543 | ||
503 | 544 | ||
504 | /** | 545 | /** |
505 | * Increment session timeout due to activity | 546 | * Increment session timeout due to activity. We do not immediately |
547 | * notify the monitor here as that might generate excessive | ||
548 | * signalling. | ||
506 | * | 549 | * |
507 | * @param s session for which the timeout should be rescheduled | 550 | * @param session session for which the timeout should be rescheduled |
508 | */ | 551 | */ |
509 | static void | 552 | static void |
510 | reschedule_session_timeout (struct Session *s) | 553 | reschedule_session_timeout (struct Session *session) |
511 | { | 554 | { |
512 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); | 555 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != session->timeout_task); |
513 | s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | 556 | session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); |
514 | } | 557 | } |
515 | 558 | ||
516 | 559 | ||
517 | /** | 560 | /** |
518 | * Convert unix path to a `struct sockaddr_un` | 561 | * Convert unix path to a `struct sockaddr_un *` |
519 | * | 562 | * |
520 | * @param unixpath path to convert | 563 | * @param unixpath path to convert |
521 | * @param sock_len[out] set to the length of the address | 564 | * @param sock_len[out] set to the length of the address |
@@ -576,11 +619,12 @@ lookup_session_it (void *cls, | |||
576 | void *value) | 619 | void *value) |
577 | { | 620 | { |
578 | struct LookupCtx *lctx = cls; | 621 | struct LookupCtx *lctx = cls; |
579 | struct Session *s = value; | 622 | struct Session *session = value; |
580 | 623 | ||
581 | if (0 == GNUNET_HELLO_address_cmp (lctx->address, s->address)) | 624 | if (0 == GNUNET_HELLO_address_cmp (lctx->address, |
625 | session->address)) | ||
582 | { | 626 | { |
583 | lctx->res = s; | 627 | lctx->res = session; |
584 | return GNUNET_NO; | 628 | return GNUNET_NO; |
585 | } | 629 | } |
586 | return GNUNET_YES; | 630 | return GNUNET_YES; |
@@ -785,7 +829,7 @@ unix_plugin_get_session (void *cls, | |||
785 | const struct GNUNET_HELLO_Address *address) | 829 | const struct GNUNET_HELLO_Address *address) |
786 | { | 830 | { |
787 | struct Plugin *plugin = cls; | 831 | struct Plugin *plugin = cls; |
788 | struct Session *s; | 832 | struct Session *session; |
789 | struct UnixAddress *ua; | 833 | struct UnixAddress *ua; |
790 | char * addrstr; | 834 | char * addrstr; |
791 | uint32_t addr_str_len; | 835 | uint32_t addr_str_len; |
@@ -825,41 +869,43 @@ unix_plugin_get_session (void *cls, | |||
825 | } | 869 | } |
826 | 870 | ||
827 | /* Check if a session for this address already exists */ | 871 | /* Check if a session for this address already exists */ |
828 | if (NULL != (s = lookup_session (plugin, | 872 | if (NULL != (session = lookup_session (plugin, |
829 | address))) | 873 | address))) |
830 | { | 874 | { |
831 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 875 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
832 | "Found existing session %p for address `%s'\n", | 876 | "Found existing session %p for address `%s'\n", |
833 | s, | 877 | session, |
834 | unix_address_to_string (NULL, | 878 | unix_address_to_string (NULL, |
835 | address->address, | 879 | address->address, |
836 | address->address_length)); | 880 | address->address_length)); |
837 | return s; | 881 | return session; |
838 | } | 882 | } |
839 | 883 | ||
840 | /* create a new session */ | 884 | /* create a new session */ |
841 | s = GNUNET_new (struct Session); | 885 | session = GNUNET_new (struct Session); |
842 | s->target = address->peer; | 886 | session->target = address->peer; |
843 | s->address = GNUNET_HELLO_address_copy (address); | 887 | session->address = GNUNET_HELLO_address_copy (address); |
844 | s->plugin = plugin; | 888 | session->plugin = plugin; |
845 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); | 889 | session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
846 | s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 890 | &session_timeout, |
847 | &session_timeout, | 891 | session); |
848 | s); | ||
849 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 892 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
850 | "Creating a new session %p for address `%s'\n", | 893 | "Creating a new session %p for address `%s'\n", |
851 | s, | 894 | session, |
852 | unix_address_to_string (NULL, | 895 | unix_address_to_string (NULL, |
853 | address->address, | 896 | address->address, |
854 | address->address_length)); | 897 | address->address_length)); |
855 | (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map, | 898 | (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map, |
856 | &address->peer, s, | 899 | &address->peer, session, |
857 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 900 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
858 | GNUNET_STATISTICS_set (plugin->env->stats, | 901 | GNUNET_STATISTICS_set (plugin->env->stats, |
859 | "# UNIX sessions active", | 902 | "# UNIX sessions active", |
860 | GNUNET_CONTAINER_multipeermap_size (plugin->session_map), | 903 | GNUNET_CONTAINER_multipeermap_size (plugin->session_map), |
861 | GNUNET_NO); | 904 | GNUNET_NO); |
862 | return s; | 905 | notify_session_monitor (plugin, |
906 | session, | ||
907 | GNUNET_TRANSPORT_SS_UP); | ||
908 | return session; | ||
863 | } | 909 | } |
864 | 910 | ||
865 | 911 | ||
@@ -901,11 +947,12 @@ unix_plugin_update_session_timeout (void *cls, | |||
901 | * @param ua_len length of the address @a ua | 947 | * @param ua_len length of the address @a ua |
902 | */ | 948 | */ |
903 | static void | 949 | static void |
904 | unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, | 950 | unix_demultiplexer (struct Plugin *plugin, |
951 | struct GNUNET_PeerIdentity *sender, | ||
905 | const struct GNUNET_MessageHeader *currhdr, | 952 | const struct GNUNET_MessageHeader *currhdr, |
906 | const struct UnixAddress *ua, size_t ua_len) | 953 | const struct UnixAddress *ua, size_t ua_len) |
907 | { | 954 | { |
908 | struct Session *s = NULL; | 955 | struct Session *session; |
909 | struct GNUNET_HELLO_Address *address; | 956 | struct GNUNET_HELLO_Address *address; |
910 | 957 | ||
911 | GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED); | 958 | GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED); |
@@ -923,26 +970,31 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, | |||
923 | PLUGIN_NAME, | 970 | PLUGIN_NAME, |
924 | ua, ua_len, | 971 | ua, ua_len, |
925 | GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ | 972 | GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ |
926 | s = lookup_session (plugin, address); | 973 | session = lookup_session (plugin, address); |
927 | if (NULL == s) | 974 | if (NULL == session) |
928 | { | 975 | { |
929 | s = unix_plugin_get_session (plugin, address); | 976 | session = unix_plugin_get_session (plugin, address); |
930 | /* Notify transport and ATS about new inbound session */ | 977 | /* Notify transport and ATS about new inbound session */ |
931 | plugin->env->session_start (NULL, | 978 | plugin->env->session_start (NULL, |
932 | s->address, | 979 | session->address, |
933 | s, | 980 | session, |
934 | &plugin->ats_network, 1); | 981 | &plugin->ats_network, 1); |
982 | notify_session_monitor (plugin, | ||
983 | session, | ||
984 | GNUNET_TRANSPORT_SS_UP); | ||
985 | } | ||
986 | else | ||
987 | { | ||
988 | reschedule_session_timeout (session); | ||
935 | } | 989 | } |
936 | GNUNET_HELLO_address_free (address); | 990 | GNUNET_HELLO_address_free (address); |
937 | reschedule_session_timeout (s); | ||
938 | |||
939 | plugin->env->receive (plugin->env->cls, | 991 | plugin->env->receive (plugin->env->cls, |
940 | s->address, | 992 | session->address, |
941 | s, | 993 | session, |
942 | currhdr); | 994 | currhdr); |
943 | plugin->env->update_address_metrics (plugin->env->cls, | 995 | plugin->env->update_address_metrics (plugin->env->cls, |
944 | s->address, | 996 | session->address, |
945 | s, | 997 | session, |
946 | &plugin->ats_network, 1); | 998 | &plugin->ats_network, 1); |
947 | } | 999 | } |
948 | 1000 | ||
@@ -1018,7 +1070,9 @@ unix_plugin_do_read (struct Plugin *plugin) | |||
1018 | return; | 1070 | return; |
1019 | } | 1071 | } |
1020 | msgbuf = (char *) &msg[1]; | 1072 | msgbuf = (char *) &msg[1]; |
1021 | memcpy (&sender, &msg->sender, sizeof (struct GNUNET_PeerIdentity)); | 1073 | memcpy (&sender, |
1074 | &msg->sender, | ||
1075 | sizeof (struct GNUNET_PeerIdentity)); | ||
1022 | offset = 0; | 1076 | offset = 0; |
1023 | tsize = csize - sizeof (struct UNIXMessage); | 1077 | tsize = csize - sizeof (struct UNIXMessage); |
1024 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) | 1078 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) |
@@ -1049,12 +1103,15 @@ unix_plugin_do_write (struct Plugin *plugin) | |||
1049 | ssize_t sent = 0; | 1103 | ssize_t sent = 0; |
1050 | struct UNIXMessageWrapper *msgw; | 1104 | struct UNIXMessageWrapper *msgw; |
1051 | struct Session *session; | 1105 | struct Session *session; |
1106 | int did_delete; | ||
1052 | 1107 | ||
1108 | did_delete = GNUNET_NO; | ||
1053 | while (NULL != (msgw = plugin->msg_head)) | 1109 | while (NULL != (msgw = plugin->msg_head)) |
1054 | { | 1110 | { |
1055 | if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) | 1111 | if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) |
1056 | break; /* Message is ready for sending */ | 1112 | break; /* Message is ready for sending */ |
1057 | /* Message has a timeout */ | 1113 | /* Message has a timeout */ |
1114 | did_delete = GNUNET_YES; | ||
1058 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1115 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1059 | "Timeout for message with %u bytes \n", | 1116 | "Timeout for message with %u bytes \n", |
1060 | (unsigned int) msgw->msgsize); | 1117 | (unsigned int) msgw->msgsize); |
@@ -1085,7 +1142,13 @@ unix_plugin_do_write (struct Plugin *plugin) | |||
1085 | GNUNET_free (msgw); | 1142 | GNUNET_free (msgw); |
1086 | } | 1143 | } |
1087 | if (NULL == msgw) | 1144 | if (NULL == msgw) |
1145 | { | ||
1146 | if (GNUNET_YES == did_delete) | ||
1147 | notify_session_monitor (plugin, | ||
1148 | session, | ||
1149 | GNUNET_TRANSPORT_SS_UP); | ||
1088 | return; /* Nothing to send at the moment */ | 1150 | return; /* Nothing to send at the moment */ |
1151 | } | ||
1089 | 1152 | ||
1090 | sent = unix_real_send (plugin, | 1153 | sent = unix_real_send (plugin, |
1091 | plugin->unix_sock.desc, | 1154 | plugin->unix_sock.desc, |
@@ -1098,12 +1161,14 @@ unix_plugin_do_write (struct Plugin *plugin) | |||
1098 | msgw->session->address->address_length, | 1161 | msgw->session->address->address_length, |
1099 | msgw->payload, | 1162 | msgw->payload, |
1100 | msgw->cont, msgw->cont_cls); | 1163 | msgw->cont, msgw->cont_cls); |
1101 | |||
1102 | if (RETRY == sent) | 1164 | if (RETRY == sent) |
1103 | { | 1165 | { |
1104 | GNUNET_STATISTICS_update (plugin->env->stats, | 1166 | GNUNET_STATISTICS_update (plugin->env->stats, |
1105 | "# UNIX retry attempts", | 1167 | "# UNIX retry attempts", |
1106 | 1, GNUNET_NO); | 1168 | 1, GNUNET_NO); |
1169 | notify_session_monitor (plugin, | ||
1170 | session, | ||
1171 | GNUNET_TRANSPORT_SS_UP); | ||
1107 | return; | 1172 | return; |
1108 | } | 1173 | } |
1109 | GNUNET_CONTAINER_DLL_remove (plugin->msg_head, | 1174 | GNUNET_CONTAINER_DLL_remove (plugin->msg_head, |
@@ -1118,6 +1183,9 @@ unix_plugin_do_write (struct Plugin *plugin) | |||
1118 | GNUNET_STATISTICS_set (plugin->env->stats, | 1183 | GNUNET_STATISTICS_set (plugin->env->stats, |
1119 | "# bytes currently in UNIX buffers", | 1184 | "# bytes currently in UNIX buffers", |
1120 | plugin->bytes_in_queue, GNUNET_NO); | 1185 | plugin->bytes_in_queue, GNUNET_NO); |
1186 | notify_session_monitor (plugin, | ||
1187 | session, | ||
1188 | GNUNET_TRANSPORT_SS_UP); | ||
1121 | if (GNUNET_SYSERR == sent) | 1189 | if (GNUNET_SYSERR == sent) |
1122 | { | 1190 | { |
1123 | /* failed and no retry */ | 1191 | /* failed and no retry */ |
@@ -1293,6 +1361,9 @@ unix_plugin_send (void *cls, | |||
1293 | "# bytes currently in UNIX buffers", | 1361 | "# bytes currently in UNIX buffers", |
1294 | plugin->bytes_in_queue, | 1362 | plugin->bytes_in_queue, |
1295 | GNUNET_NO); | 1363 | GNUNET_NO); |
1364 | notify_session_monitor (plugin, | ||
1365 | session, | ||
1366 | GNUNET_TRANSPORT_SS_UP); | ||
1296 | if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) | 1367 | if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) |
1297 | plugin->write_task = | 1368 | plugin->write_task = |
1298 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | 1369 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, |
@@ -1633,21 +1704,10 @@ send_session_info_iter (void *cls, | |||
1633 | { | 1704 | { |
1634 | struct Plugin *plugin = cls; | 1705 | struct Plugin *plugin = cls; |
1635 | struct Session *session = value; | 1706 | struct Session *session = value; |
1636 | struct GNUNET_TRANSPORT_SessionInfo info; | ||
1637 | 1707 | ||
1638 | memset (&info, 0, sizeof (info)); | 1708 | notify_session_monitor (plugin, |
1639 | info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */ | 1709 | session, |
1640 | info.is_inbound = GNUNET_SYSERR; /* hard to say */ | 1710 | GNUNET_TRANSPORT_SS_UP); |
1641 | info.num_msg_pending = session->msgs_in_queue; | ||
1642 | info.num_bytes_pending = session->bytes_in_queue; | ||
1643 | /* info.receive_delay remains zero as this is not supported by UNIX | ||
1644 | (cannot selectively not receive from 'some' peer while continuing | ||
1645 | to receive from others) */ | ||
1646 | info.session_timeout = session->timeout; | ||
1647 | info.address = session->address; | ||
1648 | plugin->sic (plugin->sic_cls, | ||
1649 | session, | ||
1650 | &info); | ||
1651 | return GNUNET_OK; | 1711 | return GNUNET_OK; |
1652 | } | 1712 | } |
1653 | 1713 | ||