diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-06 22:52:03 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-06 22:52:03 +0000 |
commit | 6f2641e920c612c60facb0a63eac6ac65f74351b (patch) | |
tree | 7284f4aa83a815c3d99f1348fb2cb1b436e61fb1 /src/transport/transport_api_monitor_plugins.c | |
parent | 5c862712d9c389fced47fde9abdc9458da319aef (diff) | |
download | gnunet-6f2641e920c612c60facb0a63eac6ac65f74351b.tar.gz gnunet-6f2641e920c612c60facb0a63eac6ac65f74351b.zip |
converting monitor plugin functionality to MQ
Diffstat (limited to 'src/transport/transport_api_monitor_plugins.c')
-rw-r--r-- | src/transport/transport_api_monitor_plugins.c | 230 |
1 files changed, 126 insertions, 104 deletions
diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c index eef4a0830..01ec2074a 100644 --- a/src/transport/transport_api_monitor_plugins.c +++ b/src/transport/transport_api_monitor_plugins.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2014 GNUnet e.V. | 3 | Copyright (C) 2014, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor | |||
41 | /** | 41 | /** |
42 | * Connection to the service. | 42 | * Connection to the service. |
43 | */ | 43 | */ |
44 | struct GNUNET_CLIENT_Connection *client; | 44 | struct GNUNET_MQ_Handle *mq; |
45 | 45 | ||
46 | /** | 46 | /** |
47 | * Our configuration. | 47 | * Our configuration. |
@@ -72,7 +72,7 @@ struct GNUNET_TRANSPORT_PluginMonitor | |||
72 | /** | 72 | /** |
73 | * Task ID for reconnect. | 73 | * Task ID for reconnect. |
74 | */ | 74 | */ |
75 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 75 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
76 | 76 | ||
77 | }; | 77 | }; |
78 | 78 | ||
@@ -95,39 +95,6 @@ struct GNUNET_TRANSPORT_PluginSession | |||
95 | }; | 95 | }; |
96 | 96 | ||
97 | 97 | ||
98 | /** | ||
99 | * Function called with responses from the service. | ||
100 | * | ||
101 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | ||
102 | * @param msg NULL on timeout or error, otherwise presumably a | ||
103 | * message with the human-readable address | ||
104 | */ | ||
105 | static void | ||
106 | response_processor (void *cls, | ||
107 | const struct GNUNET_MessageHeader *msg); | ||
108 | |||
109 | |||
110 | /** | ||
111 | * Send our subscription request to the service. | ||
112 | * | ||
113 | * @param pal_ctx our context | ||
114 | */ | ||
115 | static void | ||
116 | send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm) | ||
117 | { | ||
118 | struct GNUNET_MessageHeader msg; | ||
119 | |||
120 | msg.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
121 | msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START); | ||
122 | GNUNET_assert (GNUNET_OK == | ||
123 | GNUNET_CLIENT_transmit_and_get_response (pm->client, | ||
124 | &msg, | ||
125 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
126 | GNUNET_YES, | ||
127 | &response_processor, | ||
128 | pm)); | ||
129 | } | ||
130 | |||
131 | 98 | ||
132 | /** | 99 | /** |
133 | * Task run to re-establish the connection. | 100 | * Task run to re-establish the connection. |
@@ -135,15 +102,7 @@ send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm) | |||
135 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | 102 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` |
136 | */ | 103 | */ |
137 | static void | 104 | static void |
138 | do_plugin_connect (void *cls) | 105 | do_plugin_connect (void *cls); |
139 | { | ||
140 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; | ||
141 | |||
142 | pm->reconnect_task = NULL; | ||
143 | pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg); | ||
144 | GNUNET_assert (NULL != pm->client); | ||
145 | send_plugin_mon_request (pm); | ||
146 | } | ||
147 | 106 | ||
148 | 107 | ||
149 | /** | 108 | /** |
@@ -184,8 +143,8 @@ free_entry (void *cls, | |||
184 | static void | 143 | static void |
185 | reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) | 144 | reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) |
186 | { | 145 | { |
187 | GNUNET_CLIENT_disconnect (pm->client); | 146 | GNUNET_MQ_destroy (pm->mq); |
188 | pm->client = NULL; | 147 | pm->mq = NULL; |
189 | GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, | 148 | GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, |
190 | &free_entry, | 149 | &free_entry, |
191 | pm); | 150 | pm); |
@@ -257,15 +216,44 @@ locate_by_id (void *cls, | |||
257 | * Function called with responses from the service. | 216 | * Function called with responses from the service. |
258 | * | 217 | * |
259 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | 218 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` |
260 | * @param msg NULL on timeout or error, otherwise presumably a | 219 | * @paramm tpmm message with event data |
261 | * message with the human-readable address | 220 | * @return #GNUNET_Ok if message is well-formed |
221 | */ | ||
222 | static int | ||
223 | check_event (void *cls, | ||
224 | const struct TransportPluginMonitorMessage *tpmm) | ||
225 | { | ||
226 | const char *pname; | ||
227 | size_t pname_len; | ||
228 | size_t paddr_len; | ||
229 | |||
230 | pname = (const char *) &tpmm[1]; | ||
231 | pname_len = ntohs (tpmm->plugin_name_len); | ||
232 | paddr_len = ntohs (tpmm->plugin_address_len); | ||
233 | if ( (pname_len + | ||
234 | paddr_len + | ||
235 | sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) || | ||
236 | ( (0 != pname_len) && | ||
237 | ('\0' != pname[pname_len - 1]) ) ) | ||
238 | { | ||
239 | GNUNET_break (0); | ||
240 | return GNUNET_SYSERR; | ||
241 | } | ||
242 | return GNUNET_OK; | ||
243 | } | ||
244 | |||
245 | |||
246 | /** | ||
247 | * Function called with responses from the service. | ||
248 | * | ||
249 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | ||
250 | * @paramm tpmm message with event data | ||
262 | */ | 251 | */ |
263 | static void | 252 | static void |
264 | response_processor (void *cls, | 253 | handle_event (void *cls, |
265 | const struct GNUNET_MessageHeader *msg) | 254 | const struct TransportPluginMonitorMessage *tpmm) |
266 | { | 255 | { |
267 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; | 256 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; |
268 | const struct TransportPluginMonitorMessage *tpmm; | ||
269 | struct GNUNET_TRANSPORT_PluginSession *ps; | 257 | struct GNUNET_TRANSPORT_PluginSession *ps; |
270 | const char *pname; | 258 | const char *pname; |
271 | const void *paddr; | 259 | const void *paddr; |
@@ -276,47 +264,9 @@ response_processor (void *cls, | |||
276 | struct GNUNET_HELLO_Address addr; | 264 | struct GNUNET_HELLO_Address addr; |
277 | struct SearchContext rv; | 265 | struct SearchContext rv; |
278 | 266 | ||
279 | if (NULL == msg) | ||
280 | { | ||
281 | reconnect_plugin_ctx (pm); | ||
282 | return; | ||
283 | } | ||
284 | if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) && | ||
285 | (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) ) | ||
286 | { | ||
287 | /* we are in sync */ | ||
288 | pm->cb (pm->cb_cls, | ||
289 | NULL, | ||
290 | NULL, | ||
291 | NULL); | ||
292 | GNUNET_CLIENT_receive (pm->client, | ||
293 | &response_processor, | ||
294 | pm, | ||
295 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
296 | return; | ||
297 | } | ||
298 | |||
299 | if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) || | ||
300 | (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) ) | ||
301 | { | ||
302 | GNUNET_break (0); | ||
303 | reconnect_plugin_ctx (pm); | ||
304 | return; | ||
305 | } | ||
306 | tpmm = (const struct TransportPluginMonitorMessage *) msg; | ||
307 | pname = (const char *) &tpmm[1]; | 267 | pname = (const char *) &tpmm[1]; |
308 | pname_len = ntohs (tpmm->plugin_name_len); | 268 | pname_len = ntohs (tpmm->plugin_name_len); |
309 | paddr_len = ntohs (tpmm->plugin_address_len); | 269 | paddr_len = ntohs (tpmm->plugin_address_len); |
310 | if ( (pname_len + | ||
311 | paddr_len + | ||
312 | sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) || | ||
313 | ( (0 != pname_len) && | ||
314 | ('\0' != pname[pname_len - 1]) ) ) | ||
315 | { | ||
316 | GNUNET_break (0); | ||
317 | reconnect_plugin_ctx (pm); | ||
318 | return; | ||
319 | } | ||
320 | paddr = &pname[pname_len]; | 270 | paddr = &pname[pname_len]; |
321 | ps = NULL; | 271 | ps = NULL; |
322 | ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state); | 272 | ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state); |
@@ -372,10 +322,83 @@ response_processor (void *cls, | |||
372 | ps)); | 322 | ps)); |
373 | GNUNET_free (ps); | 323 | GNUNET_free (ps); |
374 | } | 324 | } |
375 | GNUNET_CLIENT_receive (pm->client, | 325 | } |
376 | &response_processor, | 326 | |
377 | pm, | 327 | |
378 | GNUNET_TIME_UNIT_FOREVER_REL); | 328 | /** |
329 | * Function called with sync responses from the service. | ||
330 | * | ||
331 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | ||
332 | * @param msg message from the service | ||
333 | */ | ||
334 | static void | ||
335 | handle_sync (void *cls, | ||
336 | const struct GNUNET_MessageHeader *msg) | ||
337 | { | ||
338 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; | ||
339 | |||
340 | /* we are in sync, notify callback */ | ||
341 | pm->cb (pm->cb_cls, | ||
342 | NULL, | ||
343 | NULL, | ||
344 | NULL); | ||
345 | } | ||
346 | |||
347 | |||
348 | /** | ||
349 | * Generic error handler, called with the appropriate | ||
350 | * error code and the same closure specified at the creation of | ||
351 | * the message queue. | ||
352 | * Not every message queue implementation supports an error handler. | ||
353 | * | ||
354 | * @param cls closure with the `struct GNUNET_NSE_Handle *` | ||
355 | * @param error error code | ||
356 | */ | ||
357 | static void | ||
358 | mq_error_handler (void *cls, | ||
359 | enum GNUNET_MQ_Error error) | ||
360 | { | ||
361 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; | ||
362 | |||
363 | reconnect_plugin_ctx (pm); | ||
364 | } | ||
365 | |||
366 | |||
367 | /** | ||
368 | * Task run to re-establish the connection. | ||
369 | * | ||
370 | * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *` | ||
371 | */ | ||
372 | static void | ||
373 | do_plugin_connect (void *cls) | ||
374 | { | ||
375 | GNUNET_MQ_hd_var_size (event, | ||
376 | GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT, | ||
377 | struct TransportPluginMonitorMessage); | ||
378 | GNUNET_MQ_hd_fixed_size (sync, | ||
379 | GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC, | ||
380 | struct GNUNET_MessageHeader); | ||
381 | struct GNUNET_TRANSPORT_PluginMonitor *pm = cls; | ||
382 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
383 | make_event_handler (pm), | ||
384 | make_sync_handler (pm), | ||
385 | GNUNET_MQ_handler_end () | ||
386 | }; | ||
387 | struct GNUNET_MessageHeader *msg; | ||
388 | struct GNUNET_MQ_Envelope *env; | ||
389 | |||
390 | pm->reconnect_task = NULL; | ||
391 | pm->mq = GNUNET_CLIENT_connecT (pm->cfg, | ||
392 | "transport", | ||
393 | handlers, | ||
394 | &mq_error_handler, | ||
395 | pm); | ||
396 | if (NULL == pm->mq) | ||
397 | return; | ||
398 | env = GNUNET_MQ_msg (msg, | ||
399 | GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START); | ||
400 | GNUNET_MQ_send (pm->mq, | ||
401 | env); | ||
379 | } | 402 | } |
380 | 403 | ||
381 | 404 | ||
@@ -394,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
394 | void *cb_cls) | 417 | void *cb_cls) |
395 | { | 418 | { |
396 | struct GNUNET_TRANSPORT_PluginMonitor *pm; | 419 | struct GNUNET_TRANSPORT_PluginMonitor *pm; |
397 | struct GNUNET_CLIENT_Connection *client; | ||
398 | 420 | ||
399 | client = GNUNET_CLIENT_connect ("transport", | ||
400 | cfg); | ||
401 | if (NULL == client) | ||
402 | return NULL; | ||
403 | pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor); | 421 | pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor); |
404 | pm->cb = cb; | 422 | pm->cb = cb; |
405 | pm->cb_cls = cb_cls; | 423 | pm->cb_cls = cb_cls; |
406 | pm->cfg = cfg; | 424 | pm->cfg = cfg; |
407 | pm->client = client; | 425 | do_plugin_connect (pm); |
426 | if (NULL == pm->mq) | ||
427 | { | ||
428 | GNUNET_free (pm); | ||
429 | return NULL; | ||
430 | } | ||
408 | pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128); | 431 | pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128); |
409 | send_plugin_mon_request (pm); | ||
410 | return pm; | 432 | return pm; |
411 | } | 433 | } |
412 | 434 | ||
@@ -422,10 +444,10 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
422 | void | 444 | void |
423 | GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm) | 445 | GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm) |
424 | { | 446 | { |
425 | if (NULL != pm->client) | 447 | if (NULL != pm->mq) |
426 | { | 448 | { |
427 | GNUNET_CLIENT_disconnect (pm->client); | 449 | GNUNET_MQ_destroy (pm->mq); |
428 | pm->client = NULL; | 450 | pm->mq = NULL; |
429 | } | 451 | } |
430 | if (NULL != pm->reconnect_task) | 452 | if (NULL != pm->reconnect_task) |
431 | { | 453 | { |