aboutsummaryrefslogtreecommitdiff
path: root/src/lib/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/util/mq.c')
-rw-r--r--src/lib/util/mq.c1030
1 files changed, 1030 insertions, 0 deletions
diff --git a/src/lib/util/mq.c b/src/lib/util/mq.c
new file mode 100644
index 000000000..de0cff0c2
--- /dev/null
+++ b/src/lib/util/mq.c
@@ -0,0 +1,1030 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @author Florian Dold
23 * @file util/mq.c
24 * @brief general purpose request queue
25 */
26
27#include "platform.h"
28#include "gnunet_util_lib.h"
29
30#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
31
32
33struct GNUNET_MQ_Envelope
34{
35 /**
36 * Messages are stored in a linked list.
37 * Each queue has its own list of envelopes.
38 */
39 struct GNUNET_MQ_Envelope *next;
40
41 /**
42 * Messages are stored in a linked list
43 * Each queue has its own list of envelopes.
44 */
45 struct GNUNET_MQ_Envelope *prev;
46
47 /**
48 * Actual allocated message header.
49 * The GNUNET_MQ_Envelope header is allocated at
50 * the end of the message.
51 */
52 struct GNUNET_MessageHeader *mh;
53
54 /**
55 * Queue the message is queued in, NULL if message is not queued.
56 */
57 struct GNUNET_MQ_Handle *parent_queue;
58
59 /**
60 * Called after the message was sent irrevocably.
61 */
62 GNUNET_SCHEDULER_TaskCallback sent_cb;
63
64 /**
65 * Closure for @e send_cb
66 */
67 void *sent_cls;
68
69 /**
70 * Flags that were set for this envelope by
71 * #GNUNET_MQ_env_set_options(). Only valid if
72 * @e have_custom_options is set.
73 */
74 enum GNUNET_MQ_PriorityPreferences priority;
75
76 /**
77 * Did the application call #GNUNET_MQ_env_set_options()?
78 */
79 int have_custom_options;
80};
81
82
83/**
84 * Handle to a message queue.
85 */
86struct GNUNET_MQ_Handle
87{
88 /**
89 * Handlers array, or NULL if the queue should not receive messages
90 */
91 struct GNUNET_MQ_MessageHandler *handlers;
92
93 /**
94 * Actual implementation of message sending,
95 * called when a message is added
96 */
97 GNUNET_MQ_SendImpl send_impl;
98
99 /**
100 * Implementation-dependent queue destruction function
101 */
102 GNUNET_MQ_DestroyImpl destroy_impl;
103
104 /**
105 * Implementation-dependent send cancel function
106 */
107 GNUNET_MQ_CancelImpl cancel_impl;
108
109 /**
110 * Implementation-specific state
111 */
112 void *impl_state;
113
114 /**
115 * Callback will be called when an error occurs.
116 */
117 GNUNET_MQ_ErrorHandler error_handler;
118
119 /**
120 * Closure for the error handler.
121 */
122 void *error_handler_cls;
123
124 /**
125 * Task to asynchronously run #impl_send_continue().
126 */
127 struct GNUNET_SCHEDULER_Task *send_task;
128
129 /**
130 * Linked list of messages pending to be sent
131 */
132 struct GNUNET_MQ_Envelope *envelope_head;
133
134 /**
135 * Linked list of messages pending to be sent
136 */
137 struct GNUNET_MQ_Envelope *envelope_tail;
138
139 /**
140 * Message that is currently scheduled to be
141 * sent. Not the head of the message queue, as the implementation
142 * needs to know if sending has been already scheduled or not.
143 */
144 struct GNUNET_MQ_Envelope *current_envelope;
145
146 /**
147 * Map of associations, lazily allocated
148 */
149 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
150
151 /**
152 * Functions to call on queue destruction; kept in a DLL.
153 */
154 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
155
156 /**
157 * Functions to call on queue destruction; kept in a DLL.
158 */
159 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
160
161 /**
162 * Flags that were set for this queue by
163 * #GNUNET_MQ_set_options(). Default is 0.
164 */
165 enum GNUNET_MQ_PriorityPreferences priority;
166
167 /**
168 * Next id that should be used for the @e assoc_map,
169 * initialized lazily to a random value together with
170 * @e assoc_map
171 */
172 uint32_t assoc_id;
173
174 /**
175 * Number of entries we have in the envelope-DLL.
176 */
177 unsigned int queue_length;
178
179 /**
180 * True if GNUNET_MQ_impl_send_in_flight() was called.
181 */
182 bool in_flight;
183};
184
185
186void
187GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
188 const struct GNUNET_MessageHeader *mh)
189{
190 enum GNUNET_GenericReturnValue ret;
191
192 ret = GNUNET_MQ_handle_message (mq->handlers,
193 mh);
194 if (GNUNET_SYSERR == ret)
195 {
196 GNUNET_break_op (0);
197 GNUNET_MQ_inject_error (mq,
198 GNUNET_MQ_ERROR_MALFORMED);
199 return;
200 }
201}
202
203
204enum GNUNET_GenericReturnValue
205GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
206 const struct GNUNET_MessageHeader *mh)
207{
208 bool handled = false;
209 uint16_t msize = ntohs (mh->size);
210 uint16_t mtype = ntohs (mh->type);
211
212 LOG (GNUNET_ERROR_TYPE_DEBUG,
213 "Received message of type %u and size %u\n",
214 mtype,
215 msize);
216 if (NULL == handlers)
217 goto done;
218 for (const struct GNUNET_MQ_MessageHandler *handler = handlers;
219 NULL != handler->cb;
220 handler++)
221 {
222 if (handler->type == mtype)
223 {
224 handled = true;
225 if ( (handler->expected_size > msize) ||
226 ( (handler->expected_size != msize) &&
227 (NULL == handler->mv) ) )
228 {
229 /* Too small, or not an exact size and
230 no 'mv' handler to check rest */
231 LOG (GNUNET_ERROR_TYPE_ERROR,
232 "Received malformed message of type %u\n",
233 (unsigned int) handler->type);
234 return GNUNET_SYSERR;
235 }
236 if ( (NULL == handler->mv) ||
237 (GNUNET_OK ==
238 handler->mv (handler->cls,
239 mh)) )
240 {
241 /* message well-formed, pass to handler */
242 handler->cb (handler->cls, mh);
243 }
244 else
245 {
246 /* Message rejected by check routine */
247 LOG (GNUNET_ERROR_TYPE_ERROR,
248 "Received malformed message of type %u\n",
249 (unsigned int) handler->type);
250 return GNUNET_SYSERR;
251 }
252 break;
253 }
254 }
255done:
256 if (! handled)
257 {
258 LOG (GNUNET_ERROR_TYPE_INFO,
259 "No handler for message of type %u and size %u\n",
260 mtype,
261 msize);
262 return GNUNET_NO;
263 }
264 return GNUNET_OK;
265}
266
267
268void
269GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
270 enum GNUNET_MQ_Error error)
271{
272 if (NULL == mq->error_handler)
273 {
274 LOG (GNUNET_ERROR_TYPE_WARNING,
275 "Got error %d, but no handler installed\n",
276 (int) error);
277 return;
278 }
279 mq->error_handler (mq->error_handler_cls,
280 error);
281}
282
283
284void
285GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
286{
287 GNUNET_assert (NULL == ev->parent_queue);
288 GNUNET_free (ev);
289}
290
291
292unsigned int
293GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
294{
295 if (! mq->in_flight)
296 {
297 return mq->queue_length;
298 }
299 return mq->queue_length - 1;
300}
301
302
303void
304GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
305 struct GNUNET_MQ_Envelope *ev)
306{
307 GNUNET_assert (NULL != mq);
308 GNUNET_assert (NULL == ev->parent_queue);
309
310 mq->queue_length++;
311 if (mq->queue_length >= 10000000)
312 {
313 /* This would seem like a bug... */
314 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
315 "MQ with %u entries extended by message of type %u (FC broken?)\n",
316 (unsigned int) mq->queue_length,
317 (unsigned int) ntohs (ev->mh->type));
318 }
319 ev->parent_queue = mq;
320 /* is the implementation busy? queue it! */
321 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
322 {
323 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
324 mq->envelope_tail,
325 ev);
326 return;
327 }
328 GNUNET_assert (NULL == mq->envelope_head);
329 mq->current_envelope = ev;
330
331 LOG (GNUNET_ERROR_TYPE_DEBUG,
332 "sending message of type %u and size %u, queue empty (MQ: %p)\n",
333 ntohs (ev->mh->type),
334 ntohs (ev->mh->size),
335 mq);
336
337 mq->send_impl (mq,
338 ev->mh,
339 mq->impl_state);
340}
341
342
343struct GNUNET_MQ_Envelope *
344GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
345{
346 struct GNUNET_MQ_Envelope *env;
347
348 env = mq->envelope_head;
349 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
350 mq->envelope_tail,
351 env);
352 mq->queue_length--;
353 env->parent_queue = NULL;
354 return env;
355}
356
357
358struct GNUNET_MQ_Envelope *
359GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
360{
361 GNUNET_assert (NULL == env->next);
362 GNUNET_assert (NULL == env->parent_queue);
363 GNUNET_assert (NULL == env->sent_cb);
364 GNUNET_assert (GNUNET_NO == env->have_custom_options);
365 return GNUNET_MQ_msg_copy (env->mh);
366}
367
368
369void
370GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
371 const struct GNUNET_MQ_Envelope *ev)
372{
373 struct GNUNET_MQ_Envelope *env;
374 uint16_t msize;
375
376 msize = ntohs (ev->mh->size);
377 env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
378 env->mh = (struct GNUNET_MessageHeader *) &env[1];
379 env->sent_cb = ev->sent_cb;
380 env->sent_cls = ev->sent_cls;
381 GNUNET_memcpy (&env[1], ev->mh, msize);
382 GNUNET_MQ_send (mq, env);
383}
384
385
386/**
387 * Task run to call the send implementation for the next queued
388 * message, if any. Only useful for implementing message queues,
389 * results in undefined behavior if not used carefully.
390 *
391 * @param cls message queue to send the next message with
392 */
393static void
394impl_send_continue (void *cls)
395{
396 struct GNUNET_MQ_Handle *mq = cls;
397
398 mq->send_task = NULL;
399 /* call is only valid if we're actually currently sending
400 * a message */
401 if (NULL == mq->envelope_head)
402 return;
403 mq->current_envelope = mq->envelope_head;
404 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
405 mq->envelope_tail,
406 mq->current_envelope);
407
408 LOG (GNUNET_ERROR_TYPE_DEBUG,
409 "sending message of type %u and size %u from queue (MQ: %p)\n",
410 ntohs (mq->current_envelope->mh->type),
411 ntohs (mq->current_envelope->mh->size),
412 mq);
413
414 mq->send_impl (mq,
415 mq->current_envelope->mh,
416 mq->impl_state);
417}
418
419
420void
421GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
422{
423 struct GNUNET_MQ_Envelope *current_envelope;
424 GNUNET_SCHEDULER_TaskCallback cb;
425
426 GNUNET_assert (0 < mq->queue_length);
427 mq->queue_length--;
428 mq->in_flight = false;
429 current_envelope = mq->current_envelope;
430 current_envelope->parent_queue = NULL;
431 mq->current_envelope = NULL;
432 GNUNET_assert (NULL == mq->send_task);
433 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
434 if (NULL != (cb = current_envelope->sent_cb))
435 {
436 current_envelope->sent_cb = NULL;
437 cb (current_envelope->sent_cls);
438 }
439 GNUNET_free (current_envelope);
440}
441
442
443void
444GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
445{
446 struct GNUNET_MQ_Envelope *current_envelope;
447 GNUNET_SCHEDULER_TaskCallback cb;
448
449 mq->in_flight = true;
450 /* call is only valid if we're actually currently sending
451 * a message */
452 current_envelope = mq->current_envelope;
453 GNUNET_assert (NULL != current_envelope);
454 /* can't call cancel from now on anymore */
455 current_envelope->parent_queue = NULL;
456 if (NULL != (cb = current_envelope->sent_cb))
457 {
458 current_envelope->sent_cb = NULL;
459 cb (current_envelope->sent_cls);
460 }
461}
462
463
464struct GNUNET_MQ_Handle *
465GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
466 GNUNET_MQ_DestroyImpl destroy,
467 GNUNET_MQ_CancelImpl cancel,
468 void *impl_state,
469 const struct GNUNET_MQ_MessageHandler *handlers,
470 GNUNET_MQ_ErrorHandler error_handler,
471 void *error_handler_cls)
472{
473 struct GNUNET_MQ_Handle *mq;
474
475 mq = GNUNET_new (struct GNUNET_MQ_Handle);
476 mq->send_impl = send;
477 mq->destroy_impl = destroy;
478 mq->cancel_impl = cancel;
479 mq->handlers = GNUNET_MQ_copy_handlers (handlers);
480 mq->error_handler = error_handler;
481 mq->error_handler_cls = error_handler_cls;
482 mq->impl_state = impl_state;
483
484 return mq;
485}
486
487
488void
489GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
490 void *handlers_cls)
491{
492 if (NULL == mq->handlers)
493 return;
494 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
495 mq->handlers[i].cls = handlers_cls;
496}
497
498
499const struct GNUNET_MessageHeader *
500GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
501{
502 GNUNET_assert (NULL != mq->current_envelope);
503 GNUNET_assert (NULL != mq->current_envelope->mh);
504 return mq->current_envelope->mh;
505}
506
507
508void *
509GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
510{
511 return mq->impl_state;
512}
513
514
515struct GNUNET_MQ_Envelope *
516GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
517 uint16_t size,
518 uint16_t type)
519{
520 struct GNUNET_MQ_Envelope *ev;
521
522 ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
523 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
524 ev->mh->size = htons (size);
525 ev->mh->type = htons (type);
526 if (NULL != mhp)
527 *mhp = ev->mh;
528 return ev;
529}
530
531
532struct GNUNET_MQ_Envelope *
533GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
534{
535 struct GNUNET_MQ_Envelope *mqm;
536 uint16_t size = ntohs (hdr->size);
537
538 mqm = GNUNET_malloc (sizeof(*mqm) + size);
539 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
540 GNUNET_memcpy (mqm->mh,
541 hdr,
542 size);
543 return mqm;
544}
545
546
547struct GNUNET_MQ_Envelope *
548GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
549 uint16_t base_size,
550 uint16_t type,
551 const struct GNUNET_MessageHeader *nested_mh)
552{
553 struct GNUNET_MQ_Envelope *mqm;
554 uint16_t size;
555
556 if (NULL == nested_mh)
557 return GNUNET_MQ_msg_ (mhp,
558 base_size,
559 type);
560 size = base_size + ntohs (nested_mh->size);
561 /* check for uint16_t overflow */
562 if (size < base_size)
563 return NULL;
564 mqm = GNUNET_MQ_msg_ (mhp,
565 size,
566 type);
567 GNUNET_memcpy ((char *) mqm->mh + base_size,
568 nested_mh,
569 ntohs (nested_mh->size));
570 return mqm;
571}
572
573
574uint32_t
575GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
576 void *assoc_data)
577{
578 uint32_t id;
579
580 if (NULL == mq->assoc_map)
581 {
582 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
583 mq->assoc_id = 1;
584 }
585 id = mq->assoc_id++;
586 GNUNET_assert (GNUNET_OK ==
587 GNUNET_CONTAINER_multihashmap32_put (
588 mq->assoc_map,
589 id,
590 assoc_data,
591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592 return id;
593}
594
595
596/**
597 * Get the data associated with a @a request_id in a queue
598 *
599 * @param mq the message queue with the association
600 * @param request_id the request id we are interested in
601 * @return the associated data
602 */
603void *
604GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
605 uint32_t request_id)
606{
607 if (NULL == mq->assoc_map)
608 return NULL;
609 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
610 request_id);
611}
612
613
614/**
615 * Remove the association for a @a request_id
616 *
617 * @param mq the message queue with the association
618 * @param request_id the request id we want to remove
619 * @return the associated data
620 */
621void *
622GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
623 uint32_t request_id)
624{
625 void *val;
626
627 if (NULL == mq->assoc_map)
628 return NULL;
629 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
630 request_id);
631 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
632 request_id);
633 return val;
634}
635
636
637void
638GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
639 GNUNET_SCHEDULER_TaskCallback cb,
640 void *cb_cls)
641{
642 /* allow setting *OR* clearing callback */
643 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
644 ev->sent_cb = cb;
645 ev->sent_cls = cb_cls;
646}
647
648
649/**
650 * Handle we return for callbacks registered to be
651 * notified when #GNUNET_MQ_destroy() is called on a queue.
652 */
653struct GNUNET_MQ_DestroyNotificationHandle
654{
655 /**
656 * Kept in a DLL.
657 */
658 struct GNUNET_MQ_DestroyNotificationHandle *prev;
659
660 /**
661 * Kept in a DLL.
662 */
663 struct GNUNET_MQ_DestroyNotificationHandle *next;
664
665 /**
666 * Queue to notify about.
667 */
668 struct GNUNET_MQ_Handle *mq;
669
670 /**
671 * Function to call.
672 */
673 GNUNET_SCHEDULER_TaskCallback cb;
674
675 /**
676 * Closure for @e cb.
677 */
678 void *cb_cls;
679};
680
681
682void
683GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
684{
685 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
686
687 if (NULL != mq->destroy_impl)
688 {
689 mq->destroy_impl (mq, mq->impl_state);
690 }
691 if (NULL != mq->send_task)
692 {
693 GNUNET_SCHEDULER_cancel (mq->send_task);
694 mq->send_task = NULL;
695 }
696 while (NULL != mq->envelope_head)
697 {
698 struct GNUNET_MQ_Envelope *ev;
699
700 ev = mq->envelope_head;
701 ev->parent_queue = NULL;
702 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
703 GNUNET_assert (0 < mq->queue_length);
704 mq->queue_length--;
705 LOG (GNUNET_ERROR_TYPE_DEBUG,
706 "MQ destroy drops message of type %u\n",
707 ntohs (ev->mh->type));
708 GNUNET_MQ_discard (ev);
709 }
710 if (NULL != mq->current_envelope)
711 {
712 /* we can only discard envelopes that
713 * are not queued! */
714 mq->current_envelope->parent_queue = NULL;
715 LOG (GNUNET_ERROR_TYPE_DEBUG,
716 "MQ destroy drops current message of type %u\n",
717 ntohs (mq->current_envelope->mh->type));
718 GNUNET_MQ_discard (mq->current_envelope);
719 mq->current_envelope = NULL;
720 GNUNET_assert (0 < mq->queue_length);
721 mq->queue_length--;
722 }
723 GNUNET_assert (0 == mq->queue_length);
724 while (NULL != (dnh = mq->dnh_head))
725 {
726 dnh->cb (dnh->cb_cls);
727 GNUNET_MQ_destroy_notify_cancel (dnh);
728 }
729 if (NULL != mq->assoc_map)
730 {
731 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
732 mq->assoc_map = NULL;
733 }
734 GNUNET_free (mq->handlers);
735 GNUNET_free (mq);
736}
737
738
739const struct GNUNET_MessageHeader *
740GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
741 uint16_t base_size)
742{
743 uint16_t whole_size;
744 uint16_t nested_size;
745 const struct GNUNET_MessageHeader *nested_msg;
746
747 whole_size = ntohs (mh->size);
748 GNUNET_assert (whole_size >= base_size);
749 nested_size = whole_size - base_size;
750 if (0 == nested_size)
751 return NULL;
752 if (nested_size < sizeof(struct GNUNET_MessageHeader))
753 {
754 GNUNET_break_op (0);
755 return NULL;
756 }
757 nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
758 if (ntohs (nested_msg->size) != nested_size)
759 {
760 GNUNET_break_op (0);
761 return NULL;
762 }
763 return nested_msg;
764}
765
766
767void
768GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
769{
770 struct GNUNET_MQ_Handle *mq = ev->parent_queue;
771
772 GNUNET_assert (NULL != mq);
773 GNUNET_assert (NULL != mq->cancel_impl);
774 GNUNET_assert (0 < mq->queue_length);
775 mq->queue_length--;
776 if (mq->current_envelope == ev)
777 {
778 /* complex case, we already started with transmitting
779 the message using the callbacks. */
780 GNUNET_assert (! mq->in_flight);
781 mq->cancel_impl (mq,
782 mq->impl_state);
783 /* continue sending the next message, if any */
784 mq->current_envelope = mq->envelope_head;
785 if (NULL != mq->current_envelope)
786 {
787 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
788 mq->envelope_tail,
789 mq->current_envelope);
790 LOG (GNUNET_ERROR_TYPE_DEBUG,
791 "sending canceled message of type %u queue\n",
792 ntohs (ev->mh->type));
793 mq->send_impl (mq,
794 mq->current_envelope->mh,
795 mq->impl_state);
796 }
797 }
798 else
799 {
800 /* simple case, message is still waiting in the queue */
801 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
802 mq->envelope_tail,
803 ev);
804 }
805 ev->parent_queue = NULL;
806 ev->mh = NULL;
807 /* also frees ev */
808 GNUNET_free (ev);
809}
810
811
812struct GNUNET_MQ_Envelope *
813GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
814{
815 return mq->current_envelope;
816}
817
818
819struct GNUNET_MQ_Envelope *
820GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
821{
822 if (NULL != mq->envelope_tail)
823 return mq->envelope_tail;
824
825 return mq->current_envelope;
826}
827
828
829void
830GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
831 enum GNUNET_MQ_PriorityPreferences pp)
832{
833 env->priority = pp;
834 env->have_custom_options = GNUNET_YES;
835}
836
837
838enum GNUNET_MQ_PriorityPreferences
839GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
840{
841 struct GNUNET_MQ_Handle *mq = env->parent_queue;
842
843 if (GNUNET_YES == env->have_custom_options)
844 return env->priority;
845 if (NULL == mq)
846 return 0;
847 return mq->priority;
848}
849
850
851enum GNUNET_MQ_PriorityPreferences
852GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
853 enum GNUNET_MQ_PriorityPreferences p2)
854{
855 enum GNUNET_MQ_PriorityPreferences ret;
856
857 ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
858 ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
859 ret |=
860 ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
861 ret |=
862 ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
863 ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
864 ret |=
865 ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
866 return ret;
867}
868
869
870void
871GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
872 enum GNUNET_MQ_PriorityPreferences pp)
873{
874 mq->priority = pp;
875}
876
877
878const struct GNUNET_MessageHeader *
879GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
880{
881 return env->mh;
882}
883
884
885const struct GNUNET_MQ_Envelope *
886GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
887{
888 return env->next;
889}
890
891
892struct GNUNET_MQ_DestroyNotificationHandle *
893GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
894 GNUNET_SCHEDULER_TaskCallback cb,
895 void *cb_cls)
896{
897 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
898
899 dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
900 dnh->mq = mq;
901 dnh->cb = cb;
902 dnh->cb_cls = cb_cls;
903 GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
904 mq->dnh_tail,
905 dnh);
906 return dnh;
907}
908
909
910void
911GNUNET_MQ_destroy_notify_cancel (
912 struct GNUNET_MQ_DestroyNotificationHandle *dnh)
913{
914 struct GNUNET_MQ_Handle *mq = dnh->mq;
915
916 GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
917 mq->dnh_tail,
918 dnh);
919 GNUNET_free (dnh);
920}
921
922
923void
924GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
925 struct GNUNET_MQ_Envelope **env_tail,
926 struct GNUNET_MQ_Envelope *env)
927{
928 GNUNET_CONTAINER_DLL_insert (*env_head,
929 *env_tail,
930 env);
931}
932
933
934void
935GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
936 struct GNUNET_MQ_Envelope **env_tail,
937 struct GNUNET_MQ_Envelope *env)
938{
939 GNUNET_CONTAINER_DLL_insert_tail (*env_head,
940 *env_tail,
941 env);
942}
943
944
945void
946GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
947 struct GNUNET_MQ_Envelope **env_tail,
948 struct GNUNET_MQ_Envelope *env)
949{
950 GNUNET_CONTAINER_DLL_remove (*env_head,
951 *env_tail,
952 env);
953}
954
955
956struct GNUNET_MQ_MessageHandler *
957GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
958{
959 struct GNUNET_MQ_MessageHandler *copy;
960 unsigned int count;
961
962 if (NULL == handlers)
963 return NULL;
964 count = GNUNET_MQ_count_handlers (handlers);
965 copy = GNUNET_new_array (count + 1,
966 struct GNUNET_MQ_MessageHandler);
967 GNUNET_memcpy (copy,
968 handlers,
969 count * sizeof(struct GNUNET_MQ_MessageHandler));
970 return copy;
971}
972
973
974struct GNUNET_MQ_MessageHandler *
975GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
976 GNUNET_MQ_MessageCallback agpl_handler,
977 void *agpl_cls)
978{
979 struct GNUNET_MQ_MessageHandler *copy;
980 unsigned int count;
981
982 if (NULL == handlers)
983 return NULL;
984 count = GNUNET_MQ_count_handlers (handlers);
985 copy = GNUNET_new_array (count + 2,
986 struct GNUNET_MQ_MessageHandler);
987 GNUNET_memcpy (copy,
988 handlers,
989 count * sizeof(struct GNUNET_MQ_MessageHandler));
990 copy[count].mv = NULL;
991 copy[count].cb = agpl_handler;
992 copy[count].cls = agpl_cls;
993 copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
994 copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
995 return copy;
996}
997
998
999unsigned int
1000GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1001{
1002 unsigned int i;
1003
1004 if (NULL == handlers)
1005 return 0;
1006 for (i = 0; NULL != handlers[i].cb; i++)
1007 ;
1008 return i;
1009}
1010
1011
1012const char *
1013GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1014{
1015 switch (type)
1016 {
1017 case GNUNET_MQ_PREFERENCE_NONE:
1018 return "NONE";
1019 case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1020 return "BANDWIDTH";
1021 case GNUNET_MQ_PREFERENCE_LATENCY:
1022 return "LATENCY";
1023 case GNUNET_MQ_PREFERENCE_RELIABILITY:
1024 return "RELIABILITY";
1025 }
1026 return NULL;
1027}
1028
1029
1030/* end of mq.c */