aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-06 22:52:03 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-06 22:52:03 +0000
commit6f2641e920c612c60facb0a63eac6ac65f74351b (patch)
tree7284f4aa83a815c3d99f1348fb2cb1b436e61fb1
parent5c862712d9c389fced47fde9abdc9458da319aef (diff)
downloadgnunet-6f2641e920c612c60facb0a63eac6ac65f74351b.tar.gz
gnunet-6f2641e920c612c60facb0a63eac6ac65f74351b.zip
converting monitor plugin functionality to MQ
-rw-r--r--src/transport/transport_api_monitor_plugins.c230
1 files changed, 126 insertions, 104 deletions
diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c
index eef4a0830..01ec2074a 100644
--- a/src/transport/transport_api_monitor_plugins.c
+++ b/src/transport/transport_api_monitor_plugins.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2014 GNUnet e.V. 3 Copyright (C) 2014, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
41 /** 41 /**
42 * Connection to the service. 42 * Connection to the service.
43 */ 43 */
44 struct GNUNET_CLIENT_Connection *client; 44 struct GNUNET_MQ_Handle *mq;
45 45
46 /** 46 /**
47 * Our configuration. 47 * Our configuration.
@@ -72,7 +72,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
72 /** 72 /**
73 * Task ID for reconnect. 73 * Task ID for reconnect.
74 */ 74 */
75 struct GNUNET_SCHEDULER_Task * reconnect_task; 75 struct GNUNET_SCHEDULER_Task *reconnect_task;
76 76
77}; 77};
78 78
@@ -95,39 +95,6 @@ struct GNUNET_TRANSPORT_PluginSession
95}; 95};
96 96
97 97
98/**
99 * Function called with responses from the service.
100 *
101 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
102 * @param msg NULL on timeout or error, otherwise presumably a
103 * message with the human-readable address
104 */
105static void
106response_processor (void *cls,
107 const struct GNUNET_MessageHeader *msg);
108
109
110/**
111 * Send our subscription request to the service.
112 *
113 * @param pal_ctx our context
114 */
115static void
116send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
117{
118 struct GNUNET_MessageHeader msg;
119
120 msg.size = htons (sizeof (struct GNUNET_MessageHeader));
121 msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
122 GNUNET_assert (GNUNET_OK ==
123 GNUNET_CLIENT_transmit_and_get_response (pm->client,
124 &msg,
125 GNUNET_TIME_UNIT_FOREVER_REL,
126 GNUNET_YES,
127 &response_processor,
128 pm));
129}
130
131 98
132/** 99/**
133 * Task run to re-establish the connection. 100 * Task run to re-establish the connection.
@@ -135,15 +102,7 @@ send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
135 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` 102 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
136 */ 103 */
137static void 104static void
138do_plugin_connect (void *cls) 105do_plugin_connect (void *cls);
139{
140 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
141
142 pm->reconnect_task = NULL;
143 pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
144 GNUNET_assert (NULL != pm->client);
145 send_plugin_mon_request (pm);
146}
147 106
148 107
149/** 108/**
@@ -184,8 +143,8 @@ free_entry (void *cls,
184static void 143static void
185reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) 144reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
186{ 145{
187 GNUNET_CLIENT_disconnect (pm->client); 146 GNUNET_MQ_destroy (pm->mq);
188 pm->client = NULL; 147 pm->mq = NULL;
189 GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, 148 GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
190 &free_entry, 149 &free_entry,
191 pm); 150 pm);
@@ -257,15 +216,44 @@ locate_by_id (void *cls,
257 * Function called with responses from the service. 216 * Function called with responses from the service.
258 * 217 *
259 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` 218 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
260 * @param msg NULL on timeout or error, otherwise presumably a 219 * @paramm tpmm message with event data
261 * message with the human-readable address 220 * @return #GNUNET_Ok if message is well-formed
221 */
222static int
223check_event (void *cls,
224 const struct TransportPluginMonitorMessage *tpmm)
225{
226 const char *pname;
227 size_t pname_len;
228 size_t paddr_len;
229
230 pname = (const char *) &tpmm[1];
231 pname_len = ntohs (tpmm->plugin_name_len);
232 paddr_len = ntohs (tpmm->plugin_address_len);
233 if ( (pname_len +
234 paddr_len +
235 sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
236 ( (0 != pname_len) &&
237 ('\0' != pname[pname_len - 1]) ) )
238 {
239 GNUNET_break (0);
240 return GNUNET_SYSERR;
241 }
242 return GNUNET_OK;
243}
244
245
246/**
247 * Function called with responses from the service.
248 *
249 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
250 * @paramm tpmm message with event data
262 */ 251 */
263static void 252static void
264response_processor (void *cls, 253handle_event (void *cls,
265 const struct GNUNET_MessageHeader *msg) 254 const struct TransportPluginMonitorMessage *tpmm)
266{ 255{
267 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; 256 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
268 const struct TransportPluginMonitorMessage *tpmm;
269 struct GNUNET_TRANSPORT_PluginSession *ps; 257 struct GNUNET_TRANSPORT_PluginSession *ps;
270 const char *pname; 258 const char *pname;
271 const void *paddr; 259 const void *paddr;
@@ -276,47 +264,9 @@ response_processor (void *cls,
276 struct GNUNET_HELLO_Address addr; 264 struct GNUNET_HELLO_Address addr;
277 struct SearchContext rv; 265 struct SearchContext rv;
278 266
279 if (NULL == msg)
280 {
281 reconnect_plugin_ctx (pm);
282 return;
283 }
284 if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
285 (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
286 {
287 /* we are in sync */
288 pm->cb (pm->cb_cls,
289 NULL,
290 NULL,
291 NULL);
292 GNUNET_CLIENT_receive (pm->client,
293 &response_processor,
294 pm,
295 GNUNET_TIME_UNIT_FOREVER_REL);
296 return;
297 }
298
299 if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
300 (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
301 {
302 GNUNET_break (0);
303 reconnect_plugin_ctx (pm);
304 return;
305 }
306 tpmm = (const struct TransportPluginMonitorMessage *) msg;
307 pname = (const char *) &tpmm[1]; 267 pname = (const char *) &tpmm[1];
308 pname_len = ntohs (tpmm->plugin_name_len); 268 pname_len = ntohs (tpmm->plugin_name_len);
309 paddr_len = ntohs (tpmm->plugin_address_len); 269 paddr_len = ntohs (tpmm->plugin_address_len);
310 if ( (pname_len +
311 paddr_len +
312 sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
313 ( (0 != pname_len) &&
314 ('\0' != pname[pname_len - 1]) ) )
315 {
316 GNUNET_break (0);
317 reconnect_plugin_ctx (pm);
318 return;
319 }
320 paddr = &pname[pname_len]; 270 paddr = &pname[pname_len];
321 ps = NULL; 271 ps = NULL;
322 ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state); 272 ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
@@ -372,10 +322,83 @@ response_processor (void *cls,
372 ps)); 322 ps));
373 GNUNET_free (ps); 323 GNUNET_free (ps);
374 } 324 }
375 GNUNET_CLIENT_receive (pm->client, 325}
376 &response_processor, 326
377 pm, 327
378 GNUNET_TIME_UNIT_FOREVER_REL); 328/**
329 * Function called with sync responses from the service.
330 *
331 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
332 * @param msg message from the service
333 */
334static void
335handle_sync (void *cls,
336 const struct GNUNET_MessageHeader *msg)
337{
338 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
339
340 /* we are in sync, notify callback */
341 pm->cb (pm->cb_cls,
342 NULL,
343 NULL,
344 NULL);
345}
346
347
348/**
349 * Generic error handler, called with the appropriate
350 * error code and the same closure specified at the creation of
351 * the message queue.
352 * Not every message queue implementation supports an error handler.
353 *
354 * @param cls closure with the `struct GNUNET_NSE_Handle *`
355 * @param error error code
356 */
357static void
358mq_error_handler (void *cls,
359 enum GNUNET_MQ_Error error)
360{
361 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
362
363 reconnect_plugin_ctx (pm);
364}
365
366
367/**
368 * Task run to re-establish the connection.
369 *
370 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
371 */
372static void
373do_plugin_connect (void *cls)
374{
375 GNUNET_MQ_hd_var_size (event,
376 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
377 struct TransportPluginMonitorMessage);
378 GNUNET_MQ_hd_fixed_size (sync,
379 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
380 struct GNUNET_MessageHeader);
381 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
382 struct GNUNET_MQ_MessageHandler handlers[] = {
383 make_event_handler (pm),
384 make_sync_handler (pm),
385 GNUNET_MQ_handler_end ()
386 };
387 struct GNUNET_MessageHeader *msg;
388 struct GNUNET_MQ_Envelope *env;
389
390 pm->reconnect_task = NULL;
391 pm->mq = GNUNET_CLIENT_connecT (pm->cfg,
392 "transport",
393 handlers,
394 &mq_error_handler,
395 pm);
396 if (NULL == pm->mq)
397 return;
398 env = GNUNET_MQ_msg (msg,
399 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
400 GNUNET_MQ_send (pm->mq,
401 env);
379} 402}
380 403
381 404
@@ -394,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
394 void *cb_cls) 417 void *cb_cls)
395{ 418{
396 struct GNUNET_TRANSPORT_PluginMonitor *pm; 419 struct GNUNET_TRANSPORT_PluginMonitor *pm;
397 struct GNUNET_CLIENT_Connection *client;
398 420
399 client = GNUNET_CLIENT_connect ("transport",
400 cfg);
401 if (NULL == client)
402 return NULL;
403 pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor); 421 pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
404 pm->cb = cb; 422 pm->cb = cb;
405 pm->cb_cls = cb_cls; 423 pm->cb_cls = cb_cls;
406 pm->cfg = cfg; 424 pm->cfg = cfg;
407 pm->client = client; 425 do_plugin_connect (pm);
426 if (NULL == pm->mq)
427 {
428 GNUNET_free (pm);
429 return NULL;
430 }
408 pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128); 431 pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
409 send_plugin_mon_request (pm);
410 return pm; 432 return pm;
411} 433}
412 434
@@ -422,10 +444,10 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
422void 444void
423GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm) 445GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
424{ 446{
425 if (NULL != pm->client) 447 if (NULL != pm->mq)
426 { 448 {
427 GNUNET_CLIENT_disconnect (pm->client); 449 GNUNET_MQ_destroy (pm->mq);
428 pm->client = NULL; 450 pm->mq = NULL;
429 } 451 }
430 if (NULL != pm->reconnect_task) 452 if (NULL != pm->reconnect_task)
431 { 453 {