aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c1032
1 files changed, 0 insertions, 1032 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
deleted file mode 100644
index 8749d5d21..000000000
--- a/src/util/mq.c
+++ /dev/null
@@ -1,1032 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @author Florian Dold
23 * @file util/mq.c
24 * @brief general purpose request queue
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28
29#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32struct GNUNET_MQ_Envelope
33{
34 /**
35 * Messages are stored in a linked list.
36 * Each queue has its own list of envelopes.
37 */
38 struct GNUNET_MQ_Envelope *next;
39
40 /**
41 * Messages are stored in a linked list
42 * Each queue has its own list of envelopes.
43 */
44 struct GNUNET_MQ_Envelope *prev;
45
46 /**
47 * Actual allocated message header.
48 * The GNUNET_MQ_Envelope header is allocated at
49 * the end of the message.
50 */
51 struct GNUNET_MessageHeader *mh;
52
53 /**
54 * Queue the message is queued in, NULL if message is not queued.
55 */
56 struct GNUNET_MQ_Handle *parent_queue;
57
58 /**
59 * Called after the message was sent irrevocably.
60 */
61 GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63 /**
64 * Closure for @e send_cb
65 */
66 void *sent_cls;
67
68 /**
69 * Flags that were set for this envelope by
70 * #GNUNET_MQ_env_set_options(). Only valid if
71 * @e have_custom_options is set.
72 */
73 enum GNUNET_MQ_PriorityPreferences priority;
74
75 /**
76 * Did the application call #GNUNET_MQ_env_set_options()?
77 */
78 int have_custom_options;
79};
80
81
82/**
83 * Handle to a message queue.
84 */
85struct GNUNET_MQ_Handle
86{
87 /**
88 * Handlers array, or NULL if the queue should not receive messages
89 */
90 struct GNUNET_MQ_MessageHandler *handlers;
91
92 /**
93 * Actual implementation of message sending,
94 * called when a message is added
95 */
96 GNUNET_MQ_SendImpl send_impl;
97
98 /**
99 * Implementation-dependent queue destruction function
100 */
101 GNUNET_MQ_DestroyImpl destroy_impl;
102
103 /**
104 * Implementation-dependent send cancel function
105 */
106 GNUNET_MQ_CancelImpl cancel_impl;
107
108 /**
109 * Implementation-specific state
110 */
111 void *impl_state;
112
113 /**
114 * Callback will be called when an error occurs.
115 */
116 GNUNET_MQ_ErrorHandler error_handler;
117
118 /**
119 * Closure for the error handler.
120 */
121 void *error_handler_cls;
122
123 /**
124 * Task to asynchronously run #impl_send_continue().
125 */
126 struct GNUNET_SCHEDULER_Task *send_task;
127
128 /**
129 * Linked list of messages pending to be sent
130 */
131 struct GNUNET_MQ_Envelope *envelope_head;
132
133 /**
134 * Linked list of messages pending to be sent
135 */
136 struct GNUNET_MQ_Envelope *envelope_tail;
137
138 /**
139 * Message that is currently scheduled to be
140 * sent. Not the head of the message queue, as the implementation
141 * needs to know if sending has been already scheduled or not.
142 */
143 struct GNUNET_MQ_Envelope *current_envelope;
144
145 /**
146 * Map of associations, lazily allocated
147 */
148 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150 /**
151 * Functions to call on queue destruction; kept in a DLL.
152 */
153 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155 /**
156 * Functions to call on queue destruction; kept in a DLL.
157 */
158 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160 /**
161 * Flags that were set for this queue by
162 * #GNUNET_MQ_set_options(). Default is 0.
163 */
164 enum GNUNET_MQ_PriorityPreferences priority;
165
166 /**
167 * Next id that should be used for the @e assoc_map,
168 * initialized lazily to a random value together with
169 * @e assoc_map
170 */
171 uint32_t assoc_id;
172
173 /**
174 * Number of entries we have in the envelope-DLL.
175 */
176 unsigned int queue_length;
177
178 /**
179 * True if GNUNET_MQ_impl_send_in_flight() was called.
180 */
181 bool in_flight;
182};
183
184
185void
186GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
187 const struct GNUNET_MessageHeader *mh)
188{
189 enum GNUNET_GenericReturnValue ret;
190
191 ret = GNUNET_MQ_handle_message (mq->handlers,
192 mh);
193 if (GNUNET_SYSERR == ret)
194 {
195 GNUNET_MQ_inject_error (mq,
196 GNUNET_MQ_ERROR_MALFORMED);
197 return;
198 }
199}
200
201
202enum GNUNET_GenericReturnValue
203GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
204 const struct GNUNET_MessageHeader *mh)
205{
206 bool handled = false;
207 uint16_t msize = ntohs (mh->size);
208 uint16_t mtype = ntohs (mh->type);
209
210 LOG (GNUNET_ERROR_TYPE_DEBUG,
211 "Received message of type %u and size %u\n",
212 mtype,
213 msize);
214 if (NULL == handlers)
215 goto done;
216 for (const struct GNUNET_MQ_MessageHandler *handler = handlers;
217 NULL != handler->cb;
218 handler++)
219 {
220 if (handler->type == mtype)
221 {
222 handled = true;
223 if ( (handler->expected_size > msize) ||
224 ( (handler->expected_size != msize) &&
225 (NULL == handler->mv) ) )
226 {
227 /* Too small, or not an exact size and
228 no 'mv' handler to check rest */
229 LOG (GNUNET_ERROR_TYPE_ERROR,
230 "Received malformed message of type %u\n",
231 (unsigned int) handler->type);
232 return GNUNET_SYSERR;
233 }
234 if ( (NULL == handler->mv) ||
235 (GNUNET_OK ==
236 handler->mv (handler->cls,
237 mh)) )
238 {
239 /* message well-formed, pass to handler */
240 handler->cb (handler->cls, mh);
241 }
242 else
243 {
244 /* Message rejected by check routine */
245 LOG (GNUNET_ERROR_TYPE_ERROR,
246 "Received malformed message of type %u\n",
247 (unsigned int) handler->type);
248 return GNUNET_SYSERR;
249 }
250 break;
251 }
252 }
253done:
254 if (! handled)
255 {
256 LOG (GNUNET_ERROR_TYPE_INFO,
257 "No handler for message of type %u and size %u\n",
258 mtype,
259 msize);
260 return GNUNET_NO;
261 }
262 return GNUNET_OK;
263}
264
265
266void
267GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
268 enum GNUNET_MQ_Error error)
269{
270 if (NULL == mq->error_handler)
271 {
272 LOG (GNUNET_ERROR_TYPE_WARNING,
273 "Got error %d, but no handler installed\n",
274 (int) error);
275 return;
276 }
277 mq->error_handler (mq->error_handler_cls,
278 error);
279}
280
281
282void
283GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
284{
285 GNUNET_assert (NULL == ev->parent_queue);
286 GNUNET_free (ev);
287}
288
289
290unsigned int
291GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
292{
293 if (! mq->in_flight)
294 {
295 return mq->queue_length;
296 }
297 return mq->queue_length - 1;
298}
299
300
301void
302GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
303 struct GNUNET_MQ_Envelope *ev)
304{
305 if (NULL == mq)
306 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
307 "mq is NUll when sending message of type %u\n",
308 (unsigned int) ntohs (ev->mh->type));
309 GNUNET_assert (NULL != mq);
310 GNUNET_assert (NULL == ev->parent_queue);
311
312 mq->queue_length++;
313 if (mq->queue_length >= 10000000)
314 {
315 /* This would seem like a bug... */
316 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
317 "MQ with %u entries extended by message of type %u (FC broken?)\n",
318 (unsigned int) mq->queue_length,
319 (unsigned int) ntohs (ev->mh->type));
320 }
321 ev->parent_queue = mq;
322 /* is the implementation busy? queue it! */
323 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
324 {
325 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
326 mq->envelope_tail,
327 ev);
328 return;
329 }
330 GNUNET_assert (NULL == mq->envelope_head);
331 mq->current_envelope = ev;
332
333 LOG (GNUNET_ERROR_TYPE_DEBUG,
334 "sending message of type %u and size %u, queue empty (MQ: %p)\n",
335 ntohs (ev->mh->type),
336 ntohs (ev->mh->size),
337 mq);
338
339 mq->send_impl (mq,
340 ev->mh,
341 mq->impl_state);
342}
343
344
345struct GNUNET_MQ_Envelope *
346GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
347{
348 struct GNUNET_MQ_Envelope *env;
349
350 env = mq->envelope_head;
351 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
352 mq->envelope_tail,
353 env);
354 mq->queue_length--;
355 env->parent_queue = NULL;
356 return env;
357}
358
359
360struct GNUNET_MQ_Envelope *
361GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
362{
363 GNUNET_assert (NULL == env->next);
364 GNUNET_assert (NULL == env->parent_queue);
365 GNUNET_assert (NULL == env->sent_cb);
366 GNUNET_assert (GNUNET_NO == env->have_custom_options);
367 return GNUNET_MQ_msg_copy (env->mh);
368}
369
370
371void
372GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
373 const struct GNUNET_MQ_Envelope *ev)
374{
375 struct GNUNET_MQ_Envelope *env;
376 uint16_t msize;
377
378 msize = ntohs (ev->mh->size);
379 env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
380 env->mh = (struct GNUNET_MessageHeader *) &env[1];
381 env->sent_cb = ev->sent_cb;
382 env->sent_cls = ev->sent_cls;
383 GNUNET_memcpy (&env[1], ev->mh, msize);
384 GNUNET_MQ_send (mq, env);
385}
386
387
388/**
389 * Task run to call the send implementation for the next queued
390 * message, if any. Only useful for implementing message queues,
391 * results in undefined behavior if not used carefully.
392 *
393 * @param cls message queue to send the next message with
394 */
395static void
396impl_send_continue (void *cls)
397{
398 struct GNUNET_MQ_Handle *mq = cls;
399
400 mq->send_task = NULL;
401 /* call is only valid if we're actually currently sending
402 * a message */
403 if (NULL == mq->envelope_head)
404 return;
405 mq->current_envelope = mq->envelope_head;
406 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
407 mq->envelope_tail,
408 mq->current_envelope);
409
410 LOG (GNUNET_ERROR_TYPE_DEBUG,
411 "sending message of type %u and size %u from queue (MQ: %p)\n",
412 ntohs (mq->current_envelope->mh->type),
413 ntohs (mq->current_envelope->mh->size),
414 mq);
415
416 mq->send_impl (mq,
417 mq->current_envelope->mh,
418 mq->impl_state);
419}
420
421
422void
423GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
424{
425 struct GNUNET_MQ_Envelope *current_envelope;
426 GNUNET_SCHEDULER_TaskCallback cb;
427
428 GNUNET_assert (0 < mq->queue_length);
429 mq->queue_length--;
430 mq->in_flight = false;
431 current_envelope = mq->current_envelope;
432 current_envelope->parent_queue = NULL;
433 mq->current_envelope = NULL;
434 GNUNET_assert (NULL == mq->send_task);
435 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
436 if (NULL != (cb = current_envelope->sent_cb))
437 {
438 current_envelope->sent_cb = NULL;
439 cb (current_envelope->sent_cls);
440 }
441 GNUNET_free (current_envelope);
442}
443
444
445void
446GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
447{
448 struct GNUNET_MQ_Envelope *current_envelope;
449 GNUNET_SCHEDULER_TaskCallback cb;
450
451 mq->in_flight = true;
452 /* call is only valid if we're actually currently sending
453 * a message */
454 current_envelope = mq->current_envelope;
455 GNUNET_assert (NULL != current_envelope);
456 /* can't call cancel from now on anymore */
457 current_envelope->parent_queue = NULL;
458 if (NULL != (cb = current_envelope->sent_cb))
459 {
460 current_envelope->sent_cb = NULL;
461 cb (current_envelope->sent_cls);
462 }
463}
464
465
466struct GNUNET_MQ_Handle *
467GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
468 GNUNET_MQ_DestroyImpl destroy,
469 GNUNET_MQ_CancelImpl cancel,
470 void *impl_state,
471 const struct GNUNET_MQ_MessageHandler *handlers,
472 GNUNET_MQ_ErrorHandler error_handler,
473 void *error_handler_cls)
474{
475 struct GNUNET_MQ_Handle *mq;
476
477 mq = GNUNET_new (struct GNUNET_MQ_Handle);
478 mq->send_impl = send;
479 mq->destroy_impl = destroy;
480 mq->cancel_impl = cancel;
481 mq->handlers = GNUNET_MQ_copy_handlers (handlers);
482 mq->error_handler = error_handler;
483 mq->error_handler_cls = error_handler_cls;
484 mq->impl_state = impl_state;
485
486 return mq;
487}
488
489
490void
491GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
492 void *handlers_cls)
493{
494 if (NULL == mq->handlers)
495 return;
496 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
497 mq->handlers[i].cls = handlers_cls;
498}
499
500
501const struct GNUNET_MessageHeader *
502GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
503{
504 GNUNET_assert (NULL != mq->current_envelope);
505 GNUNET_assert (NULL != mq->current_envelope->mh);
506 return mq->current_envelope->mh;
507}
508
509
510void *
511GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
512{
513 return mq->impl_state;
514}
515
516
517struct GNUNET_MQ_Envelope *
518GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
519 uint16_t size,
520 uint16_t type)
521{
522 struct GNUNET_MQ_Envelope *ev;
523
524 ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
525 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
526 ev->mh->size = htons (size);
527 ev->mh->type = htons (type);
528 if (NULL != mhp)
529 *mhp = ev->mh;
530 return ev;
531}
532
533
534struct GNUNET_MQ_Envelope *
535GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
536{
537 struct GNUNET_MQ_Envelope *mqm;
538 uint16_t size = ntohs (hdr->size);
539
540 mqm = GNUNET_malloc (sizeof(*mqm) + size);
541 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
542 GNUNET_memcpy (mqm->mh,
543 hdr,
544 size);
545 return mqm;
546}
547
548
549struct GNUNET_MQ_Envelope *
550GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
551 uint16_t base_size,
552 uint16_t type,
553 const struct GNUNET_MessageHeader *nested_mh)
554{
555 struct GNUNET_MQ_Envelope *mqm;
556 uint16_t size;
557
558 if (NULL == nested_mh)
559 return GNUNET_MQ_msg_ (mhp,
560 base_size,
561 type);
562 size = base_size + ntohs (nested_mh->size);
563 /* check for uint16_t overflow */
564 if (size < base_size)
565 return NULL;
566 mqm = GNUNET_MQ_msg_ (mhp,
567 size,
568 type);
569 GNUNET_memcpy ((char *) mqm->mh + base_size,
570 nested_mh,
571 ntohs (nested_mh->size));
572 return mqm;
573}
574
575
576uint32_t
577GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
578 void *assoc_data)
579{
580 uint32_t id;
581
582 if (NULL == mq->assoc_map)
583 {
584 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
585 mq->assoc_id = 1;
586 }
587 id = mq->assoc_id++;
588 GNUNET_assert (GNUNET_OK ==
589 GNUNET_CONTAINER_multihashmap32_put (
590 mq->assoc_map,
591 id,
592 assoc_data,
593 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
594 return id;
595}
596
597
598/**
599 * Get the data associated with a @a request_id in a queue
600 *
601 * @param mq the message queue with the association
602 * @param request_id the request id we are interested in
603 * @return the associated data
604 */
605void *
606GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
607 uint32_t request_id)
608{
609 if (NULL == mq->assoc_map)
610 return NULL;
611 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
612 request_id);
613}
614
615
616/**
617 * Remove the association for a @a request_id
618 *
619 * @param mq the message queue with the association
620 * @param request_id the request id we want to remove
621 * @return the associated data
622 */
623void *
624GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
625 uint32_t request_id)
626{
627 void *val;
628
629 if (NULL == mq->assoc_map)
630 return NULL;
631 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
632 request_id);
633 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
634 request_id);
635 return val;
636}
637
638
639void
640GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
641 GNUNET_SCHEDULER_TaskCallback cb,
642 void *cb_cls)
643{
644 /* allow setting *OR* clearing callback */
645 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
646 ev->sent_cb = cb;
647 ev->sent_cls = cb_cls;
648}
649
650
651/**
652 * Handle we return for callbacks registered to be
653 * notified when #GNUNET_MQ_destroy() is called on a queue.
654 */
655struct GNUNET_MQ_DestroyNotificationHandle
656{
657 /**
658 * Kept in a DLL.
659 */
660 struct GNUNET_MQ_DestroyNotificationHandle *prev;
661
662 /**
663 * Kept in a DLL.
664 */
665 struct GNUNET_MQ_DestroyNotificationHandle *next;
666
667 /**
668 * Queue to notify about.
669 */
670 struct GNUNET_MQ_Handle *mq;
671
672 /**
673 * Function to call.
674 */
675 GNUNET_SCHEDULER_TaskCallback cb;
676
677 /**
678 * Closure for @e cb.
679 */
680 void *cb_cls;
681};
682
683
684void
685GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
686{
687 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
688
689 if (NULL != mq->destroy_impl)
690 {
691 mq->destroy_impl (mq, mq->impl_state);
692 }
693 if (NULL != mq->send_task)
694 {
695 GNUNET_SCHEDULER_cancel (mq->send_task);
696 mq->send_task = NULL;
697 }
698 while (NULL != mq->envelope_head)
699 {
700 struct GNUNET_MQ_Envelope *ev;
701
702 ev = mq->envelope_head;
703 ev->parent_queue = NULL;
704 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
705 GNUNET_assert (0 < mq->queue_length);
706 mq->queue_length--;
707 LOG (GNUNET_ERROR_TYPE_DEBUG,
708 "MQ destroy drops message of type %u\n",
709 ntohs (ev->mh->type));
710 GNUNET_MQ_discard (ev);
711 }
712 if (NULL != mq->current_envelope)
713 {
714 /* we can only discard envelopes that
715 * are not queued! */
716 mq->current_envelope->parent_queue = NULL;
717 LOG (GNUNET_ERROR_TYPE_DEBUG,
718 "MQ destroy drops current message of type %u\n",
719 ntohs (mq->current_envelope->mh->type));
720 GNUNET_MQ_discard (mq->current_envelope);
721 mq->current_envelope = NULL;
722 GNUNET_assert (0 < mq->queue_length);
723 mq->queue_length--;
724 }
725 GNUNET_assert (0 == mq->queue_length);
726 while (NULL != (dnh = mq->dnh_head))
727 {
728 dnh->cb (dnh->cb_cls);
729 GNUNET_MQ_destroy_notify_cancel (dnh);
730 }
731 if (NULL != mq->assoc_map)
732 {
733 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
734 mq->assoc_map = NULL;
735 }
736 GNUNET_free (mq->handlers);
737 GNUNET_free (mq);
738}
739
740
741const struct GNUNET_MessageHeader *
742GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
743 uint16_t base_size)
744{
745 uint16_t whole_size;
746 uint16_t nested_size;
747 const struct GNUNET_MessageHeader *nested_msg;
748
749 whole_size = ntohs (mh->size);
750 GNUNET_assert (whole_size >= base_size);
751 nested_size = whole_size - base_size;
752 if (0 == nested_size)
753 return NULL;
754 if (nested_size < sizeof(struct GNUNET_MessageHeader))
755 {
756 GNUNET_break_op (0);
757 return NULL;
758 }
759 nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
760 if (ntohs (nested_msg->size) != nested_size)
761 {
762 GNUNET_break_op (0);
763 return NULL;
764 }
765 return nested_msg;
766}
767
768
769void
770GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
771{
772 struct GNUNET_MQ_Handle *mq = ev->parent_queue;
773
774 GNUNET_assert (NULL != mq);
775 GNUNET_assert (NULL != mq->cancel_impl);
776 GNUNET_assert (0 < mq->queue_length);
777 mq->queue_length--;
778 if (mq->current_envelope == ev)
779 {
780 /* complex case, we already started with transmitting
781 the message using the callbacks. */
782 GNUNET_assert (! mq->in_flight);
783 mq->cancel_impl (mq,
784 mq->impl_state);
785 /* continue sending the next message, if any */
786 mq->current_envelope = mq->envelope_head;
787 if (NULL != mq->current_envelope)
788 {
789 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
790 mq->envelope_tail,
791 mq->current_envelope);
792 LOG (GNUNET_ERROR_TYPE_DEBUG,
793 "sending canceled message of type %u queue\n",
794 ntohs (ev->mh->type));
795 mq->send_impl (mq,
796 mq->current_envelope->mh,
797 mq->impl_state);
798 }
799 }
800 else
801 {
802 /* simple case, message is still waiting in the queue */
803 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
804 mq->envelope_tail,
805 ev);
806 }
807 ev->parent_queue = NULL;
808 ev->mh = NULL;
809 /* also frees ev */
810 GNUNET_free (ev);
811}
812
813
814struct GNUNET_MQ_Envelope *
815GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
816{
817 return mq->current_envelope;
818}
819
820
821struct GNUNET_MQ_Envelope *
822GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
823{
824 if (NULL != mq->envelope_tail)
825 return mq->envelope_tail;
826
827 return mq->current_envelope;
828}
829
830
831void
832GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
833 enum GNUNET_MQ_PriorityPreferences pp)
834{
835 env->priority = pp;
836 env->have_custom_options = GNUNET_YES;
837}
838
839
840enum GNUNET_MQ_PriorityPreferences
841GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
842{
843 struct GNUNET_MQ_Handle *mq = env->parent_queue;
844
845 if (GNUNET_YES == env->have_custom_options)
846 return env->priority;
847 if (NULL == mq)
848 return 0;
849 return mq->priority;
850}
851
852
853enum GNUNET_MQ_PriorityPreferences
854GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
855 enum GNUNET_MQ_PriorityPreferences p2)
856{
857 enum GNUNET_MQ_PriorityPreferences ret;
858
859 ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
860 ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
861 ret |=
862 ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
863 ret |=
864 ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
865 ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
866 ret |=
867 ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
868 return ret;
869}
870
871
872void
873GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
874 enum GNUNET_MQ_PriorityPreferences pp)
875{
876 mq->priority = pp;
877}
878
879
880const struct GNUNET_MessageHeader *
881GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
882{
883 return env->mh;
884}
885
886
887const struct GNUNET_MQ_Envelope *
888GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
889{
890 return env->next;
891}
892
893
894struct GNUNET_MQ_DestroyNotificationHandle *
895GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
896 GNUNET_SCHEDULER_TaskCallback cb,
897 void *cb_cls)
898{
899 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
900
901 dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
902 dnh->mq = mq;
903 dnh->cb = cb;
904 dnh->cb_cls = cb_cls;
905 GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
906 mq->dnh_tail,
907 dnh);
908 return dnh;
909}
910
911
912void
913GNUNET_MQ_destroy_notify_cancel (
914 struct GNUNET_MQ_DestroyNotificationHandle *dnh)
915{
916 struct GNUNET_MQ_Handle *mq = dnh->mq;
917
918 GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
919 mq->dnh_tail,
920 dnh);
921 GNUNET_free (dnh);
922}
923
924
925void
926GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
927 struct GNUNET_MQ_Envelope **env_tail,
928 struct GNUNET_MQ_Envelope *env)
929{
930 GNUNET_CONTAINER_DLL_insert (*env_head,
931 *env_tail,
932 env);
933}
934
935
936void
937GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
938 struct GNUNET_MQ_Envelope **env_tail,
939 struct GNUNET_MQ_Envelope *env)
940{
941 GNUNET_CONTAINER_DLL_insert_tail (*env_head,
942 *env_tail,
943 env);
944}
945
946
947void
948GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
949 struct GNUNET_MQ_Envelope **env_tail,
950 struct GNUNET_MQ_Envelope *env)
951{
952 GNUNET_CONTAINER_DLL_remove (*env_head,
953 *env_tail,
954 env);
955}
956
957
958struct GNUNET_MQ_MessageHandler *
959GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
960{
961 struct GNUNET_MQ_MessageHandler *copy;
962 unsigned int count;
963
964 if (NULL == handlers)
965 return NULL;
966 count = GNUNET_MQ_count_handlers (handlers);
967 copy = GNUNET_new_array (count + 1,
968 struct GNUNET_MQ_MessageHandler);
969 GNUNET_memcpy (copy,
970 handlers,
971 count * sizeof(struct GNUNET_MQ_MessageHandler));
972 return copy;
973}
974
975
976struct GNUNET_MQ_MessageHandler *
977GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
978 GNUNET_MQ_MessageCallback agpl_handler,
979 void *agpl_cls)
980{
981 struct GNUNET_MQ_MessageHandler *copy;
982 unsigned int count;
983
984 if (NULL == handlers)
985 return NULL;
986 count = GNUNET_MQ_count_handlers (handlers);
987 copy = GNUNET_new_array (count + 2,
988 struct GNUNET_MQ_MessageHandler);
989 GNUNET_memcpy (copy,
990 handlers,
991 count * sizeof(struct GNUNET_MQ_MessageHandler));
992 copy[count].mv = NULL;
993 copy[count].cb = agpl_handler;
994 copy[count].cls = agpl_cls;
995 copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
996 copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
997 return copy;
998}
999
1000
1001unsigned int
1002GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1003{
1004 unsigned int i;
1005
1006 if (NULL == handlers)
1007 return 0;
1008 for (i = 0; NULL != handlers[i].cb; i++)
1009 ;
1010 return i;
1011}
1012
1013
1014const char *
1015GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1016{
1017 switch (type)
1018 {
1019 case GNUNET_MQ_PREFERENCE_NONE:
1020 return "NONE";
1021 case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1022 return "BANDWIDTH";
1023 case GNUNET_MQ_PREFERENCE_LATENCY:
1024 return "LATENCY";
1025 case GNUNET_MQ_PREFERENCE_RELIABILITY:
1026 return "RELIABILITY";
1027 }
1028 return NULL;
1029}
1030
1031
1032/* end of mq.c */