diff options
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/src/util/mq.c b/src/util/mq.c new file mode 100644 index 000000000..36cacd30b --- /dev/null +++ b/src/util/mq.c | |||
@@ -0,0 +1,481 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @author Florian Dold | ||
23 | * @file util/mq.c | ||
24 | * @brief general purpose request queue | ||
25 | */ | ||
26 | |||
27 | #include "platform.h" | ||
28 | #include "gnunet_common.h" | ||
29 | #include "gnunet_util_lib.h" | ||
30 | |||
31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) | ||
32 | |||
33 | |||
34 | |||
35 | struct ServerClientSocketState | ||
36 | { | ||
37 | struct GNUNET_SERVER_Client *client; | ||
38 | struct GNUNET_SERVER_TransmitHandle* th; | ||
39 | }; | ||
40 | |||
41 | |||
42 | struct ClientConnectionState | ||
43 | { | ||
44 | /** | ||
45 | * Did we call receive? | ||
46 | */ | ||
47 | int receive_active; | ||
48 | struct GNUNET_CLIENT_Connection *connection; | ||
49 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
50 | }; | ||
51 | |||
52 | |||
53 | |||
54 | |||
55 | /** | ||
56 | * Call the right callback for a message. | ||
57 | * | ||
58 | * @param mq message queue with the handlers | ||
59 | * @param mh message to dispatch | ||
60 | */ | ||
61 | void | ||
62 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | ||
63 | { | ||
64 | const struct GNUNET_MQ_Handler *handler; | ||
65 | int handled = GNUNET_NO; | ||
66 | |||
67 | handler = mq->handlers; | ||
68 | if (NULL == handler) | ||
69 | return; | ||
70 | for (; NULL != handler->cb; handler++) | ||
71 | { | ||
72 | if (handler->type == ntohs (mh->type)) | ||
73 | { | ||
74 | handler->cb (mq->handlers_cls, mh); | ||
75 | handled = GNUNET_YES; | ||
76 | } | ||
77 | } | ||
78 | |||
79 | if (GNUNET_NO == handled) | ||
80 | LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); | ||
81 | } | ||
82 | |||
83 | |||
84 | void | ||
85 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | ||
86 | { | ||
87 | GNUNET_assert (NULL == mqm->parent_queue); | ||
88 | GNUNET_free (mqm); | ||
89 | } | ||
90 | |||
91 | |||
92 | /** | ||
93 | * Send a message with the give message queue. | ||
94 | * May only be called once per message. | ||
95 | * | ||
96 | * @param mq message queue | ||
97 | * @param mqm the message to send. | ||
98 | */ | ||
99 | void | ||
100 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
101 | { | ||
102 | GNUNET_assert (NULL != mq); | ||
103 | mq->send_impl (mq, mqm); | ||
104 | } | ||
105 | |||
106 | |||
107 | struct GNUNET_MQ_Message * | ||
108 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | ||
109 | { | ||
110 | struct GNUNET_MQ_Message *mqm; | ||
111 | |||
112 | mqm = GNUNET_malloc (sizeof *mqm + size); | ||
113 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | ||
114 | mqm->mh->size = htons (size); | ||
115 | mqm->mh->type = htons (type); | ||
116 | if (NULL != mhp) | ||
117 | *mhp = mqm->mh; | ||
118 | return mqm; | ||
119 | } | ||
120 | |||
121 | |||
122 | int | ||
123 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, | ||
124 | const void *data, uint16_t len) | ||
125 | { | ||
126 | size_t new_size; | ||
127 | size_t old_size; | ||
128 | |||
129 | GNUNET_assert (NULL != mqmp); | ||
130 | /* there's no data to append => do nothing */ | ||
131 | if (NULL == data) | ||
132 | return GNUNET_OK; | ||
133 | old_size = ntohs ((*mqmp)->mh->size); | ||
134 | /* message too large to concatenate? */ | ||
135 | if (((uint16_t) (old_size + len)) < len) | ||
136 | return GNUNET_SYSERR; | ||
137 | new_size = old_size + len; | ||
138 | *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); | ||
139 | (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1]; | ||
140 | memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size); | ||
141 | (*mqmp)->mh->size = htons (new_size); | ||
142 | return GNUNET_OK; | ||
143 | } | ||
144 | |||
145 | |||
146 | |||
147 | |||
148 | /*** Transmit a queued message to the session's client. | ||
149 | * | ||
150 | * @param cls consensus session | ||
151 | * @param size number of bytes available in buf | ||
152 | * @param buf where the callee should write the message | ||
153 | * @return number of bytes written to buf | ||
154 | */ | ||
155 | static size_t | ||
156 | transmit_queued (void *cls, size_t size, | ||
157 | void *buf) | ||
158 | { | ||
159 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
160 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | ||
161 | struct ServerClientSocketState *state = mq->impl_state; | ||
162 | size_t msg_size; | ||
163 | |||
164 | GNUNET_assert (NULL != buf); | ||
165 | |||
166 | if (NULL != mqm->sent_cb) | ||
167 | { | ||
168 | mqm->sent_cb (mqm->sent_cls); | ||
169 | } | ||
170 | |||
171 | mq->current_msg = NULL; | ||
172 | GNUNET_assert (NULL != mqm); | ||
173 | msg_size = ntohs (mqm->mh->size); | ||
174 | GNUNET_assert (size >= msg_size); | ||
175 | memcpy (buf, mqm->mh, msg_size); | ||
176 | GNUNET_free (mqm); | ||
177 | state->th = NULL; | ||
178 | |||
179 | if (NULL != mq->msg_head) | ||
180 | { | ||
181 | mq->current_msg = mq->msg_head; | ||
182 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
183 | state->th = | ||
184 | GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, | ||
185 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
186 | &transmit_queued, mq); | ||
187 | } | ||
188 | return msg_size; | ||
189 | } | ||
190 | |||
191 | |||
192 | |||
193 | static void | ||
194 | server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
195 | { | ||
196 | struct ServerClientSocketState *state; | ||
197 | |||
198 | GNUNET_assert (NULL != mq); | ||
199 | state = mq->impl_state; | ||
200 | GNUNET_assert (NULL != state); | ||
201 | GNUNET_SERVER_client_drop (state->client); | ||
202 | GNUNET_free (state); | ||
203 | } | ||
204 | |||
205 | static void | ||
206 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
207 | { | ||
208 | struct ServerClientSocketState *state; | ||
209 | int msize; | ||
210 | |||
211 | GNUNET_assert (NULL != mq); | ||
212 | state = mq->impl_state; | ||
213 | GNUNET_assert (NULL != state); | ||
214 | |||
215 | if (NULL != state->th) | ||
216 | { | ||
217 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
218 | return; | ||
219 | } | ||
220 | GNUNET_assert (NULL == mq->msg_head); | ||
221 | GNUNET_assert (NULL == mq->current_msg); | ||
222 | msize = ntohs (mqm->mh->size); | ||
223 | mq->current_msg = mqm; | ||
224 | state->th = | ||
225 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | ||
226 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
227 | &transmit_queued, mq); | ||
228 | } | ||
229 | |||
230 | |||
231 | struct GNUNET_MQ_MessageQueue * | ||
232 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | ||
233 | { | ||
234 | struct GNUNET_MQ_MessageQueue *mq; | ||
235 | struct ServerClientSocketState *scss; | ||
236 | |||
237 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
238 | scss = GNUNET_new (struct ServerClientSocketState); | ||
239 | mq->impl_state = scss; | ||
240 | scss->client = client; | ||
241 | GNUNET_SERVER_client_keep (client); | ||
242 | mq->send_impl = server_client_send_impl; | ||
243 | mq->destroy_impl = server_client_destroy_impl; | ||
244 | return mq; | ||
245 | } | ||
246 | |||
247 | |||
248 | /** | ||
249 | * Type of a function to call when we receive a message | ||
250 | * from the service. | ||
251 | * | ||
252 | * @param cls closure | ||
253 | * @param msg message received, NULL on timeout or fatal error | ||
254 | */ | ||
255 | static void | ||
256 | handle_client_message (void *cls, | ||
257 | const struct GNUNET_MessageHeader *msg) | ||
258 | { | ||
259 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
260 | struct ClientConnectionState *state; | ||
261 | |||
262 | state = mq->impl_state; | ||
263 | |||
264 | if (NULL == msg) | ||
265 | { | ||
266 | if (NULL == mq->error_handler) | ||
267 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | ||
268 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
269 | return; | ||
270 | } | ||
271 | |||
272 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
273 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
274 | |||
275 | GNUNET_MQ_dispatch (mq, msg); | ||
276 | } | ||
277 | |||
278 | |||
279 | /** | ||
280 | * Transmit a queued message to the session's client. | ||
281 | * | ||
282 | * @param cls consensus session | ||
283 | * @param size number of bytes available in buf | ||
284 | * @param buf where the callee should write the message | ||
285 | * @return number of bytes written to buf | ||
286 | */ | ||
287 | static size_t | ||
288 | connection_client_transmit_queued (void *cls, size_t size, | ||
289 | void *buf) | ||
290 | { | ||
291 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
292 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | ||
293 | struct ClientConnectionState *state = mq->impl_state; | ||
294 | size_t msg_size; | ||
295 | |||
296 | if (NULL == buf) | ||
297 | { | ||
298 | if (NULL == mq->error_handler) | ||
299 | { | ||
300 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n"); | ||
301 | return 0; | ||
302 | } | ||
303 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
304 | return 0; | ||
305 | } | ||
306 | |||
307 | if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) | ||
308 | { | ||
309 | state->receive_active = GNUNET_YES; | ||
310 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | ||
311 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
312 | } | ||
313 | |||
314 | |||
315 | GNUNET_assert (NULL != mqm); | ||
316 | |||
317 | if (NULL != mqm->sent_cb) | ||
318 | { | ||
319 | mqm->sent_cb (mqm->sent_cls); | ||
320 | } | ||
321 | |||
322 | mq->current_msg = NULL; | ||
323 | GNUNET_assert (NULL != buf); | ||
324 | msg_size = ntohs (mqm->mh->size); | ||
325 | GNUNET_assert (size >= msg_size); | ||
326 | memcpy (buf, mqm->mh, msg_size); | ||
327 | GNUNET_free (mqm); | ||
328 | state->th = NULL; | ||
329 | if (NULL != mq->msg_head) | ||
330 | { | ||
331 | mq->current_msg = mq->msg_head; | ||
332 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
333 | state->th = | ||
334 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), | ||
335 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
336 | &connection_client_transmit_queued, mq); | ||
337 | } | ||
338 | return msg_size; | ||
339 | } | ||
340 | |||
341 | |||
342 | |||
343 | static void | ||
344 | connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | ||
345 | { | ||
346 | GNUNET_free (mq->impl_state); | ||
347 | } | ||
348 | |||
349 | static void | ||
350 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | ||
351 | struct GNUNET_MQ_Message *mqm) | ||
352 | { | ||
353 | struct ClientConnectionState *state = mq->impl_state; | ||
354 | int msize; | ||
355 | |||
356 | GNUNET_assert (NULL != state); | ||
357 | |||
358 | if (NULL != state->th) | ||
359 | { | ||
360 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
361 | return; | ||
362 | } | ||
363 | GNUNET_assert (NULL == mq->current_msg); | ||
364 | mq->current_msg = mqm; | ||
365 | msize = ntohs (mqm->mh->size); | ||
366 | state->th = | ||
367 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, | ||
368 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
369 | &connection_client_transmit_queued, mq); | ||
370 | } | ||
371 | |||
372 | |||
373 | |||
374 | |||
375 | |||
376 | struct GNUNET_MQ_MessageQueue * | ||
377 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | ||
378 | const struct GNUNET_MQ_Handler *handlers, | ||
379 | void *cls) | ||
380 | { | ||
381 | struct GNUNET_MQ_MessageQueue *mq; | ||
382 | struct ClientConnectionState *state; | ||
383 | |||
384 | GNUNET_assert (NULL != connection); | ||
385 | |||
386 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
387 | mq->handlers = handlers; | ||
388 | mq->handlers_cls = cls; | ||
389 | state = GNUNET_new (struct ClientConnectionState); | ||
390 | state->connection = connection; | ||
391 | mq->impl_state = state; | ||
392 | mq->send_impl = connection_client_send_impl; | ||
393 | mq->destroy_impl = connection_client_destroy_impl; | ||
394 | |||
395 | return mq; | ||
396 | } | ||
397 | |||
398 | |||
399 | void | ||
400 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | ||
401 | const struct GNUNET_MQ_Handler *new_handlers, | ||
402 | void *cls) | ||
403 | { | ||
404 | mq->handlers = new_handlers; | ||
405 | mq->handlers_cls = cls; | ||
406 | } | ||
407 | |||
408 | |||
409 | /** | ||
410 | * Associate the assoc_data in mq with a unique request id. | ||
411 | * | ||
412 | * @param mq message queue, id will be unique for the queue | ||
413 | * @param mqm message to associate | ||
414 | * @param assoc_data to associate | ||
415 | */ | ||
416 | uint32_t | ||
417 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | ||
418 | struct GNUNET_MQ_Message *mqm, | ||
419 | void *assoc_data) | ||
420 | { | ||
421 | uint32_t id; | ||
422 | |||
423 | if (NULL == mq->assoc_map) | ||
424 | { | ||
425 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); | ||
426 | mq->assoc_id = 1; | ||
427 | } | ||
428 | id = mq->assoc_id++; | ||
429 | GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, | ||
430 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
431 | return id; | ||
432 | } | ||
433 | |||
434 | |||
435 | |||
436 | void * | ||
437 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | ||
438 | { | ||
439 | if (NULL == mq->assoc_map) | ||
440 | return NULL; | ||
441 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | ||
442 | } | ||
443 | |||
444 | |||
445 | void * | ||
446 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | ||
447 | { | ||
448 | void *val; | ||
449 | |||
450 | if (NULL == mq->assoc_map) | ||
451 | return NULL; | ||
452 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | ||
453 | GNUNET_assert (NULL != val); | ||
454 | GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); | ||
455 | return val; | ||
456 | } | ||
457 | |||
458 | |||
459 | void | ||
460 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | ||
461 | GNUNET_MQ_NotifyCallback cb, | ||
462 | void *cls) | ||
463 | { | ||
464 | mqm->sent_cb = cb; | ||
465 | mqm->sent_cls = cls; | ||
466 | } | ||
467 | |||
468 | |||
469 | void | ||
470 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | ||
471 | { | ||
472 | /* FIXME: destroy all pending messages in the queue */ | ||
473 | |||
474 | if (NULL != mq->destroy_impl) | ||
475 | { | ||
476 | mq->destroy_impl (mq); | ||
477 | } | ||
478 | |||
479 | GNUNET_free (mq); | ||
480 | } | ||
481 | |||