diff options
Diffstat (limited to 'src/lib/util/mq.c')
-rw-r--r-- | src/lib/util/mq.c | 1030 |
1 files changed, 1030 insertions, 0 deletions
diff --git a/src/lib/util/mq.c b/src/lib/util/mq.c new file mode 100644 index 000000000..de0cff0c2 --- /dev/null +++ b/src/lib/util/mq.c | |||
@@ -0,0 +1,1030 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2012-2019 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @author Florian Dold | ||
23 | * @file util/mq.c | ||
24 | * @brief general purpose request queue | ||
25 | */ | ||
26 | |||
27 | #include "platform.h" | ||
28 | #include "gnunet_util_lib.h" | ||
29 | |||
30 | #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__) | ||
31 | |||
32 | |||
33 | struct GNUNET_MQ_Envelope | ||
34 | { | ||
35 | /** | ||
36 | * Messages are stored in a linked list. | ||
37 | * Each queue has its own list of envelopes. | ||
38 | */ | ||
39 | struct GNUNET_MQ_Envelope *next; | ||
40 | |||
41 | /** | ||
42 | * Messages are stored in a linked list | ||
43 | * Each queue has its own list of envelopes. | ||
44 | */ | ||
45 | struct GNUNET_MQ_Envelope *prev; | ||
46 | |||
47 | /** | ||
48 | * Actual allocated message header. | ||
49 | * The GNUNET_MQ_Envelope header is allocated at | ||
50 | * the end of the message. | ||
51 | */ | ||
52 | struct GNUNET_MessageHeader *mh; | ||
53 | |||
54 | /** | ||
55 | * Queue the message is queued in, NULL if message is not queued. | ||
56 | */ | ||
57 | struct GNUNET_MQ_Handle *parent_queue; | ||
58 | |||
59 | /** | ||
60 | * Called after the message was sent irrevocably. | ||
61 | */ | ||
62 | GNUNET_SCHEDULER_TaskCallback sent_cb; | ||
63 | |||
64 | /** | ||
65 | * Closure for @e send_cb | ||
66 | */ | ||
67 | void *sent_cls; | ||
68 | |||
69 | /** | ||
70 | * Flags that were set for this envelope by | ||
71 | * #GNUNET_MQ_env_set_options(). Only valid if | ||
72 | * @e have_custom_options is set. | ||
73 | */ | ||
74 | enum GNUNET_MQ_PriorityPreferences priority; | ||
75 | |||
76 | /** | ||
77 | * Did the application call #GNUNET_MQ_env_set_options()? | ||
78 | */ | ||
79 | int have_custom_options; | ||
80 | }; | ||
81 | |||
82 | |||
83 | /** | ||
84 | * Handle to a message queue. | ||
85 | */ | ||
86 | struct GNUNET_MQ_Handle | ||
87 | { | ||
88 | /** | ||
89 | * Handlers array, or NULL if the queue should not receive messages | ||
90 | */ | ||
91 | struct GNUNET_MQ_MessageHandler *handlers; | ||
92 | |||
93 | /** | ||
94 | * Actual implementation of message sending, | ||
95 | * called when a message is added | ||
96 | */ | ||
97 | GNUNET_MQ_SendImpl send_impl; | ||
98 | |||
99 | /** | ||
100 | * Implementation-dependent queue destruction function | ||
101 | */ | ||
102 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
103 | |||
104 | /** | ||
105 | * Implementation-dependent send cancel function | ||
106 | */ | ||
107 | GNUNET_MQ_CancelImpl cancel_impl; | ||
108 | |||
109 | /** | ||
110 | * Implementation-specific state | ||
111 | */ | ||
112 | void *impl_state; | ||
113 | |||
114 | /** | ||
115 | * Callback will be called when an error occurs. | ||
116 | */ | ||
117 | GNUNET_MQ_ErrorHandler error_handler; | ||
118 | |||
119 | /** | ||
120 | * Closure for the error handler. | ||
121 | */ | ||
122 | void *error_handler_cls; | ||
123 | |||
124 | /** | ||
125 | * Task to asynchronously run #impl_send_continue(). | ||
126 | */ | ||
127 | struct GNUNET_SCHEDULER_Task *send_task; | ||
128 | |||
129 | /** | ||
130 | * Linked list of messages pending to be sent | ||
131 | */ | ||
132 | struct GNUNET_MQ_Envelope *envelope_head; | ||
133 | |||
134 | /** | ||
135 | * Linked list of messages pending to be sent | ||
136 | */ | ||
137 | struct GNUNET_MQ_Envelope *envelope_tail; | ||
138 | |||
139 | /** | ||
140 | * Message that is currently scheduled to be | ||
141 | * sent. Not the head of the message queue, as the implementation | ||
142 | * needs to know if sending has been already scheduled or not. | ||
143 | */ | ||
144 | struct GNUNET_MQ_Envelope *current_envelope; | ||
145 | |||
146 | /** | ||
147 | * Map of associations, lazily allocated | ||
148 | */ | ||
149 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
150 | |||
151 | /** | ||
152 | * Functions to call on queue destruction; kept in a DLL. | ||
153 | */ | ||
154 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; | ||
155 | |||
156 | /** | ||
157 | * Functions to call on queue destruction; kept in a DLL. | ||
158 | */ | ||
159 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; | ||
160 | |||
161 | /** | ||
162 | * Flags that were set for this queue by | ||
163 | * #GNUNET_MQ_set_options(). Default is 0. | ||
164 | */ | ||
165 | enum GNUNET_MQ_PriorityPreferences priority; | ||
166 | |||
167 | /** | ||
168 | * Next id that should be used for the @e assoc_map, | ||
169 | * initialized lazily to a random value together with | ||
170 | * @e assoc_map | ||
171 | */ | ||
172 | uint32_t assoc_id; | ||
173 | |||
174 | /** | ||
175 | * Number of entries we have in the envelope-DLL. | ||
176 | */ | ||
177 | unsigned int queue_length; | ||
178 | |||
179 | /** | ||
180 | * True if GNUNET_MQ_impl_send_in_flight() was called. | ||
181 | */ | ||
182 | bool in_flight; | ||
183 | }; | ||
184 | |||
185 | |||
186 | void | ||
187 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, | ||
188 | const struct GNUNET_MessageHeader *mh) | ||
189 | { | ||
190 | enum GNUNET_GenericReturnValue ret; | ||
191 | |||
192 | ret = GNUNET_MQ_handle_message (mq->handlers, | ||
193 | mh); | ||
194 | if (GNUNET_SYSERR == ret) | ||
195 | { | ||
196 | GNUNET_break_op (0); | ||
197 | GNUNET_MQ_inject_error (mq, | ||
198 | GNUNET_MQ_ERROR_MALFORMED); | ||
199 | return; | ||
200 | } | ||
201 | } | ||
202 | |||
203 | |||
204 | enum GNUNET_GenericReturnValue | ||
205 | GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, | ||
206 | const struct GNUNET_MessageHeader *mh) | ||
207 | { | ||
208 | bool handled = false; | ||
209 | uint16_t msize = ntohs (mh->size); | ||
210 | uint16_t mtype = ntohs (mh->type); | ||
211 | |||
212 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
213 | "Received message of type %u and size %u\n", | ||
214 | mtype, | ||
215 | msize); | ||
216 | if (NULL == handlers) | ||
217 | goto done; | ||
218 | for (const struct GNUNET_MQ_MessageHandler *handler = handlers; | ||
219 | NULL != handler->cb; | ||
220 | handler++) | ||
221 | { | ||
222 | if (handler->type == mtype) | ||
223 | { | ||
224 | handled = true; | ||
225 | if ( (handler->expected_size > msize) || | ||
226 | ( (handler->expected_size != msize) && | ||
227 | (NULL == handler->mv) ) ) | ||
228 | { | ||
229 | /* Too small, or not an exact size and | ||
230 | no 'mv' handler to check rest */ | ||
231 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
232 | "Received malformed message of type %u\n", | ||
233 | (unsigned int) handler->type); | ||
234 | return GNUNET_SYSERR; | ||
235 | } | ||
236 | if ( (NULL == handler->mv) || | ||
237 | (GNUNET_OK == | ||
238 | handler->mv (handler->cls, | ||
239 | mh)) ) | ||
240 | { | ||
241 | /* message well-formed, pass to handler */ | ||
242 | handler->cb (handler->cls, mh); | ||
243 | } | ||
244 | else | ||
245 | { | ||
246 | /* Message rejected by check routine */ | ||
247 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
248 | "Received malformed message of type %u\n", | ||
249 | (unsigned int) handler->type); | ||
250 | return GNUNET_SYSERR; | ||
251 | } | ||
252 | break; | ||
253 | } | ||
254 | } | ||
255 | done: | ||
256 | if (! handled) | ||
257 | { | ||
258 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
259 | "No handler for message of type %u and size %u\n", | ||
260 | mtype, | ||
261 | msize); | ||
262 | return GNUNET_NO; | ||
263 | } | ||
264 | return GNUNET_OK; | ||
265 | } | ||
266 | |||
267 | |||
268 | void | ||
269 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, | ||
270 | enum GNUNET_MQ_Error error) | ||
271 | { | ||
272 | if (NULL == mq->error_handler) | ||
273 | { | ||
274 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
275 | "Got error %d, but no handler installed\n", | ||
276 | (int) error); | ||
277 | return; | ||
278 | } | ||
279 | mq->error_handler (mq->error_handler_cls, | ||
280 | error); | ||
281 | } | ||
282 | |||
283 | |||
284 | void | ||
285 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) | ||
286 | { | ||
287 | GNUNET_assert (NULL == ev->parent_queue); | ||
288 | GNUNET_free (ev); | ||
289 | } | ||
290 | |||
291 | |||
292 | unsigned int | ||
293 | GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) | ||
294 | { | ||
295 | if (! mq->in_flight) | ||
296 | { | ||
297 | return mq->queue_length; | ||
298 | } | ||
299 | return mq->queue_length - 1; | ||
300 | } | ||
301 | |||
302 | |||
303 | void | ||
304 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | ||
305 | struct GNUNET_MQ_Envelope *ev) | ||
306 | { | ||
307 | GNUNET_assert (NULL != mq); | ||
308 | GNUNET_assert (NULL == ev->parent_queue); | ||
309 | |||
310 | mq->queue_length++; | ||
311 | if (mq->queue_length >= 10000000) | ||
312 | { | ||
313 | /* This would seem like a bug... */ | ||
314 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
315 | "MQ with %u entries extended by message of type %u (FC broken?)\n", | ||
316 | (unsigned int) mq->queue_length, | ||
317 | (unsigned int) ntohs (ev->mh->type)); | ||
318 | } | ||
319 | ev->parent_queue = mq; | ||
320 | /* is the implementation busy? queue it! */ | ||
321 | if ((NULL != mq->current_envelope) || (NULL != mq->send_task)) | ||
322 | { | ||
323 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, | ||
324 | mq->envelope_tail, | ||
325 | ev); | ||
326 | return; | ||
327 | } | ||
328 | GNUNET_assert (NULL == mq->envelope_head); | ||
329 | mq->current_envelope = ev; | ||
330 | |||
331 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
332 | "sending message of type %u and size %u, queue empty (MQ: %p)\n", | ||
333 | ntohs (ev->mh->type), | ||
334 | ntohs (ev->mh->size), | ||
335 | mq); | ||
336 | |||
337 | mq->send_impl (mq, | ||
338 | ev->mh, | ||
339 | mq->impl_state); | ||
340 | } | ||
341 | |||
342 | |||
343 | struct GNUNET_MQ_Envelope * | ||
344 | GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) | ||
345 | { | ||
346 | struct GNUNET_MQ_Envelope *env; | ||
347 | |||
348 | env = mq->envelope_head; | ||
349 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
350 | mq->envelope_tail, | ||
351 | env); | ||
352 | mq->queue_length--; | ||
353 | env->parent_queue = NULL; | ||
354 | return env; | ||
355 | } | ||
356 | |||
357 | |||
358 | struct GNUNET_MQ_Envelope * | ||
359 | GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env) | ||
360 | { | ||
361 | GNUNET_assert (NULL == env->next); | ||
362 | GNUNET_assert (NULL == env->parent_queue); | ||
363 | GNUNET_assert (NULL == env->sent_cb); | ||
364 | GNUNET_assert (GNUNET_NO == env->have_custom_options); | ||
365 | return GNUNET_MQ_msg_copy (env->mh); | ||
366 | } | ||
367 | |||
368 | |||
369 | void | ||
370 | GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, | ||
371 | const struct GNUNET_MQ_Envelope *ev) | ||
372 | { | ||
373 | struct GNUNET_MQ_Envelope *env; | ||
374 | uint16_t msize; | ||
375 | |||
376 | msize = ntohs (ev->mh->size); | ||
377 | env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize); | ||
378 | env->mh = (struct GNUNET_MessageHeader *) &env[1]; | ||
379 | env->sent_cb = ev->sent_cb; | ||
380 | env->sent_cls = ev->sent_cls; | ||
381 | GNUNET_memcpy (&env[1], ev->mh, msize); | ||
382 | GNUNET_MQ_send (mq, env); | ||
383 | } | ||
384 | |||
385 | |||
386 | /** | ||
387 | * Task run to call the send implementation for the next queued | ||
388 | * message, if any. Only useful for implementing message queues, | ||
389 | * results in undefined behavior if not used carefully. | ||
390 | * | ||
391 | * @param cls message queue to send the next message with | ||
392 | */ | ||
393 | static void | ||
394 | impl_send_continue (void *cls) | ||
395 | { | ||
396 | struct GNUNET_MQ_Handle *mq = cls; | ||
397 | |||
398 | mq->send_task = NULL; | ||
399 | /* call is only valid if we're actually currently sending | ||
400 | * a message */ | ||
401 | if (NULL == mq->envelope_head) | ||
402 | return; | ||
403 | mq->current_envelope = mq->envelope_head; | ||
404 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
405 | mq->envelope_tail, | ||
406 | mq->current_envelope); | ||
407 | |||
408 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
409 | "sending message of type %u and size %u from queue (MQ: %p)\n", | ||
410 | ntohs (mq->current_envelope->mh->type), | ||
411 | ntohs (mq->current_envelope->mh->size), | ||
412 | mq); | ||
413 | |||
414 | mq->send_impl (mq, | ||
415 | mq->current_envelope->mh, | ||
416 | mq->impl_state); | ||
417 | } | ||
418 | |||
419 | |||
420 | void | ||
421 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | ||
422 | { | ||
423 | struct GNUNET_MQ_Envelope *current_envelope; | ||
424 | GNUNET_SCHEDULER_TaskCallback cb; | ||
425 | |||
426 | GNUNET_assert (0 < mq->queue_length); | ||
427 | mq->queue_length--; | ||
428 | mq->in_flight = false; | ||
429 | current_envelope = mq->current_envelope; | ||
430 | current_envelope->parent_queue = NULL; | ||
431 | mq->current_envelope = NULL; | ||
432 | GNUNET_assert (NULL == mq->send_task); | ||
433 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); | ||
434 | if (NULL != (cb = current_envelope->sent_cb)) | ||
435 | { | ||
436 | current_envelope->sent_cb = NULL; | ||
437 | cb (current_envelope->sent_cls); | ||
438 | } | ||
439 | GNUNET_free (current_envelope); | ||
440 | } | ||
441 | |||
442 | |||
443 | void | ||
444 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) | ||
445 | { | ||
446 | struct GNUNET_MQ_Envelope *current_envelope; | ||
447 | GNUNET_SCHEDULER_TaskCallback cb; | ||
448 | |||
449 | mq->in_flight = true; | ||
450 | /* call is only valid if we're actually currently sending | ||
451 | * a message */ | ||
452 | current_envelope = mq->current_envelope; | ||
453 | GNUNET_assert (NULL != current_envelope); | ||
454 | /* can't call cancel from now on anymore */ | ||
455 | current_envelope->parent_queue = NULL; | ||
456 | if (NULL != (cb = current_envelope->sent_cb)) | ||
457 | { | ||
458 | current_envelope->sent_cb = NULL; | ||
459 | cb (current_envelope->sent_cls); | ||
460 | } | ||
461 | } | ||
462 | |||
463 | |||
464 | struct GNUNET_MQ_Handle * | ||
465 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | ||
466 | GNUNET_MQ_DestroyImpl destroy, | ||
467 | GNUNET_MQ_CancelImpl cancel, | ||
468 | void *impl_state, | ||
469 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
470 | GNUNET_MQ_ErrorHandler error_handler, | ||
471 | void *error_handler_cls) | ||
472 | { | ||
473 | struct GNUNET_MQ_Handle *mq; | ||
474 | |||
475 | mq = GNUNET_new (struct GNUNET_MQ_Handle); | ||
476 | mq->send_impl = send; | ||
477 | mq->destroy_impl = destroy; | ||
478 | mq->cancel_impl = cancel; | ||
479 | mq->handlers = GNUNET_MQ_copy_handlers (handlers); | ||
480 | mq->error_handler = error_handler; | ||
481 | mq->error_handler_cls = error_handler_cls; | ||
482 | mq->impl_state = impl_state; | ||
483 | |||
484 | return mq; | ||
485 | } | ||
486 | |||
487 | |||
488 | void | ||
489 | GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, | ||
490 | void *handlers_cls) | ||
491 | { | ||
492 | if (NULL == mq->handlers) | ||
493 | return; | ||
494 | for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++) | ||
495 | mq->handlers[i].cls = handlers_cls; | ||
496 | } | ||
497 | |||
498 | |||
499 | const struct GNUNET_MessageHeader * | ||
500 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) | ||
501 | { | ||
502 | GNUNET_assert (NULL != mq->current_envelope); | ||
503 | GNUNET_assert (NULL != mq->current_envelope->mh); | ||
504 | return mq->current_envelope->mh; | ||
505 | } | ||
506 | |||
507 | |||
508 | void * | ||
509 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) | ||
510 | { | ||
511 | return mq->impl_state; | ||
512 | } | ||
513 | |||
514 | |||
515 | struct GNUNET_MQ_Envelope * | ||
516 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, | ||
517 | uint16_t size, | ||
518 | uint16_t type) | ||
519 | { | ||
520 | struct GNUNET_MQ_Envelope *ev; | ||
521 | |||
522 | ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope)); | ||
523 | ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; | ||
524 | ev->mh->size = htons (size); | ||
525 | ev->mh->type = htons (type); | ||
526 | if (NULL != mhp) | ||
527 | *mhp = ev->mh; | ||
528 | return ev; | ||
529 | } | ||
530 | |||
531 | |||
532 | struct GNUNET_MQ_Envelope * | ||
533 | GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr) | ||
534 | { | ||
535 | struct GNUNET_MQ_Envelope *mqm; | ||
536 | uint16_t size = ntohs (hdr->size); | ||
537 | |||
538 | mqm = GNUNET_malloc (sizeof(*mqm) + size); | ||
539 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | ||
540 | GNUNET_memcpy (mqm->mh, | ||
541 | hdr, | ||
542 | size); | ||
543 | return mqm; | ||
544 | } | ||
545 | |||
546 | |||
547 | struct GNUNET_MQ_Envelope * | ||
548 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | ||
549 | uint16_t base_size, | ||
550 | uint16_t type, | ||
551 | const struct GNUNET_MessageHeader *nested_mh) | ||
552 | { | ||
553 | struct GNUNET_MQ_Envelope *mqm; | ||
554 | uint16_t size; | ||
555 | |||
556 | if (NULL == nested_mh) | ||
557 | return GNUNET_MQ_msg_ (mhp, | ||
558 | base_size, | ||
559 | type); | ||
560 | size = base_size + ntohs (nested_mh->size); | ||
561 | /* check for uint16_t overflow */ | ||
562 | if (size < base_size) | ||
563 | return NULL; | ||
564 | mqm = GNUNET_MQ_msg_ (mhp, | ||
565 | size, | ||
566 | type); | ||
567 | GNUNET_memcpy ((char *) mqm->mh + base_size, | ||
568 | nested_mh, | ||
569 | ntohs (nested_mh->size)); | ||
570 | return mqm; | ||
571 | } | ||
572 | |||
573 | |||
574 | uint32_t | ||
575 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, | ||
576 | void *assoc_data) | ||
577 | { | ||
578 | uint32_t id; | ||
579 | |||
580 | if (NULL == mq->assoc_map) | ||
581 | { | ||
582 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); | ||
583 | mq->assoc_id = 1; | ||
584 | } | ||
585 | id = mq->assoc_id++; | ||
586 | GNUNET_assert (GNUNET_OK == | ||
587 | GNUNET_CONTAINER_multihashmap32_put ( | ||
588 | mq->assoc_map, | ||
589 | id, | ||
590 | assoc_data, | ||
591 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
592 | return id; | ||
593 | } | ||
594 | |||
595 | |||
596 | /** | ||
597 | * Get the data associated with a @a request_id in a queue | ||
598 | * | ||
599 | * @param mq the message queue with the association | ||
600 | * @param request_id the request id we are interested in | ||
601 | * @return the associated data | ||
602 | */ | ||
603 | void * | ||
604 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, | ||
605 | uint32_t request_id) | ||
606 | { | ||
607 | if (NULL == mq->assoc_map) | ||
608 | return NULL; | ||
609 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | ||
610 | request_id); | ||
611 | } | ||
612 | |||
613 | |||
614 | /** | ||
615 | * Remove the association for a @a request_id | ||
616 | * | ||
617 | * @param mq the message queue with the association | ||
618 | * @param request_id the request id we want to remove | ||
619 | * @return the associated data | ||
620 | */ | ||
621 | void * | ||
622 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, | ||
623 | uint32_t request_id) | ||
624 | { | ||
625 | void *val; | ||
626 | |||
627 | if (NULL == mq->assoc_map) | ||
628 | return NULL; | ||
629 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | ||
630 | request_id); | ||
631 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, | ||
632 | request_id); | ||
633 | return val; | ||
634 | } | ||
635 | |||
636 | |||
637 | void | ||
638 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, | ||
639 | GNUNET_SCHEDULER_TaskCallback cb, | ||
640 | void *cb_cls) | ||
641 | { | ||
642 | /* allow setting *OR* clearing callback */ | ||
643 | GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb)); | ||
644 | ev->sent_cb = cb; | ||
645 | ev->sent_cls = cb_cls; | ||
646 | } | ||
647 | |||
648 | |||
649 | /** | ||
650 | * Handle we return for callbacks registered to be | ||
651 | * notified when #GNUNET_MQ_destroy() is called on a queue. | ||
652 | */ | ||
653 | struct GNUNET_MQ_DestroyNotificationHandle | ||
654 | { | ||
655 | /** | ||
656 | * Kept in a DLL. | ||
657 | */ | ||
658 | struct GNUNET_MQ_DestroyNotificationHandle *prev; | ||
659 | |||
660 | /** | ||
661 | * Kept in a DLL. | ||
662 | */ | ||
663 | struct GNUNET_MQ_DestroyNotificationHandle *next; | ||
664 | |||
665 | /** | ||
666 | * Queue to notify about. | ||
667 | */ | ||
668 | struct GNUNET_MQ_Handle *mq; | ||
669 | |||
670 | /** | ||
671 | * Function to call. | ||
672 | */ | ||
673 | GNUNET_SCHEDULER_TaskCallback cb; | ||
674 | |||
675 | /** | ||
676 | * Closure for @e cb. | ||
677 | */ | ||
678 | void *cb_cls; | ||
679 | }; | ||
680 | |||
681 | |||
682 | void | ||
683 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | ||
684 | { | ||
685 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | ||
686 | |||
687 | if (NULL != mq->destroy_impl) | ||
688 | { | ||
689 | mq->destroy_impl (mq, mq->impl_state); | ||
690 | } | ||
691 | if (NULL != mq->send_task) | ||
692 | { | ||
693 | GNUNET_SCHEDULER_cancel (mq->send_task); | ||
694 | mq->send_task = NULL; | ||
695 | } | ||
696 | while (NULL != mq->envelope_head) | ||
697 | { | ||
698 | struct GNUNET_MQ_Envelope *ev; | ||
699 | |||
700 | ev = mq->envelope_head; | ||
701 | ev->parent_queue = NULL; | ||
702 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); | ||
703 | GNUNET_assert (0 < mq->queue_length); | ||
704 | mq->queue_length--; | ||
705 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
706 | "MQ destroy drops message of type %u\n", | ||
707 | ntohs (ev->mh->type)); | ||
708 | GNUNET_MQ_discard (ev); | ||
709 | } | ||
710 | if (NULL != mq->current_envelope) | ||
711 | { | ||
712 | /* we can only discard envelopes that | ||
713 | * are not queued! */ | ||
714 | mq->current_envelope->parent_queue = NULL; | ||
715 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
716 | "MQ destroy drops current message of type %u\n", | ||
717 | ntohs (mq->current_envelope->mh->type)); | ||
718 | GNUNET_MQ_discard (mq->current_envelope); | ||
719 | mq->current_envelope = NULL; | ||
720 | GNUNET_assert (0 < mq->queue_length); | ||
721 | mq->queue_length--; | ||
722 | } | ||
723 | GNUNET_assert (0 == mq->queue_length); | ||
724 | while (NULL != (dnh = mq->dnh_head)) | ||
725 | { | ||
726 | dnh->cb (dnh->cb_cls); | ||
727 | GNUNET_MQ_destroy_notify_cancel (dnh); | ||
728 | } | ||
729 | if (NULL != mq->assoc_map) | ||
730 | { | ||
731 | GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); | ||
732 | mq->assoc_map = NULL; | ||
733 | } | ||
734 | GNUNET_free (mq->handlers); | ||
735 | GNUNET_free (mq); | ||
736 | } | ||
737 | |||
738 | |||
739 | const struct GNUNET_MessageHeader * | ||
740 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, | ||
741 | uint16_t base_size) | ||
742 | { | ||
743 | uint16_t whole_size; | ||
744 | uint16_t nested_size; | ||
745 | const struct GNUNET_MessageHeader *nested_msg; | ||
746 | |||
747 | whole_size = ntohs (mh->size); | ||
748 | GNUNET_assert (whole_size >= base_size); | ||
749 | nested_size = whole_size - base_size; | ||
750 | if (0 == nested_size) | ||
751 | return NULL; | ||
752 | if (nested_size < sizeof(struct GNUNET_MessageHeader)) | ||
753 | { | ||
754 | GNUNET_break_op (0); | ||
755 | return NULL; | ||
756 | } | ||
757 | nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); | ||
758 | if (ntohs (nested_msg->size) != nested_size) | ||
759 | { | ||
760 | GNUNET_break_op (0); | ||
761 | return NULL; | ||
762 | } | ||
763 | return nested_msg; | ||
764 | } | ||
765 | |||
766 | |||
767 | void | ||
768 | GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | ||
769 | { | ||
770 | struct GNUNET_MQ_Handle *mq = ev->parent_queue; | ||
771 | |||
772 | GNUNET_assert (NULL != mq); | ||
773 | GNUNET_assert (NULL != mq->cancel_impl); | ||
774 | GNUNET_assert (0 < mq->queue_length); | ||
775 | mq->queue_length--; | ||
776 | if (mq->current_envelope == ev) | ||
777 | { | ||
778 | /* complex case, we already started with transmitting | ||
779 | the message using the callbacks. */ | ||
780 | GNUNET_assert (! mq->in_flight); | ||
781 | mq->cancel_impl (mq, | ||
782 | mq->impl_state); | ||
783 | /* continue sending the next message, if any */ | ||
784 | mq->current_envelope = mq->envelope_head; | ||
785 | if (NULL != mq->current_envelope) | ||
786 | { | ||
787 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
788 | mq->envelope_tail, | ||
789 | mq->current_envelope); | ||
790 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
791 | "sending canceled message of type %u queue\n", | ||
792 | ntohs (ev->mh->type)); | ||
793 | mq->send_impl (mq, | ||
794 | mq->current_envelope->mh, | ||
795 | mq->impl_state); | ||
796 | } | ||
797 | } | ||
798 | else | ||
799 | { | ||
800 | /* simple case, message is still waiting in the queue */ | ||
801 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
802 | mq->envelope_tail, | ||
803 | ev); | ||
804 | } | ||
805 | ev->parent_queue = NULL; | ||
806 | ev->mh = NULL; | ||
807 | /* also frees ev */ | ||
808 | GNUNET_free (ev); | ||
809 | } | ||
810 | |||
811 | |||
812 | struct GNUNET_MQ_Envelope * | ||
813 | GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq) | ||
814 | { | ||
815 | return mq->current_envelope; | ||
816 | } | ||
817 | |||
818 | |||
819 | struct GNUNET_MQ_Envelope * | ||
820 | GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq) | ||
821 | { | ||
822 | if (NULL != mq->envelope_tail) | ||
823 | return mq->envelope_tail; | ||
824 | |||
825 | return mq->current_envelope; | ||
826 | } | ||
827 | |||
828 | |||
829 | void | ||
830 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | ||
831 | enum GNUNET_MQ_PriorityPreferences pp) | ||
832 | { | ||
833 | env->priority = pp; | ||
834 | env->have_custom_options = GNUNET_YES; | ||
835 | } | ||
836 | |||
837 | |||
838 | enum GNUNET_MQ_PriorityPreferences | ||
839 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env) | ||
840 | { | ||
841 | struct GNUNET_MQ_Handle *mq = env->parent_queue; | ||
842 | |||
843 | if (GNUNET_YES == env->have_custom_options) | ||
844 | return env->priority; | ||
845 | if (NULL == mq) | ||
846 | return 0; | ||
847 | return mq->priority; | ||
848 | } | ||
849 | |||
850 | |||
851 | enum GNUNET_MQ_PriorityPreferences | ||
852 | GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1, | ||
853 | enum GNUNET_MQ_PriorityPreferences p2) | ||
854 | { | ||
855 | enum GNUNET_MQ_PriorityPreferences ret; | ||
856 | |||
857 | ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK); | ||
858 | ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE)); | ||
859 | ret |= | ||
860 | ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY)); | ||
861 | ret |= | ||
862 | ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED)); | ||
863 | ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT)); | ||
864 | ret |= | ||
865 | ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER)); | ||
866 | return ret; | ||
867 | } | ||
868 | |||
869 | |||
870 | void | ||
871 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, | ||
872 | enum GNUNET_MQ_PriorityPreferences pp) | ||
873 | { | ||
874 | mq->priority = pp; | ||
875 | } | ||
876 | |||
877 | |||
878 | const struct GNUNET_MessageHeader * | ||
879 | GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) | ||
880 | { | ||
881 | return env->mh; | ||
882 | } | ||
883 | |||
884 | |||
885 | const struct GNUNET_MQ_Envelope * | ||
886 | GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) | ||
887 | { | ||
888 | return env->next; | ||
889 | } | ||
890 | |||
891 | |||
892 | struct GNUNET_MQ_DestroyNotificationHandle * | ||
893 | GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, | ||
894 | GNUNET_SCHEDULER_TaskCallback cb, | ||
895 | void *cb_cls) | ||
896 | { | ||
897 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | ||
898 | |||
899 | dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle); | ||
900 | dnh->mq = mq; | ||
901 | dnh->cb = cb; | ||
902 | dnh->cb_cls = cb_cls; | ||
903 | GNUNET_CONTAINER_DLL_insert (mq->dnh_head, | ||
904 | mq->dnh_tail, | ||
905 | dnh); | ||
906 | return dnh; | ||
907 | } | ||
908 | |||
909 | |||
910 | void | ||
911 | GNUNET_MQ_destroy_notify_cancel ( | ||
912 | struct GNUNET_MQ_DestroyNotificationHandle *dnh) | ||
913 | { | ||
914 | struct GNUNET_MQ_Handle *mq = dnh->mq; | ||
915 | |||
916 | GNUNET_CONTAINER_DLL_remove (mq->dnh_head, | ||
917 | mq->dnh_tail, | ||
918 | dnh); | ||
919 | GNUNET_free (dnh); | ||
920 | } | ||
921 | |||
922 | |||
923 | void | ||
924 | GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head, | ||
925 | struct GNUNET_MQ_Envelope **env_tail, | ||
926 | struct GNUNET_MQ_Envelope *env) | ||
927 | { | ||
928 | GNUNET_CONTAINER_DLL_insert (*env_head, | ||
929 | *env_tail, | ||
930 | env); | ||
931 | } | ||
932 | |||
933 | |||
934 | void | ||
935 | GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head, | ||
936 | struct GNUNET_MQ_Envelope **env_tail, | ||
937 | struct GNUNET_MQ_Envelope *env) | ||
938 | { | ||
939 | GNUNET_CONTAINER_DLL_insert_tail (*env_head, | ||
940 | *env_tail, | ||
941 | env); | ||
942 | } | ||
943 | |||
944 | |||
945 | void | ||
946 | GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head, | ||
947 | struct GNUNET_MQ_Envelope **env_tail, | ||
948 | struct GNUNET_MQ_Envelope *env) | ||
949 | { | ||
950 | GNUNET_CONTAINER_DLL_remove (*env_head, | ||
951 | *env_tail, | ||
952 | env); | ||
953 | } | ||
954 | |||
955 | |||
956 | struct GNUNET_MQ_MessageHandler * | ||
957 | GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | ||
958 | { | ||
959 | struct GNUNET_MQ_MessageHandler *copy; | ||
960 | unsigned int count; | ||
961 | |||
962 | if (NULL == handlers) | ||
963 | return NULL; | ||
964 | count = GNUNET_MQ_count_handlers (handlers); | ||
965 | copy = GNUNET_new_array (count + 1, | ||
966 | struct GNUNET_MQ_MessageHandler); | ||
967 | GNUNET_memcpy (copy, | ||
968 | handlers, | ||
969 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | ||
970 | return copy; | ||
971 | } | ||
972 | |||
973 | |||
974 | struct GNUNET_MQ_MessageHandler * | ||
975 | GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, | ||
976 | GNUNET_MQ_MessageCallback agpl_handler, | ||
977 | void *agpl_cls) | ||
978 | { | ||
979 | struct GNUNET_MQ_MessageHandler *copy; | ||
980 | unsigned int count; | ||
981 | |||
982 | if (NULL == handlers) | ||
983 | return NULL; | ||
984 | count = GNUNET_MQ_count_handlers (handlers); | ||
985 | copy = GNUNET_new_array (count + 2, | ||
986 | struct GNUNET_MQ_MessageHandler); | ||
987 | GNUNET_memcpy (copy, | ||
988 | handlers, | ||
989 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | ||
990 | copy[count].mv = NULL; | ||
991 | copy[count].cb = agpl_handler; | ||
992 | copy[count].cls = agpl_cls; | ||
993 | copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL; | ||
994 | copy[count].expected_size = sizeof(struct GNUNET_MessageHeader); | ||
995 | return copy; | ||
996 | } | ||
997 | |||
998 | |||
999 | unsigned int | ||
1000 | GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | ||
1001 | { | ||
1002 | unsigned int i; | ||
1003 | |||
1004 | if (NULL == handlers) | ||
1005 | return 0; | ||
1006 | for (i = 0; NULL != handlers[i].cb; i++) | ||
1007 | ; | ||
1008 | return i; | ||
1009 | } | ||
1010 | |||
1011 | |||
1012 | const char * | ||
1013 | GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) | ||
1014 | { | ||
1015 | switch (type) | ||
1016 | { | ||
1017 | case GNUNET_MQ_PREFERENCE_NONE: | ||
1018 | return "NONE"; | ||
1019 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: | ||
1020 | return "BANDWIDTH"; | ||
1021 | case GNUNET_MQ_PREFERENCE_LATENCY: | ||
1022 | return "LATENCY"; | ||
1023 | case GNUNET_MQ_PREFERENCE_RELIABILITY: | ||
1024 | return "RELIABILITY"; | ||
1025 | } | ||
1026 | return NULL; | ||
1027 | } | ||
1028 | |||
1029 | |||
1030 | /* end of mq.c */ | ||