aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c481
1 files changed, 481 insertions, 0 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
new file mode 100644
index 000000000..36cacd30b
--- /dev/null
+++ b/src/util/mq.c
@@ -0,0 +1,481 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @author Florian Dold
23 * @file util/mq.c
24 * @brief general purpose request queue
25 */
26
27#include "platform.h"
28#include "gnunet_common.h"
29#include "gnunet_util_lib.h"
30
31#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32
33
34
35struct ServerClientSocketState
36{
37 struct GNUNET_SERVER_Client *client;
38 struct GNUNET_SERVER_TransmitHandle* th;
39};
40
41
42struct ClientConnectionState
43{
44 /**
45 * Did we call receive?
46 */
47 int receive_active;
48 struct GNUNET_CLIENT_Connection *connection;
49 struct GNUNET_CLIENT_TransmitHandle *th;
50};
51
52
53
54
55/**
56 * Call the right callback for a message.
57 *
58 * @param mq message queue with the handlers
59 * @param mh message to dispatch
60 */
61void
62GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
63{
64 const struct GNUNET_MQ_Handler *handler;
65 int handled = GNUNET_NO;
66
67 handler = mq->handlers;
68 if (NULL == handler)
69 return;
70 for (; NULL != handler->cb; handler++)
71 {
72 if (handler->type == ntohs (mh->type))
73 {
74 handler->cb (mq->handlers_cls, mh);
75 handled = GNUNET_YES;
76 }
77 }
78
79 if (GNUNET_NO == handled)
80 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
81}
82
83
84void
85GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
86{
87 GNUNET_assert (NULL == mqm->parent_queue);
88 GNUNET_free (mqm);
89}
90
91
92/**
93 * Send a message with the give message queue.
94 * May only be called once per message.
95 *
96 * @param mq message queue
97 * @param mqm the message to send.
98 */
99void
100GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
101{
102 GNUNET_assert (NULL != mq);
103 mq->send_impl (mq, mqm);
104}
105
106
107struct GNUNET_MQ_Message *
108GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
109{
110 struct GNUNET_MQ_Message *mqm;
111
112 mqm = GNUNET_malloc (sizeof *mqm + size);
113 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
114 mqm->mh->size = htons (size);
115 mqm->mh->type = htons (type);
116 if (NULL != mhp)
117 *mhp = mqm->mh;
118 return mqm;
119}
120
121
122int
123GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
124 const void *data, uint16_t len)
125{
126 size_t new_size;
127 size_t old_size;
128
129 GNUNET_assert (NULL != mqmp);
130 /* there's no data to append => do nothing */
131 if (NULL == data)
132 return GNUNET_OK;
133 old_size = ntohs ((*mqmp)->mh->size);
134 /* message too large to concatenate? */
135 if (((uint16_t) (old_size + len)) < len)
136 return GNUNET_SYSERR;
137 new_size = old_size + len;
138 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
139 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
140 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
141 (*mqmp)->mh->size = htons (new_size);
142 return GNUNET_OK;
143}
144
145
146
147
148/*** Transmit a queued message to the session's client.
149 *
150 * @param cls consensus session
151 * @param size number of bytes available in buf
152 * @param buf where the callee should write the message
153 * @return number of bytes written to buf
154 */
155static size_t
156transmit_queued (void *cls, size_t size,
157 void *buf)
158{
159 struct GNUNET_MQ_MessageQueue *mq = cls;
160 struct GNUNET_MQ_Message *mqm = mq->current_msg;
161 struct ServerClientSocketState *state = mq->impl_state;
162 size_t msg_size;
163
164 GNUNET_assert (NULL != buf);
165
166 if (NULL != mqm->sent_cb)
167 {
168 mqm->sent_cb (mqm->sent_cls);
169 }
170
171 mq->current_msg = NULL;
172 GNUNET_assert (NULL != mqm);
173 msg_size = ntohs (mqm->mh->size);
174 GNUNET_assert (size >= msg_size);
175 memcpy (buf, mqm->mh, msg_size);
176 GNUNET_free (mqm);
177 state->th = NULL;
178
179 if (NULL != mq->msg_head)
180 {
181 mq->current_msg = mq->msg_head;
182 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
183 state->th =
184 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
185 GNUNET_TIME_UNIT_FOREVER_REL,
186 &transmit_queued, mq);
187 }
188 return msg_size;
189}
190
191
192
193static void
194server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
195{
196 struct ServerClientSocketState *state;
197
198 GNUNET_assert (NULL != mq);
199 state = mq->impl_state;
200 GNUNET_assert (NULL != state);
201 GNUNET_SERVER_client_drop (state->client);
202 GNUNET_free (state);
203}
204
205static void
206server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
207{
208 struct ServerClientSocketState *state;
209 int msize;
210
211 GNUNET_assert (NULL != mq);
212 state = mq->impl_state;
213 GNUNET_assert (NULL != state);
214
215 if (NULL != state->th)
216 {
217 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
218 return;
219 }
220 GNUNET_assert (NULL == mq->msg_head);
221 GNUNET_assert (NULL == mq->current_msg);
222 msize = ntohs (mqm->mh->size);
223 mq->current_msg = mqm;
224 state->th =
225 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
226 GNUNET_TIME_UNIT_FOREVER_REL,
227 &transmit_queued, mq);
228}
229
230
231struct GNUNET_MQ_MessageQueue *
232GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
233{
234 struct GNUNET_MQ_MessageQueue *mq;
235 struct ServerClientSocketState *scss;
236
237 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
238 scss = GNUNET_new (struct ServerClientSocketState);
239 mq->impl_state = scss;
240 scss->client = client;
241 GNUNET_SERVER_client_keep (client);
242 mq->send_impl = server_client_send_impl;
243 mq->destroy_impl = server_client_destroy_impl;
244 return mq;
245}
246
247
248/**
249 * Type of a function to call when we receive a message
250 * from the service.
251 *
252 * @param cls closure
253 * @param msg message received, NULL on timeout or fatal error
254 */
255static void
256handle_client_message (void *cls,
257 const struct GNUNET_MessageHeader *msg)
258{
259 struct GNUNET_MQ_MessageQueue *mq = cls;
260 struct ClientConnectionState *state;
261
262 state = mq->impl_state;
263
264 if (NULL == msg)
265 {
266 if (NULL == mq->error_handler)
267 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
268 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
269 return;
270 }
271
272 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
273 GNUNET_TIME_UNIT_FOREVER_REL);
274
275 GNUNET_MQ_dispatch (mq, msg);
276}
277
278
279/**
280 * Transmit a queued message to the session's client.
281 *
282 * @param cls consensus session
283 * @param size number of bytes available in buf
284 * @param buf where the callee should write the message
285 * @return number of bytes written to buf
286 */
287static size_t
288connection_client_transmit_queued (void *cls, size_t size,
289 void *buf)
290{
291 struct GNUNET_MQ_MessageQueue *mq = cls;
292 struct GNUNET_MQ_Message *mqm = mq->current_msg;
293 struct ClientConnectionState *state = mq->impl_state;
294 size_t msg_size;
295
296 if (NULL == buf)
297 {
298 if (NULL == mq->error_handler)
299 {
300 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
301 return 0;
302 }
303 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
304 return 0;
305 }
306
307 if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
308 {
309 state->receive_active = GNUNET_YES;
310 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
311 GNUNET_TIME_UNIT_FOREVER_REL);
312 }
313
314
315 GNUNET_assert (NULL != mqm);
316
317 if (NULL != mqm->sent_cb)
318 {
319 mqm->sent_cb (mqm->sent_cls);
320 }
321
322 mq->current_msg = NULL;
323 GNUNET_assert (NULL != buf);
324 msg_size = ntohs (mqm->mh->size);
325 GNUNET_assert (size >= msg_size);
326 memcpy (buf, mqm->mh, msg_size);
327 GNUNET_free (mqm);
328 state->th = NULL;
329 if (NULL != mq->msg_head)
330 {
331 mq->current_msg = mq->msg_head;
332 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
333 state->th =
334 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
335 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
336 &connection_client_transmit_queued, mq);
337 }
338 return msg_size;
339}
340
341
342
343static void
344connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
345{
346 GNUNET_free (mq->impl_state);
347}
348
349static void
350connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
351 struct GNUNET_MQ_Message *mqm)
352{
353 struct ClientConnectionState *state = mq->impl_state;
354 int msize;
355
356 GNUNET_assert (NULL != state);
357
358 if (NULL != state->th)
359 {
360 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
361 return;
362 }
363 GNUNET_assert (NULL == mq->current_msg);
364 mq->current_msg = mqm;
365 msize = ntohs (mqm->mh->size);
366 state->th =
367 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
368 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
369 &connection_client_transmit_queued, mq);
370}
371
372
373
374
375
376struct GNUNET_MQ_MessageQueue *
377GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
378 const struct GNUNET_MQ_Handler *handlers,
379 void *cls)
380{
381 struct GNUNET_MQ_MessageQueue *mq;
382 struct ClientConnectionState *state;
383
384 GNUNET_assert (NULL != connection);
385
386 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
387 mq->handlers = handlers;
388 mq->handlers_cls = cls;
389 state = GNUNET_new (struct ClientConnectionState);
390 state->connection = connection;
391 mq->impl_state = state;
392 mq->send_impl = connection_client_send_impl;
393 mq->destroy_impl = connection_client_destroy_impl;
394
395 return mq;
396}
397
398
399void
400GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
401 const struct GNUNET_MQ_Handler *new_handlers,
402 void *cls)
403{
404 mq->handlers = new_handlers;
405 mq->handlers_cls = cls;
406}
407
408
409/**
410 * Associate the assoc_data in mq with a unique request id.
411 *
412 * @param mq message queue, id will be unique for the queue
413 * @param mqm message to associate
414 * @param assoc_data to associate
415 */
416uint32_t
417GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
418 struct GNUNET_MQ_Message *mqm,
419 void *assoc_data)
420{
421 uint32_t id;
422
423 if (NULL == mq->assoc_map)
424 {
425 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
426 mq->assoc_id = 1;
427 }
428 id = mq->assoc_id++;
429 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
430 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
431 return id;
432}
433
434
435
436void *
437GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
438{
439 if (NULL == mq->assoc_map)
440 return NULL;
441 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
442}
443
444
445void *
446GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
447{
448 void *val;
449
450 if (NULL == mq->assoc_map)
451 return NULL;
452 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
453 GNUNET_assert (NULL != val);
454 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
455 return val;
456}
457
458
459void
460GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
461 GNUNET_MQ_NotifyCallback cb,
462 void *cls)
463{
464 mqm->sent_cb = cb;
465 mqm->sent_cls = cls;
466}
467
468
469void
470GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
471{
472 /* FIXME: destroy all pending messages in the queue */
473
474 if (NULL != mq->destroy_impl)
475 {
476 mq->destroy_impl (mq);
477 }
478
479 GNUNET_free (mq);
480}
481