aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c1333
1 files changed, 0 insertions, 1333 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
deleted file mode 100644
index 9b59cd338..000000000
--- a/src/util/mq.c
+++ /dev/null
@@ -1,1333 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @author Florian Dold
23 * @file util/mq.c
24 * @brief general purpose request queue
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28
29#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32struct GNUNET_MQ_Envelope
33{
34 /**
35 * Messages are stored in a linked list.
36 * Each queue has its own list of envelopes.
37 */
38 struct GNUNET_MQ_Envelope *next;
39
40 /**
41 * Messages are stored in a linked list
42 * Each queue has its own list of envelopes.
43 */
44 struct GNUNET_MQ_Envelope *prev;
45
46 /**
47 * Actual allocated message header.
48 * The GNUNET_MQ_Envelope header is allocated at
49 * the end of the message.
50 */
51 struct GNUNET_MessageHeader *mh;
52
53 /**
54 * Queue the message is queued in, NULL if message is not queued.
55 */
56 struct GNUNET_MQ_Handle *parent_queue;
57
58 /**
59 * Called after the message was sent irrevocably.
60 */
61 GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63 /**
64 * Closure for @e send_cb
65 */
66 void *sent_cls;
67
68 /**
69 * Flags that were set for this envelope by
70 * #GNUNET_MQ_env_set_options(). Only valid if
71 * @e have_custom_options is set.
72 */
73 enum GNUNET_MQ_PriorityPreferences priority;
74
75 /**
76 * Did the application call #GNUNET_MQ_env_set_options()?
77 */
78 int have_custom_options;
79};
80
81
82/**
83 * Handle to a message queue.
84 */
85struct GNUNET_MQ_Handle
86{
87 /**
88 * Handlers array, or NULL if the queue should not receive messages
89 */
90 struct GNUNET_MQ_MessageHandler *handlers;
91
92 /**
93 * Actual implementation of message sending,
94 * called when a message is added
95 */
96 GNUNET_MQ_SendImpl send_impl;
97
98 /**
99 * Implementation-dependent queue destruction function
100 */
101 GNUNET_MQ_DestroyImpl destroy_impl;
102
103 /**
104 * Implementation-dependent send cancel function
105 */
106 GNUNET_MQ_CancelImpl cancel_impl;
107
108 /**
109 * Implementation-specific state
110 */
111 void *impl_state;
112
113 /**
114 * Callback will be called when an error occurs.
115 */
116 GNUNET_MQ_ErrorHandler error_handler;
117
118 /**
119 * Closure for the error handler.
120 */
121 void *error_handler_cls;
122
123 /**
124 * Task to asynchronously run #impl_send_continue().
125 */
126 struct GNUNET_SCHEDULER_Task *send_task;
127
128 /**
129 * Linked list of messages pending to be sent
130 */
131 struct GNUNET_MQ_Envelope *envelope_head;
132
133 /**
134 * Linked list of messages pending to be sent
135 */
136 struct GNUNET_MQ_Envelope *envelope_tail;
137
138 /**
139 * Message that is currently scheduled to be
140 * sent. Not the head of the message queue, as the implementation
141 * needs to know if sending has been already scheduled or not.
142 */
143 struct GNUNET_MQ_Envelope *current_envelope;
144
145 /**
146 * Map of associations, lazily allocated
147 */
148 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150 /**
151 * Functions to call on queue destruction; kept in a DLL.
152 */
153 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155 /**
156 * Functions to call on queue destruction; kept in a DLL.
157 */
158 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160 /**
161 * Flags that were set for this queue by
162 * #GNUNET_MQ_set_options(). Default is 0.
163 */
164 enum GNUNET_MQ_PriorityPreferences priority;
165
166 /**
167 * Next id that should be used for the @e assoc_map,
168 * initialized lazily to a random value together with
169 * @e assoc_map
170 */
171 uint32_t assoc_id;
172
173 /**
174 * Number of entries we have in the envelope-DLL.
175 */
176 unsigned int queue_length;
177
178 /**
179 * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
180 * FIXME: is this dead?
181 */
182 int evacuate_called;
183
184 /**
185 * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
186 */
187 int in_flight;
188};
189
190
191/**
192 * Call the message message handler that was registered
193 * for the type of the given message in the given message queue.
194 *
195 * This function is intended to be used for the implementation
196 * of message queues.
197 *
198 * @param mq message queue with the handlers
199 * @param mh message to dispatch
200 */
201void
202GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
203 const struct GNUNET_MessageHeader *mh)
204{
205 int ret;
206
207 ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208 if (GNUNET_SYSERR == ret)
209 {
210 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
211 return;
212 }
213}
214
215
216/**
217 * Call the message message handler that was registered
218 * for the type of the given message in the given @a handlers list.
219 *
220 * This function is intended to be used for the implementation
221 * of message queues.
222 *
223 * @param handlers a set of handlers
224 * @param mh message to dispatch
225 * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
226 * #GNUNET_SYSERR if message was rejected by check function
227 */
228int
229GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
230 const struct GNUNET_MessageHeader *mh)
231{
232 const struct GNUNET_MQ_MessageHandler *handler;
233 int handled = GNUNET_NO;
234 uint16_t msize = ntohs (mh->size);
235 uint16_t mtype = ntohs (mh->type);
236
237 LOG (GNUNET_ERROR_TYPE_DEBUG,
238 "Received message of type %u and size %u\n",
239 mtype,
240 msize);
241
242 if (NULL == handlers)
243 goto done;
244 for (handler = handlers; NULL != handler->cb; handler++)
245 {
246 if (handler->type == mtype)
247 {
248 handled = GNUNET_YES;
249 if ((handler->expected_size > msize) ||
250 ((handler->expected_size != msize) && (NULL == handler->mv)))
251 {
252 /* Too small, or not an exact size and
253 no 'mv' handler to check rest */
254 LOG (GNUNET_ERROR_TYPE_ERROR,
255 "Received malformed message of type %u\n",
256 (unsigned int) handler->type);
257 return GNUNET_SYSERR;
258 }
259 if ((NULL == handler->mv) ||
260 (GNUNET_OK == handler->mv (handler->cls, mh)))
261 {
262 /* message well-formed, pass to handler */
263 handler->cb (handler->cls, mh);
264 }
265 else
266 {
267 /* Message rejected by check routine */
268 LOG (GNUNET_ERROR_TYPE_ERROR,
269 "Received malformed message of type %u\n",
270 (unsigned int) handler->type);
271 return GNUNET_SYSERR;
272 }
273 break;
274 }
275 }
276 done:
277 if (GNUNET_NO == handled)
278 {
279 LOG (GNUNET_ERROR_TYPE_INFO,
280 "No handler for message of type %u and size %u\n",
281 mtype,
282 msize);
283 return GNUNET_NO;
284 }
285 return GNUNET_OK;
286}
287
288
289/**
290 * Call the error handler of a message queue with the given
291 * error code. If there is no error handler, log a warning.
292 *
293 * This function is intended to be used by the implementation
294 * of message queues.
295 *
296 * @param mq message queue
297 * @param error the error type
298 */
299void
300GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
301 enum GNUNET_MQ_Error error)
302{
303 if (NULL == mq->error_handler)
304 {
305 LOG (GNUNET_ERROR_TYPE_WARNING,
306 "Got error %d, but no handler installed\n",
307 (int) error);
308 return;
309 }
310 mq->error_handler (mq->error_handler_cls,
311 error);
312}
313
314
315/**
316 * Discard the message queue message, free all
317 * allocated resources. Must be called in the event
318 * that a message is created but should not actually be sent.
319 *
320 * @param mqm the message to discard
321 */
322void
323GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
324{
325 GNUNET_assert (NULL == ev->parent_queue);
326 GNUNET_free (ev);
327}
328
329
330/**
331 * Obtain the current length of the message queue.
332 *
333 * @param mq queue to inspect
334 * @return number of queued, non-transmitted messages
335 */
336unsigned int
337GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
338{
339 if (GNUNET_YES != mq->in_flight)
340 {
341 return mq->queue_length;
342 }
343 return mq->queue_length - 1;
344}
345
346
347/**
348 * Send a message with the given message queue.
349 * May only be called once per message.
350 *
351 * @param mq message queue
352 * @param ev the envelope with the message to send.
353 */
354void
355GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
356 struct GNUNET_MQ_Envelope *ev)
357{
358 if (NULL == mq)
359 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
360 "mq is NUll when sending message of type %u\n",
361 (unsigned int) ntohs (ev->mh->type));
362 GNUNET_assert (NULL != mq);
363 GNUNET_assert (NULL == ev->parent_queue);
364
365 mq->queue_length++;
366 if (mq->queue_length >= 10000000)
367 {
368 /* This would seem like a bug... */
369 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
370 "MQ with %u entries extended by message of type %u (FC broken?)\n",
371 (unsigned int) mq->queue_length,
372 (unsigned int) ntohs (ev->mh->type));
373 }
374 ev->parent_queue = mq;
375 /* is the implementation busy? queue it! */
376 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
377 {
378 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
379 mq->envelope_tail,
380 ev);
381 return;
382 }
383 GNUNET_assert (NULL == mq->envelope_head);
384 mq->current_envelope = ev;
385
386 LOG (GNUNET_ERROR_TYPE_DEBUG,
387 "sending message of type %u, queue empty (MQ: %p)\n",
388 ntohs (ev->mh->type),
389 mq);
390
391 mq->send_impl (mq,
392 ev->mh,
393 mq->impl_state);
394}
395
396
397/**
398 * Remove the first envelope that has not yet been sent from the message
399 * queue and return it.
400 *
401 * @param mq queue to remove envelope from
402 * @return NULL if queue is empty (or has no envelope that is not under transmission)
403 */
404struct GNUNET_MQ_Envelope *
405GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
406{
407 struct GNUNET_MQ_Envelope *env;
408
409 env = mq->envelope_head;
410 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
411 mq->queue_length--;
412 env->parent_queue = NULL;
413 return env;
414}
415
416
417/**
418 * Function to copy an envelope. The envelope must not yet
419 * be in any queue or have any options or callbacks set.
420 *
421 * @param env envelope to copy
422 * @return copy of @a env
423 */
424struct GNUNET_MQ_Envelope *
425GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
426{
427 GNUNET_assert (NULL == env->next);
428 GNUNET_assert (NULL == env->parent_queue);
429 GNUNET_assert (NULL == env->sent_cb);
430 GNUNET_assert (GNUNET_NO == env->have_custom_options);
431 return GNUNET_MQ_msg_copy (env->mh);
432}
433
434
435/**
436 * Send a copy of a message with the given message queue.
437 * Can be called repeatedly on the same envelope.
438 *
439 * @param mq message queue
440 * @param ev the envelope with the message to send.
441 */
442void
443GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
444 const struct GNUNET_MQ_Envelope *ev)
445{
446 struct GNUNET_MQ_Envelope *env;
447 uint16_t msize;
448
449 msize = ntohs (ev->mh->size);
450 env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
451 env->mh = (struct GNUNET_MessageHeader *) &env[1];
452 env->sent_cb = ev->sent_cb;
453 env->sent_cls = ev->sent_cls;
454 GNUNET_memcpy (&env[1], ev->mh, msize);
455 GNUNET_MQ_send (mq, env);
456}
457
458
459/**
460 * Task run to call the send implementation for the next queued
461 * message, if any. Only useful for implementing message queues,
462 * results in undefined behavior if not used carefully.
463 *
464 * @param cls message queue to send the next message with
465 */
466static void
467impl_send_continue (void *cls)
468{
469 struct GNUNET_MQ_Handle *mq = cls;
470
471 mq->send_task = NULL;
472 /* call is only valid if we're actually currently sending
473 * a message */
474 if (NULL == mq->envelope_head)
475 return;
476 mq->current_envelope = mq->envelope_head;
477 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
478 mq->envelope_tail,
479 mq->current_envelope);
480
481 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "sending message of type %u from queue\n",
483 ntohs (mq->current_envelope->mh->type));
484
485 mq->send_impl (mq,
486 mq->current_envelope->mh,
487 mq->impl_state);
488}
489
490
491/**
492 * Call the send implementation for the next queued message, if any.
493 * Only useful for implementing message queues, results in undefined
494 * behavior if not used carefully.
495 *
496 * @param mq message queue to send the next message with
497 */
498void
499GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
500{
501 struct GNUNET_MQ_Envelope *current_envelope;
502 GNUNET_SCHEDULER_TaskCallback cb;
503
504 GNUNET_assert (0 < mq->queue_length);
505 mq->queue_length--;
506 mq->in_flight = GNUNET_NO;
507 current_envelope = mq->current_envelope;
508 current_envelope->parent_queue = NULL;
509 mq->current_envelope = NULL;
510 GNUNET_assert (NULL == mq->send_task);
511 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
512 if (NULL != (cb = current_envelope->sent_cb))
513 {
514 current_envelope->sent_cb = NULL;
515 cb (current_envelope->sent_cls);
516 }
517 GNUNET_free (current_envelope);
518}
519
520
521/**
522 * Call the send notification for the current message, but do not
523 * try to send the next message until #GNUNET_MQ_impl_send_continue
524 * is called.
525 *
526 * Only useful for implementing message queues, results in undefined
527 * behavior if not used carefully.
528 *
529 * @param mq message queue to send the next message with
530 */
531void
532GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
533{
534 struct GNUNET_MQ_Envelope *current_envelope;
535 GNUNET_SCHEDULER_TaskCallback cb;
536
537 mq->in_flight = GNUNET_YES;
538 /* call is only valid if we're actually currently sending
539 * a message */
540 current_envelope = mq->current_envelope;
541 GNUNET_assert (NULL != current_envelope);
542 /* can't call cancel from now on anymore */
543 current_envelope->parent_queue = NULL;
544 if (NULL != (cb = current_envelope->sent_cb))
545 {
546 current_envelope->sent_cb = NULL;
547 cb (current_envelope->sent_cls);
548 }
549}
550
551
552/**
553 * Create a message queue for the specified handlers.
554 *
555 * @param send function the implements sending messages
556 * @param destroy function that implements destroying the queue
557 * @param cancel function that implements canceling a message
558 * @param impl_state for the queue, passed to 'send' and 'destroy'
559 * @param handlers array of message handlers
560 * @param error_handler handler for read and write errors
561 * @param error_handler_cls closure for @a error_handler
562 * @return a new message queue
563 */
564struct GNUNET_MQ_Handle *
565GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
566 GNUNET_MQ_DestroyImpl destroy,
567 GNUNET_MQ_CancelImpl cancel,
568 void *impl_state,
569 const struct GNUNET_MQ_MessageHandler *handlers,
570 GNUNET_MQ_ErrorHandler error_handler,
571 void *error_handler_cls)
572{
573 struct GNUNET_MQ_Handle *mq;
574
575 mq = GNUNET_new (struct GNUNET_MQ_Handle);
576 mq->send_impl = send;
577 mq->destroy_impl = destroy;
578 mq->cancel_impl = cancel;
579 mq->handlers = GNUNET_MQ_copy_handlers (handlers);
580 mq->error_handler = error_handler;
581 mq->error_handler_cls = error_handler_cls;
582 mq->impl_state = impl_state;
583
584 return mq;
585}
586
587
588/**
589 * Change the closure argument in all of the `handlers` of the
590 * @a mq.
591 *
592 * @param mq to modify
593 * @param handlers_cls new closure to use
594 */
595void
596GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
597{
598 if (NULL == mq->handlers)
599 return;
600 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
601 mq->handlers[i].cls = handlers_cls;
602}
603
604
605/**
606 * Get the message that should currently be sent.
607 * Fails if there is no current message.
608 * Only useful for implementing message queues,
609 * results in undefined behavior if not used carefully.
610 *
611 * @param mq message queue with the current message
612 * @return message to send, never NULL
613 */
614const struct GNUNET_MessageHeader *
615GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
616{
617 GNUNET_assert (NULL != mq->current_envelope);
618 GNUNET_assert (NULL != mq->current_envelope->mh);
619 return mq->current_envelope->mh;
620}
621
622
623/**
624 * Get the implementation state associated with the
625 * message queue.
626 *
627 * While the GNUNET_MQ_Impl* callbacks receive the
628 * implementation state, continuations that are scheduled
629 * by the implementation function often only have one closure
630 * argument, with this function it is possible to get at the
631 * implementation state when only passing the GNUNET_MQ_Handle
632 * as closure.
633 *
634 * @param mq message queue with the current message
635 * @return message to send, never NULL
636 */
637void *
638GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
639{
640 return mq->impl_state;
641}
642
643
644struct GNUNET_MQ_Envelope *
645GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
646{
647 struct GNUNET_MQ_Envelope *ev;
648
649 ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
650 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
651 ev->mh->size = htons (size);
652 ev->mh->type = htons (type);
653 if (NULL != mhp)
654 *mhp = ev->mh;
655 return ev;
656}
657
658
659/**
660 * Create a new envelope by copying an existing message.
661 *
662 * @param hdr header of the message to copy
663 * @return envelope containing @a hdr
664 */
665struct GNUNET_MQ_Envelope *
666GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
667{
668 struct GNUNET_MQ_Envelope *mqm;
669 uint16_t size = ntohs (hdr->size);
670
671 mqm = GNUNET_malloc (sizeof(*mqm) + size);
672 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
673 GNUNET_memcpy (mqm->mh, hdr, size);
674 return mqm;
675}
676
677
678/**
679 * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
680 *
681 * @param mhp pointer to the message header pointer that will be changed to allocate at
682 * the newly allocated space for the message.
683 * @param base_size size of the data before the nested message
684 * @param type type of the message in the envelope
685 * @param nested_mh the message to append to the message after base_size
686 */
687struct GNUNET_MQ_Envelope *
688GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
689 uint16_t base_size,
690 uint16_t type,
691 const struct GNUNET_MessageHeader *nested_mh)
692{
693 struct GNUNET_MQ_Envelope *mqm;
694 uint16_t size;
695
696 if (NULL == nested_mh)
697 return GNUNET_MQ_msg_ (mhp, base_size, type);
698
699 size = base_size + ntohs (nested_mh->size);
700
701 /* check for uint16_t overflow */
702 if (size < base_size)
703 return NULL;
704
705 mqm = GNUNET_MQ_msg_ (mhp, size, type);
706 GNUNET_memcpy ((char *) mqm->mh + base_size,
707 nested_mh,
708 ntohs (nested_mh->size));
709
710 return mqm;
711}
712
713
714/**
715 * Associate the assoc_data in mq with a unique request id.
716 *
717 * @param mq message queue, id will be unique for the queue
718 * @param assoc_data to associate
719 */
720uint32_t
721GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
722{
723 uint32_t id;
724
725 if (NULL == mq->assoc_map)
726 {
727 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
728 mq->assoc_id = 1;
729 }
730 id = mq->assoc_id++;
731 GNUNET_assert (GNUNET_OK ==
732 GNUNET_CONTAINER_multihashmap32_put (
733 mq->assoc_map,
734 id,
735 assoc_data,
736 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
737 return id;
738}
739
740
741/**
742 * Get the data associated with a @a request_id in a queue
743 *
744 * @param mq the message queue with the association
745 * @param request_id the request id we are interested in
746 * @return the associated data
747 */
748void *
749GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
750{
751 if (NULL == mq->assoc_map)
752 return NULL;
753 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
754}
755
756
757/**
758 * Remove the association for a @a request_id
759 *
760 * @param mq the message queue with the association
761 * @param request_id the request id we want to remove
762 * @return the associated data
763 */
764void *
765GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
766{
767 void *val;
768
769 if (NULL == mq->assoc_map)
770 return NULL;
771 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
772 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
773 return val;
774}
775
776
777/**
778 * Call a callback once the envelope has been sent, that is,
779 * sending it can not be canceled anymore.
780 * There can be only one notify sent callback per envelope.
781 *
782 * @param ev message to call the notify callback for
783 * @param cb the notify callback
784 * @param cb_cls closure for the callback
785 */
786void
787GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
788 GNUNET_SCHEDULER_TaskCallback cb,
789 void *cb_cls)
790{
791 /* allow setting *OR* clearing callback */
792 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
793 ev->sent_cb = cb;
794 ev->sent_cls = cb_cls;
795}
796
797
798/**
799 * Handle we return for callbacks registered to be
800 * notified when #GNUNET_MQ_destroy() is called on a queue.
801 */
802struct GNUNET_MQ_DestroyNotificationHandle
803{
804 /**
805 * Kept in a DLL.
806 */
807 struct GNUNET_MQ_DestroyNotificationHandle *prev;
808
809 /**
810 * Kept in a DLL.
811 */
812 struct GNUNET_MQ_DestroyNotificationHandle *next;
813
814 /**
815 * Queue to notify about.
816 */
817 struct GNUNET_MQ_Handle *mq;
818
819 /**
820 * Function to call.
821 */
822 GNUNET_SCHEDULER_TaskCallback cb;
823
824 /**
825 * Closure for @e cb.
826 */
827 void *cb_cls;
828};
829
830
831/**
832 * Destroy the message queue.
833 *
834 * @param mq message queue to destroy
835 */
836void
837GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
838{
839 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
840
841 if (NULL != mq->destroy_impl)
842 {
843 mq->destroy_impl (mq, mq->impl_state);
844 }
845 if (NULL != mq->send_task)
846 {
847 GNUNET_SCHEDULER_cancel (mq->send_task);
848 mq->send_task = NULL;
849 }
850 while (NULL != mq->envelope_head)
851 {
852 struct GNUNET_MQ_Envelope *ev;
853
854 ev = mq->envelope_head;
855 ev->parent_queue = NULL;
856 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
857 GNUNET_assert (0 < mq->queue_length);
858 mq->queue_length--;
859 LOG (GNUNET_ERROR_TYPE_DEBUG,
860 "MQ destroy drops message of type %u\n",
861 ntohs (ev->mh->type));
862 GNUNET_MQ_discard (ev);
863 }
864 if (NULL != mq->current_envelope)
865 {
866 /* we can only discard envelopes that
867 * are not queued! */
868 mq->current_envelope->parent_queue = NULL;
869 LOG (GNUNET_ERROR_TYPE_DEBUG,
870 "MQ destroy drops current message of type %u\n",
871 ntohs (mq->current_envelope->mh->type));
872 GNUNET_MQ_discard (mq->current_envelope);
873 mq->current_envelope = NULL;
874 GNUNET_assert (0 < mq->queue_length);
875 mq->queue_length--;
876 }
877 GNUNET_assert (0 == mq->queue_length);
878 while (NULL != (dnh = mq->dnh_head))
879 {
880 dnh->cb (dnh->cb_cls);
881 GNUNET_MQ_destroy_notify_cancel (dnh);
882 }
883 if (NULL != mq->assoc_map)
884 {
885 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
886 mq->assoc_map = NULL;
887 }
888 GNUNET_free (mq->handlers);
889 GNUNET_free (mq);
890}
891
892
893const struct GNUNET_MessageHeader *
894GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
895 uint16_t base_size)
896{
897 uint16_t whole_size;
898 uint16_t nested_size;
899 const struct GNUNET_MessageHeader *nested_msg;
900
901 whole_size = ntohs (mh->size);
902 GNUNET_assert (whole_size >= base_size);
903 nested_size = whole_size - base_size;
904 if (0 == nested_size)
905 return NULL;
906 if (nested_size < sizeof(struct GNUNET_MessageHeader))
907 {
908 GNUNET_break_op (0);
909 return NULL;
910 }
911 nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
912 if (ntohs (nested_msg->size) != nested_size)
913 {
914 GNUNET_break_op (0);
915 return NULL;
916 }
917 return nested_msg;
918}
919
920
921/**
922 * Cancel sending the message. Message must have been sent with
923 * #GNUNET_MQ_send before. May not be called after the notify sent
924 * callback has been called
925 *
926 * @param ev queued envelope to cancel
927 */
928void
929GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
930{
931 struct GNUNET_MQ_Handle *mq = ev->parent_queue;
932
933 GNUNET_assert (NULL != mq);
934 GNUNET_assert (NULL != mq->cancel_impl);
935
936 mq->evacuate_called = GNUNET_NO;
937
938 if (mq->current_envelope == ev)
939 {
940 /* complex case, we already started with transmitting
941 the message using the callbacks. */
942 GNUNET_assert (GNUNET_NO == mq->in_flight);
943 GNUNET_assert (0 < mq->queue_length);
944 mq->queue_length--;
945 mq->cancel_impl (mq,
946 mq->impl_state);
947 /* continue sending the next message, if any */
948 mq->current_envelope = mq->envelope_head;
949 if (NULL != mq->current_envelope)
950 {
951 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
952 mq->envelope_tail,
953 mq->current_envelope);
954
955 LOG (GNUNET_ERROR_TYPE_DEBUG,
956 "sending canceled message of type %u queue\n",
957 ntohs (ev->mh->type));
958 mq->send_impl (mq,
959 mq->current_envelope->mh,
960 mq->impl_state);
961 }
962 }
963 else
964 {
965 /* simple case, message is still waiting in the queue */
966 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
967 mq->envelope_tail,
968 ev);
969 GNUNET_assert (0 < mq->queue_length);
970 mq->queue_length--;
971 }
972
973 if (GNUNET_YES != mq->evacuate_called)
974 {
975 ev->parent_queue = NULL;
976 ev->mh = NULL;
977 /* also frees ev */
978 GNUNET_free (ev);
979 }
980}
981
982
983/**
984 * Function to obtain the current envelope
985 * from within #GNUNET_MQ_SendImpl implementations.
986 *
987 * @param mq message queue to interrogate
988 * @return the current envelope
989 */
990struct GNUNET_MQ_Envelope *
991GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
992{
993 return mq->current_envelope;
994}
995
996
997/**
998 * Function to obtain the last envelope in the queue.
999 *
1000 * @param mq message queue to interrogate
1001 * @return the last envelope in the queue
1002 */
1003struct GNUNET_MQ_Envelope *
1004GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1005{
1006 if (NULL != mq->envelope_tail)
1007 return mq->envelope_tail;
1008
1009 return mq->current_envelope;
1010}
1011
1012
1013/**
1014 * Set application-specific preferences for this envelope.
1015 * Overrides the options set for the queue with
1016 * #GNUNET_MQ_set_options() for this message only.
1017 *
1018 * @param env message to set options for
1019 * @param pp priorities and preferences to apply
1020 */
1021void
1022GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1023 enum GNUNET_MQ_PriorityPreferences pp)
1024{
1025 env->priority = pp;
1026 env->have_custom_options = GNUNET_YES;
1027}
1028
1029
1030/**
1031 * Get application-specific options for this envelope.
1032 *
1033 * @param env message to set options for
1034 * @return priorities and preferences to apply for @a env
1035 */
1036enum GNUNET_MQ_PriorityPreferences
1037GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1038{
1039 struct GNUNET_MQ_Handle *mq = env->parent_queue;
1040
1041 if (GNUNET_YES == env->have_custom_options)
1042 return env->priority;
1043 if (NULL == mq)
1044 return 0;
1045 return mq->priority;
1046}
1047
1048
1049/**
1050 * Combine performance preferences set for different
1051 * envelopes that are being combined into one larger envelope.
1052 *
1053 * @param p1 one set of preferences
1054 * @param p2 second set of preferences
1055 * @return combined priority and preferences to use
1056 */
1057enum GNUNET_MQ_PriorityPreferences
1058GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1059 enum GNUNET_MQ_PriorityPreferences p2)
1060{
1061 enum GNUNET_MQ_PriorityPreferences ret;
1062
1063 ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1064 ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1065 ret |=
1066 ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1067 ret |=
1068 ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1069 ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1070 ret |=
1071 ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1072 return ret;
1073}
1074
1075
1076/**
1077 * Set application-specific default options for this queue.
1078 *
1079 * @param mq message queue to set options for
1080 * @param pp priorities and preferences to apply
1081 */
1082void
1083GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1084 enum GNUNET_MQ_PriorityPreferences pp)
1085{
1086 mq->priority = pp;
1087}
1088
1089
1090/**
1091 * Obtain message contained in envelope.
1092 *
1093 * @param env the envelope
1094 * @return message contained in the envelope
1095 */
1096const struct GNUNET_MessageHeader *
1097GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1098{
1099 return env->mh;
1100}
1101
1102
1103/**
1104 * Return next envelope in queue.
1105 *
1106 * @param env a queued envelope
1107 * @return next one, or NULL
1108 */
1109const struct GNUNET_MQ_Envelope *
1110GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1111{
1112 return env->next;
1113}
1114
1115
1116/**
1117 * Register function to be called whenever @a mq is being
1118 * destroyed.
1119 *
1120 * @param mq message queue to watch
1121 * @param cb function to call on @a mq destruction
1122 * @param cb_cls closure for @a cb
1123 * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1124 */
1125struct GNUNET_MQ_DestroyNotificationHandle *
1126GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1127 GNUNET_SCHEDULER_TaskCallback cb,
1128 void *cb_cls)
1129{
1130 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1131
1132 dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1133 dnh->mq = mq;
1134 dnh->cb = cb;
1135 dnh->cb_cls = cb_cls;
1136 GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1137 return dnh;
1138}
1139
1140
1141/**
1142 * Cancel registration from #GNUNET_MQ_destroy_notify().
1143 *
1144 * @param dnh handle for registration to cancel
1145 */
1146void
1147GNUNET_MQ_destroy_notify_cancel (struct
1148 GNUNET_MQ_DestroyNotificationHandle *dnh)
1149{
1150 struct GNUNET_MQ_Handle *mq = dnh->mq;
1151
1152 GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1153 GNUNET_free (dnh);
1154}
1155
1156
1157/**
1158 * Insert @a env into the envelope DLL starting at @a env_head
1159 * Note that @a env must not be in any MQ while this function
1160 * is used with DLLs defined outside of the MQ module. This
1161 * is just in case some application needs to also manage a
1162 * FIFO of envelopes independent of MQ itself and wants to
1163 * re-use the pointers internal to @a env. Use with caution.
1164 *
1165 * @param[in|out] env_head of envelope DLL
1166 * @param[in|out] env_tail tail of envelope DLL
1167 * @param[in|out] env element to insert at the tail
1168 */
1169void
1170GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
1171 struct GNUNET_MQ_Envelope **env_tail,
1172 struct GNUNET_MQ_Envelope *env)
1173{
1174 GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1175}
1176
1177
1178/**
1179 * Insert @a env into the envelope DLL starting at @a env_head
1180 * Note that @a env must not be in any MQ while this function
1181 * is used with DLLs defined outside of the MQ module. This
1182 * is just in case some application needs to also manage a
1183 * FIFO of envelopes independent of MQ itself and wants to
1184 * re-use the pointers internal to @a env. Use with caution.
1185 *
1186 * @param[in|out] env_head of envelope DLL
1187 * @param[in|out] env_tail tail of envelope DLL
1188 * @param[in|out] env element to insert at the tail
1189 */
1190void
1191GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1192 struct GNUNET_MQ_Envelope **env_tail,
1193 struct GNUNET_MQ_Envelope *env)
1194{
1195 GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1196}
1197
1198
1199/**
1200 * Remove @a env from the envelope DLL starting at @a env_head.
1201 * Note that @a env must not be in any MQ while this function
1202 * is used with DLLs defined outside of the MQ module. This
1203 * is just in case some application needs to also manage a
1204 * FIFO of envelopes independent of MQ itself and wants to
1205 * re-use the pointers internal to @a env. Use with caution.
1206 *
1207 * @param[in|out] env_head of envelope DLL
1208 * @param[in|out] env_tail tail of envelope DLL
1209 * @param[in|out] env element to remove from the DLL
1210 */
1211void
1212GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1213 struct GNUNET_MQ_Envelope **env_tail,
1214 struct GNUNET_MQ_Envelope *env)
1215{
1216 GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1217}
1218
1219
1220/**
1221 * Copy an array of handlers.
1222 *
1223 * Useful if the array has been declared in local memory and needs to be
1224 * persisted for future use.
1225 *
1226 * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1227 * @return A newly allocated array of handlers.
1228 * Needs to be freed with #GNUNET_free.
1229 */
1230struct GNUNET_MQ_MessageHandler *
1231GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1232{
1233 struct GNUNET_MQ_MessageHandler *copy;
1234 unsigned int count;
1235
1236 if (NULL == handlers)
1237 return NULL;
1238
1239 count = GNUNET_MQ_count_handlers (handlers);
1240 copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1241 GNUNET_memcpy (copy,
1242 handlers,
1243 count * sizeof(struct GNUNET_MQ_MessageHandler));
1244 return copy;
1245}
1246
1247
1248/**
1249 * Copy an array of handlers, appending AGPL handler.
1250 *
1251 * Useful if the array has been declared in local memory and needs to be
1252 * persisted for future use.
1253 *
1254 * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1255 * @param agpl_handler function to call for AGPL handling
1256 * @param agpl_cls closure for @a agpl_handler
1257 * @return A newly allocated array of handlers.
1258 * Needs to be freed with #GNUNET_free.
1259 */
1260struct GNUNET_MQ_MessageHandler *
1261GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1262 GNUNET_MQ_MessageCallback agpl_handler,
1263 void *agpl_cls)
1264{
1265 struct GNUNET_MQ_MessageHandler *copy;
1266 unsigned int count;
1267
1268 if (NULL == handlers)
1269 return NULL;
1270 count = GNUNET_MQ_count_handlers (handlers);
1271 copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1272 GNUNET_memcpy (copy,
1273 handlers,
1274 count * sizeof(struct GNUNET_MQ_MessageHandler));
1275 copy[count].mv = NULL;
1276 copy[count].cb = agpl_handler;
1277 copy[count].cls = agpl_cls;
1278 copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1279 copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1280 return copy;
1281}
1282
1283
1284/**
1285 * Count the handlers in a handler array.
1286 *
1287 * @param handlers Array of handlers to be counted.
1288 * @return The number of handlers in the array.
1289 */
1290unsigned int
1291GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1292{
1293 unsigned int i;
1294
1295 if (NULL == handlers)
1296 return 0;
1297
1298 for (i = 0; NULL != handlers[i].cb; i++)
1299 ;
1300
1301 return i;
1302}
1303
1304
1305/**
1306 * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1307 *
1308 * @param type the preference type
1309 * @return a string or NULL if invalid
1310 */
1311const char *
1312GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1313{
1314 switch (type)
1315 {
1316 case GNUNET_MQ_PREFERENCE_NONE:
1317 return "NONE";
1318
1319 case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1320 return "BANDWIDTH";
1321
1322 case GNUNET_MQ_PREFERENCE_LATENCY:
1323 return "LATENCY";
1324
1325 case GNUNET_MQ_PREFERENCE_RELIABILITY:
1326 return "RELIABILITY";
1327 }
1328 ;
1329 return NULL;
1330}
1331
1332
1333/* end of mq.c */