summaryrefslogtreecommitdiff
path: root/src/transport/transport_api_monitor_plugins.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-06 22:52:03 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-06 22:52:03 +0000
commit6f2641e920c612c60facb0a63eac6ac65f74351b (patch)
tree7284f4aa83a815c3d99f1348fb2cb1b436e61fb1 /src/transport/transport_api_monitor_plugins.c
parent5c862712d9c389fced47fde9abdc9458da319aef (diff)
converting monitor plugin functionality to MQ
Diffstat (limited to 'src/transport/transport_api_monitor_plugins.c')
-rw-r--r--src/transport/transport_api_monitor_plugins.c230
1 files changed, 126 insertions, 104 deletions
diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c
index eef4a0830..01ec2074a 100644
--- a/src/transport/transport_api_monitor_plugins.c
+++ b/src/transport/transport_api_monitor_plugins.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2014 GNUnet e.V.
+ Copyright (C) 2014, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
/**
* Connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
* Our configuration.
@@ -72,7 +72,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
/**
* Task ID for reconnect.
*/
- struct GNUNET_SCHEDULER_Task * reconnect_task;
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
};
@@ -95,39 +95,6 @@ struct GNUNET_TRANSPORT_PluginSession
};
-/**
- * Function called with responses from the service.
- *
- * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- * message with the human-readable address
- */
-static void
-response_processor (void *cls,
- const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Send our subscription request to the service.
- *
- * @param pal_ctx our context
- */
-static void
-send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
-{
- struct GNUNET_MessageHeader msg;
-
- msg.size = htons (sizeof (struct GNUNET_MessageHeader));
- msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CLIENT_transmit_and_get_response (pm->client,
- &msg,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &response_processor,
- pm));
-}
-
/**
* Task run to re-establish the connection.
@@ -135,15 +102,7 @@ send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
* @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
*/
static void
-do_plugin_connect (void *cls)
-{
- struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-
- pm->reconnect_task = NULL;
- pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
- GNUNET_assert (NULL != pm->client);
- send_plugin_mon_request (pm);
-}
+do_plugin_connect (void *cls);
/**
@@ -184,8 +143,8 @@ free_entry (void *cls,
static void
reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
{
- GNUNET_CLIENT_disconnect (pm->client);
- pm->client = NULL;
+ GNUNET_MQ_destroy (pm->mq);
+ pm->mq = NULL;
GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
&free_entry,
pm);
@@ -257,15 +216,44 @@ locate_by_id (void *cls,
* Function called with responses from the service.
*
* @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- * message with the human-readable address
+ * @paramm tpmm message with event data
+ * @return #GNUNET_Ok if message is well-formed
+ */
+static int
+check_event (void *cls,
+ const struct TransportPluginMonitorMessage *tpmm)
+{
+ const char *pname;
+ size_t pname_len;
+ size_t paddr_len;
+
+ pname = (const char *) &tpmm[1];
+ pname_len = ntohs (tpmm->plugin_name_len);
+ paddr_len = ntohs (tpmm->plugin_address_len);
+ if ( (pname_len +
+ paddr_len +
+ sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
+ ( (0 != pname_len) &&
+ ('\0' != pname[pname_len - 1]) ) )
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function called with responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @paramm tpmm message with event data
*/
static void
-response_processor (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_event (void *cls,
+ const struct TransportPluginMonitorMessage *tpmm)
{
struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
- const struct TransportPluginMonitorMessage *tpmm;
struct GNUNET_TRANSPORT_PluginSession *ps;
const char *pname;
const void *paddr;
@@ -276,47 +264,9 @@ response_processor (void *cls,
struct GNUNET_HELLO_Address addr;
struct SearchContext rv;
- if (NULL == msg)
- {
- reconnect_plugin_ctx (pm);
- return;
- }
- if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
- (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
- {
- /* we are in sync */
- pm->cb (pm->cb_cls,
- NULL,
- NULL,
- NULL);
- GNUNET_CLIENT_receive (pm->client,
- &response_processor,
- pm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return;
- }
-
- if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
- (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
- {
- GNUNET_break (0);
- reconnect_plugin_ctx (pm);
- return;
- }
- tpmm = (const struct TransportPluginMonitorMessage *) msg;
pname = (const char *) &tpmm[1];
pname_len = ntohs (tpmm->plugin_name_len);
paddr_len = ntohs (tpmm->plugin_address_len);
- if ( (pname_len +
- paddr_len +
- sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
- ( (0 != pname_len) &&
- ('\0' != pname[pname_len - 1]) ) )
- {
- GNUNET_break (0);
- reconnect_plugin_ctx (pm);
- return;
- }
paddr = &pname[pname_len];
ps = NULL;
ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
@@ -372,10 +322,83 @@ response_processor (void *cls,
ps));
GNUNET_free (ps);
}
- GNUNET_CLIENT_receive (pm->client,
- &response_processor,
- pm,
- GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Function called with sync responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @param msg message from the service
+ */
+static void
+handle_sync (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+ /* we are in sync, notify callback */
+ pm->cb (pm->cb_cls,
+ NULL,
+ NULL,
+ NULL);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_NSE_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+ reconnect_plugin_ctx (pm);
+}
+
+
+/**
+ * Task run to re-establish the connection.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ */
+static void
+do_plugin_connect (void *cls)
+{
+ GNUNET_MQ_hd_var_size (event,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
+ struct TransportPluginMonitorMessage);
+ GNUNET_MQ_hd_fixed_size (sync,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
+ struct GNUNET_MessageHeader);
+ struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_event_handler (pm),
+ make_sync_handler (pm),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MessageHeader *msg;
+ struct GNUNET_MQ_Envelope *env;
+
+ pm->reconnect_task = NULL;
+ pm->mq = GNUNET_CLIENT_connecT (pm->cfg,
+ "transport",
+ handlers,
+ &mq_error_handler,
+ pm);
+ if (NULL == pm->mq)
+ return;
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
+ GNUNET_MQ_send (pm->mq,
+ env);
}
@@ -394,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
void *cb_cls)
{
struct GNUNET_TRANSPORT_PluginMonitor *pm;
- struct GNUNET_CLIENT_Connection *client;
- client = GNUNET_CLIENT_connect ("transport",
- cfg);
- if (NULL == client)
- return NULL;
pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
pm->cb = cb;
pm->cb_cls = cb_cls;
pm->cfg = cfg;
- pm->client = client;
+ do_plugin_connect (pm);
+ if (NULL == pm->mq)
+ {
+ GNUNET_free (pm);
+ return NULL;
+ }
pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
- send_plugin_mon_request (pm);
return pm;
}
@@ -422,10 +444,10 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
void
GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
{
- if (NULL != pm->client)
+ if (NULL != pm->mq)
{
- GNUNET_CLIENT_disconnect (pm->client);
- pm->client = NULL;
+ GNUNET_MQ_destroy (pm->mq);
+ pm->mq = NULL;
}
if (NULL != pm->reconnect_task)
{