aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_unix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/plugin_transport_unix.c')
-rw-r--r--src/transport/plugin_transport_unix.c192
1 files changed, 126 insertions, 66 deletions
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c
index 6b90cf1f4..7dff9587f 100644
--- a/src/transport/plugin_transport_unix.c
+++ b/src/transport/plugin_transport_unix.c
@@ -338,6 +338,39 @@ struct Plugin
338 338
339 339
340/** 340/**
341 * If a session monitor is attached, notify it about the new
342 * session state.
343 *
344 * @param plugin our plugin
345 * @param session session that changed state
346 * @param state new state of the session
347 */
348static void
349notify_session_monitor (struct Plugin *plugin,
350 struct Session *session,
351 enum GNUNET_TRANSPORT_SessionState state)
352{
353 struct GNUNET_TRANSPORT_SessionInfo info;
354
355 if (NULL == plugin->sic)
356 return;
357 memset (&info, 0, sizeof (info));
358 info.state = state;
359 info.is_inbound = GNUNET_SYSERR; /* hard to say */
360 info.num_msg_pending = session->msgs_in_queue;
361 info.num_bytes_pending = session->bytes_in_queue;
362 /* info.receive_delay remains zero as this is not supported by UNIX
363 (cannot selectively not receive from 'some' peer while continuing
364 to receive from others) */
365 info.session_timeout = session->timeout;
366 info.address = session->address;
367 plugin->sic (plugin->sic_cls,
368 session,
369 &info);
370}
371
372
373/**
341 * Function called for a quick conversion of the binary address to 374 * Function called for a quick conversion of the binary address to
342 * a numeric address. Note that the caller must not free the 375 * a numeric address. Note that the caller must not free the
343 * address and that the next call to this function is allowed 376 * address and that the next call to this function is allowed
@@ -404,7 +437,7 @@ unix_address_to_string (void *cls,
404 * to close a session due to a disconnect or failure to 437 * to close a session due to a disconnect or failure to
405 * establish a connection. 438 * establish a connection.
406 * 439 *
407 * @param cls closure with the `struct Plugin` 440 * @param cls closure with the `struct Plugin *`
408 * @param session session to close down 441 * @param session session to close down
409 * @return #GNUNET_OK on success 442 * @return #GNUNET_OK on success
410 */ 443 */
@@ -460,7 +493,11 @@ unix_session_disconnect (void *cls,
460 { 493 {
461 GNUNET_SCHEDULER_cancel (session->timeout_task); 494 GNUNET_SCHEDULER_cancel (session->timeout_task);
462 session->timeout_task = GNUNET_SCHEDULER_NO_TASK; 495 session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
496 session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
463 } 497 }
498 notify_session_monitor (plugin,
499 session,
500 GNUNET_TRANSPORT_SS_DOWN);
464 GNUNET_HELLO_address_free (session->address); 501 GNUNET_HELLO_address_free (session->address);
465 GNUNET_break (0 == session->bytes_in_queue); 502 GNUNET_break (0 == session->bytes_in_queue);
466 GNUNET_break (0 == session->msgs_in_queue); 503 GNUNET_break (0 == session->msgs_in_queue);
@@ -472,50 +509,56 @@ unix_session_disconnect (void *cls,
472/** 509/**
473 * Session was idle for too long, so disconnect it 510 * Session was idle for too long, so disconnect it
474 * 511 *
475 * @param cls the 'struct Session' to disconnect 512 * @param cls the `struct Session *` to disconnect
476 * @param tc scheduler context 513 * @param tc scheduler context
477 */ 514 */
478static void 515static void
479session_timeout (void *cls, 516session_timeout (void *cls,
480 const struct GNUNET_SCHEDULER_TaskContext *tc) 517 const struct GNUNET_SCHEDULER_TaskContext *tc)
481{ 518{
482 struct Session *s = cls; 519 struct Session *session = cls;
483 struct GNUNET_TIME_Relative left; 520 struct GNUNET_TIME_Relative left;
484 521
485 s->timeout_task = GNUNET_SCHEDULER_NO_TASK; 522 session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
486 left = GNUNET_TIME_absolute_get_remaining (s->timeout); 523 left = GNUNET_TIME_absolute_get_remaining (session->timeout);
487 if (0 != left.rel_value_us) 524 if (0 != left.rel_value_us)
488 { 525 {
489 /* not actually our turn yet */ 526 /* not actually our turn yet, but let's at least update
490 s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, 527 the monitor, it may think we're about to die ... */
491 &session_timeout, 528 notify_session_monitor (session->plugin,
492 s); 529 session,
530 GNUNET_TRANSPORT_SS_UP);
531 session->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
532 &session_timeout,
533 session);
493 return; 534 return;
494 } 535 }
495 LOG (GNUNET_ERROR_TYPE_DEBUG, 536 LOG (GNUNET_ERROR_TYPE_DEBUG,
496 "Session %p was idle for %s, disconnecting\n", 537 "Session %p was idle for %s, disconnecting\n",
497 s, 538 session,
498 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 539 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
499 GNUNET_YES)); 540 GNUNET_YES));
500 unix_session_disconnect (s->plugin, s); 541 unix_session_disconnect (session->plugin, session);
501} 542}
502 543
503 544
504/** 545/**
505 * Increment session timeout due to activity 546 * Increment session timeout due to activity. We do not immediately
547 * notify the monitor here as that might generate excessive
548 * signalling.
506 * 549 *
507 * @param s session for which the timeout should be rescheduled 550 * @param session session for which the timeout should be rescheduled
508 */ 551 */
509static void 552static void
510reschedule_session_timeout (struct Session *s) 553reschedule_session_timeout (struct Session *session)
511{ 554{
512 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); 555 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != session->timeout_task);
513 s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 556 session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
514} 557}
515 558
516 559
517/** 560/**
518 * Convert unix path to a `struct sockaddr_un` 561 * Convert unix path to a `struct sockaddr_un *`
519 * 562 *
520 * @param unixpath path to convert 563 * @param unixpath path to convert
521 * @param sock_len[out] set to the length of the address 564 * @param sock_len[out] set to the length of the address
@@ -576,11 +619,12 @@ lookup_session_it (void *cls,
576 void *value) 619 void *value)
577{ 620{
578 struct LookupCtx *lctx = cls; 621 struct LookupCtx *lctx = cls;
579 struct Session *s = value; 622 struct Session *session = value;
580 623
581 if (0 == GNUNET_HELLO_address_cmp (lctx->address, s->address)) 624 if (0 == GNUNET_HELLO_address_cmp (lctx->address,
625 session->address))
582 { 626 {
583 lctx->res = s; 627 lctx->res = session;
584 return GNUNET_NO; 628 return GNUNET_NO;
585 } 629 }
586 return GNUNET_YES; 630 return GNUNET_YES;
@@ -785,7 +829,7 @@ unix_plugin_get_session (void *cls,
785 const struct GNUNET_HELLO_Address *address) 829 const struct GNUNET_HELLO_Address *address)
786{ 830{
787 struct Plugin *plugin = cls; 831 struct Plugin *plugin = cls;
788 struct Session *s; 832 struct Session *session;
789 struct UnixAddress *ua; 833 struct UnixAddress *ua;
790 char * addrstr; 834 char * addrstr;
791 uint32_t addr_str_len; 835 uint32_t addr_str_len;
@@ -825,41 +869,43 @@ unix_plugin_get_session (void *cls,
825 } 869 }
826 870
827 /* Check if a session for this address already exists */ 871 /* Check if a session for this address already exists */
828 if (NULL != (s = lookup_session (plugin, 872 if (NULL != (session = lookup_session (plugin,
829 address))) 873 address)))
830 { 874 {
831 LOG (GNUNET_ERROR_TYPE_DEBUG, 875 LOG (GNUNET_ERROR_TYPE_DEBUG,
832 "Found existing session %p for address `%s'\n", 876 "Found existing session %p for address `%s'\n",
833 s, 877 session,
834 unix_address_to_string (NULL, 878 unix_address_to_string (NULL,
835 address->address, 879 address->address,
836 address->address_length)); 880 address->address_length));
837 return s; 881 return session;
838 } 882 }
839 883
840 /* create a new session */ 884 /* create a new session */
841 s = GNUNET_new (struct Session); 885 session = GNUNET_new (struct Session);
842 s->target = address->peer; 886 session->target = address->peer;
843 s->address = GNUNET_HELLO_address_copy (address); 887 session->address = GNUNET_HELLO_address_copy (address);
844 s->plugin = plugin; 888 session->plugin = plugin;
845 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); 889 session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
846 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 890 &session_timeout,
847 &session_timeout, 891 session);
848 s);
849 LOG (GNUNET_ERROR_TYPE_DEBUG, 892 LOG (GNUNET_ERROR_TYPE_DEBUG,
850 "Creating a new session %p for address `%s'\n", 893 "Creating a new session %p for address `%s'\n",
851 s, 894 session,
852 unix_address_to_string (NULL, 895 unix_address_to_string (NULL,
853 address->address, 896 address->address,
854 address->address_length)); 897 address->address_length));
855 (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map, 898 (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map,
856 &address->peer, s, 899 &address->peer, session,
857 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 900 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
858 GNUNET_STATISTICS_set (plugin->env->stats, 901 GNUNET_STATISTICS_set (plugin->env->stats,
859 "# UNIX sessions active", 902 "# UNIX sessions active",
860 GNUNET_CONTAINER_multipeermap_size (plugin->session_map), 903 GNUNET_CONTAINER_multipeermap_size (plugin->session_map),
861 GNUNET_NO); 904 GNUNET_NO);
862 return s; 905 notify_session_monitor (plugin,
906 session,
907 GNUNET_TRANSPORT_SS_UP);
908 return session;
863} 909}
864 910
865 911
@@ -901,11 +947,12 @@ unix_plugin_update_session_timeout (void *cls,
901 * @param ua_len length of the address @a ua 947 * @param ua_len length of the address @a ua
902 */ 948 */
903static void 949static void
904unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, 950unix_demultiplexer (struct Plugin *plugin,
951 struct GNUNET_PeerIdentity *sender,
905 const struct GNUNET_MessageHeader *currhdr, 952 const struct GNUNET_MessageHeader *currhdr,
906 const struct UnixAddress *ua, size_t ua_len) 953 const struct UnixAddress *ua, size_t ua_len)
907{ 954{
908 struct Session *s = NULL; 955 struct Session *session;
909 struct GNUNET_HELLO_Address *address; 956 struct GNUNET_HELLO_Address *address;
910 957
911 GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED); 958 GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED);
@@ -923,26 +970,31 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
923 PLUGIN_NAME, 970 PLUGIN_NAME,
924 ua, ua_len, 971 ua, ua_len,
925 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ 972 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */
926 s = lookup_session (plugin, address); 973 session = lookup_session (plugin, address);
927 if (NULL == s) 974 if (NULL == session)
928 { 975 {
929 s = unix_plugin_get_session (plugin, address); 976 session = unix_plugin_get_session (plugin, address);
930 /* Notify transport and ATS about new inbound session */ 977 /* Notify transport and ATS about new inbound session */
931 plugin->env->session_start (NULL, 978 plugin->env->session_start (NULL,
932 s->address, 979 session->address,
933 s, 980 session,
934 &plugin->ats_network, 1); 981 &plugin->ats_network, 1);
982 notify_session_monitor (plugin,
983 session,
984 GNUNET_TRANSPORT_SS_UP);
985 }
986 else
987 {
988 reschedule_session_timeout (session);
935 } 989 }
936 GNUNET_HELLO_address_free (address); 990 GNUNET_HELLO_address_free (address);
937 reschedule_session_timeout (s);
938
939 plugin->env->receive (plugin->env->cls, 991 plugin->env->receive (plugin->env->cls,
940 s->address, 992 session->address,
941 s, 993 session,
942 currhdr); 994 currhdr);
943 plugin->env->update_address_metrics (plugin->env->cls, 995 plugin->env->update_address_metrics (plugin->env->cls,
944 s->address, 996 session->address,
945 s, 997 session,
946 &plugin->ats_network, 1); 998 &plugin->ats_network, 1);
947} 999}
948 1000
@@ -1018,7 +1070,9 @@ unix_plugin_do_read (struct Plugin *plugin)
1018 return; 1070 return;
1019 } 1071 }
1020 msgbuf = (char *) &msg[1]; 1072 msgbuf = (char *) &msg[1];
1021 memcpy (&sender, &msg->sender, sizeof (struct GNUNET_PeerIdentity)); 1073 memcpy (&sender,
1074 &msg->sender,
1075 sizeof (struct GNUNET_PeerIdentity));
1022 offset = 0; 1076 offset = 0;
1023 tsize = csize - sizeof (struct UNIXMessage); 1077 tsize = csize - sizeof (struct UNIXMessage);
1024 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) 1078 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
@@ -1049,12 +1103,15 @@ unix_plugin_do_write (struct Plugin *plugin)
1049 ssize_t sent = 0; 1103 ssize_t sent = 0;
1050 struct UNIXMessageWrapper *msgw; 1104 struct UNIXMessageWrapper *msgw;
1051 struct Session *session; 1105 struct Session *session;
1106 int did_delete;
1052 1107
1108 did_delete = GNUNET_NO;
1053 while (NULL != (msgw = plugin->msg_head)) 1109 while (NULL != (msgw = plugin->msg_head))
1054 { 1110 {
1055 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) 1111 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
1056 break; /* Message is ready for sending */ 1112 break; /* Message is ready for sending */
1057 /* Message has a timeout */ 1113 /* Message has a timeout */
1114 did_delete = GNUNET_YES;
1058 LOG (GNUNET_ERROR_TYPE_DEBUG, 1115 LOG (GNUNET_ERROR_TYPE_DEBUG,
1059 "Timeout for message with %u bytes \n", 1116 "Timeout for message with %u bytes \n",
1060 (unsigned int) msgw->msgsize); 1117 (unsigned int) msgw->msgsize);
@@ -1085,7 +1142,13 @@ unix_plugin_do_write (struct Plugin *plugin)
1085 GNUNET_free (msgw); 1142 GNUNET_free (msgw);
1086 } 1143 }
1087 if (NULL == msgw) 1144 if (NULL == msgw)
1145 {
1146 if (GNUNET_YES == did_delete)
1147 notify_session_monitor (plugin,
1148 session,
1149 GNUNET_TRANSPORT_SS_UP);
1088 return; /* Nothing to send at the moment */ 1150 return; /* Nothing to send at the moment */
1151 }
1089 1152
1090 sent = unix_real_send (plugin, 1153 sent = unix_real_send (plugin,
1091 plugin->unix_sock.desc, 1154 plugin->unix_sock.desc,
@@ -1098,12 +1161,14 @@ unix_plugin_do_write (struct Plugin *plugin)
1098 msgw->session->address->address_length, 1161 msgw->session->address->address_length,
1099 msgw->payload, 1162 msgw->payload,
1100 msgw->cont, msgw->cont_cls); 1163 msgw->cont, msgw->cont_cls);
1101
1102 if (RETRY == sent) 1164 if (RETRY == sent)
1103 { 1165 {
1104 GNUNET_STATISTICS_update (plugin->env->stats, 1166 GNUNET_STATISTICS_update (plugin->env->stats,
1105 "# UNIX retry attempts", 1167 "# UNIX retry attempts",
1106 1, GNUNET_NO); 1168 1, GNUNET_NO);
1169 notify_session_monitor (plugin,
1170 session,
1171 GNUNET_TRANSPORT_SS_UP);
1107 return; 1172 return;
1108 } 1173 }
1109 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, 1174 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
@@ -1118,6 +1183,9 @@ unix_plugin_do_write (struct Plugin *plugin)
1118 GNUNET_STATISTICS_set (plugin->env->stats, 1183 GNUNET_STATISTICS_set (plugin->env->stats,
1119 "# bytes currently in UNIX buffers", 1184 "# bytes currently in UNIX buffers",
1120 plugin->bytes_in_queue, GNUNET_NO); 1185 plugin->bytes_in_queue, GNUNET_NO);
1186 notify_session_monitor (plugin,
1187 session,
1188 GNUNET_TRANSPORT_SS_UP);
1121 if (GNUNET_SYSERR == sent) 1189 if (GNUNET_SYSERR == sent)
1122 { 1190 {
1123 /* failed and no retry */ 1191 /* failed and no retry */
@@ -1293,6 +1361,9 @@ unix_plugin_send (void *cls,
1293 "# bytes currently in UNIX buffers", 1361 "# bytes currently in UNIX buffers",
1294 plugin->bytes_in_queue, 1362 plugin->bytes_in_queue,
1295 GNUNET_NO); 1363 GNUNET_NO);
1364 notify_session_monitor (plugin,
1365 session,
1366 GNUNET_TRANSPORT_SS_UP);
1296 if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) 1367 if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task)
1297 plugin->write_task = 1368 plugin->write_task =
1298 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, 1369 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1633,21 +1704,10 @@ send_session_info_iter (void *cls,
1633{ 1704{
1634 struct Plugin *plugin = cls; 1705 struct Plugin *plugin = cls;
1635 struct Session *session = value; 1706 struct Session *session = value;
1636 struct GNUNET_TRANSPORT_SessionInfo info;
1637 1707
1638 memset (&info, 0, sizeof (info)); 1708 notify_session_monitor (plugin,
1639 info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */ 1709 session,
1640 info.is_inbound = GNUNET_SYSERR; /* hard to say */ 1710 GNUNET_TRANSPORT_SS_UP);
1641 info.num_msg_pending = session->msgs_in_queue;
1642 info.num_bytes_pending = session->bytes_in_queue;
1643 /* info.receive_delay remains zero as this is not supported by UNIX
1644 (cannot selectively not receive from 'some' peer while continuing
1645 to receive from others) */
1646 info.session_timeout = session->timeout;
1647 info.address = session->address;
1648 plugin->sic (plugin->sic_cls,
1649 session,
1650 &info);
1651 return GNUNET_OK; 1711 return GNUNET_OK;
1652} 1712}
1653 1713