diff options
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 1032 |
1 files changed, 0 insertions, 1032 deletions
diff --git a/src/util/mq.c b/src/util/mq.c deleted file mode 100644 index 8749d5d21..000000000 --- a/src/util/mq.c +++ /dev/null | |||
@@ -1,1032 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2012-2019 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @author Florian Dold | ||
23 | * @file util/mq.c | ||
24 | * @brief general purpose request queue | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | |||
29 | #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__) | ||
30 | |||
31 | |||
32 | struct GNUNET_MQ_Envelope | ||
33 | { | ||
34 | /** | ||
35 | * Messages are stored in a linked list. | ||
36 | * Each queue has its own list of envelopes. | ||
37 | */ | ||
38 | struct GNUNET_MQ_Envelope *next; | ||
39 | |||
40 | /** | ||
41 | * Messages are stored in a linked list | ||
42 | * Each queue has its own list of envelopes. | ||
43 | */ | ||
44 | struct GNUNET_MQ_Envelope *prev; | ||
45 | |||
46 | /** | ||
47 | * Actual allocated message header. | ||
48 | * The GNUNET_MQ_Envelope header is allocated at | ||
49 | * the end of the message. | ||
50 | */ | ||
51 | struct GNUNET_MessageHeader *mh; | ||
52 | |||
53 | /** | ||
54 | * Queue the message is queued in, NULL if message is not queued. | ||
55 | */ | ||
56 | struct GNUNET_MQ_Handle *parent_queue; | ||
57 | |||
58 | /** | ||
59 | * Called after the message was sent irrevocably. | ||
60 | */ | ||
61 | GNUNET_SCHEDULER_TaskCallback sent_cb; | ||
62 | |||
63 | /** | ||
64 | * Closure for @e send_cb | ||
65 | */ | ||
66 | void *sent_cls; | ||
67 | |||
68 | /** | ||
69 | * Flags that were set for this envelope by | ||
70 | * #GNUNET_MQ_env_set_options(). Only valid if | ||
71 | * @e have_custom_options is set. | ||
72 | */ | ||
73 | enum GNUNET_MQ_PriorityPreferences priority; | ||
74 | |||
75 | /** | ||
76 | * Did the application call #GNUNET_MQ_env_set_options()? | ||
77 | */ | ||
78 | int have_custom_options; | ||
79 | }; | ||
80 | |||
81 | |||
82 | /** | ||
83 | * Handle to a message queue. | ||
84 | */ | ||
85 | struct GNUNET_MQ_Handle | ||
86 | { | ||
87 | /** | ||
88 | * Handlers array, or NULL if the queue should not receive messages | ||
89 | */ | ||
90 | struct GNUNET_MQ_MessageHandler *handlers; | ||
91 | |||
92 | /** | ||
93 | * Actual implementation of message sending, | ||
94 | * called when a message is added | ||
95 | */ | ||
96 | GNUNET_MQ_SendImpl send_impl; | ||
97 | |||
98 | /** | ||
99 | * Implementation-dependent queue destruction function | ||
100 | */ | ||
101 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
102 | |||
103 | /** | ||
104 | * Implementation-dependent send cancel function | ||
105 | */ | ||
106 | GNUNET_MQ_CancelImpl cancel_impl; | ||
107 | |||
108 | /** | ||
109 | * Implementation-specific state | ||
110 | */ | ||
111 | void *impl_state; | ||
112 | |||
113 | /** | ||
114 | * Callback will be called when an error occurs. | ||
115 | */ | ||
116 | GNUNET_MQ_ErrorHandler error_handler; | ||
117 | |||
118 | /** | ||
119 | * Closure for the error handler. | ||
120 | */ | ||
121 | void *error_handler_cls; | ||
122 | |||
123 | /** | ||
124 | * Task to asynchronously run #impl_send_continue(). | ||
125 | */ | ||
126 | struct GNUNET_SCHEDULER_Task *send_task; | ||
127 | |||
128 | /** | ||
129 | * Linked list of messages pending to be sent | ||
130 | */ | ||
131 | struct GNUNET_MQ_Envelope *envelope_head; | ||
132 | |||
133 | /** | ||
134 | * Linked list of messages pending to be sent | ||
135 | */ | ||
136 | struct GNUNET_MQ_Envelope *envelope_tail; | ||
137 | |||
138 | /** | ||
139 | * Message that is currently scheduled to be | ||
140 | * sent. Not the head of the message queue, as the implementation | ||
141 | * needs to know if sending has been already scheduled or not. | ||
142 | */ | ||
143 | struct GNUNET_MQ_Envelope *current_envelope; | ||
144 | |||
145 | /** | ||
146 | * Map of associations, lazily allocated | ||
147 | */ | ||
148 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
149 | |||
150 | /** | ||
151 | * Functions to call on queue destruction; kept in a DLL. | ||
152 | */ | ||
153 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; | ||
154 | |||
155 | /** | ||
156 | * Functions to call on queue destruction; kept in a DLL. | ||
157 | */ | ||
158 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; | ||
159 | |||
160 | /** | ||
161 | * Flags that were set for this queue by | ||
162 | * #GNUNET_MQ_set_options(). Default is 0. | ||
163 | */ | ||
164 | enum GNUNET_MQ_PriorityPreferences priority; | ||
165 | |||
166 | /** | ||
167 | * Next id that should be used for the @e assoc_map, | ||
168 | * initialized lazily to a random value together with | ||
169 | * @e assoc_map | ||
170 | */ | ||
171 | uint32_t assoc_id; | ||
172 | |||
173 | /** | ||
174 | * Number of entries we have in the envelope-DLL. | ||
175 | */ | ||
176 | unsigned int queue_length; | ||
177 | |||
178 | /** | ||
179 | * True if GNUNET_MQ_impl_send_in_flight() was called. | ||
180 | */ | ||
181 | bool in_flight; | ||
182 | }; | ||
183 | |||
184 | |||
185 | void | ||
186 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, | ||
187 | const struct GNUNET_MessageHeader *mh) | ||
188 | { | ||
189 | enum GNUNET_GenericReturnValue ret; | ||
190 | |||
191 | ret = GNUNET_MQ_handle_message (mq->handlers, | ||
192 | mh); | ||
193 | if (GNUNET_SYSERR == ret) | ||
194 | { | ||
195 | GNUNET_MQ_inject_error (mq, | ||
196 | GNUNET_MQ_ERROR_MALFORMED); | ||
197 | return; | ||
198 | } | ||
199 | } | ||
200 | |||
201 | |||
202 | enum GNUNET_GenericReturnValue | ||
203 | GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, | ||
204 | const struct GNUNET_MessageHeader *mh) | ||
205 | { | ||
206 | bool handled = false; | ||
207 | uint16_t msize = ntohs (mh->size); | ||
208 | uint16_t mtype = ntohs (mh->type); | ||
209 | |||
210 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
211 | "Received message of type %u and size %u\n", | ||
212 | mtype, | ||
213 | msize); | ||
214 | if (NULL == handlers) | ||
215 | goto done; | ||
216 | for (const struct GNUNET_MQ_MessageHandler *handler = handlers; | ||
217 | NULL != handler->cb; | ||
218 | handler++) | ||
219 | { | ||
220 | if (handler->type == mtype) | ||
221 | { | ||
222 | handled = true; | ||
223 | if ( (handler->expected_size > msize) || | ||
224 | ( (handler->expected_size != msize) && | ||
225 | (NULL == handler->mv) ) ) | ||
226 | { | ||
227 | /* Too small, or not an exact size and | ||
228 | no 'mv' handler to check rest */ | ||
229 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
230 | "Received malformed message of type %u\n", | ||
231 | (unsigned int) handler->type); | ||
232 | return GNUNET_SYSERR; | ||
233 | } | ||
234 | if ( (NULL == handler->mv) || | ||
235 | (GNUNET_OK == | ||
236 | handler->mv (handler->cls, | ||
237 | mh)) ) | ||
238 | { | ||
239 | /* message well-formed, pass to handler */ | ||
240 | handler->cb (handler->cls, mh); | ||
241 | } | ||
242 | else | ||
243 | { | ||
244 | /* Message rejected by check routine */ | ||
245 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
246 | "Received malformed message of type %u\n", | ||
247 | (unsigned int) handler->type); | ||
248 | return GNUNET_SYSERR; | ||
249 | } | ||
250 | break; | ||
251 | } | ||
252 | } | ||
253 | done: | ||
254 | if (! handled) | ||
255 | { | ||
256 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
257 | "No handler for message of type %u and size %u\n", | ||
258 | mtype, | ||
259 | msize); | ||
260 | return GNUNET_NO; | ||
261 | } | ||
262 | return GNUNET_OK; | ||
263 | } | ||
264 | |||
265 | |||
266 | void | ||
267 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, | ||
268 | enum GNUNET_MQ_Error error) | ||
269 | { | ||
270 | if (NULL == mq->error_handler) | ||
271 | { | ||
272 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
273 | "Got error %d, but no handler installed\n", | ||
274 | (int) error); | ||
275 | return; | ||
276 | } | ||
277 | mq->error_handler (mq->error_handler_cls, | ||
278 | error); | ||
279 | } | ||
280 | |||
281 | |||
282 | void | ||
283 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) | ||
284 | { | ||
285 | GNUNET_assert (NULL == ev->parent_queue); | ||
286 | GNUNET_free (ev); | ||
287 | } | ||
288 | |||
289 | |||
290 | unsigned int | ||
291 | GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) | ||
292 | { | ||
293 | if (! mq->in_flight) | ||
294 | { | ||
295 | return mq->queue_length; | ||
296 | } | ||
297 | return mq->queue_length - 1; | ||
298 | } | ||
299 | |||
300 | |||
301 | void | ||
302 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | ||
303 | struct GNUNET_MQ_Envelope *ev) | ||
304 | { | ||
305 | if (NULL == mq) | ||
306 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
307 | "mq is NUll when sending message of type %u\n", | ||
308 | (unsigned int) ntohs (ev->mh->type)); | ||
309 | GNUNET_assert (NULL != mq); | ||
310 | GNUNET_assert (NULL == ev->parent_queue); | ||
311 | |||
312 | mq->queue_length++; | ||
313 | if (mq->queue_length >= 10000000) | ||
314 | { | ||
315 | /* This would seem like a bug... */ | ||
316 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
317 | "MQ with %u entries extended by message of type %u (FC broken?)\n", | ||
318 | (unsigned int) mq->queue_length, | ||
319 | (unsigned int) ntohs (ev->mh->type)); | ||
320 | } | ||
321 | ev->parent_queue = mq; | ||
322 | /* is the implementation busy? queue it! */ | ||
323 | if ((NULL != mq->current_envelope) || (NULL != mq->send_task)) | ||
324 | { | ||
325 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, | ||
326 | mq->envelope_tail, | ||
327 | ev); | ||
328 | return; | ||
329 | } | ||
330 | GNUNET_assert (NULL == mq->envelope_head); | ||
331 | mq->current_envelope = ev; | ||
332 | |||
333 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
334 | "sending message of type %u and size %u, queue empty (MQ: %p)\n", | ||
335 | ntohs (ev->mh->type), | ||
336 | ntohs (ev->mh->size), | ||
337 | mq); | ||
338 | |||
339 | mq->send_impl (mq, | ||
340 | ev->mh, | ||
341 | mq->impl_state); | ||
342 | } | ||
343 | |||
344 | |||
345 | struct GNUNET_MQ_Envelope * | ||
346 | GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) | ||
347 | { | ||
348 | struct GNUNET_MQ_Envelope *env; | ||
349 | |||
350 | env = mq->envelope_head; | ||
351 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
352 | mq->envelope_tail, | ||
353 | env); | ||
354 | mq->queue_length--; | ||
355 | env->parent_queue = NULL; | ||
356 | return env; | ||
357 | } | ||
358 | |||
359 | |||
360 | struct GNUNET_MQ_Envelope * | ||
361 | GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env) | ||
362 | { | ||
363 | GNUNET_assert (NULL == env->next); | ||
364 | GNUNET_assert (NULL == env->parent_queue); | ||
365 | GNUNET_assert (NULL == env->sent_cb); | ||
366 | GNUNET_assert (GNUNET_NO == env->have_custom_options); | ||
367 | return GNUNET_MQ_msg_copy (env->mh); | ||
368 | } | ||
369 | |||
370 | |||
371 | void | ||
372 | GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, | ||
373 | const struct GNUNET_MQ_Envelope *ev) | ||
374 | { | ||
375 | struct GNUNET_MQ_Envelope *env; | ||
376 | uint16_t msize; | ||
377 | |||
378 | msize = ntohs (ev->mh->size); | ||
379 | env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize); | ||
380 | env->mh = (struct GNUNET_MessageHeader *) &env[1]; | ||
381 | env->sent_cb = ev->sent_cb; | ||
382 | env->sent_cls = ev->sent_cls; | ||
383 | GNUNET_memcpy (&env[1], ev->mh, msize); | ||
384 | GNUNET_MQ_send (mq, env); | ||
385 | } | ||
386 | |||
387 | |||
388 | /** | ||
389 | * Task run to call the send implementation for the next queued | ||
390 | * message, if any. Only useful for implementing message queues, | ||
391 | * results in undefined behavior if not used carefully. | ||
392 | * | ||
393 | * @param cls message queue to send the next message with | ||
394 | */ | ||
395 | static void | ||
396 | impl_send_continue (void *cls) | ||
397 | { | ||
398 | struct GNUNET_MQ_Handle *mq = cls; | ||
399 | |||
400 | mq->send_task = NULL; | ||
401 | /* call is only valid if we're actually currently sending | ||
402 | * a message */ | ||
403 | if (NULL == mq->envelope_head) | ||
404 | return; | ||
405 | mq->current_envelope = mq->envelope_head; | ||
406 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
407 | mq->envelope_tail, | ||
408 | mq->current_envelope); | ||
409 | |||
410 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
411 | "sending message of type %u and size %u from queue (MQ: %p)\n", | ||
412 | ntohs (mq->current_envelope->mh->type), | ||
413 | ntohs (mq->current_envelope->mh->size), | ||
414 | mq); | ||
415 | |||
416 | mq->send_impl (mq, | ||
417 | mq->current_envelope->mh, | ||
418 | mq->impl_state); | ||
419 | } | ||
420 | |||
421 | |||
422 | void | ||
423 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | ||
424 | { | ||
425 | struct GNUNET_MQ_Envelope *current_envelope; | ||
426 | GNUNET_SCHEDULER_TaskCallback cb; | ||
427 | |||
428 | GNUNET_assert (0 < mq->queue_length); | ||
429 | mq->queue_length--; | ||
430 | mq->in_flight = false; | ||
431 | current_envelope = mq->current_envelope; | ||
432 | current_envelope->parent_queue = NULL; | ||
433 | mq->current_envelope = NULL; | ||
434 | GNUNET_assert (NULL == mq->send_task); | ||
435 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); | ||
436 | if (NULL != (cb = current_envelope->sent_cb)) | ||
437 | { | ||
438 | current_envelope->sent_cb = NULL; | ||
439 | cb (current_envelope->sent_cls); | ||
440 | } | ||
441 | GNUNET_free (current_envelope); | ||
442 | } | ||
443 | |||
444 | |||
445 | void | ||
446 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) | ||
447 | { | ||
448 | struct GNUNET_MQ_Envelope *current_envelope; | ||
449 | GNUNET_SCHEDULER_TaskCallback cb; | ||
450 | |||
451 | mq->in_flight = true; | ||
452 | /* call is only valid if we're actually currently sending | ||
453 | * a message */ | ||
454 | current_envelope = mq->current_envelope; | ||
455 | GNUNET_assert (NULL != current_envelope); | ||
456 | /* can't call cancel from now on anymore */ | ||
457 | current_envelope->parent_queue = NULL; | ||
458 | if (NULL != (cb = current_envelope->sent_cb)) | ||
459 | { | ||
460 | current_envelope->sent_cb = NULL; | ||
461 | cb (current_envelope->sent_cls); | ||
462 | } | ||
463 | } | ||
464 | |||
465 | |||
466 | struct GNUNET_MQ_Handle * | ||
467 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | ||
468 | GNUNET_MQ_DestroyImpl destroy, | ||
469 | GNUNET_MQ_CancelImpl cancel, | ||
470 | void *impl_state, | ||
471 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
472 | GNUNET_MQ_ErrorHandler error_handler, | ||
473 | void *error_handler_cls) | ||
474 | { | ||
475 | struct GNUNET_MQ_Handle *mq; | ||
476 | |||
477 | mq = GNUNET_new (struct GNUNET_MQ_Handle); | ||
478 | mq->send_impl = send; | ||
479 | mq->destroy_impl = destroy; | ||
480 | mq->cancel_impl = cancel; | ||
481 | mq->handlers = GNUNET_MQ_copy_handlers (handlers); | ||
482 | mq->error_handler = error_handler; | ||
483 | mq->error_handler_cls = error_handler_cls; | ||
484 | mq->impl_state = impl_state; | ||
485 | |||
486 | return mq; | ||
487 | } | ||
488 | |||
489 | |||
490 | void | ||
491 | GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, | ||
492 | void *handlers_cls) | ||
493 | { | ||
494 | if (NULL == mq->handlers) | ||
495 | return; | ||
496 | for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++) | ||
497 | mq->handlers[i].cls = handlers_cls; | ||
498 | } | ||
499 | |||
500 | |||
501 | const struct GNUNET_MessageHeader * | ||
502 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) | ||
503 | { | ||
504 | GNUNET_assert (NULL != mq->current_envelope); | ||
505 | GNUNET_assert (NULL != mq->current_envelope->mh); | ||
506 | return mq->current_envelope->mh; | ||
507 | } | ||
508 | |||
509 | |||
510 | void * | ||
511 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) | ||
512 | { | ||
513 | return mq->impl_state; | ||
514 | } | ||
515 | |||
516 | |||
517 | struct GNUNET_MQ_Envelope * | ||
518 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, | ||
519 | uint16_t size, | ||
520 | uint16_t type) | ||
521 | { | ||
522 | struct GNUNET_MQ_Envelope *ev; | ||
523 | |||
524 | ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope)); | ||
525 | ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; | ||
526 | ev->mh->size = htons (size); | ||
527 | ev->mh->type = htons (type); | ||
528 | if (NULL != mhp) | ||
529 | *mhp = ev->mh; | ||
530 | return ev; | ||
531 | } | ||
532 | |||
533 | |||
534 | struct GNUNET_MQ_Envelope * | ||
535 | GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr) | ||
536 | { | ||
537 | struct GNUNET_MQ_Envelope *mqm; | ||
538 | uint16_t size = ntohs (hdr->size); | ||
539 | |||
540 | mqm = GNUNET_malloc (sizeof(*mqm) + size); | ||
541 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | ||
542 | GNUNET_memcpy (mqm->mh, | ||
543 | hdr, | ||
544 | size); | ||
545 | return mqm; | ||
546 | } | ||
547 | |||
548 | |||
549 | struct GNUNET_MQ_Envelope * | ||
550 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | ||
551 | uint16_t base_size, | ||
552 | uint16_t type, | ||
553 | const struct GNUNET_MessageHeader *nested_mh) | ||
554 | { | ||
555 | struct GNUNET_MQ_Envelope *mqm; | ||
556 | uint16_t size; | ||
557 | |||
558 | if (NULL == nested_mh) | ||
559 | return GNUNET_MQ_msg_ (mhp, | ||
560 | base_size, | ||
561 | type); | ||
562 | size = base_size + ntohs (nested_mh->size); | ||
563 | /* check for uint16_t overflow */ | ||
564 | if (size < base_size) | ||
565 | return NULL; | ||
566 | mqm = GNUNET_MQ_msg_ (mhp, | ||
567 | size, | ||
568 | type); | ||
569 | GNUNET_memcpy ((char *) mqm->mh + base_size, | ||
570 | nested_mh, | ||
571 | ntohs (nested_mh->size)); | ||
572 | return mqm; | ||
573 | } | ||
574 | |||
575 | |||
576 | uint32_t | ||
577 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, | ||
578 | void *assoc_data) | ||
579 | { | ||
580 | uint32_t id; | ||
581 | |||
582 | if (NULL == mq->assoc_map) | ||
583 | { | ||
584 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); | ||
585 | mq->assoc_id = 1; | ||
586 | } | ||
587 | id = mq->assoc_id++; | ||
588 | GNUNET_assert (GNUNET_OK == | ||
589 | GNUNET_CONTAINER_multihashmap32_put ( | ||
590 | mq->assoc_map, | ||
591 | id, | ||
592 | assoc_data, | ||
593 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
594 | return id; | ||
595 | } | ||
596 | |||
597 | |||
598 | /** | ||
599 | * Get the data associated with a @a request_id in a queue | ||
600 | * | ||
601 | * @param mq the message queue with the association | ||
602 | * @param request_id the request id we are interested in | ||
603 | * @return the associated data | ||
604 | */ | ||
605 | void * | ||
606 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, | ||
607 | uint32_t request_id) | ||
608 | { | ||
609 | if (NULL == mq->assoc_map) | ||
610 | return NULL; | ||
611 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | ||
612 | request_id); | ||
613 | } | ||
614 | |||
615 | |||
616 | /** | ||
617 | * Remove the association for a @a request_id | ||
618 | * | ||
619 | * @param mq the message queue with the association | ||
620 | * @param request_id the request id we want to remove | ||
621 | * @return the associated data | ||
622 | */ | ||
623 | void * | ||
624 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, | ||
625 | uint32_t request_id) | ||
626 | { | ||
627 | void *val; | ||
628 | |||
629 | if (NULL == mq->assoc_map) | ||
630 | return NULL; | ||
631 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | ||
632 | request_id); | ||
633 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, | ||
634 | request_id); | ||
635 | return val; | ||
636 | } | ||
637 | |||
638 | |||
639 | void | ||
640 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, | ||
641 | GNUNET_SCHEDULER_TaskCallback cb, | ||
642 | void *cb_cls) | ||
643 | { | ||
644 | /* allow setting *OR* clearing callback */ | ||
645 | GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb)); | ||
646 | ev->sent_cb = cb; | ||
647 | ev->sent_cls = cb_cls; | ||
648 | } | ||
649 | |||
650 | |||
651 | /** | ||
652 | * Handle we return for callbacks registered to be | ||
653 | * notified when #GNUNET_MQ_destroy() is called on a queue. | ||
654 | */ | ||
655 | struct GNUNET_MQ_DestroyNotificationHandle | ||
656 | { | ||
657 | /** | ||
658 | * Kept in a DLL. | ||
659 | */ | ||
660 | struct GNUNET_MQ_DestroyNotificationHandle *prev; | ||
661 | |||
662 | /** | ||
663 | * Kept in a DLL. | ||
664 | */ | ||
665 | struct GNUNET_MQ_DestroyNotificationHandle *next; | ||
666 | |||
667 | /** | ||
668 | * Queue to notify about. | ||
669 | */ | ||
670 | struct GNUNET_MQ_Handle *mq; | ||
671 | |||
672 | /** | ||
673 | * Function to call. | ||
674 | */ | ||
675 | GNUNET_SCHEDULER_TaskCallback cb; | ||
676 | |||
677 | /** | ||
678 | * Closure for @e cb. | ||
679 | */ | ||
680 | void *cb_cls; | ||
681 | }; | ||
682 | |||
683 | |||
684 | void | ||
685 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | ||
686 | { | ||
687 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | ||
688 | |||
689 | if (NULL != mq->destroy_impl) | ||
690 | { | ||
691 | mq->destroy_impl (mq, mq->impl_state); | ||
692 | } | ||
693 | if (NULL != mq->send_task) | ||
694 | { | ||
695 | GNUNET_SCHEDULER_cancel (mq->send_task); | ||
696 | mq->send_task = NULL; | ||
697 | } | ||
698 | while (NULL != mq->envelope_head) | ||
699 | { | ||
700 | struct GNUNET_MQ_Envelope *ev; | ||
701 | |||
702 | ev = mq->envelope_head; | ||
703 | ev->parent_queue = NULL; | ||
704 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); | ||
705 | GNUNET_assert (0 < mq->queue_length); | ||
706 | mq->queue_length--; | ||
707 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
708 | "MQ destroy drops message of type %u\n", | ||
709 | ntohs (ev->mh->type)); | ||
710 | GNUNET_MQ_discard (ev); | ||
711 | } | ||
712 | if (NULL != mq->current_envelope) | ||
713 | { | ||
714 | /* we can only discard envelopes that | ||
715 | * are not queued! */ | ||
716 | mq->current_envelope->parent_queue = NULL; | ||
717 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
718 | "MQ destroy drops current message of type %u\n", | ||
719 | ntohs (mq->current_envelope->mh->type)); | ||
720 | GNUNET_MQ_discard (mq->current_envelope); | ||
721 | mq->current_envelope = NULL; | ||
722 | GNUNET_assert (0 < mq->queue_length); | ||
723 | mq->queue_length--; | ||
724 | } | ||
725 | GNUNET_assert (0 == mq->queue_length); | ||
726 | while (NULL != (dnh = mq->dnh_head)) | ||
727 | { | ||
728 | dnh->cb (dnh->cb_cls); | ||
729 | GNUNET_MQ_destroy_notify_cancel (dnh); | ||
730 | } | ||
731 | if (NULL != mq->assoc_map) | ||
732 | { | ||
733 | GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); | ||
734 | mq->assoc_map = NULL; | ||
735 | } | ||
736 | GNUNET_free (mq->handlers); | ||
737 | GNUNET_free (mq); | ||
738 | } | ||
739 | |||
740 | |||
741 | const struct GNUNET_MessageHeader * | ||
742 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, | ||
743 | uint16_t base_size) | ||
744 | { | ||
745 | uint16_t whole_size; | ||
746 | uint16_t nested_size; | ||
747 | const struct GNUNET_MessageHeader *nested_msg; | ||
748 | |||
749 | whole_size = ntohs (mh->size); | ||
750 | GNUNET_assert (whole_size >= base_size); | ||
751 | nested_size = whole_size - base_size; | ||
752 | if (0 == nested_size) | ||
753 | return NULL; | ||
754 | if (nested_size < sizeof(struct GNUNET_MessageHeader)) | ||
755 | { | ||
756 | GNUNET_break_op (0); | ||
757 | return NULL; | ||
758 | } | ||
759 | nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size); | ||
760 | if (ntohs (nested_msg->size) != nested_size) | ||
761 | { | ||
762 | GNUNET_break_op (0); | ||
763 | return NULL; | ||
764 | } | ||
765 | return nested_msg; | ||
766 | } | ||
767 | |||
768 | |||
769 | void | ||
770 | GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | ||
771 | { | ||
772 | struct GNUNET_MQ_Handle *mq = ev->parent_queue; | ||
773 | |||
774 | GNUNET_assert (NULL != mq); | ||
775 | GNUNET_assert (NULL != mq->cancel_impl); | ||
776 | GNUNET_assert (0 < mq->queue_length); | ||
777 | mq->queue_length--; | ||
778 | if (mq->current_envelope == ev) | ||
779 | { | ||
780 | /* complex case, we already started with transmitting | ||
781 | the message using the callbacks. */ | ||
782 | GNUNET_assert (! mq->in_flight); | ||
783 | mq->cancel_impl (mq, | ||
784 | mq->impl_state); | ||
785 | /* continue sending the next message, if any */ | ||
786 | mq->current_envelope = mq->envelope_head; | ||
787 | if (NULL != mq->current_envelope) | ||
788 | { | ||
789 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
790 | mq->envelope_tail, | ||
791 | mq->current_envelope); | ||
792 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
793 | "sending canceled message of type %u queue\n", | ||
794 | ntohs (ev->mh->type)); | ||
795 | mq->send_impl (mq, | ||
796 | mq->current_envelope->mh, | ||
797 | mq->impl_state); | ||
798 | } | ||
799 | } | ||
800 | else | ||
801 | { | ||
802 | /* simple case, message is still waiting in the queue */ | ||
803 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | ||
804 | mq->envelope_tail, | ||
805 | ev); | ||
806 | } | ||
807 | ev->parent_queue = NULL; | ||
808 | ev->mh = NULL; | ||
809 | /* also frees ev */ | ||
810 | GNUNET_free (ev); | ||
811 | } | ||
812 | |||
813 | |||
814 | struct GNUNET_MQ_Envelope * | ||
815 | GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq) | ||
816 | { | ||
817 | return mq->current_envelope; | ||
818 | } | ||
819 | |||
820 | |||
821 | struct GNUNET_MQ_Envelope * | ||
822 | GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq) | ||
823 | { | ||
824 | if (NULL != mq->envelope_tail) | ||
825 | return mq->envelope_tail; | ||
826 | |||
827 | return mq->current_envelope; | ||
828 | } | ||
829 | |||
830 | |||
831 | void | ||
832 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | ||
833 | enum GNUNET_MQ_PriorityPreferences pp) | ||
834 | { | ||
835 | env->priority = pp; | ||
836 | env->have_custom_options = GNUNET_YES; | ||
837 | } | ||
838 | |||
839 | |||
840 | enum GNUNET_MQ_PriorityPreferences | ||
841 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env) | ||
842 | { | ||
843 | struct GNUNET_MQ_Handle *mq = env->parent_queue; | ||
844 | |||
845 | if (GNUNET_YES == env->have_custom_options) | ||
846 | return env->priority; | ||
847 | if (NULL == mq) | ||
848 | return 0; | ||
849 | return mq->priority; | ||
850 | } | ||
851 | |||
852 | |||
853 | enum GNUNET_MQ_PriorityPreferences | ||
854 | GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1, | ||
855 | enum GNUNET_MQ_PriorityPreferences p2) | ||
856 | { | ||
857 | enum GNUNET_MQ_PriorityPreferences ret; | ||
858 | |||
859 | ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK); | ||
860 | ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE)); | ||
861 | ret |= | ||
862 | ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY)); | ||
863 | ret |= | ||
864 | ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED)); | ||
865 | ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT)); | ||
866 | ret |= | ||
867 | ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER)); | ||
868 | return ret; | ||
869 | } | ||
870 | |||
871 | |||
872 | void | ||
873 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, | ||
874 | enum GNUNET_MQ_PriorityPreferences pp) | ||
875 | { | ||
876 | mq->priority = pp; | ||
877 | } | ||
878 | |||
879 | |||
880 | const struct GNUNET_MessageHeader * | ||
881 | GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env) | ||
882 | { | ||
883 | return env->mh; | ||
884 | } | ||
885 | |||
886 | |||
887 | const struct GNUNET_MQ_Envelope * | ||
888 | GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) | ||
889 | { | ||
890 | return env->next; | ||
891 | } | ||
892 | |||
893 | |||
894 | struct GNUNET_MQ_DestroyNotificationHandle * | ||
895 | GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, | ||
896 | GNUNET_SCHEDULER_TaskCallback cb, | ||
897 | void *cb_cls) | ||
898 | { | ||
899 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | ||
900 | |||
901 | dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle); | ||
902 | dnh->mq = mq; | ||
903 | dnh->cb = cb; | ||
904 | dnh->cb_cls = cb_cls; | ||
905 | GNUNET_CONTAINER_DLL_insert (mq->dnh_head, | ||
906 | mq->dnh_tail, | ||
907 | dnh); | ||
908 | return dnh; | ||
909 | } | ||
910 | |||
911 | |||
912 | void | ||
913 | GNUNET_MQ_destroy_notify_cancel ( | ||
914 | struct GNUNET_MQ_DestroyNotificationHandle *dnh) | ||
915 | { | ||
916 | struct GNUNET_MQ_Handle *mq = dnh->mq; | ||
917 | |||
918 | GNUNET_CONTAINER_DLL_remove (mq->dnh_head, | ||
919 | mq->dnh_tail, | ||
920 | dnh); | ||
921 | GNUNET_free (dnh); | ||
922 | } | ||
923 | |||
924 | |||
925 | void | ||
926 | GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head, | ||
927 | struct GNUNET_MQ_Envelope **env_tail, | ||
928 | struct GNUNET_MQ_Envelope *env) | ||
929 | { | ||
930 | GNUNET_CONTAINER_DLL_insert (*env_head, | ||
931 | *env_tail, | ||
932 | env); | ||
933 | } | ||
934 | |||
935 | |||
936 | void | ||
937 | GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head, | ||
938 | struct GNUNET_MQ_Envelope **env_tail, | ||
939 | struct GNUNET_MQ_Envelope *env) | ||
940 | { | ||
941 | GNUNET_CONTAINER_DLL_insert_tail (*env_head, | ||
942 | *env_tail, | ||
943 | env); | ||
944 | } | ||
945 | |||
946 | |||
947 | void | ||
948 | GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head, | ||
949 | struct GNUNET_MQ_Envelope **env_tail, | ||
950 | struct GNUNET_MQ_Envelope *env) | ||
951 | { | ||
952 | GNUNET_CONTAINER_DLL_remove (*env_head, | ||
953 | *env_tail, | ||
954 | env); | ||
955 | } | ||
956 | |||
957 | |||
958 | struct GNUNET_MQ_MessageHandler * | ||
959 | GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | ||
960 | { | ||
961 | struct GNUNET_MQ_MessageHandler *copy; | ||
962 | unsigned int count; | ||
963 | |||
964 | if (NULL == handlers) | ||
965 | return NULL; | ||
966 | count = GNUNET_MQ_count_handlers (handlers); | ||
967 | copy = GNUNET_new_array (count + 1, | ||
968 | struct GNUNET_MQ_MessageHandler); | ||
969 | GNUNET_memcpy (copy, | ||
970 | handlers, | ||
971 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | ||
972 | return copy; | ||
973 | } | ||
974 | |||
975 | |||
976 | struct GNUNET_MQ_MessageHandler * | ||
977 | GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, | ||
978 | GNUNET_MQ_MessageCallback agpl_handler, | ||
979 | void *agpl_cls) | ||
980 | { | ||
981 | struct GNUNET_MQ_MessageHandler *copy; | ||
982 | unsigned int count; | ||
983 | |||
984 | if (NULL == handlers) | ||
985 | return NULL; | ||
986 | count = GNUNET_MQ_count_handlers (handlers); | ||
987 | copy = GNUNET_new_array (count + 2, | ||
988 | struct GNUNET_MQ_MessageHandler); | ||
989 | GNUNET_memcpy (copy, | ||
990 | handlers, | ||
991 | count * sizeof(struct GNUNET_MQ_MessageHandler)); | ||
992 | copy[count].mv = NULL; | ||
993 | copy[count].cb = agpl_handler; | ||
994 | copy[count].cls = agpl_cls; | ||
995 | copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL; | ||
996 | copy[count].expected_size = sizeof(struct GNUNET_MessageHeader); | ||
997 | return copy; | ||
998 | } | ||
999 | |||
1000 | |||
1001 | unsigned int | ||
1002 | GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | ||
1003 | { | ||
1004 | unsigned int i; | ||
1005 | |||
1006 | if (NULL == handlers) | ||
1007 | return 0; | ||
1008 | for (i = 0; NULL != handlers[i].cb; i++) | ||
1009 | ; | ||
1010 | return i; | ||
1011 | } | ||
1012 | |||
1013 | |||
1014 | const char * | ||
1015 | GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) | ||
1016 | { | ||
1017 | switch (type) | ||
1018 | { | ||
1019 | case GNUNET_MQ_PREFERENCE_NONE: | ||
1020 | return "NONE"; | ||
1021 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: | ||
1022 | return "BANDWIDTH"; | ||
1023 | case GNUNET_MQ_PREFERENCE_LATENCY: | ||
1024 | return "LATENCY"; | ||
1025 | case GNUNET_MQ_PREFERENCE_RELIABILITY: | ||
1026 | return "RELIABILITY"; | ||
1027 | } | ||
1028 | return NULL; | ||
1029 | } | ||
1030 | |||
1031 | |||
1032 | /* end of mq.c */ | ||