From 6f2641e920c612c60facb0a63eac6ac65f74351b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Jul 2016 22:52:03 +0000 Subject: converting monitor plugin functionality to MQ --- src/transport/transport_api_monitor_plugins.c | 230 ++++++++++++++------------ 1 file changed, 126 insertions(+), 104 deletions(-) (limited to 'src/transport/transport_api_monitor_plugins.c') diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c index eef4a0830..01ec2074a 100644 --- a/src/transport/transport_api_monitor_plugins.c +++ b/src/transport/transport_api_monitor_plugins.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2014 GNUnet e.V. + Copyright (C) 2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor /** * Connection to the service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Our configuration. @@ -72,7 +72,7 @@ struct GNUNET_TRANSPORT_PluginMonitor /** * Task ID for reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; }; @@ -95,39 +95,6 @@ struct GNUNET_TRANSPORT_PluginSession }; -/** - * Function called with responses from the service. - * - * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` - * @param msg NULL on timeout or error, otherwise presumably a - * message with the human-readable address - */ -static void -response_processor (void *cls, - const struct GNUNET_MessageHeader *msg); - - -/** - * Send our subscription request to the service. - * - * @param pal_ctx our context - */ -static void -send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm) -{ - struct GNUNET_MessageHeader msg; - - msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START); - GNUNET_assert (GNUNET_OK == - GNUNET_CLIENT_transmit_and_get_response (pm->client, - &msg, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &response_processor, - pm)); -} - /** * Task run to re-establish the connection. @@ -135,15 +102,7 @@ send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm) * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` */ static void -do_plugin_connect (void *cls) -{ - struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; - - pm->reconnect_task = NULL; - pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg); - GNUNET_assert (NULL != pm->client); - send_plugin_mon_request (pm); -} +do_plugin_connect (void *cls); /** @@ -184,8 +143,8 @@ free_entry (void *cls, static void reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) { - GNUNET_CLIENT_disconnect (pm->client); - pm->client = NULL; + GNUNET_MQ_destroy (pm->mq); + pm->mq = NULL; GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, &free_entry, pm); @@ -257,15 +216,44 @@ locate_by_id (void *cls, * Function called with responses from the service. * * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` - * @param msg NULL on timeout or error, otherwise presumably a - * message with the human-readable address + * @paramm tpmm message with event data + * @return #GNUNET_Ok if message is well-formed + */ +static int +check_event (void *cls, + const struct TransportPluginMonitorMessage *tpmm) +{ + const char *pname; + size_t pname_len; + size_t paddr_len; + + pname = (const char *) &tpmm[1]; + pname_len = ntohs (tpmm->plugin_name_len); + paddr_len = ntohs (tpmm->plugin_address_len); + if ( (pname_len + + paddr_len + + sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) || + ( (0 != pname_len) && + ('\0' != pname[pname_len - 1]) ) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Function called with responses from the service. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + * @paramm tpmm message with event data */ static void -response_processor (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_event (void *cls, + const struct TransportPluginMonitorMessage *tpmm) { struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; - const struct TransportPluginMonitorMessage *tpmm; struct GNUNET_TRANSPORT_PluginSession *ps; const char *pname; const void *paddr; @@ -276,47 +264,9 @@ response_processor (void *cls, struct GNUNET_HELLO_Address addr; struct SearchContext rv; - if (NULL == msg) - { - reconnect_plugin_ctx (pm); - return; - } - if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) && - (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) ) - { - /* we are in sync */ - pm->cb (pm->cb_cls, - NULL, - NULL, - NULL); - GNUNET_CLIENT_receive (pm->client, - &response_processor, - pm, - GNUNET_TIME_UNIT_FOREVER_REL); - return; - } - - if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) || - (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) ) - { - GNUNET_break (0); - reconnect_plugin_ctx (pm); - return; - } - tpmm = (const struct TransportPluginMonitorMessage *) msg; pname = (const char *) &tpmm[1]; pname_len = ntohs (tpmm->plugin_name_len); paddr_len = ntohs (tpmm->plugin_address_len); - if ( (pname_len + - paddr_len + - sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) || - ( (0 != pname_len) && - ('\0' != pname[pname_len - 1]) ) ) - { - GNUNET_break (0); - reconnect_plugin_ctx (pm); - return; - } paddr = &pname[pname_len]; ps = NULL; ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state); @@ -372,10 +322,83 @@ response_processor (void *cls, ps)); GNUNET_free (ps); } - GNUNET_CLIENT_receive (pm->client, - &response_processor, - pm, - GNUNET_TIME_UNIT_FOREVER_REL); +} + + +/** + * Function called with sync responses from the service. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + * @param msg message from the service + */ +static void +handle_sync (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + + /* we are in sync, notify callback */ + pm->cb (pm->cb_cls, + NULL, + NULL, + NULL); +} + + +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_NSE_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + + reconnect_plugin_ctx (pm); +} + + +/** + * Task run to re-establish the connection. + * + * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` + */ +static void +do_plugin_connect (void *cls) +{ + GNUNET_MQ_hd_var_size (event, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT, + struct TransportPluginMonitorMessage); + GNUNET_MQ_hd_fixed_size (sync, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC, + struct GNUNET_MessageHeader); + struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_event_handler (pm), + make_sync_handler (pm), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MessageHeader *msg; + struct GNUNET_MQ_Envelope *env; + + pm->reconnect_task = NULL; + pm->mq = GNUNET_CLIENT_connecT (pm->cfg, + "transport", + handlers, + &mq_error_handler, + pm); + if (NULL == pm->mq) + return; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START); + GNUNET_MQ_send (pm->mq, + env); } @@ -394,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cb_cls) { struct GNUNET_TRANSPORT_PluginMonitor *pm; - struct GNUNET_CLIENT_Connection *client; - client = GNUNET_CLIENT_connect ("transport", - cfg); - if (NULL == client) - return NULL; pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor); pm->cb = cb; pm->cb_cls = cb_cls; pm->cfg = cfg; - pm->client = client; + do_plugin_connect (pm); + if (NULL == pm->mq) + { + GNUNET_free (pm); + return NULL; + } pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128); - send_plugin_mon_request (pm); return pm; } @@ -422,10 +444,10 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm) { - if (NULL != pm->client) + if (NULL != pm->mq) { - GNUNET_CLIENT_disconnect (pm->client); - pm->client = NULL; + GNUNET_MQ_destroy (pm->mq); + pm->mq = NULL; } if (NULL != pm->reconnect_task) { -- cgit v1.2.3