aboutsummaryrefslogtreecommitdiff
path: root/src/arm
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-22 07:19:52 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-22 07:19:52 +0000
commiteed68e90e8564b578fd92ef130d305465cecf936 (patch)
tree0e2bc52e369179487bde1c047165c94d93669b36 /src/arm
parentdd9ed7931e52705b216c346127108520c5e4460b (diff)
downloadgnunet-eed68e90e8564b578fd92ef130d305465cecf936.tar.gz
gnunet-eed68e90e8564b578fd92ef130d305465cecf936.zip
convert monitor API to use MQ
Diffstat (limited to 'src/arm')
-rw-r--r--src/arm/arm_monitor_api.c282
-rw-r--r--src/arm/gnunet-service-arm.c13
2 files changed, 111 insertions, 184 deletions
diff --git a/src/arm/arm_monitor_api.c b/src/arm/arm_monitor_api.c
index 19a2f4eb9..6d4129928 100644
--- a/src/arm/arm_monitor_api.c
+++ b/src/arm/arm_monitor_api.c
@@ -42,22 +42,17 @@ struct GNUNET_ARM_MonitorHandle
42 /** 42 /**
43 * Our control connection to the ARM service. 43 * Our control connection to the ARM service.
44 */ 44 */
45 struct GNUNET_CLIENT_Connection *monitor; 45 struct GNUNET_MQ_Handle *mq;
46 46
47 /** 47 /**
48 * The configuration that we are using. 48 * The configuration that we are using.
49 */ 49 */
50 struct GNUNET_CONFIGURATION_Handle *cfg; 50 const struct GNUNET_CONFIGURATION_Handle *cfg;
51
52 /**
53 * Handle for our current transmission request.
54 */
55 struct GNUNET_CLIENT_TransmitHandle *cth;
56 51
57 /** 52 /**
58 * ID of the reconnect task (if any). 53 * ID of the reconnect task (if any).
59 */ 54 */
60 struct GNUNET_SCHEDULER_Task * reconnect_task; 55 struct GNUNET_SCHEDULER_Task *reconnect_task;
61 56
62 /** 57 /**
63 * Current delay we use for re-trying to connect to core. 58 * Current delay we use for re-trying to connect to core.
@@ -65,32 +60,24 @@ struct GNUNET_ARM_MonitorHandle
65 struct GNUNET_TIME_Relative retry_backoff; 60 struct GNUNET_TIME_Relative retry_backoff;
66 61
67 /** 62 /**
68 * Are we currently disconnected and hence unable to send?
69 */
70 unsigned char currently_down;
71
72 /**
73 * Callback to invoke on status updates. 63 * Callback to invoke on status updates.
74 */ 64 */
75 GNUNET_ARM_ServiceStatusCallback service_status; 65 GNUNET_ARM_ServiceStatusCallback service_status;
76 66
77 /** 67 /**
78 * Closure for service_status. 68 * Closure for @e service_status.
79 */ 69 */
80 void *cls; 70 void *service_status_cls;
81 71
82 /**
83 * ID of a task to run if we fail to get a reply to the init message in time.
84 */
85 struct GNUNET_SCHEDULER_Task * init_timeout_task_id;
86}; 72};
87 73
88 74
89static void 75/**
90monitor_notify_handler (void *cls, 76 * Connect to the ARM service for monitoring.
91 const struct GNUNET_MessageHeader *msg); 77 *
92 78 * @param h handle to connect
93 79 * @return #GNUNET_OK on success
80 */
94static int 81static int
95reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h); 82reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h);
96 83
@@ -98,7 +85,7 @@ reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h);
98/** 85/**
99 * Task scheduled to try to re-connect to arm. 86 * Task scheduled to try to re-connect to arm.
100 * 87 *
101 * @param cls the 'struct GNUNET_ARM_MonitorHandle' 88 * @param cls the `struct GNUNET_ARM_MonitorHandle`
102 */ 89 */
103static void 90static void
104reconnect_arm_monitor_task (void *cls) 91reconnect_arm_monitor_task (void *cls)
@@ -108,7 +95,7 @@ reconnect_arm_monitor_task (void *cls)
108 h->reconnect_task = NULL; 95 h->reconnect_task = NULL;
109 LOG (GNUNET_ERROR_TYPE_DEBUG, 96 LOG (GNUNET_ERROR_TYPE_DEBUG,
110 "Connecting to ARM service for monitoring after delay\n"); 97 "Connecting to ARM service for monitoring after delay\n");
111 reconnect_arm_monitor (h); 98 GNUNET_break (GNUNET_OK == reconnect_arm_monitor (h));
112} 99}
113 100
114 101
@@ -121,118 +108,123 @@ reconnect_arm_monitor_task (void *cls)
121static void 108static void
122reconnect_arm_monitor_later (struct GNUNET_ARM_MonitorHandle *h) 109reconnect_arm_monitor_later (struct GNUNET_ARM_MonitorHandle *h)
123{ 110{
124 if (NULL != h->cth) 111 if (NULL != h->mq)
125 { 112 {
126 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); 113 GNUNET_MQ_destroy (h->mq);
127 h->cth = NULL; 114 h->mq = NULL;
128 } 115 }
116 GNUNET_assert (NULL == h->reconnect_task);
117 h->reconnect_task
118 = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
119 &reconnect_arm_monitor_task, h);
120 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
121}
129 122
130 if (NULL != h->monitor)
131 {
132 GNUNET_CLIENT_disconnect (h->monitor);
133 h->monitor = NULL;
134 }
135 123
136 if (NULL != h->init_timeout_task_id) 124/**
125 * Check notification messages received from ARM is well-formed.
126 *
127 * @param cls our `struct GNUNET_ARM_MonitorHandle`
128 * @param msg the message received from the arm service
129 * @return #GNUNET_OK if the message is well-formed
130 */
131static int
132check_monitor_notify (void *cls,
133 const struct GNUNET_ARM_StatusMessage *res)
134{
135 size_t sl = ntohs (res->header.size) - sizeof (struct GNUNET_ARM_StatusMessage);
136 const char *name = (const char *) &res[1];
137
138 if ( (0 == sl) ||
139 ('\0' != name[sl-1]) )
137 { 140 {
138 GNUNET_SCHEDULER_cancel (h->init_timeout_task_id); 141 GNUNET_break (0);
139 h->init_timeout_task_id = NULL; 142 return GNUNET_SYSERR;
140 } 143 }
141 144 return GNUNET_OK;
142 GNUNET_assert (NULL == h->reconnect_task);
143 h->reconnect_task =
144 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_arm_monitor_task, h);
145
146 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
147} 145}
148 146
149 147
150/** 148/**
151 * Init message timed out. Disconnect and try again. 149 * Handler for notification messages received from ARM.
152 * 150 *
153 * @param cls arm monitor handle 151 * @param cls our `struct GNUNET_ARM_MonitorHandle`
152 * @param msg the message received from the arm service
154 */ 153 */
155static void 154static void
156init_timeout_task (void *cls) 155handle_monitor_notify (void *cls,
156 const struct GNUNET_ARM_StatusMessage *res)
157{ 157{
158 struct GNUNET_ARM_MonitorHandle *h = cls; 158 struct GNUNET_ARM_MonitorHandle *h = cls;
159 enum GNUNET_ARM_ServiceStatus status;
159 160
161 status = (enum GNUNET_ARM_ServiceStatus) ntohl (res->status);
160 LOG (GNUNET_ERROR_TYPE_DEBUG, 162 LOG (GNUNET_ERROR_TYPE_DEBUG,
161 "Init message timed out\n"); 163 "Received notification from ARM for service `%s' with status %d\n",
162 h->init_timeout_task_id = NULL; 164 (const char *) &res[1],
163 reconnect_arm_monitor_later (h); 165 (int) status);
166 if (NULL != h->service_status)
167 h->service_status (h->service_status_cls,
168 (const char *) &res[1],
169 status);
164} 170}
165 171
166 172
167/** 173/**
168 * Transmit the monitoring initialization message to the arm service. 174 * Generic error handler, called with the appropriate error code and
175 * the same closure specified at the creation of the message queue.
176 * Not every message queue implementation supports an error handler.
169 * 177 *
170 * @param cls closure with the 'struct GNUNET_ARM_MonitorHandle' 178 * @param cls closure with the `struct GNUNET_ARM_MonitorHandle *`
171 * @param size number of bytes available in buf 179 * @param error error code
172 * @param buf where the callee should write the message
173 * @return number of bytes written to buf
174 */ 180 */
175static size_t 181static void
176transmit_monitoring_init_message (void *cls, size_t size, void *buf) 182mq_error_handler (void *cls,
183 enum GNUNET_MQ_Error error)
177{ 184{
178 struct GNUNET_ARM_MonitorHandle *h = cls; 185 struct GNUNET_ARM_MonitorHandle *h = cls;
179 struct GNUNET_MessageHeader *msg;
180 uint16_t msize;
181 186
182 GNUNET_assert (NULL == h->reconnect_task); 187 reconnect_arm_monitor_later (h);
183 GNUNET_assert (NULL == h->init_timeout_task_id);
184 h->cth = NULL;
185 if (NULL == buf)
186 {
187 LOG (GNUNET_ERROR_TYPE_DEBUG,
188 "Transmission failed, initiating reconnect\n");
189 reconnect_arm_monitor_later (h);
190 return 0;
191 }
192 msize = sizeof (struct GNUNET_MessageHeader);
193 if (size < msize)
194 {
195 LOG (GNUNET_ERROR_TYPE_DEBUG,
196 "Request is too big (%u < %u), not sending it\n", size, msize);
197 h->cth = GNUNET_CLIENT_notify_transmit_ready (h->monitor, msize,
198 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
199 transmit_monitoring_init_message, h);
200 return 0;
201 }
202
203 msg = buf;
204 msg->size = htons (msize);
205 msg->type = htons (GNUNET_MESSAGE_TYPE_ARM_MONITOR);
206 LOG (GNUNET_ERROR_TYPE_DEBUG,
207 "Transmitting ARM monitoring init message with %u bytes to arm.\n",
208 (unsigned int) msize);
209
210 h->init_timeout_task_id = GNUNET_SCHEDULER_add_delayed (
211 INIT_TIMEOUT, init_timeout_task, h);
212 GNUNET_CLIENT_receive (h->monitor, &monitor_notify_handler, h,
213 GNUNET_TIME_UNIT_FOREVER_REL);
214 return msize;
215} 188}
216 189
217 190
191/**
192 * Connect to the ARM service for monitoring.
193 *
194 * @param h handle to connect
195 * @return #GNUNET_OK on success
196 */
218static int 197static int
219reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h) 198reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h)
220{ 199{
221 GNUNET_assert (NULL == h->monitor); 200 GNUNET_MQ_hd_var_size (monitor_notify,
222 h->monitor = GNUNET_CLIENT_connect ("arm", h->cfg); 201 GNUNET_MESSAGE_TYPE_ARM_STATUS,
223 if (NULL == h->monitor) 202 struct GNUNET_ARM_StatusMessage);
203 struct GNUNET_MQ_MessageHandler handlers[] = {
204 make_monitor_notify_handler (h),
205 GNUNET_MQ_handler_end ()
206 };
207 struct GNUNET_MessageHeader *msg;
208 struct GNUNET_MQ_Envelope *env;
209
210 GNUNET_assert (NULL == h->mq);
211 h->mq = GNUNET_CLIENT_connecT (h->cfg,
212 "arm",
213 handlers,
214 &mq_error_handler,
215 h);
216 if (NULL == h->mq)
224 { 217 {
225 LOG (GNUNET_ERROR_TYPE_DEBUG,
226 "arm_api, GNUNET_CLIENT_connect returned NULL\n");
227 if (NULL != h->service_status) 218 if (NULL != h->service_status)
228 h->service_status (h->cls, NULL, GNUNET_ARM_SERVICE_STOPPED); 219 h->service_status (h->service_status_cls,
220 NULL,
221 GNUNET_ARM_SERVICE_STOPPED);
229 return GNUNET_SYSERR; 222 return GNUNET_SYSERR;
230 } 223 }
231 LOG (GNUNET_ERROR_TYPE_DEBUG, 224 env = GNUNET_MQ_msg (msg,
232 "arm_api, GNUNET_CLIENT_connect returned non-NULL\n"); 225 GNUNET_MESSAGE_TYPE_ARM_MONITOR);
233 h->cth = GNUNET_CLIENT_notify_transmit_ready (h->monitor, 226 GNUNET_MQ_send (h->mq,
234 sizeof (struct GNUNET_MessageHeader), GNUNET_TIME_UNIT_FOREVER_REL, 227 env);
235 GNUNET_NO, &transmit_monitoring_init_message, h);
236 return GNUNET_OK; 228 return GNUNET_OK;
237} 229}
238 230
@@ -245,22 +237,20 @@ reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h)
245 * the ARM service may internally use a different 237 * the ARM service may internally use a different
246 * configuration to determine how to start the service). 238 * configuration to determine how to start the service).
247 * @param cont callback to invoke on status updates 239 * @param cont callback to invoke on status updates
248 * @param cont_cls closure 240 * @param cont_cls closure for @a cont
249 * @return context to use for further ARM monitor operations, NULL on error. 241 * @return context to use for further ARM monitor operations, NULL on error.
250 */ 242 */
251struct GNUNET_ARM_MonitorHandle * 243struct GNUNET_ARM_MonitorHandle *
252GNUNET_ARM_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg, 244GNUNET_ARM_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg,
253 GNUNET_ARM_ServiceStatusCallback cont, void *cont_cls) 245 GNUNET_ARM_ServiceStatusCallback cont,
246 void *cont_cls)
254{ 247{
255 struct GNUNET_ARM_MonitorHandle *h; 248 struct GNUNET_ARM_MonitorHandle *h;
256 249
257 h = GNUNET_new (struct GNUNET_ARM_MonitorHandle); 250 h = GNUNET_new (struct GNUNET_ARM_MonitorHandle);
258 h->cfg = GNUNET_CONFIGURATION_dup (cfg); 251 h->cfg = cfg;
259 h->currently_down = GNUNET_YES;
260 h->reconnect_task = NULL;
261 h->init_timeout_task_id = NULL;
262 h->service_status = cont; 252 h->service_status = cont;
263 h->cls = cont_cls; 253 h->service_status_cls = cont_cls;
264 if (GNUNET_OK != reconnect_arm_monitor (h)) 254 if (GNUNET_OK != reconnect_arm_monitor (h))
265 { 255 {
266 GNUNET_free (h); 256 GNUNET_free (h);
@@ -278,86 +268,18 @@ GNUNET_ARM_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg,
278void 268void
279GNUNET_ARM_monitor_disconnect_and_free (struct GNUNET_ARM_MonitorHandle *h) 269GNUNET_ARM_monitor_disconnect_and_free (struct GNUNET_ARM_MonitorHandle *h)
280{ 270{
281 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from ARM service\n"); 271 if (NULL != h->mq)
282 if (NULL != h->cth)
283 { 272 {
284 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); 273 GNUNET_MQ_destroy (h->mq);
285 h->cth = NULL; 274 h->mq = NULL;
286 }
287 if (NULL != h->init_timeout_task_id)
288 {
289 GNUNET_SCHEDULER_cancel (h->init_timeout_task_id);
290 h->init_timeout_task_id = NULL;
291 }
292 if (NULL != h->monitor)
293 {
294 GNUNET_CLIENT_disconnect (h->monitor);
295 h->monitor = NULL;
296 } 275 }
297 if (NULL != h->reconnect_task) 276 if (NULL != h->reconnect_task)
298 { 277 {
299 GNUNET_SCHEDULER_cancel (h->reconnect_task); 278 GNUNET_SCHEDULER_cancel (h->reconnect_task);
300 h->reconnect_task = NULL; 279 h->reconnect_task = NULL;
301 } 280 }
302 GNUNET_CONFIGURATION_destroy (h->cfg);
303 GNUNET_free (h); 281 GNUNET_free (h);
304} 282}
305 283
306 284
307/**
308 * Handler for notification messages received from ARM.
309 *
310 * @param cls our "struct GNUNET_ARM_MonitorHandle"
311 * @param msg the message received from the arm service
312 */
313static void
314monitor_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
315{
316 struct GNUNET_ARM_MonitorHandle *h = cls;
317 uint16_t msize;
318 const struct GNUNET_ARM_StatusMessage *res;
319 enum GNUNET_ARM_ServiceStatus status;
320
321 if (NULL == msg)
322 {
323 LOG (GNUNET_ERROR_TYPE_INFO,
324 _("Monitoring client was disconnected from arm service, trying to reconnect.\n"));
325 reconnect_arm_monitor_later (h);
326 return;
327 }
328 msize = ntohs (msg->size);
329 LOG (GNUNET_ERROR_TYPE_DEBUG,
330 "Processing message of type %u and size %u from arm service\n",
331 ntohs (msg->type), msize);
332 switch (ntohs (msg->type))
333 {
334 case GNUNET_MESSAGE_TYPE_ARM_STATUS:
335 if (msize <= sizeof (struct GNUNET_ARM_StatusMessage))
336 {
337 GNUNET_break (0);
338 reconnect_arm_monitor_later (h);
339 return;
340 }
341 if (NULL != h->init_timeout_task_id)
342 {
343 GNUNET_SCHEDULER_cancel (h->init_timeout_task_id);
344 h->init_timeout_task_id = NULL;
345 }
346 res = (const struct GNUNET_ARM_StatusMessage *) msg;
347 LOG (GNUNET_ERROR_TYPE_DEBUG,
348 "Received response from ARM for service `%s': %u\n",
349 (const char *) &res[1], ntohs (msg->type));
350 status = (enum GNUNET_ARM_ServiceStatus) ntohl (res->status);
351 GNUNET_CLIENT_receive (h->monitor, &monitor_notify_handler, h,
352 GNUNET_TIME_UNIT_FOREVER_REL);
353 if (NULL != h->service_status)
354 h->service_status (h->cls, (const char *) &res[1], status);
355 break;
356 default:
357 reconnect_arm_monitor_later (h);
358 return;
359 }
360}
361
362
363/* end of arm_api.c */ 285/* end of arm_api.c */
diff --git a/src/arm/gnunet-service-arm.c b/src/arm/gnunet-service-arm.c
index 0ccffa27b..8bc6e9e07 100644
--- a/src/arm/gnunet-service-arm.c
+++ b/src/arm/gnunet-service-arm.c
@@ -989,7 +989,8 @@ handle_stop (void *cls,
989 * @param message the actual message 989 * @param message the actual message
990 */ 990 */
991static void 991static void
992handle_list (void *cls, struct GNUNET_SERVER_Client *client, 992handle_list (void *cls,
993 struct GNUNET_SERVER_Client *client,
993 const struct GNUNET_MessageHeader *message) 994 const struct GNUNET_MessageHeader *message)
994{ 995{
995 struct GNUNET_ARM_ListResultMessage *msg; 996 struct GNUNET_ARM_ListResultMessage *msg;
@@ -1595,7 +1596,8 @@ setup_service (void *cls,
1595 * @param client identification of the client 1596 * @param client identification of the client
1596 */ 1597 */
1597static void 1598static void
1598handle_client_connecting (void *cls, struct GNUNET_SERVER_Client *client) 1599handle_client_connecting (void *cls,
1600 struct GNUNET_SERVER_Client *client)
1599{ 1601{
1600 /* All clients are considered to be of the "monitor" kind 1602 /* All clients are considered to be of the "monitor" kind
1601 * (that is, they don't affect ARM shutdown). 1603 * (that is, they don't affect ARM shutdown).
@@ -1615,9 +1617,12 @@ handle_client_connecting (void *cls, struct GNUNET_SERVER_Client *client)
1615 * #GNUNET_SYSERR to close it (signal serious error) 1617 * #GNUNET_SYSERR to close it (signal serious error)
1616 */ 1618 */
1617static void 1619static void
1618handle_monitor (void *cls, struct GNUNET_SERVER_Client *client, 1620handle_monitor (void *cls,
1619 const struct GNUNET_MessageHeader *message) 1621 struct GNUNET_SERVER_Client *client,
1622 const struct GNUNET_MessageHeader *message)
1620{ 1623{
1624 /* FIXME: might want to start by letting monitor know about
1625 services that are already running */
1621 /* Removal is handled by the server implementation, internally. */ 1626 /* Removal is handled by the server implementation, internally. */
1622 if ((NULL != client) && (NULL != notifier)) 1627 if ((NULL != client) && (NULL != notifier))
1623 { 1628 {