From 0c89b2a16eae49cb23635f6d6c0f13da070c5c66 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 7 Nov 2014 16:33:42 +0000 Subject: implementing plugin session monitoring API (#3452) --- src/transport/Makefile.am | 3 +- src/transport/gnunet-service-transport_clients.c | 27 +- src/transport/plugin_transport_http_client.c | 20 +- src/transport/plugin_transport_http_server.c | 19 +- src/transport/plugin_transport_tcp.c | 123 ++++--- src/transport/plugin_transport_udp.c | 27 +- src/transport/plugin_transport_unix.c | 21 +- src/transport/plugin_transport_wlan.c | 10 +- src/transport/transport.h | 5 + src/transport/transport_api_monitor_peers.c | 6 +- src/transport/transport_api_monitor_plugins.c | 434 +++++++++++++++++++++++ 11 files changed, 611 insertions(+), 84 deletions(-) create mode 100644 src/transport/transport_api_monitor_plugins.c (limited to 'src/transport') diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index 83e820012..6866ea0ee 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am @@ -95,7 +95,7 @@ endif if LINUX install-exec-hook: $(top_srcdir)/src/transport/install-wlan-helper.sh $(libexecdir) $(SUDO_BINARY) || true -if HAVE_LIBBLUETOOTH +if HAVE_LIBBLUETOOTH $(top_srcdir)/src/transport/install-bluetooth-helper.sh $(libexecdir) $(SUDO_BINARY) || true endif else @@ -164,6 +164,7 @@ libgnunettransport_la_SOURCES = \ transport_api_blacklist.c \ transport_api_address_to_string.c \ transport_api_monitor_peers.c \ + transport_api_monitor_plugins.c \ transport_api_monitor_validation.c libgnunettransport_la_LIBADD = \ $(top_builddir)/src/hello/libgnunethello.la \ diff --git a/src/transport/gnunet-service-transport_clients.c b/src/transport/gnunet-service-transport_clients.c index b67d432c5..df73affc2 100644 --- a/src/transport/gnunet-service-transport_clients.c +++ b/src/transport/gnunet-service-transport_clients.c @@ -117,6 +117,7 @@ struct TransportClient int send_payload; }; + /** * Context for address to string operations */ @@ -138,6 +139,7 @@ struct AddressToStringContext struct GNUNET_SERVER_TransmitContext* tc; }; + /** * Client monitoring changes of active addresses of our neighbours. */ @@ -225,6 +227,7 @@ static struct GNUNET_SERVER_NotificationContext *val_nc; */ static struct GNUNET_SERVER_NotificationContext *plugin_nc; + /** * Find the internal handle associated with the given client handle * @@ -306,6 +309,7 @@ setup_peer_monitoring_client (struct GNUNET_SERVER_Client *client, mc->client = client; mc->peer = *peer; GNUNET_CONTAINER_DLL_insert (peer_monitoring_clients_head, peer_monitoring_clients_tail, mc); + GNUNET_SERVER_client_mark_monitor (client); GNUNET_SERVER_notification_context_add (peer_nc, client); if (0 != memcmp (peer, &all_zeros, sizeof (struct GNUNET_PeerIdentity))) @@ -635,7 +639,7 @@ clients_handle_hello (void *cls, struct GNUNET_SERVER_Client *client, /** - * Closure for 'handle_send_transmit_continuation' + * Closure for #handle_send_transmit_continuation() */ struct SendTransmitContinuationContext { @@ -1068,6 +1072,7 @@ compose_address_iterate_response_message (const struct GNUNET_PeerIdentity *peer return msg; } + /** * Compose #PeerIterateResponseMessage using the given peer and address. * @@ -1077,7 +1082,7 @@ compose_address_iterate_response_message (const struct GNUNET_PeerIdentity *peer */ static struct ValidationIterateResponseMessage * compose_validation_iterate_response_message (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address *address) + const struct GNUNET_HELLO_Address *address) { struct ValidationIterateResponseMessage *msg; size_t size; @@ -1113,12 +1118,26 @@ compose_validation_iterate_response_message (const struct GNUNET_PeerIdentity *p return msg; } + +/** + * Context for #send_validation_information() and + * #send_peer_information(). + */ struct IterationContext { + /** + * Context to use for the transmission. + */ struct GNUNET_SERVER_TransmitContext *tc; + /** + * Which peers do we care about? + */ struct GNUNET_PeerIdentity id; + /** + * #GNUNET_YES if @e id should be ignored because we want all peers. + */ int all; }; @@ -1245,6 +1264,7 @@ clients_handle_monitor_peers (void *cls, struct GNUNET_SERVER_Client *client, return; } GNUNET_SERVER_disable_receive_done_warning (client); + GNUNET_SERVER_client_mark_monitor (client); pc.tc = tc = GNUNET_SERVER_transmit_context_create (client); /* Send initial list */ @@ -1318,6 +1338,7 @@ clients_handle_monitor_validation (void *cls, return; } GNUNET_SERVER_disable_receive_done_warning (client); + GNUNET_SERVER_client_mark_monitor (client); pc.tc = tc = GNUNET_SERVER_transmit_context_create (client); /* Send initial list */ @@ -1396,6 +1417,7 @@ plugin_session_info_cb (void *cls, msg->timeout = GNUNET_TIME_absolute_hton (info->session_timeout); msg->delay = GNUNET_TIME_absolute_hton (info->receive_delay); msg->peer = info->address->peer; + msg->session_id = (uint64_t) (intptr_t) session; msg->plugin_name_len = htons (slen); msg->plugin_address_len = htons (alen); name = (char *) &msg[1]; @@ -1421,6 +1443,7 @@ clients_handle_monitor_plugins (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + GNUNET_SERVER_client_mark_monitor (client); GNUNET_SERVER_disable_receive_done_warning (client); if (0 == GNUNET_SERVER_notification_context_get_size (plugin_nc)) GST_plugins_monitor_subscribe (&plugin_session_info_cb, NULL); diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 615639a6e..adb83c719 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -519,7 +519,7 @@ client_delete_session (struct Session *s) GNUNET_assert (0 == s->bytes_in_queue); notify_session_monitor (plugin, s, - GNUNET_TRANSPORT_SS_DOWN); + GNUNET_TRANSPORT_SS_DONE); if (NULL != s->msg_tk) { GNUNET_SERVER_mst_destroy (s->msg_tk); @@ -778,7 +778,7 @@ http_client_plugin_send (void *cls, GNUNET_free (stat_txt); notify_session_monitor (plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); if (H_TMP_DISCONNECTING == s->put.state) { /* PUT request is currently getting disconnected */ @@ -1078,7 +1078,7 @@ client_send_cb (void *stream, } notify_session_monitor (plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); GNUNET_asprintf (&stat_txt, "# bytes currently in %s_client buffers", plugin->protocol); @@ -1741,7 +1741,7 @@ client_session_timeout (void *cls, the monitor, it may think we're about to die ... */ notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &client_session_timeout, s); @@ -1860,7 +1860,12 @@ http_client_plugin_get_session (void *cls, client_delete_session (s); return NULL; } - notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UP); /* or handshake? */ + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_INIT); + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_UP); /* or handshake? */ return s; } @@ -2162,7 +2167,10 @@ send_session_info_iter (void *cls, notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_INIT); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); /* FIXME: or handshake? */ return GNUNET_OK; } diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index c7a1bfe7a..47b0bda7f 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -598,11 +598,13 @@ server_delete_session (struct Session *s) } notify_session_monitor (plugin, s, - GNUNET_TRANSPORT_SS_DOWN); + GNUNET_TRANSPORT_SS_DONE); if (GNUNET_YES == s->known_to_service) + { plugin->env->session_end (plugin->env->cls, s->address, s); + } if (NULL != s->msg_tk) { GNUNET_SERVER_mst_destroy (s->msg_tk); @@ -1459,6 +1461,9 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, &s->target, s, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_HANDSHAKE); @@ -1512,7 +1517,13 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, (NULL != s->server_recv) ) { s->known_to_service = GNUNET_YES; - plugin->env->session_start (NULL, s->address ,s, NULL, 0); + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_UP); + plugin->env->session_start (NULL, + s->address, + s, + NULL, 0); } if ( (NULL == s->server_recv) || @@ -1592,7 +1603,7 @@ server_send_callback (void *cls, GNUNET_free (msg); notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); } } if (0 < bytes_read) @@ -3298,7 +3309,7 @@ send_session_info_iter (void *cls, notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_INIT); return GNUNET_OK; } diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 310ac3483..4c2691e09 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -885,7 +885,7 @@ tcp_plugin_disconnect_session (void *cls, GNUNET_assert (0 == session->bytes_in_queue); notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_DONE); if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK) { @@ -942,7 +942,7 @@ session_timeout (void *cls, the monitor, it may think we're about to die ... */ notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &session_timeout, s); @@ -1030,6 +1030,9 @@ create_session (struct Plugin *plugin, session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_INIT); if (GNUNET_YES != is_nat) { GNUNET_STATISTICS_update (plugin->env->stats, @@ -1144,7 +1147,7 @@ do_transmit (void *cls, size_t size, void *buf) if (0 < ret) notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); return 0; } /* copy all pending messages that would fit */ @@ -1177,7 +1180,7 @@ do_transmit (void *cls, size_t size, void *buf) } notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); /* schedule 'continuation' before callbacks so that callbacks that * cancel everything don't cause us to use a session that no longer * exists... */ @@ -1310,7 +1313,7 @@ tcp_plugin_send (void *cls, pm); notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); session->msgs_in_queue++; session->bytes_in_queue += pm->message_size; process_pending_messages (session); @@ -1410,12 +1413,12 @@ nat_connect_timeout (void *cls, struct Session *session = cls; session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK; - LOG(GNUNET_ERROR_TYPE_DEBUG, - "NAT WAIT connection to `%4s' at `%s' could not be established, removing session\n", - GNUNET_i2s (&session->target), - tcp_plugin_address_to_string (NULL, - session->address->address, - session->address->address_length)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "NAT WAIT connection to `%4s' at `%s' could not be established, removing session\n", + GNUNET_i2s (&session->target), + tcp_plugin_address_to_string (NULL, + session->address->address, + session->address->address_length)); tcp_plugin_disconnect_session (session->plugin, session); } @@ -1632,24 +1635,30 @@ tcp_plugin_get_session (void *cls, &address->peer))) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Found valid IPv4 NAT address (creating session)!\n"); - session = create_session (plugin, address, NULL, GNUNET_YES); - session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl ( - ats.value); - GNUNET_break( - session->ats_address_network_type != GNUNET_ATS_NET_UNSPECIFIED); + "Found valid IPv4 NAT address (creating session)!\n"); + session = create_session (plugin, + address, + NULL, + GNUNET_YES); + session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl (ats.value); + GNUNET_break (session->ats_address_network_type != GNUNET_ATS_NET_UNSPECIFIED); session->nat_connection_timeout = GNUNET_SCHEDULER_add_delayed (NAT_TIMEOUT, - &nat_connect_timeout, session); - GNUNET_assert(session != NULL); - GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (plugin->nat_wait_conns, - &session->target, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Created NAT WAIT connection to `%4s' at `%s'\n", - GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs)); + &nat_connect_timeout, + session); + GNUNET_assert(GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (plugin->nat_wait_conns, + &session->target, + session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Created NAT WAIT connection to `%4s' at `%s'\n", + GNUNET_i2s (&session->target), + GNUNET_a2s (sb, sbs)); if (GNUNET_OK == GNUNET_NAT_run_client (plugin->nat, &a4)) + { return session; + } else { LOG(GNUNET_ERROR_TYPE_DEBUG, @@ -1714,24 +1723,28 @@ tcp_plugin_get_session (void *cls, if (plugin->cur_connections == plugin->max_connections) GNUNET_SERVER_suspend (plugin->server); /* Maximum number of connections rechead */ - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Asked to transmit to `%4s', creating fresh session using address `%s'.\n", - GNUNET_i2s (&address->peer), GNUNET_a2s (sb, sbs)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to transmit to `%4s', creating fresh session using address `%s'.\n", + GNUNET_i2s (&address->peer), GNUNET_a2s (sb, sbs)); - session = create_session (plugin, address, + session = create_session (plugin, + address, GNUNET_SERVER_connect_socket (plugin->server, sa), GNUNET_NO); - session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl ( - ats.value); - GNUNET_break(session->ats_address_network_type != GNUNET_ATS_NET_UNSPECIFIED); + session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl (ats.value); + GNUNET_break (session->ats_address_network_type != GNUNET_ATS_NET_UNSPECIFIED); GNUNET_SERVER_client_set_user_context(session->client, session); - GNUNET_CONTAINER_multipeermap_put (plugin->sessionmap, &session->target, - session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Creating new session for `%s' address `%s' session %p\n", - GNUNET_i2s (&address->peer), - tcp_plugin_address_to_string(NULL, address->address, address->address_length), - session); + GNUNET_CONTAINER_multipeermap_put (plugin->sessionmap, + &session->target, + session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session for `%s' address `%s' session %p\n", + GNUNET_i2s (&address->peer), + tcp_plugin_address_to_string (NULL, + address->address, + address->address_length), + session); /* Send TCP Welcome */ process_pending_messages (session); @@ -2259,8 +2272,9 @@ handle_tcp_welcome (void *cls, { if (GNUNET_OK == GNUNET_SERVER_client_get_address (client, &vaddr, &alen)) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p for peer `%s'\n", - session, GNUNET_a2s (vaddr, alen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing session %p for peer `%s'\n", + session, GNUNET_a2s (vaddr, alen)); GNUNET_free(vaddr); } } @@ -2305,21 +2319,27 @@ handle_tcp_welcome (void *cls, session = create_session (plugin, address, client, GNUNET_NO); GNUNET_HELLO_address_free (address); ats = plugin->env->get_address_type (plugin->env->cls, vaddr, alen); - session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl ( - ats.value); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Creating new%s session %p for peer `%s' client %p \n", + session->ats_address_network_type = (enum GNUNET_ATS_Network_Type) ntohl (ats.value); + LOG(GNUNET_ERROR_TYPE_DEBUG, + "Creating new%s session %p for peer `%s' client %p \n", GNUNET_HELLO_address_check_option (session->address, - GNUNET_HELLO_ADDRESS_INFO_INBOUND) ? " inbound" : "", session, + GNUNET_HELLO_ADDRESS_INFO_INBOUND) + ? " inbound" : "", + session, tcp_plugin_address_to_string(NULL, (void *) session->address->address, - session->address->address_length), + session->address->address_length), client); GNUNET_free(vaddr); GNUNET_SERVER_client_set_user_context(session->client, session); - GNUNET_CONTAINER_multipeermap_put (plugin->sessionmap, &session->target, - session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - + GNUNET_CONTAINER_multipeermap_put (plugin->sessionmap, + &session->target, + session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); /* Notify transport and ATS about new session */ plugin->env->session_start (NULL, session->address, session, &ats, 1); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); } else { @@ -2612,6 +2632,11 @@ send_session_info_iter (void *cls, struct Plugin *plugin = cls; struct Session *session = value; + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); + /* FIXME: cannot tell if this is up or not from current + session state... */ notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP); diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index bb223263f..37de3c06e 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -1270,7 +1270,7 @@ fragmented_message_done (struct UDP_FragmentationContext *fc, } notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); /* Destroy fragmentation context */ GNUNET_FRAGMENT_context_destroy (fc->frag, &s->last_expected_msg_delay, @@ -1385,7 +1385,7 @@ udp_disconnect_session (void *cls, } notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_DOWN); + GNUNET_TRANSPORT_SS_DONE); plugin->env->session_end (plugin->env->cls, s->address, s); @@ -1508,7 +1508,7 @@ session_timeout (void *cls, the monitor, it may think we're about to die ... */ notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &session_timeout, s); @@ -2154,7 +2154,7 @@ udp_plugin_send (void *cls, } notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); schedule_select (plugin); return udpmlen; } @@ -2335,8 +2335,14 @@ process_udp_message (struct Plugin *plugin, { s = udp_plugin_create_session (plugin, address); plugin->env->session_start (NULL, address, s, NULL, 0); + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_INIT); + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_UP); } - GNUNET_free(address); + GNUNET_free (address); /* iterate over all embedded messages */ si.session = s; @@ -2446,7 +2452,7 @@ ack_proc (void *cls, enqueue (rc->plugin, udpw); notify_session_monitor (s->plugin, s, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); schedule_select (rc->plugin); } @@ -2845,7 +2851,7 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, if (GNUNET_YES == removed) notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); return udpw; } @@ -2955,7 +2961,7 @@ udp_select_send (struct Plugin *plugin, dequeue (plugin, udpw); notify_session_monitor (plugin, udpw->session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); GNUNET_free (udpw); return GNUNET_SYSERR; } @@ -2994,7 +3000,7 @@ udp_select_send (struct Plugin *plugin, dequeue (plugin, udpw); notify_session_monitor (plugin, udpw->session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); GNUNET_free(udpw); return sent; } @@ -3309,6 +3315,9 @@ send_session_info_iter (void *cls, struct Plugin *plugin = cls; struct Session *session = value; + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP); diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 8a0aa72a0..a753db5ca 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -490,7 +490,7 @@ unix_plugin_session_disconnect (void *cls, } notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_DOWN); + GNUNET_TRANSPORT_SS_DONE); GNUNET_HELLO_address_free (session->address); GNUNET_break (0 == session->bytes_in_queue); GNUNET_break (0 == session->msgs_in_queue); @@ -520,7 +520,7 @@ session_timeout (void *cls, the monitor, it may think we're about to die ... */ notify_session_monitor (session->plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); session->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &session_timeout, session); @@ -896,6 +896,9 @@ unix_plugin_get_session (void *cls, "# UNIX sessions active", GNUNET_CONTAINER_multipeermap_size (plugin->session_map), GNUNET_NO); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP); @@ -973,9 +976,6 @@ unix_demultiplexer (struct Plugin *plugin, session->address, session, &plugin->ats_network, 1); - notify_session_monitor (plugin, - session, - GNUNET_TRANSPORT_SS_UP); } else { @@ -1141,7 +1141,7 @@ unix_plugin_do_write (struct Plugin *plugin) if (GNUNET_YES == did_delete) notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); return; /* Nothing to send at the moment */ } @@ -1163,7 +1163,7 @@ unix_plugin_do_write (struct Plugin *plugin) 1, GNUNET_NO); notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); return; } GNUNET_CONTAINER_DLL_remove (plugin->msg_head, @@ -1180,7 +1180,7 @@ unix_plugin_do_write (struct Plugin *plugin) plugin->bytes_in_queue, GNUNET_NO); notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); if (GNUNET_SYSERR == sent) { /* failed and no retry */ @@ -1358,7 +1358,7 @@ unix_plugin_send (void *cls, GNUNET_NO); notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_UP); + GNUNET_TRANSPORT_SS_UPDATE); if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) plugin->write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, @@ -1701,6 +1701,9 @@ send_session_info_iter (void *cls, struct Plugin *plugin = cls; struct Session *session = value; + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP); diff --git a/src/transport/plugin_transport_wlan.c b/src/transport/plugin_transport_wlan.c index 912ff9a12..ab9fa4a74 100644 --- a/src/transport/plugin_transport_wlan.c +++ b/src/transport/plugin_transport_wlan.c @@ -773,7 +773,7 @@ wlan_plugin_disconnect_session (void *cls, session); notify_session_monitor (plugin, session, - GNUNET_TRANSPORT_SS_DOWN); + GNUNET_TRANSPORT_SS_DONE); GNUNET_CONTAINER_DLL_remove (endpoint->sessions_head, endpoint->sessions_tail, session); @@ -888,6 +888,9 @@ create_session (struct MacEndpoint *endpoint, session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); + notify_session_monitor (endpoint->plugin, + session, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (endpoint->plugin, session, GNUNET_TRANSPORT_SS_UP); @@ -2046,9 +2049,14 @@ wlan_plugin_setup_monitor (void *cls, { for (mac = plugin->mac_head; NULL != mac; mac = mac->next) for (session = mac->sessions_head; NULL != session; session = session->next) + { + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP); + } sic (sic_cls, NULL, NULL); } } diff --git a/src/transport/transport.h b/src/transport/transport.h index 57e7d1b00..6fea9201c 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -643,6 +643,11 @@ struct TransportPluginMonitorMessage */ struct GNUNET_PeerIdentity peer; + /** + * Unique identifier for the session. + */ + uint64_t session_id; + /** * Length of the plugin name in bytes, including 0-termination. */ diff --git a/src/transport/transport_api_monitor_peers.c b/src/transport/transport_api_monitor_peers.c index f7b50d6d4..f1dab26e2 100644 --- a/src/transport/transport_api_monitor_peers.c +++ b/src/transport/transport_api_monitor_peers.c @@ -167,7 +167,7 @@ GNUNET_TRANSPORT_ps2s (enum GNUNET_TRANSPORT_PeerState state) /** * Function called with responses from the service. * - * @param cls our `struct GNUNET_TRANSPORT_PeerAddressLookupContext *` + * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` * @param msg NULL on timeout or error, otherwise presumably a * message with the human-readable address */ @@ -203,7 +203,7 @@ send_peer_mon_request (struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx) /** * Task run to re-establish the connection. * - * @param cls our `struct GNUNET_TRANSPORT_PeerAddressLookupContext *` + * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` * @param tc scheduler context, unused */ static void @@ -423,7 +423,7 @@ GNUNET_TRANSPORT_monitor_peers (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_CLIENT_Connection *client; client = GNUNET_CLIENT_connect ("transport", cfg); - if (client == NULL) + if (NULL == client) return NULL; if (GNUNET_YES != one_shot) timeout = GNUNET_TIME_UNIT_FOREVER_REL; diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c new file mode 100644 index 000000000..290092c22 --- /dev/null +++ b/src/transport/transport_api_monitor_plugins.c @@ -0,0 +1,434 @@ +/* + This file is part of GNUnet. + (C) 2014 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file transport/transport_api_monitor_plugins.c + * @brief montoring api for transport plugin session status + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_arm_service.h" +#include "gnunet_hello_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_transport_service.h" +#include "transport.h" + + +/** + * Handle for a plugin session state monitor. + */ +struct GNUNET_TRANSPORT_PluginMonitor +{ + + /** + * Connection to the service. + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Our configuration. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Callback to call. + */ + GNUNET_TRANSPORT_SessionMonitorCallback cb; + + /** + * Closure for @e cb + */ + void *cb_cls; + + /** + * Map of session_ids (reduced to 32-bits) to + * `struct GNUNET_TRANSPORT_PluginSession` objects. + */ + struct GNUNET_CONTAINER_MultiHashMap32 *sessions; + + /** + * Backoff for reconnect. + */ + struct GNUNET_TIME_Relative backoff; + + /** + * Task ID for reconnect. + */ + GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + +}; + + +/** + * Abstract representation of a plugin's session. + * Corresponds to the `struct Session` within the TRANSPORT service. + */ +struct GNUNET_TRANSPORT_PluginSession +{ + /** + * Unique session identifier. + */ + uint64_t session_id; + + /** + * Location for the client to store "data". + */ + void *client_ctx; +}; + + +/** + * Function called with responses from the service. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + * @param msg NULL on timeout or error, otherwise presumably a + * message with the human-readable address + */ +static void +response_processor (void *cls, + const struct GNUNET_MessageHeader *msg); + + +/** + * Send our subscription request to the service. + * + * @param pal_ctx our context + */ +static void +send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm) +{ + struct GNUNET_MessageHeader msg; + + msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START); + GNUNET_assert (GNUNET_OK == + GNUNET_CLIENT_transmit_and_get_response (pm->client, + &msg, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, + &response_processor, + pm)); +} + + +/** + * Task run to re-establish the connection. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + * @param tc scheduler context, unused + */ +static void +do_plugin_connect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + + pm->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg); + GNUNET_assert (NULL != pm->client); + send_plugin_mon_request (pm); +} + + +/** + * Free the session entry and notify the callback about its demise. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor` + * @param key key of the session in the map + * @param value the session to free + * @return #GNUNET_OK (continue to iterate) + */ +static int +free_entry (void *cls, + uint32_t key, + void *value) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + struct GNUNET_TRANSPORT_PluginSession *ps = value; + + pm->cb (pm->cb_cls, + ps, + &ps->client_ctx, + NULL); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (pm->sessions, + key, + ps)); + GNUNET_break (NULL == ps->client_ctx); + GNUNET_free (ps); + return GNUNET_OK; +} + + +/** + * We got disconnected, remove all existing entries from + * the map and notify client. + * + * @param pm montitor that got disconnected + */ +static void +clear_map (struct GNUNET_TRANSPORT_PluginMonitor *pm) +{ + GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, + &free_entry, + pm); +} + + +/** + * Cut the existing connection and reconnect. + * + * @param pm our context + */ +static void +reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) +{ + GNUNET_CLIENT_disconnect (pm->client); + pm->client = NULL; + clear_map (pm); + pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff); + pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff, + &do_plugin_connect, + pm); +} + + +/** + * Convert 64-bit session ID to 32-bit index for hash map. + * + * @param id 64-bit session ID + * @return 32-bit hash map index + */ +static uint32_t +wrap_id (uint64_t id) +{ + return ((uint32_t) id) ^ ((uint32_t) (id >> 32)); +} + + +/** + * Context for #locate_by_id(). + */ +struct SearchContext +{ + + /** + * Result. + */ + struct GNUNET_TRANSPORT_PluginSession *ps; + + /** + * ID to locate. + */ + uint64_t session_id; + +}; + + +/** + * Locate a session entry. + * + * @param cls our `struct SearchContext` + * @param key key of the session in the map + * @param value a session + * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found) + */ +static int +locate_by_id (void *cls, + uint32_t key, + void *value) +{ + struct SearchContext *sc = cls; + struct GNUNET_TRANSPORT_PluginSession *ps = value; + + if (sc->session_id == ps->session_id) + { + sc->ps = ps; + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Function called with responses from the service. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + * @param msg NULL on timeout or error, otherwise presumably a + * message with the human-readable address + */ +static void +response_processor (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + const struct TransportPluginMonitorMessage *tpmm; + struct GNUNET_TRANSPORT_PluginSession *ps; + const char *pname; + const void *paddr; + enum GNUNET_TRANSPORT_SessionState ss; + size_t pname_len; + size_t paddr_len; + struct GNUNET_TRANSPORT_SessionInfo info; + struct GNUNET_HELLO_Address addr; + struct SearchContext rv; + + if (NULL == msg) + { + reconnect_plugin_ctx (pm); + return; + } + if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) || + (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) ) + { + GNUNET_break (0); + reconnect_plugin_ctx (pm); + return; + } + tpmm = (const struct TransportPluginMonitorMessage *) msg; + pname = (const char *) &tpmm[1]; + pname_len = ntohs (tpmm->plugin_name_len); + paddr_len = ntohs (tpmm->plugin_address_len); + if ( (pname_len + + paddr_len + + sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) || + ( (0 != pname_len) && + ('\0' != pname[pname_len - 1]) ) ) + { + GNUNET_break (0); + reconnect_plugin_ctx (pm); + return; + } + paddr = &pname[pname_len]; + ps = NULL; + ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state); + if (GNUNET_TRANSPORT_SS_INIT == ss) + { + ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession); + ps->session_id = tpmm->session_id; + (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions, + wrap_id (tpmm->session_id), + ps, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + } + else + { + rv.session_id = tpmm->session_id; + rv.ps = NULL; + (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions, + wrap_id (tpmm->session_id), + &locate_by_id, + &rv); + ps = rv.ps; + if (NULL == ps) + { + GNUNET_break (0); + reconnect_plugin_ctx (pm); + return; + } + } + info.state = ss; + info.is_inbound = (int16_t) ntohs (tpmm->is_inbound); + info.num_msg_pending = ntohl (tpmm->msgs_pending); + info.num_bytes_pending = ntohl (tpmm->bytes_pending); + info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay); + info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout); + info.address = &addr; + addr.peer = tpmm->peer; + addr.address = (0 == paddr_len) ? NULL : paddr; + addr.address_length = paddr_len; + addr.transport_name = (0 == pname_len) ? NULL : pname; + addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE; + pm->cb (pm->cb_cls, + ps, + &ps->client_ctx, + &info); + + if (GNUNET_TRANSPORT_SS_DONE == ss) + { + GNUNET_break (NULL == ps->client_ctx); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (pm->sessions, + wrap_id (tpmm->session_id), + ps)); + GNUNET_free (ps); + } +} + + +/** + * Install a plugin session state monitor callback. The callback + * will be notified whenever the session changes. + * + * @param cfg configuration to use + * @param cb callback to invoke on events + * @param cb_cls closure for @a cb + * @return NULL on error, otherwise handle for cancellation + */ +struct GNUNET_TRANSPORT_PluginMonitor * +GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, + GNUNET_TRANSPORT_SessionMonitorCallback cb, + void *cb_cls) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm; + struct GNUNET_CLIENT_Connection *client; + + client = GNUNET_CLIENT_connect ("transport", cfg); + if (NULL == client) + return NULL; + pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor); + pm->cb = cb; + pm->cb_cls = cb_cls; + pm->cfg = cfg; + pm->client = client; + send_plugin_mon_request (pm); + return pm; +} + + +/** + * Cancel monitoring the plugin session state. The callback will + * be called once for each session that is up with the information + * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up; + * this is just to enable client-side cleanup). + * + * @param pm handle of the request that is to be cancelled + */ +void +GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm) +{ + if (NULL != pm->client) + { + GNUNET_CLIENT_disconnect (pm->client); + pm->client = NULL; + } + if (GNUNET_SCHEDULER_NO_TASK != pm->reconnect_task) + { + GNUNET_SCHEDULER_cancel (pm->reconnect_task); + pm->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + } + clear_map (pm); + GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions); + GNUNET_free (pm); +} + + +/* end of transport_api_monitor_plugins.c */ -- cgit v1.2.3