diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-05-04 22:16:24 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-05-04 22:16:24 +0200 |
commit | 80f480c752fd8dfa1be51d78fce314d9f0650b50 (patch) | |
tree | 459c21a3fde3b5f66194ac9960632de53586c5f0 /src/util/mq.c | |
parent | 3d2a951fa12546c09809f0a4d7e789ef8e971b03 (diff) | |
download | gnunet-80f480c752fd8dfa1be51d78fce314d9f0650b50.tar.gz gnunet-80f480c752fd8dfa1be51d78fce314d9f0650b50.zip |
simplify MQ logic to always carry the same kinds of flags, and extend transport API to pass them to (TNG) service
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 277 |
1 files changed, 115 insertions, 162 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index 513c008ee..2f9e650b6 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -26,7 +26,7 @@ | |||
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
28 | 28 | ||
29 | #define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__) | 29 | #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__) |
30 | 30 | ||
31 | 31 | ||
32 | struct GNUNET_MQ_Envelope | 32 | struct GNUNET_MQ_Envelope |
@@ -70,14 +70,7 @@ struct GNUNET_MQ_Envelope | |||
70 | * #GNUNET_MQ_env_set_options(). Only valid if | 70 | * #GNUNET_MQ_env_set_options(). Only valid if |
71 | * @e have_custom_options is set. | 71 | * @e have_custom_options is set. |
72 | */ | 72 | */ |
73 | uint64_t flags; | 73 | enum GNUNET_MQ_PriorityPreferences priority; |
74 | |||
75 | /** | ||
76 | * Additional options buffer set for this envelope by | ||
77 | * #GNUNET_MQ_env_set_options(). Only valid if | ||
78 | * @e have_custom_options is set. | ||
79 | */ | ||
80 | const void *extra; | ||
81 | 74 | ||
82 | /** | 75 | /** |
83 | * Did the application call #GNUNET_MQ_env_set_options()? | 76 | * Did the application call #GNUNET_MQ_env_set_options()? |
@@ -165,16 +158,10 @@ struct GNUNET_MQ_Handle | |||
165 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; | 158 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; |
166 | 159 | ||
167 | /** | 160 | /** |
168 | * Additional options buffer set for this queue by | ||
169 | * #GNUNET_MQ_set_options(). Default is 0. | ||
170 | */ | ||
171 | const void *default_extra; | ||
172 | |||
173 | /** | ||
174 | * Flags that were set for this queue by | 161 | * Flags that were set for this queue by |
175 | * #GNUNET_MQ_set_options(). Default is 0. | 162 | * #GNUNET_MQ_set_options(). Default is 0. |
176 | */ | 163 | */ |
177 | uint64_t default_flags; | 164 | enum GNUNET_MQ_PriorityPreferences priority; |
178 | 165 | ||
179 | /** | 166 | /** |
180 | * Next id that should be used for the @e assoc_map, | 167 | * Next id that should be used for the @e assoc_map, |
@@ -217,12 +204,10 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, | |||
217 | { | 204 | { |
218 | int ret; | 205 | int ret; |
219 | 206 | ||
220 | ret = GNUNET_MQ_handle_message (mq->handlers, | 207 | ret = GNUNET_MQ_handle_message (mq->handlers, mh); |
221 | mh); | ||
222 | if (GNUNET_SYSERR == ret) | 208 | if (GNUNET_SYSERR == ret) |
223 | { | 209 | { |
224 | GNUNET_MQ_inject_error (mq, | 210 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED); |
225 | GNUNET_MQ_ERROR_MALFORMED); | ||
226 | return; | 211 | return; |
227 | } | 212 | } |
228 | } | 213 | } |
@@ -251,7 +236,8 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, | |||
251 | 236 | ||
252 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 237 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
253 | "Received message of type %u and size %u\n", | 238 | "Received message of type %u and size %u\n", |
254 | mtype, msize); | 239 | mtype, |
240 | msize); | ||
255 | 241 | ||
256 | if (NULL == handlers) | 242 | if (NULL == handlers) |
257 | goto done; | 243 | goto done; |
@@ -260,41 +246,40 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, | |||
260 | if (handler->type == mtype) | 246 | if (handler->type == mtype) |
261 | { | 247 | { |
262 | handled = GNUNET_YES; | 248 | handled = GNUNET_YES; |
263 | if ( (handler->expected_size > msize) || | 249 | if ((handler->expected_size > msize) || |
264 | ( (handler->expected_size != msize) && | 250 | ((handler->expected_size != msize) && (NULL == handler->mv))) |
265 | (NULL == handler->mv) ) ) | ||
266 | { | 251 | { |
267 | /* Too small, or not an exact size and | 252 | /* Too small, or not an exact size and |
268 | no 'mv' handler to check rest */ | 253 | no 'mv' handler to check rest */ |
269 | LOG (GNUNET_ERROR_TYPE_ERROR, | 254 | LOG (GNUNET_ERROR_TYPE_ERROR, |
270 | "Received malformed message of type %u\n", | 255 | "Received malformed message of type %u\n", |
271 | (unsigned int) handler->type); | 256 | (unsigned int) handler->type); |
272 | return GNUNET_SYSERR; | 257 | return GNUNET_SYSERR; |
273 | } | 258 | } |
274 | if ( (NULL == handler->mv) || | 259 | if ((NULL == handler->mv) || |
275 | (GNUNET_OK == | 260 | (GNUNET_OK == handler->mv (handler->cls, mh))) |
276 | handler->mv (handler->cls, mh)) ) | ||
277 | { | 261 | { |
278 | /* message well-formed, pass to handler */ | 262 | /* message well-formed, pass to handler */ |
279 | handler->cb (handler->cls, mh); | 263 | handler->cb (handler->cls, mh); |
280 | } | 264 | } |
281 | else | 265 | else |
282 | { | 266 | { |
283 | /* Message rejected by check routine */ | 267 | /* Message rejected by check routine */ |
284 | LOG (GNUNET_ERROR_TYPE_ERROR, | 268 | LOG (GNUNET_ERROR_TYPE_ERROR, |
285 | "Received malformed message of type %u\n", | 269 | "Received malformed message of type %u\n", |
286 | (unsigned int) handler->type); | 270 | (unsigned int) handler->type); |
287 | return GNUNET_SYSERR; | 271 | return GNUNET_SYSERR; |
288 | } | 272 | } |
289 | break; | 273 | break; |
290 | } | 274 | } |
291 | } | 275 | } |
292 | done: | 276 | done: |
293 | if (GNUNET_NO == handled) | 277 | if (GNUNET_NO == handled) |
294 | { | 278 | { |
295 | LOG (GNUNET_ERROR_TYPE_INFO, | 279 | LOG (GNUNET_ERROR_TYPE_INFO, |
296 | "No handler for message of type %u and size %u\n", | 280 | "No handler for message of type %u and size %u\n", |
297 | mtype, msize); | 281 | mtype, |
282 | msize); | ||
298 | return GNUNET_NO; | 283 | return GNUNET_NO; |
299 | } | 284 | } |
300 | return GNUNET_OK; | 285 | return GNUNET_OK; |
@@ -312,8 +297,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, | |||
312 | * @param error the error type | 297 | * @param error the error type |
313 | */ | 298 | */ |
314 | void | 299 | void |
315 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, | 300 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error) |
316 | enum GNUNET_MQ_Error error) | ||
317 | { | 301 | { |
318 | if (NULL == mq->error_handler) | 302 | if (NULL == mq->error_handler) |
319 | { | 303 | { |
@@ -322,8 +306,7 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, | |||
322 | (int) error); | 306 | (int) error); |
323 | return; | 307 | return; |
324 | } | 308 | } |
325 | mq->error_handler (mq->error_handler_cls, | 309 | mq->error_handler (mq->error_handler_cls, error); |
326 | error); | ||
327 | } | 310 | } |
328 | 311 | ||
329 | 312 | ||
@@ -367,8 +350,7 @@ GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) | |||
367 | * @param ev the envelope with the message to send. | 350 | * @param ev the envelope with the message to send. |
368 | */ | 351 | */ |
369 | void | 352 | void |
370 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | 353 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) |
371 | struct GNUNET_MQ_Envelope *ev) | ||
372 | { | 354 | { |
373 | GNUNET_assert (NULL != mq); | 355 | GNUNET_assert (NULL != mq); |
374 | GNUNET_assert (NULL == ev->parent_queue); | 356 | GNUNET_assert (NULL == ev->parent_queue); |
@@ -378,18 +360,15 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | |||
378 | { | 360 | { |
379 | /* This would seem like a bug... */ | 361 | /* This would seem like a bug... */ |
380 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 362 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
381 | "MQ with %u entries extended by message of type %u (FC broken?)\n", | 363 | "MQ with %u entries extended by message of type %u (FC broken?)\n", |
382 | (unsigned int) mq->queue_length, | 364 | (unsigned int) mq->queue_length, |
383 | (unsigned int) ntohs (ev->mh->type)); | 365 | (unsigned int) ntohs (ev->mh->type)); |
384 | } | 366 | } |
385 | ev->parent_queue = mq; | 367 | ev->parent_queue = mq; |
386 | /* is the implementation busy? queue it! */ | 368 | /* is the implementation busy? queue it! */ |
387 | if ( (NULL != mq->current_envelope) || | 369 | if ((NULL != mq->current_envelope) || (NULL != mq->send_task)) |
388 | (NULL != mq->send_task) ) | ||
389 | { | 370 | { |
390 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, | 371 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); |
391 | mq->envelope_tail, | ||
392 | ev); | ||
393 | return; | 372 | return; |
394 | } | 373 | } |
395 | GNUNET_assert (NULL == mq->envelope_head); | 374 | GNUNET_assert (NULL == mq->envelope_head); |
@@ -397,12 +376,10 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | |||
397 | 376 | ||
398 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 377 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
399 | "sending message of type %u, queue empty (MQ: %p)\n", | 378 | "sending message of type %u, queue empty (MQ: %p)\n", |
400 | ntohs(ev->mh->type), | 379 | ntohs (ev->mh->type), |
401 | mq); | 380 | mq); |
402 | 381 | ||
403 | mq->send_impl (mq, | 382 | mq->send_impl (mq, ev->mh, mq->impl_state); |
404 | ev->mh, | ||
405 | mq->impl_state); | ||
406 | } | 383 | } |
407 | 384 | ||
408 | 385 | ||
@@ -419,9 +396,7 @@ GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq) | |||
419 | struct GNUNET_MQ_Envelope *env; | 396 | struct GNUNET_MQ_Envelope *env; |
420 | 397 | ||
421 | env = mq->envelope_head; | 398 | env = mq->envelope_head; |
422 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 399 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env); |
423 | mq->envelope_tail, | ||
424 | env); | ||
425 | mq->queue_length--; | 400 | mq->queue_length--; |
426 | env->parent_queue = NULL; | 401 | env->parent_queue = NULL; |
427 | return env; | 402 | return env; |
@@ -461,16 +436,12 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, | |||
461 | uint16_t msize; | 436 | uint16_t msize; |
462 | 437 | ||
463 | msize = ntohs (ev->mh->size); | 438 | msize = ntohs (ev->mh->size); |
464 | env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) + | 439 | env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) + msize); |
465 | msize); | ||
466 | env->mh = (struct GNUNET_MessageHeader *) &env[1]; | 440 | env->mh = (struct GNUNET_MessageHeader *) &env[1]; |
467 | env->sent_cb = ev->sent_cb; | 441 | env->sent_cb = ev->sent_cb; |
468 | env->sent_cls = ev->sent_cls; | 442 | env->sent_cls = ev->sent_cls; |
469 | GNUNET_memcpy (&env[1], | 443 | GNUNET_memcpy (&env[1], ev->mh, msize); |
470 | ev->mh, | 444 | GNUNET_MQ_send (mq, env); |
471 | msize); | ||
472 | GNUNET_MQ_send (mq, | ||
473 | env); | ||
474 | } | 445 | } |
475 | 446 | ||
476 | 447 | ||
@@ -493,16 +464,14 @@ impl_send_continue (void *cls) | |||
493 | return; | 464 | return; |
494 | mq->current_envelope = mq->envelope_head; | 465 | mq->current_envelope = mq->envelope_head; |
495 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 466 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
496 | mq->envelope_tail, | 467 | mq->envelope_tail, |
497 | mq->current_envelope); | 468 | mq->current_envelope); |
498 | 469 | ||
499 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 470 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
500 | "sending message of type %u from queue\n", | 471 | "sending message of type %u from queue\n", |
501 | ntohs(mq->current_envelope->mh->type)); | 472 | ntohs (mq->current_envelope->mh->type)); |
502 | 473 | ||
503 | mq->send_impl (mq, | 474 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); |
504 | mq->current_envelope->mh, | ||
505 | mq->impl_state); | ||
506 | } | 475 | } |
507 | 476 | ||
508 | 477 | ||
@@ -526,8 +495,7 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | |||
526 | current_envelope->parent_queue = NULL; | 495 | current_envelope->parent_queue = NULL; |
527 | mq->current_envelope = NULL; | 496 | mq->current_envelope = NULL; |
528 | GNUNET_assert (NULL == mq->send_task); | 497 | GNUNET_assert (NULL == mq->send_task); |
529 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, | 498 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq); |
530 | mq); | ||
531 | if (NULL != (cb = current_envelope->sent_cb)) | 499 | if (NULL != (cb = current_envelope->sent_cb)) |
532 | { | 500 | { |
533 | current_envelope->sent_cb = NULL; | 501 | current_envelope->sent_cb = NULL; |
@@ -612,12 +580,11 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | |||
612 | * @param handlers_cls new closure to use | 580 | * @param handlers_cls new closure to use |
613 | */ | 581 | */ |
614 | void | 582 | void |
615 | GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, | 583 | GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls) |
616 | void *handlers_cls) | ||
617 | { | 584 | { |
618 | if (NULL == mq->handlers) | 585 | if (NULL == mq->handlers) |
619 | return; | 586 | return; |
620 | for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) | 587 | for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++) |
621 | mq->handlers[i].cls = handlers_cls; | 588 | mq->handlers[i].cls = handlers_cls; |
622 | } | 589 | } |
623 | 590 | ||
@@ -662,9 +629,7 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) | |||
662 | 629 | ||
663 | 630 | ||
664 | struct GNUNET_MQ_Envelope * | 631 | struct GNUNET_MQ_Envelope * |
665 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, | 632 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
666 | uint16_t size, | ||
667 | uint16_t type) | ||
668 | { | 633 | { |
669 | struct GNUNET_MQ_Envelope *ev; | 634 | struct GNUNET_MQ_Envelope *ev; |
670 | 635 | ||
@@ -692,9 +657,7 @@ GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr) | |||
692 | 657 | ||
693 | mqm = GNUNET_malloc (sizeof (*mqm) + size); | 658 | mqm = GNUNET_malloc (sizeof (*mqm) + size); |
694 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | 659 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
695 | GNUNET_memcpy (mqm->mh, | 660 | GNUNET_memcpy (mqm->mh, hdr, size); |
696 | hdr, | ||
697 | size); | ||
698 | return mqm; | 661 | return mqm; |
699 | } | 662 | } |
700 | 663 | ||
@@ -728,8 +691,8 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | |||
728 | 691 | ||
729 | mqm = GNUNET_MQ_msg_ (mhp, size, type); | 692 | mqm = GNUNET_MQ_msg_ (mhp, size, type); |
730 | GNUNET_memcpy ((char *) mqm->mh + base_size, | 693 | GNUNET_memcpy ((char *) mqm->mh + base_size, |
731 | nested_mh, | 694 | nested_mh, |
732 | ntohs (nested_mh->size)); | 695 | ntohs (nested_mh->size)); |
733 | 696 | ||
734 | return mqm; | 697 | return mqm; |
735 | } | 698 | } |
@@ -742,8 +705,7 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | |||
742 | * @param assoc_data to associate | 705 | * @param assoc_data to associate |
743 | */ | 706 | */ |
744 | uint32_t | 707 | uint32_t |
745 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, | 708 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data) |
746 | void *assoc_data) | ||
747 | { | 709 | { |
748 | uint32_t id; | 710 | uint32_t id; |
749 | 711 | ||
@@ -754,10 +716,11 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, | |||
754 | } | 716 | } |
755 | id = mq->assoc_id++; | 717 | id = mq->assoc_id++; |
756 | GNUNET_assert (GNUNET_OK == | 718 | GNUNET_assert (GNUNET_OK == |
757 | GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, | 719 | GNUNET_CONTAINER_multihashmap32_put ( |
758 | id, | 720 | mq->assoc_map, |
759 | assoc_data, | 721 | id, |
760 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 722 | assoc_data, |
723 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
761 | return id; | 724 | return id; |
762 | } | 725 | } |
763 | 726 | ||
@@ -770,13 +733,11 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, | |||
770 | * @return the associated data | 733 | * @return the associated data |
771 | */ | 734 | */ |
772 | void * | 735 | void * |
773 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, | 736 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
774 | uint32_t request_id) | ||
775 | { | 737 | { |
776 | if (NULL == mq->assoc_map) | 738 | if (NULL == mq->assoc_map) |
777 | return NULL; | 739 | return NULL; |
778 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | 740 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); |
779 | request_id); | ||
780 | } | 741 | } |
781 | 742 | ||
782 | 743 | ||
@@ -788,17 +749,14 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, | |||
788 | * @return the associated data | 749 | * @return the associated data |
789 | */ | 750 | */ |
790 | void * | 751 | void * |
791 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, | 752 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
792 | uint32_t request_id) | ||
793 | { | 753 | { |
794 | void *val; | 754 | void *val; |
795 | 755 | ||
796 | if (NULL == mq->assoc_map) | 756 | if (NULL == mq->assoc_map) |
797 | return NULL; | 757 | return NULL; |
798 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, | 758 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); |
799 | request_id); | 759 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id); |
800 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, | ||
801 | request_id); | ||
802 | return val; | 760 | return val; |
803 | } | 761 | } |
804 | 762 | ||
@@ -818,8 +776,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, | |||
818 | void *cb_cls) | 776 | void *cb_cls) |
819 | { | 777 | { |
820 | /* allow setting *OR* clearing callback */ | 778 | /* allow setting *OR* clearing callback */ |
821 | GNUNET_assert ( (NULL == ev->sent_cb) || | 779 | GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb)); |
822 | (NULL == cb) ); | ||
823 | ev->sent_cb = cb; | 780 | ev->sent_cb = cb; |
824 | ev->sent_cls = cb_cls; | 781 | ev->sent_cls = cb_cls; |
825 | } | 782 | } |
@@ -883,9 +840,7 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | |||
883 | 840 | ||
884 | ev = mq->envelope_head; | 841 | ev = mq->envelope_head; |
885 | ev->parent_queue = NULL; | 842 | ev->parent_queue = NULL; |
886 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 843 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); |
887 | mq->envelope_tail, | ||
888 | ev); | ||
889 | GNUNET_assert (0 < mq->queue_length); | 844 | GNUNET_assert (0 < mq->queue_length); |
890 | mq->queue_length--; | 845 | mq->queue_length--; |
891 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 846 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -974,8 +929,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
974 | GNUNET_assert (GNUNET_NO == mq->in_flight); | 929 | GNUNET_assert (GNUNET_NO == mq->in_flight); |
975 | GNUNET_assert (0 < mq->queue_length); | 930 | GNUNET_assert (0 < mq->queue_length); |
976 | mq->queue_length--; | 931 | mq->queue_length--; |
977 | mq->cancel_impl (mq, | 932 | mq->cancel_impl (mq, mq->impl_state); |
978 | mq->impl_state); | ||
979 | /* continue sending the next message, if any */ | 933 | /* continue sending the next message, if any */ |
980 | mq->current_envelope = mq->envelope_head; | 934 | mq->current_envelope = mq->envelope_head; |
981 | if (NULL != mq->current_envelope) | 935 | if (NULL != mq->current_envelope) |
@@ -986,19 +940,15 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
986 | 940 | ||
987 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 941 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
988 | "sending canceled message of type %u queue\n", | 942 | "sending canceled message of type %u queue\n", |
989 | ntohs(ev->mh->type)); | 943 | ntohs (ev->mh->type)); |
990 | 944 | ||
991 | mq->send_impl (mq, | 945 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); |
992 | mq->current_envelope->mh, | ||
993 | mq->impl_state); | ||
994 | } | 946 | } |
995 | } | 947 | } |
996 | else | 948 | else |
997 | { | 949 | { |
998 | /* simple case, message is still waiting in the queue */ | 950 | /* simple case, message is still waiting in the queue */ |
999 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 951 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); |
1000 | mq->envelope_tail, | ||
1001 | ev); | ||
1002 | GNUNET_assert (0 < mq->queue_length); | 952 | GNUNET_assert (0 < mq->queue_length); |
1003 | mq->queue_length--; | 953 | mq->queue_length--; |
1004 | } | 954 | } |
@@ -1044,21 +994,18 @@ GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq) | |||
1044 | 994 | ||
1045 | 995 | ||
1046 | /** | 996 | /** |
1047 | * Set application-specific options for this envelope. | 997 | * Set application-specific preferences for this envelope. |
1048 | * Overrides the options set for the queue with | 998 | * Overrides the options set for the queue with |
1049 | * #GNUNET_MQ_set_options() for this message only. | 999 | * #GNUNET_MQ_set_options() for this message only. |
1050 | * | 1000 | * |
1051 | * @param env message to set options for | 1001 | * @param env message to set options for |
1052 | * @param flags flags to use (meaning is queue-specific) | 1002 | * @param pp priorities and preferences to apply |
1053 | * @param extra additional buffer for further data (also queue-specific) | ||
1054 | */ | 1003 | */ |
1055 | void | 1004 | void |
1056 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | 1005 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, |
1057 | uint64_t flags, | 1006 | enum GNUNET_MQ_PriorityPreferences pp) |
1058 | const void *extra) | ||
1059 | { | 1007 | { |
1060 | env->flags = flags; | 1008 | env->priority = pp; |
1061 | env->extra = extra; | ||
1062 | env->have_custom_options = GNUNET_YES; | 1009 | env->have_custom_options = GNUNET_YES; |
1063 | } | 1010 | } |
1064 | 1011 | ||
@@ -1067,44 +1014,57 @@ GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | |||
1067 | * Get application-specific options for this envelope. | 1014 | * Get application-specific options for this envelope. |
1068 | * | 1015 | * |
1069 | * @param env message to set options for | 1016 | * @param env message to set options for |
1070 | * @param[out] flags set to flags to use (meaning is queue-specific) | 1017 | * @return priorities and preferences to apply for @a env |
1071 | * @return extra additional buffer for further data (also queue-specific) | ||
1072 | */ | 1018 | */ |
1073 | const void * | 1019 | enum GNUNET_MQ_PriorityPreferences |
1074 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env, | 1020 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env) |
1075 | uint64_t *flags) | ||
1076 | { | 1021 | { |
1077 | struct GNUNET_MQ_Handle *mq = env->parent_queue; | 1022 | struct GNUNET_MQ_Handle *mq = env->parent_queue; |
1078 | 1023 | ||
1079 | if (GNUNET_YES == env->have_custom_options) | 1024 | if (GNUNET_YES == env->have_custom_options) |
1080 | { | 1025 | return env->priority; |
1081 | *flags = env->flags; | ||
1082 | return env->extra; | ||
1083 | } | ||
1084 | if (NULL == mq) | 1026 | if (NULL == mq) |
1085 | { | 1027 | return 0; |
1086 | *flags = 0; | 1028 | return mq->priority; |
1087 | return NULL; | 1029 | } |
1088 | } | 1030 | |
1089 | *flags = mq->default_flags; | 1031 | |
1090 | return mq->default_extra; | 1032 | /** |
1033 | * Combine performance preferences set for different | ||
1034 | * envelopes that are being combined into one larger envelope. | ||
1035 | * | ||
1036 | * @param p1 one set of preferences | ||
1037 | * @param p2 second set of preferences | ||
1038 | * @return combined priority and preferences to use | ||
1039 | */ | ||
1040 | enum GNUNET_MQ_PriorityPreferences | ||
1041 | GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1, | ||
1042 | enum GNUNET_MQ_PriorityPreferences p2) | ||
1043 | { | ||
1044 | enum GNUNET_MQ_PriorityPreferences ret; | ||
1045 | |||
1046 | ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK); | ||
1047 | ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE)); | ||
1048 | ret |= | ||
1049 | ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY)); | ||
1050 | ret |= | ||
1051 | ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED)); | ||
1052 | ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT)); | ||
1053 | return ret; | ||
1091 | } | 1054 | } |
1092 | 1055 | ||
1093 | 1056 | ||
1094 | /** | 1057 | /** |
1095 | * Set application-specific options for this queue. | 1058 | * Set application-specific default options for this queue. |
1096 | * | 1059 | * |
1097 | * @param mq message queue to set options for | 1060 | * @param mq message queue to set options for |
1098 | * @param flags flags to use (meaning is queue-specific) | 1061 | * @param pp priorities and preferences to apply |
1099 | * @param extra additional buffer for further data (also queue-specific) | ||
1100 | */ | 1062 | */ |
1101 | void | 1063 | void |
1102 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, | 1064 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, |
1103 | uint64_t flags, | 1065 | enum GNUNET_MQ_PriorityPreferences pp) |
1104 | const void *extra) | ||
1105 | { | 1066 | { |
1106 | mq->default_flags = flags; | 1067 | mq->priority = pp; |
1107 | mq->default_extra = extra; | ||
1108 | } | 1068 | } |
1109 | 1069 | ||
1110 | 1070 | ||
@@ -1145,8 +1105,8 @@ GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env) | |||
1145 | */ | 1105 | */ |
1146 | struct GNUNET_MQ_DestroyNotificationHandle * | 1106 | struct GNUNET_MQ_DestroyNotificationHandle * |
1147 | GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, | 1107 | GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, |
1148 | GNUNET_SCHEDULER_TaskCallback cb, | 1108 | GNUNET_SCHEDULER_TaskCallback cb, |
1149 | void *cb_cls) | 1109 | void *cb_cls) |
1150 | { | 1110 | { |
1151 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; | 1111 | struct GNUNET_MQ_DestroyNotificationHandle *dnh; |
1152 | 1112 | ||
@@ -1154,9 +1114,7 @@ GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, | |||
1154 | dnh->mq = mq; | 1114 | dnh->mq = mq; |
1155 | dnh->cb = cb; | 1115 | dnh->cb = cb; |
1156 | dnh->cb_cls = cb_cls; | 1116 | dnh->cb_cls = cb_cls; |
1157 | GNUNET_CONTAINER_DLL_insert (mq->dnh_head, | 1117 | GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh); |
1158 | mq->dnh_tail, | ||
1159 | dnh); | ||
1160 | return dnh; | 1118 | return dnh; |
1161 | } | 1119 | } |
1162 | 1120 | ||
@@ -1167,13 +1125,12 @@ GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, | |||
1167 | * @param dnh handle for registration to cancel | 1125 | * @param dnh handle for registration to cancel |
1168 | */ | 1126 | */ |
1169 | void | 1127 | void |
1170 | GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh) | 1128 | GNUNET_MQ_destroy_notify_cancel ( |
1129 | struct GNUNET_MQ_DestroyNotificationHandle *dnh) | ||
1171 | { | 1130 | { |
1172 | struct GNUNET_MQ_Handle *mq = dnh->mq; | 1131 | struct GNUNET_MQ_Handle *mq = dnh->mq; |
1173 | 1132 | ||
1174 | GNUNET_CONTAINER_DLL_remove (mq->dnh_head, | 1133 | GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh); |
1175 | mq->dnh_tail, | ||
1176 | dnh); | ||
1177 | GNUNET_free (dnh); | 1134 | GNUNET_free (dnh); |
1178 | } | 1135 | } |
1179 | 1136 | ||
@@ -1195,9 +1152,7 @@ GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head, | |||
1195 | struct GNUNET_MQ_Envelope **env_tail, | 1152 | struct GNUNET_MQ_Envelope **env_tail, |
1196 | struct GNUNET_MQ_Envelope *env) | 1153 | struct GNUNET_MQ_Envelope *env) |
1197 | { | 1154 | { |
1198 | GNUNET_CONTAINER_DLL_insert_tail (*env_head, | 1155 | GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env); |
1199 | *env_tail, | ||
1200 | env); | ||
1201 | } | 1156 | } |
1202 | 1157 | ||
1203 | 1158 | ||
@@ -1218,9 +1173,7 @@ GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head, | |||
1218 | struct GNUNET_MQ_Envelope **env_tail, | 1173 | struct GNUNET_MQ_Envelope **env_tail, |
1219 | struct GNUNET_MQ_Envelope *env) | 1174 | struct GNUNET_MQ_Envelope *env) |
1220 | { | 1175 | { |
1221 | GNUNET_CONTAINER_DLL_remove (*env_head, | 1176 | GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env); |
1222 | *env_tail, | ||
1223 | env); | ||
1224 | } | 1177 | } |
1225 | 1178 | ||
1226 | 1179 | ||
@@ -1244,8 +1197,7 @@ GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | |||
1244 | return NULL; | 1197 | return NULL; |
1245 | 1198 | ||
1246 | count = GNUNET_MQ_count_handlers (handlers); | 1199 | count = GNUNET_MQ_count_handlers (handlers); |
1247 | copy = GNUNET_new_array (count + 1, | 1200 | copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler); |
1248 | struct GNUNET_MQ_MessageHandler); | ||
1249 | GNUNET_memcpy (copy, | 1201 | GNUNET_memcpy (copy, |
1250 | handlers, | 1202 | handlers, |
1251 | count * sizeof (struct GNUNET_MQ_MessageHandler)); | 1203 | count * sizeof (struct GNUNET_MQ_MessageHandler)); |
@@ -1276,8 +1228,7 @@ GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers, | |||
1276 | if (NULL == handlers) | 1228 | if (NULL == handlers) |
1277 | return NULL; | 1229 | return NULL; |
1278 | count = GNUNET_MQ_count_handlers (handlers); | 1230 | count = GNUNET_MQ_count_handlers (handlers); |
1279 | copy = GNUNET_new_array (count + 2, | 1231 | copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler); |
1280 | struct GNUNET_MQ_MessageHandler); | ||
1281 | GNUNET_memcpy (copy, | 1232 | GNUNET_memcpy (copy, |
1282 | handlers, | 1233 | handlers, |
1283 | count * sizeof (struct GNUNET_MQ_MessageHandler)); | 1234 | count * sizeof (struct GNUNET_MQ_MessageHandler)); |
@@ -1304,7 +1255,8 @@ GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | |||
1304 | if (NULL == handlers) | 1255 | if (NULL == handlers) |
1305 | return 0; | 1256 | return 0; |
1306 | 1257 | ||
1307 | for (i=0; NULL != handlers[i].cb; i++) ; | 1258 | for (i = 0; NULL != handlers[i].cb; i++) |
1259 | ; | ||
1308 | 1260 | ||
1309 | return i; | 1261 | return i; |
1310 | } | 1262 | } |
@@ -1319,7 +1271,8 @@ GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers) | |||
1319 | const char * | 1271 | const char * |
1320 | GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) | 1272 | GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) |
1321 | { | 1273 | { |
1322 | switch (type) { | 1274 | switch (type) |
1275 | { | ||
1323 | case GNUNET_MQ_PREFERENCE_NONE: | 1276 | case GNUNET_MQ_PREFERENCE_NONE: |
1324 | return "NONE"; | 1277 | return "NONE"; |
1325 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: | 1278 | case GNUNET_MQ_PREFERENCE_BANDWIDTH: |