aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c1335
1 files changed, 0 insertions, 1335 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
deleted file mode 100644
index b09837459..000000000
--- a/src/util/mq.c
+++ /dev/null
@@ -1,1335 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @author Florian Dold
23 * @file util/mq.c
24 * @brief general purpose request queue
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28
29#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32struct GNUNET_MQ_Envelope
33{
34 /**
35 * Messages are stored in a linked list.
36 * Each queue has its own list of envelopes.
37 */
38 struct GNUNET_MQ_Envelope *next;
39
40 /**
41 * Messages are stored in a linked list
42 * Each queue has its own list of envelopes.
43 */
44 struct GNUNET_MQ_Envelope *prev;
45
46 /**
47 * Actual allocated message header.
48 * The GNUNET_MQ_Envelope header is allocated at
49 * the end of the message.
50 */
51 struct GNUNET_MessageHeader *mh;
52
53 /**
54 * Queue the message is queued in, NULL if message is not queued.
55 */
56 struct GNUNET_MQ_Handle *parent_queue;
57
58 /**
59 * Called after the message was sent irrevocably.
60 */
61 GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63 /**
64 * Closure for @e send_cb
65 */
66 void *sent_cls;
67
68 /**
69 * Flags that were set for this envelope by
70 * #GNUNET_MQ_env_set_options(). Only valid if
71 * @e have_custom_options is set.
72 */
73 enum GNUNET_MQ_PriorityPreferences priority;
74
75 /**
76 * Did the application call #GNUNET_MQ_env_set_options()?
77 */
78 int have_custom_options;
79};
80
81
82/**
83 * Handle to a message queue.
84 */
85struct GNUNET_MQ_Handle
86{
87 /**
88 * Handlers array, or NULL if the queue should not receive messages
89 */
90 struct GNUNET_MQ_MessageHandler *handlers;
91
92 /**
93 * Actual implementation of message sending,
94 * called when a message is added
95 */
96 GNUNET_MQ_SendImpl send_impl;
97
98 /**
99 * Implementation-dependent queue destruction function
100 */
101 GNUNET_MQ_DestroyImpl destroy_impl;
102
103 /**
104 * Implementation-dependent send cancel function
105 */
106 GNUNET_MQ_CancelImpl cancel_impl;
107
108 /**
109 * Implementation-specific state
110 */
111 void *impl_state;
112
113 /**
114 * Callback will be called when an error occurs.
115 */
116 GNUNET_MQ_ErrorHandler error_handler;
117
118 /**
119 * Closure for the error handler.
120 */
121 void *error_handler_cls;
122
123 /**
124 * Task to asynchronously run #impl_send_continue().
125 */
126 struct GNUNET_SCHEDULER_Task *send_task;
127
128 /**
129 * Linked list of messages pending to be sent
130 */
131 struct GNUNET_MQ_Envelope *envelope_head;
132
133 /**
134 * Linked list of messages pending to be sent
135 */
136 struct GNUNET_MQ_Envelope *envelope_tail;
137
138 /**
139 * Message that is currently scheduled to be
140 * sent. Not the head of the message queue, as the implementation
141 * needs to know if sending has been already scheduled or not.
142 */
143 struct GNUNET_MQ_Envelope *current_envelope;
144
145 /**
146 * Map of associations, lazily allocated
147 */
148 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150 /**
151 * Functions to call on queue destruction; kept in a DLL.
152 */
153 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155 /**
156 * Functions to call on queue destruction; kept in a DLL.
157 */
158 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160 /**
161 * Flags that were set for this queue by
162 * #GNUNET_MQ_set_options(). Default is 0.
163 */
164 enum GNUNET_MQ_PriorityPreferences priority;
165
166 /**
167 * Next id that should be used for the @e assoc_map,
168 * initialized lazily to a random value together with
169 * @e assoc_map
170 */
171 uint32_t assoc_id;
172
173 /**
174 * Number of entries we have in the envelope-DLL.
175 */
176 unsigned int queue_length;
177
178 /**
179 * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
180 * FIXME: is this dead?
181 */
182 int evacuate_called;
183
184 /**
185 * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
186 */
187 int in_flight;
188};
189
190
191/**
192 * Call the message message handler that was registered
193 * for the type of the given message in the given message queue.
194 *
195 * This function is intended to be used for the implementation
196 * of message queues.
197 *
198 * @param mq message queue with the handlers
199 * @param mh message to dispatch
200 */
201void
202GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
203 const struct GNUNET_MessageHeader *mh)
204{
205 int ret;
206
207 ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208 if (GNUNET_SYSERR == ret)
209 {
210 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
211 return;
212 }
213}
214
215
216/**
217 * Call the message message handler that was registered
218 * for the type of the given message in the given @a handlers list.
219 *
220 * This function is intended to be used for the implementation
221 * of message queues.
222 *
223 * @param handlers a set of handlers
224 * @param mh message to dispatch
225 * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
226 * #GNUNET_SYSERR if message was rejected by check function
227 */
228int
229GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
230 const struct GNUNET_MessageHeader *mh)
231{
232 const struct GNUNET_MQ_MessageHandler *handler;
233 int handled = GNUNET_NO;
234 uint16_t msize = ntohs (mh->size);
235 uint16_t mtype = ntohs (mh->type);
236
237 LOG (GNUNET_ERROR_TYPE_DEBUG,
238 "Received message of type %u and size %u\n",
239 mtype,
240 msize);
241
242 if (NULL == handlers)
243 goto done;
244 for (handler = handlers; NULL != handler->cb; handler++)
245 {
246 if (handler->type == mtype)
247 {
248 handled = GNUNET_YES;
249 if ((handler->expected_size > msize) ||
250 ((handler->expected_size != msize) && (NULL == handler->mv)))
251 {
252 /* Too small, or not an exact size and
253 no 'mv' handler to check rest */
254 LOG (GNUNET_ERROR_TYPE_ERROR,
255 "Received malformed message of type %u\n",
256 (unsigned int) handler->type);
257 return GNUNET_SYSERR;
258 }
259 if ((NULL == handler->mv) ||
260 (GNUNET_OK == handler->mv (handler->cls, mh)))
261 {
262 /* message well-formed, pass to handler */
263 handler->cb (handler->cls, mh);
264 }
265 else
266 {
267 /* Message rejected by check routine */
268 LOG (GNUNET_ERROR_TYPE_ERROR,
269 "Received malformed message of type %u\n",
270 (unsigned int) handler->type);
271 return GNUNET_SYSERR;
272 }
273 break;
274 }
275 }
276done:
277 if (GNUNET_NO == handled)
278 {
279 LOG (GNUNET_ERROR_TYPE_INFO,
280 "No handler for message of type %u and size %u\n",
281 mtype,
282 msize);
283 return GNUNET_NO;
284 }
285 return GNUNET_OK;
286}
287
288
289/**
290 * Call the error handler of a message queue with the given
291 * error code. If there is no error handler, log a warning.
292 *
293 * This function is intended to be used by the implementation
294 * of message queues.
295 *
296 * @param mq message queue
297 * @param error the error type
298 */
299void
300GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
301 enum GNUNET_MQ_Error error)
302{
303 if (NULL == mq->error_handler)
304 {
305 LOG (GNUNET_ERROR_TYPE_WARNING,
306 "Got error %d, but no handler installed\n",
307 (int) error);
308 return;
309 }
310 mq->error_handler (mq->error_handler_cls,
311 error);
312}
313
314
315/**
316 * Discard the message queue message, free all
317 * allocated resources. Must be called in the event
318 * that a message is created but should not actually be sent.
319 *
320 * @param mqm the message to discard
321 */
322void
323GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
324{
325 GNUNET_assert (NULL == ev->parent_queue);
326 GNUNET_free (ev);
327}
328
329
330/**
331 * Obtain the current length of the message queue.
332 *
333 * @param mq queue to inspect
334 * @return number of queued, non-transmitted messages
335 */
336unsigned int
337GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
338{
339 if (GNUNET_YES != mq->in_flight)
340 {
341 return mq->queue_length;
342 }
343 return mq->queue_length - 1;
344}
345
346
347/**
348 * Send a message with the given message queue.
349 * May only be called once per message.
350 *
351 * @param mq message queue
352 * @param ev the envelope with the message to send.
353 */
354void
355GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
356 struct GNUNET_MQ_Envelope *ev)
357{
358 if (NULL == mq)
359 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
360 "mq is NUll when sending message of type %u\n",
361 (unsigned int) ntohs (ev->mh->type));
362 GNUNET_assert (NULL != mq);
363 GNUNET_assert (NULL == ev->parent_queue);
364
365 mq->queue_length++;
366 if (mq->queue_length >= 10000000)
367 {
368 /* This would seem like a bug... */
369 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
370 "MQ with %u entries extended by message of type %u (FC broken?)\n",
371 (unsigned int) mq->queue_length,
372 (unsigned int) ntohs (ev->mh->type));
373 }
374 ev->parent_queue = mq;
375 /* is the implementation busy? queue it! */
376 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
377 {
378 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
379 mq->envelope_tail,
380 ev);
381 return;
382 }
383 GNUNET_assert (NULL == mq->envelope_head);
384 mq->current_envelope = ev;
385
386 LOG (GNUNET_ERROR_TYPE_DEBUG,
387 "sending message of type %u, queue empty (MQ: %p)\n",
388 ntohs (ev->mh->type),
389 mq);
390
391 mq->send_impl (mq,
392 ev->mh,
393 mq->impl_state);
394}
395
396
397/**
398 * Remove the first envelope that has not yet been sent from the message
399 * queue and return it.
400 *
401 * @param mq queue to remove envelope from
402 * @return NULL if queue is empty (or has no envelope that is not under transmission)
403 */
404struct GNUNET_MQ_Envelope *
405GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
406{
407 struct GNUNET_MQ_Envelope *env;
408
409 env = mq->envelope_head;
410 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
411 mq->queue_length--;
412 env->parent_queue = NULL;
413 return env;
414}
415
416
417/**
418 * Function to copy an envelope. The envelope must not yet
419 * be in any queue or have any options or callbacks set.
420 *
421 * @param env envelope to copy
422 * @return copy of @a env
423 */
424struct GNUNET_MQ_Envelope *
425GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
426{
427 GNUNET_assert (NULL == env->next);
428 GNUNET_assert (NULL == env->parent_queue);
429 GNUNET_assert (NULL == env->sent_cb);
430 GNUNET_assert (GNUNET_NO == env->have_custom_options);
431 return GNUNET_MQ_msg_copy (env->mh);
432}
433
434
435/**
436 * Send a copy of a message with the given message queue.
437 * Can be called repeatedly on the same envelope.
438 *
439 * @param mq message queue
440 * @param ev the envelope with the message to send.
441 */
442void
443GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
444 const struct GNUNET_MQ_Envelope *ev)
445{
446 struct GNUNET_MQ_Envelope *env;
447 uint16_t msize;
448
449 msize = ntohs (ev->mh->size);
450 env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
451 env->mh = (struct GNUNET_MessageHeader *) &env[1];
452 env->sent_cb = ev->sent_cb;
453 env->sent_cls = ev->sent_cls;
454 GNUNET_memcpy (&env[1], ev->mh, msize);
455 GNUNET_MQ_send (mq, env);
456}
457
458
459/**
460 * Task run to call the send implementation for the next queued
461 * message, if any. Only useful for implementing message queues,
462 * results in undefined behavior if not used carefully.
463 *
464 * @param cls message queue to send the next message with
465 */
466static void
467impl_send_continue (void *cls)
468{
469 struct GNUNET_MQ_Handle *mq = cls;
470
471 mq->send_task = NULL;
472 /* call is only valid if we're actually currently sending
473 * a message */
474 if (NULL == mq->envelope_head)
475 return;
476 mq->current_envelope = mq->envelope_head;
477 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
478 mq->envelope_tail,
479 mq->current_envelope);
480
481 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "sending message of type %u from queue\n",
483 ntohs (mq->current_envelope->mh->type));
484
485 mq->send_impl (mq,
486 mq->current_envelope->mh,
487 mq->impl_state);
488}
489
490
491/**
492 * Call the send implementation for the next queued message, if any.
493 * Only useful for implementing message queues, results in undefined
494 * behavior if not used carefully.
495 *
496 * @param mq message queue to send the next message with
497 */
498void
499GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
500{
501 struct GNUNET_MQ_Envelope *current_envelope;
502 GNUNET_SCHEDULER_TaskCallback cb;
503
504 GNUNET_assert (0 < mq->queue_length);
505 mq->queue_length--;
506 mq->in_flight = GNUNET_NO;
507 current_envelope = mq->current_envelope;
508 current_envelope->parent_queue = NULL;
509 mq->current_envelope = NULL;
510 GNUNET_assert (NULL == mq->send_task);
511 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
512 if (NULL != (cb = current_envelope->sent_cb))
513 {
514 current_envelope->sent_cb = NULL;
515 cb (current_envelope->sent_cls);
516 }
517 GNUNET_free (current_envelope);
518}
519
520
521/**
522 * Call the send notification for the current message, but do not
523 * try to send the next message until #GNUNET_MQ_impl_send_continue
524 * is called.
525 *
526 * Only useful for implementing message queues, results in undefined
527 * behavior if not used carefully.
528 *
529 * @param mq message queue to send the next message with
530 */
531void
532GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
533{
534 struct GNUNET_MQ_Envelope *current_envelope;
535 GNUNET_SCHEDULER_TaskCallback cb;
536
537 mq->in_flight = GNUNET_YES;
538 /* call is only valid if we're actually currently sending
539 * a message */
540 current_envelope = mq->current_envelope;
541 GNUNET_assert (NULL != current_envelope);
542 /* can't call cancel from now on anymore */
543 current_envelope->parent_queue = NULL;
544 if (NULL != (cb = current_envelope->sent_cb))
545 {
546 current_envelope->sent_cb = NULL;
547 cb (current_envelope->sent_cls);
548 }
549}
550
551
552/**
553 * Create a message queue for the specified handlers.
554 *
555 * @param send function the implements sending messages
556 * @param destroy function that implements destroying the queue
557 * @param cancel function that implements canceling a message
558 * @param impl_state for the queue, passed to 'send' and 'destroy'
559 * @param handlers array of message handlers
560 * @param error_handler handler for read and write errors
561 * @param error_handler_cls closure for @a error_handler
562 * @return a new message queue
563 */
564struct GNUNET_MQ_Handle *
565GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
566 GNUNET_MQ_DestroyImpl destroy,
567 GNUNET_MQ_CancelImpl cancel,
568 void *impl_state,
569 const struct GNUNET_MQ_MessageHandler *handlers,
570 GNUNET_MQ_ErrorHandler error_handler,
571 void *error_handler_cls)
572{
573 struct GNUNET_MQ_Handle *mq;
574
575 mq = GNUNET_new (struct GNUNET_MQ_Handle);
576 mq->send_impl = send;
577 mq->destroy_impl = destroy;
578 mq->cancel_impl = cancel;
579 mq->handlers = GNUNET_MQ_copy_handlers (handlers);
580 mq->error_handler = error_handler;
581 mq->error_handler_cls = error_handler_cls;
582 mq->impl_state = impl_state;
583
584 return mq;
585}
586
587
588/**
589 * Change the closure argument in all of the `handlers` of the
590 * @a mq.
591 *
592 * @param mq to modify
593 * @param handlers_cls new closure to use
594 */
595void
596GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
597{
598 if (NULL == mq->handlers)
599 return;
600 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
601 mq->handlers[i].cls = handlers_cls;
602}
603
604
605/**
606 * Get the message that should currently be sent.
607 * Fails if there is no current message.
608 * Only useful for implementing message queues,
609 * results in undefined behavior if not used carefully.
610 *
611 * @param mq message queue with the current message
612 * @return message to send, never NULL
613 */
614const struct GNUNET_MessageHeader *
615GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
616{
617 GNUNET_assert (NULL != mq->current_envelope);
618 GNUNET_assert (NULL != mq->current_envelope->mh);
619 return mq->current_envelope->mh;
620}
621
622
623/**
624 * Get the implementation state associated with the
625 * message queue.
626 *
627 * While the GNUNET_MQ_Impl* callbacks receive the
628 * implementation state, continuations that are scheduled
629 * by the implementation function often only have one closure
630 * argument, with this function it is possible to get at the
631 * implementation state when only passing the GNUNET_MQ_Handle
632 * as closure.
633 *
634 * @param mq message queue with the current message
635 * @return message to send, never NULL
636 */
637void *
638GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
639{
640 return mq->impl_state;
641}
642
643
644struct GNUNET_MQ_Envelope *
645GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
646{
647 struct GNUNET_MQ_Envelope *ev;
648
649 ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
650 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
651 ev->mh->size = htons (size);
652 ev->mh->type = htons (type);
653 if (NULL != mhp)
654 *mhp = ev->mh;
655 return ev;
656}
657
658
659/**
660 * Create a new envelope by copying an existing message.
661 *
662 * @param hdr header of the message to copy
663 * @return envelope containing @a hdr
664 */
665struct GNUNET_MQ_Envelope *
666GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
667{
668 struct GNUNET_MQ_Envelope *mqm;
669 uint16_t size = ntohs (hdr->size);
670
671 mqm = GNUNET_malloc (sizeof(*mqm) + size);
672 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
673 GNUNET_memcpy (mqm->mh,
674 hdr,
675 size);
676 return mqm;
677}
678
679
680/**
681 * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
682 *
683 * @param mhp pointer to the message header pointer that will be changed to allocate at
684 * the newly allocated space for the message.
685 * @param base_size size of the data before the nested message
686 * @param type type of the message in the envelope
687 * @param nested_mh the message to append to the message after base_size
688 */
689struct GNUNET_MQ_Envelope *
690GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
691 uint16_t base_size,
692 uint16_t type,
693 const struct GNUNET_MessageHeader *nested_mh)
694{
695 struct GNUNET_MQ_Envelope *mqm;
696 uint16_t size;
697
698 if (NULL == nested_mh)
699 return GNUNET_MQ_msg_ (mhp, base_size, type);
700
701 size = base_size + ntohs (nested_mh->size);
702
703 /* check for uint16_t overflow */
704 if (size < base_size)
705 return NULL;
706
707 mqm = GNUNET_MQ_msg_ (mhp, size, type);
708 GNUNET_memcpy ((char *) mqm->mh + base_size,
709 nested_mh,
710 ntohs (nested_mh->size));
711
712 return mqm;
713}
714
715
716/**
717 * Associate the assoc_data in mq with a unique request id.
718 *
719 * @param mq message queue, id will be unique for the queue
720 * @param assoc_data to associate
721 */
722uint32_t
723GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
724{
725 uint32_t id;
726
727 if (NULL == mq->assoc_map)
728 {
729 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
730 mq->assoc_id = 1;
731 }
732 id = mq->assoc_id++;
733 GNUNET_assert (GNUNET_OK ==
734 GNUNET_CONTAINER_multihashmap32_put (
735 mq->assoc_map,
736 id,
737 assoc_data,
738 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
739 return id;
740}
741
742
743/**
744 * Get the data associated with a @a request_id in a queue
745 *
746 * @param mq the message queue with the association
747 * @param request_id the request id we are interested in
748 * @return the associated data
749 */
750void *
751GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
752{
753 if (NULL == mq->assoc_map)
754 return NULL;
755 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
756}
757
758
759/**
760 * Remove the association for a @a request_id
761 *
762 * @param mq the message queue with the association
763 * @param request_id the request id we want to remove
764 * @return the associated data
765 */
766void *
767GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
768{
769 void *val;
770
771 if (NULL == mq->assoc_map)
772 return NULL;
773 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
774 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
775 return val;
776}
777
778
779/**
780 * Call a callback once the envelope has been sent, that is,
781 * sending it can not be canceled anymore.
782 * There can be only one notify sent callback per envelope.
783 *
784 * @param ev message to call the notify callback for
785 * @param cb the notify callback
786 * @param cb_cls closure for the callback
787 */
788void
789GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
790 GNUNET_SCHEDULER_TaskCallback cb,
791 void *cb_cls)
792{
793 /* allow setting *OR* clearing callback */
794 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
795 ev->sent_cb = cb;
796 ev->sent_cls = cb_cls;
797}
798
799
800/**
801 * Handle we return for callbacks registered to be
802 * notified when #GNUNET_MQ_destroy() is called on a queue.
803 */
804struct GNUNET_MQ_DestroyNotificationHandle
805{
806 /**
807 * Kept in a DLL.
808 */
809 struct GNUNET_MQ_DestroyNotificationHandle *prev;
810
811 /**
812 * Kept in a DLL.
813 */
814 struct GNUNET_MQ_DestroyNotificationHandle *next;
815
816 /**
817 * Queue to notify about.
818 */
819 struct GNUNET_MQ_Handle *mq;
820
821 /**
822 * Function to call.
823 */
824 GNUNET_SCHEDULER_TaskCallback cb;
825
826 /**
827 * Closure for @e cb.
828 */
829 void *cb_cls;
830};
831
832
833/**
834 * Destroy the message queue.
835 *
836 * @param mq message queue to destroy
837 */
838void
839GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
840{
841 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
842
843 if (NULL != mq->destroy_impl)
844 {
845 mq->destroy_impl (mq, mq->impl_state);
846 }
847 if (NULL != mq->send_task)
848 {
849 GNUNET_SCHEDULER_cancel (mq->send_task);
850 mq->send_task = NULL;
851 }
852 while (NULL != mq->envelope_head)
853 {
854 struct GNUNET_MQ_Envelope *ev;
855
856 ev = mq->envelope_head;
857 ev->parent_queue = NULL;
858 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
859 GNUNET_assert (0 < mq->queue_length);
860 mq->queue_length--;
861 LOG (GNUNET_ERROR_TYPE_DEBUG,
862 "MQ destroy drops message of type %u\n",
863 ntohs (ev->mh->type));
864 GNUNET_MQ_discard (ev);
865 }
866 if (NULL != mq->current_envelope)
867 {
868 /* we can only discard envelopes that
869 * are not queued! */
870 mq->current_envelope->parent_queue = NULL;
871 LOG (GNUNET_ERROR_TYPE_DEBUG,
872 "MQ destroy drops current message of type %u\n",
873 ntohs (mq->current_envelope->mh->type));
874 GNUNET_MQ_discard (mq->current_envelope);
875 mq->current_envelope = NULL;
876 GNUNET_assert (0 < mq->queue_length);
877 mq->queue_length--;
878 }
879 GNUNET_assert (0 == mq->queue_length);
880 while (NULL != (dnh = mq->dnh_head))
881 {
882 dnh->cb (dnh->cb_cls);
883 GNUNET_MQ_destroy_notify_cancel (dnh);
884 }
885 if (NULL != mq->assoc_map)
886 {
887 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
888 mq->assoc_map = NULL;
889 }
890 GNUNET_free (mq->handlers);
891 GNUNET_free (mq);
892}
893
894
895const struct GNUNET_MessageHeader *
896GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
897 uint16_t base_size)
898{
899 uint16_t whole_size;
900 uint16_t nested_size;
901 const struct GNUNET_MessageHeader *nested_msg;
902
903 whole_size = ntohs (mh->size);
904 GNUNET_assert (whole_size >= base_size);
905 nested_size = whole_size - base_size;
906 if (0 == nested_size)
907 return NULL;
908 if (nested_size < sizeof(struct GNUNET_MessageHeader))
909 {
910 GNUNET_break_op (0);
911 return NULL;
912 }
913 nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
914 if (ntohs (nested_msg->size) != nested_size)
915 {
916 GNUNET_break_op (0);
917 return NULL;
918 }
919 return nested_msg;
920}
921
922
923/**
924 * Cancel sending the message. Message must have been sent with
925 * #GNUNET_MQ_send before. May not be called after the notify sent
926 * callback has been called
927 *
928 * @param ev queued envelope to cancel
929 */
930void
931GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
932{
933 struct GNUNET_MQ_Handle *mq = ev->parent_queue;
934
935 GNUNET_assert (NULL != mq);
936 GNUNET_assert (NULL != mq->cancel_impl);
937
938 mq->evacuate_called = GNUNET_NO;
939
940 if (mq->current_envelope == ev)
941 {
942 /* complex case, we already started with transmitting
943 the message using the callbacks. */
944 GNUNET_assert (GNUNET_NO == mq->in_flight);
945 GNUNET_assert (0 < mq->queue_length);
946 mq->queue_length--;
947 mq->cancel_impl (mq,
948 mq->impl_state);
949 /* continue sending the next message, if any */
950 mq->current_envelope = mq->envelope_head;
951 if (NULL != mq->current_envelope)
952 {
953 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
954 mq->envelope_tail,
955 mq->current_envelope);
956
957 LOG (GNUNET_ERROR_TYPE_DEBUG,
958 "sending canceled message of type %u queue\n",
959 ntohs (ev->mh->type));
960 mq->send_impl (mq,
961 mq->current_envelope->mh,
962 mq->impl_state);
963 }
964 }
965 else
966 {
967 /* simple case, message is still waiting in the queue */
968 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
969 mq->envelope_tail,
970 ev);
971 GNUNET_assert (0 < mq->queue_length);
972 mq->queue_length--;
973 }
974
975 if (GNUNET_YES != mq->evacuate_called)
976 {
977 ev->parent_queue = NULL;
978 ev->mh = NULL;
979 /* also frees ev */
980 GNUNET_free (ev);
981 }
982}
983
984
985/**
986 * Function to obtain the current envelope
987 * from within #GNUNET_MQ_SendImpl implementations.
988 *
989 * @param mq message queue to interrogate
990 * @return the current envelope
991 */
992struct GNUNET_MQ_Envelope *
993GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
994{
995 return mq->current_envelope;
996}
997
998
999/**
1000 * Function to obtain the last envelope in the queue.
1001 *
1002 * @param mq message queue to interrogate
1003 * @return the last envelope in the queue
1004 */
1005struct GNUNET_MQ_Envelope *
1006GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1007{
1008 if (NULL != mq->envelope_tail)
1009 return mq->envelope_tail;
1010
1011 return mq->current_envelope;
1012}
1013
1014
1015/**
1016 * Set application-specific preferences for this envelope.
1017 * Overrides the options set for the queue with
1018 * #GNUNET_MQ_set_options() for this message only.
1019 *
1020 * @param env message to set options for
1021 * @param pp priorities and preferences to apply
1022 */
1023void
1024GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1025 enum GNUNET_MQ_PriorityPreferences pp)
1026{
1027 env->priority = pp;
1028 env->have_custom_options = GNUNET_YES;
1029}
1030
1031
1032/**
1033 * Get application-specific options for this envelope.
1034 *
1035 * @param env message to set options for
1036 * @return priorities and preferences to apply for @a env
1037 */
1038enum GNUNET_MQ_PriorityPreferences
1039GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1040{
1041 struct GNUNET_MQ_Handle *mq = env->parent_queue;
1042
1043 if (GNUNET_YES == env->have_custom_options)
1044 return env->priority;
1045 if (NULL == mq)
1046 return 0;
1047 return mq->priority;
1048}
1049
1050
1051/**
1052 * Combine performance preferences set for different
1053 * envelopes that are being combined into one larger envelope.
1054 *
1055 * @param p1 one set of preferences
1056 * @param p2 second set of preferences
1057 * @return combined priority and preferences to use
1058 */
1059enum GNUNET_MQ_PriorityPreferences
1060GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1061 enum GNUNET_MQ_PriorityPreferences p2)
1062{
1063 enum GNUNET_MQ_PriorityPreferences ret;
1064
1065 ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1066 ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1067 ret |=
1068 ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1069 ret |=
1070 ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1071 ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1072 ret |=
1073 ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1074 return ret;
1075}
1076
1077
1078/**
1079 * Set application-specific default options for this queue.
1080 *
1081 * @param mq message queue to set options for
1082 * @param pp priorities and preferences to apply
1083 */
1084void
1085GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1086 enum GNUNET_MQ_PriorityPreferences pp)
1087{
1088 mq->priority = pp;
1089}
1090
1091
1092/**
1093 * Obtain message contained in envelope.
1094 *
1095 * @param env the envelope
1096 * @return message contained in the envelope
1097 */
1098const struct GNUNET_MessageHeader *
1099GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1100{
1101 return env->mh;
1102}
1103
1104
1105/**
1106 * Return next envelope in queue.
1107 *
1108 * @param env a queued envelope
1109 * @return next one, or NULL
1110 */
1111const struct GNUNET_MQ_Envelope *
1112GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1113{
1114 return env->next;
1115}
1116
1117
1118/**
1119 * Register function to be called whenever @a mq is being
1120 * destroyed.
1121 *
1122 * @param mq message queue to watch
1123 * @param cb function to call on @a mq destruction
1124 * @param cb_cls closure for @a cb
1125 * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1126 */
1127struct GNUNET_MQ_DestroyNotificationHandle *
1128GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1129 GNUNET_SCHEDULER_TaskCallback cb,
1130 void *cb_cls)
1131{
1132 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1133
1134 dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1135 dnh->mq = mq;
1136 dnh->cb = cb;
1137 dnh->cb_cls = cb_cls;
1138 GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1139 return dnh;
1140}
1141
1142
1143/**
1144 * Cancel registration from #GNUNET_MQ_destroy_notify().
1145 *
1146 * @param dnh handle for registration to cancel
1147 */
1148void
1149GNUNET_MQ_destroy_notify_cancel (struct
1150 GNUNET_MQ_DestroyNotificationHandle *dnh)
1151{
1152 struct GNUNET_MQ_Handle *mq = dnh->mq;
1153
1154 GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1155 GNUNET_free (dnh);
1156}
1157
1158
1159/**
1160 * Insert @a env into the envelope DLL starting at @a env_head
1161 * Note that @a env must not be in any MQ while this function
1162 * is used with DLLs defined outside of the MQ module. This
1163 * is just in case some application needs to also manage a
1164 * FIFO of envelopes independent of MQ itself and wants to
1165 * re-use the pointers internal to @a env. Use with caution.
1166 *
1167 * @param[in|out] env_head of envelope DLL
1168 * @param[in|out] env_tail tail of envelope DLL
1169 * @param[in|out] env element to insert at the tail
1170 */
1171void
1172GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
1173 struct GNUNET_MQ_Envelope **env_tail,
1174 struct GNUNET_MQ_Envelope *env)
1175{
1176 GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1177}
1178
1179
1180/**
1181 * Insert @a env into the envelope DLL starting at @a env_head
1182 * Note that @a env must not be in any MQ while this function
1183 * is used with DLLs defined outside of the MQ module. This
1184 * is just in case some application needs to also manage a
1185 * FIFO of envelopes independent of MQ itself and wants to
1186 * re-use the pointers internal to @a env. Use with caution.
1187 *
1188 * @param[in|out] env_head of envelope DLL
1189 * @param[in|out] env_tail tail of envelope DLL
1190 * @param[in|out] env element to insert at the tail
1191 */
1192void
1193GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1194 struct GNUNET_MQ_Envelope **env_tail,
1195 struct GNUNET_MQ_Envelope *env)
1196{
1197 GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1198}
1199
1200
1201/**
1202 * Remove @a env from the envelope DLL starting at @a env_head.
1203 * Note that @a env must not be in any MQ while this function
1204 * is used with DLLs defined outside of the MQ module. This
1205 * is just in case some application needs to also manage a
1206 * FIFO of envelopes independent of MQ itself and wants to
1207 * re-use the pointers internal to @a env. Use with caution.
1208 *
1209 * @param[in|out] env_head of envelope DLL
1210 * @param[in|out] env_tail tail of envelope DLL
1211 * @param[in|out] env element to remove from the DLL
1212 */
1213void
1214GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1215 struct GNUNET_MQ_Envelope **env_tail,
1216 struct GNUNET_MQ_Envelope *env)
1217{
1218 GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1219}
1220
1221
1222/**
1223 * Copy an array of handlers.
1224 *
1225 * Useful if the array has been declared in local memory and needs to be
1226 * persisted for future use.
1227 *
1228 * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1229 * @return A newly allocated array of handlers.
1230 * Needs to be freed with #GNUNET_free.
1231 */
1232struct GNUNET_MQ_MessageHandler *
1233GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1234{
1235 struct GNUNET_MQ_MessageHandler *copy;
1236 unsigned int count;
1237
1238 if (NULL == handlers)
1239 return NULL;
1240
1241 count = GNUNET_MQ_count_handlers (handlers);
1242 copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1243 GNUNET_memcpy (copy,
1244 handlers,
1245 count * sizeof(struct GNUNET_MQ_MessageHandler));
1246 return copy;
1247}
1248
1249
1250/**
1251 * Copy an array of handlers, appending AGPL handler.
1252 *
1253 * Useful if the array has been declared in local memory and needs to be
1254 * persisted for future use.
1255 *
1256 * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1257 * @param agpl_handler function to call for AGPL handling
1258 * @param agpl_cls closure for @a agpl_handler
1259 * @return A newly allocated array of handlers.
1260 * Needs to be freed with #GNUNET_free.
1261 */
1262struct GNUNET_MQ_MessageHandler *
1263GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1264 GNUNET_MQ_MessageCallback agpl_handler,
1265 void *agpl_cls)
1266{
1267 struct GNUNET_MQ_MessageHandler *copy;
1268 unsigned int count;
1269
1270 if (NULL == handlers)
1271 return NULL;
1272 count = GNUNET_MQ_count_handlers (handlers);
1273 copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1274 GNUNET_memcpy (copy,
1275 handlers,
1276 count * sizeof(struct GNUNET_MQ_MessageHandler));
1277 copy[count].mv = NULL;
1278 copy[count].cb = agpl_handler;
1279 copy[count].cls = agpl_cls;
1280 copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1281 copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1282 return copy;
1283}
1284
1285
1286/**
1287 * Count the handlers in a handler array.
1288 *
1289 * @param handlers Array of handlers to be counted.
1290 * @return The number of handlers in the array.
1291 */
1292unsigned int
1293GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1294{
1295 unsigned int i;
1296
1297 if (NULL == handlers)
1298 return 0;
1299
1300 for (i = 0; NULL != handlers[i].cb; i++)
1301 ;
1302
1303 return i;
1304}
1305
1306
1307/**
1308 * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1309 *
1310 * @param type the preference type
1311 * @return a string or NULL if invalid
1312 */
1313const char *
1314GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1315{
1316 switch (type)
1317 {
1318 case GNUNET_MQ_PREFERENCE_NONE:
1319 return "NONE";
1320
1321 case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1322 return "BANDWIDTH";
1323
1324 case GNUNET_MQ_PREFERENCE_LATENCY:
1325 return "LATENCY";
1326
1327 case GNUNET_MQ_PREFERENCE_RELIABILITY:
1328 return "RELIABILITY";
1329 }
1330 ;
1331 return NULL;
1332}
1333
1334
1335/* end of mq.c */