aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-05-04 22:16:24 +0200
committerChristian Grothoff <christian@grothoff.org>2019-05-04 22:16:24 +0200
commit80f480c752fd8dfa1be51d78fce314d9f0650b50 (patch)
tree459c21a3fde3b5f66194ac9960632de53586c5f0 /src/util/mq.c
parent3d2a951fa12546c09809f0a4d7e789ef8e971b03 (diff)
downloadgnunet-80f480c752fd8dfa1be51d78fce314d9f0650b50.tar.gz
gnunet-80f480c752fd8dfa1be51d78fce314d9f0650b50.zip
simplify MQ logic to always carry the same kinds of flags, and extend transport API to pass them to (TNG) service
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c277
1 files changed, 115 insertions, 162 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index 513c008ee..2f9e650b6 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -26,7 +26,7 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28 28
29#define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__) 29#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30 30
31 31
32struct GNUNET_MQ_Envelope 32struct GNUNET_MQ_Envelope
@@ -70,14 +70,7 @@ struct GNUNET_MQ_Envelope
70 * #GNUNET_MQ_env_set_options(). Only valid if 70 * #GNUNET_MQ_env_set_options(). Only valid if
71 * @e have_custom_options is set. 71 * @e have_custom_options is set.
72 */ 72 */
73 uint64_t flags; 73 enum GNUNET_MQ_PriorityPreferences priority;
74
75 /**
76 * Additional options buffer set for this envelope by
77 * #GNUNET_MQ_env_set_options(). Only valid if
78 * @e have_custom_options is set.
79 */
80 const void *extra;
81 74
82 /** 75 /**
83 * Did the application call #GNUNET_MQ_env_set_options()? 76 * Did the application call #GNUNET_MQ_env_set_options()?
@@ -165,16 +158,10 @@ struct GNUNET_MQ_Handle
165 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail; 158 struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166 159
167 /** 160 /**
168 * Additional options buffer set for this queue by
169 * #GNUNET_MQ_set_options(). Default is 0.
170 */
171 const void *default_extra;
172
173 /**
174 * Flags that were set for this queue by 161 * Flags that were set for this queue by
175 * #GNUNET_MQ_set_options(). Default is 0. 162 * #GNUNET_MQ_set_options(). Default is 0.
176 */ 163 */
177 uint64_t default_flags; 164 enum GNUNET_MQ_PriorityPreferences priority;
178 165
179 /** 166 /**
180 * Next id that should be used for the @e assoc_map, 167 * Next id that should be used for the @e assoc_map,
@@ -217,12 +204,10 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
217{ 204{
218 int ret; 205 int ret;
219 206
220 ret = GNUNET_MQ_handle_message (mq->handlers, 207 ret = GNUNET_MQ_handle_message (mq->handlers, mh);
221 mh);
222 if (GNUNET_SYSERR == ret) 208 if (GNUNET_SYSERR == ret)
223 { 209 {
224 GNUNET_MQ_inject_error (mq, 210 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
225 GNUNET_MQ_ERROR_MALFORMED);
226 return; 211 return;
227 } 212 }
228} 213}
@@ -251,7 +236,8 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
251 236
252 LOG (GNUNET_ERROR_TYPE_DEBUG, 237 LOG (GNUNET_ERROR_TYPE_DEBUG,
253 "Received message of type %u and size %u\n", 238 "Received message of type %u and size %u\n",
254 mtype, msize); 239 mtype,
240 msize);
255 241
256 if (NULL == handlers) 242 if (NULL == handlers)
257 goto done; 243 goto done;
@@ -260,41 +246,40 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
260 if (handler->type == mtype) 246 if (handler->type == mtype)
261 { 247 {
262 handled = GNUNET_YES; 248 handled = GNUNET_YES;
263 if ( (handler->expected_size > msize) || 249 if ((handler->expected_size > msize) ||
264 ( (handler->expected_size != msize) && 250 ((handler->expected_size != msize) && (NULL == handler->mv)))
265 (NULL == handler->mv) ) )
266 { 251 {
267 /* Too small, or not an exact size and 252 /* Too small, or not an exact size and
268 no 'mv' handler to check rest */ 253 no 'mv' handler to check rest */
269 LOG (GNUNET_ERROR_TYPE_ERROR, 254 LOG (GNUNET_ERROR_TYPE_ERROR,
270 "Received malformed message of type %u\n", 255 "Received malformed message of type %u\n",
271 (unsigned int) handler->type); 256 (unsigned int) handler->type);
272 return GNUNET_SYSERR; 257 return GNUNET_SYSERR;
273 } 258 }
274 if ( (NULL == handler->mv) || 259 if ((NULL == handler->mv) ||
275 (GNUNET_OK == 260 (GNUNET_OK == handler->mv (handler->cls, mh)))
276 handler->mv (handler->cls, mh)) )
277 { 261 {
278 /* message well-formed, pass to handler */ 262 /* message well-formed, pass to handler */
279 handler->cb (handler->cls, mh); 263 handler->cb (handler->cls, mh);
280 } 264 }
281 else 265 else
282 { 266 {
283 /* Message rejected by check routine */ 267 /* Message rejected by check routine */
284 LOG (GNUNET_ERROR_TYPE_ERROR, 268 LOG (GNUNET_ERROR_TYPE_ERROR,
285 "Received malformed message of type %u\n", 269 "Received malformed message of type %u\n",
286 (unsigned int) handler->type); 270 (unsigned int) handler->type);
287 return GNUNET_SYSERR; 271 return GNUNET_SYSERR;
288 } 272 }
289 break; 273 break;
290 } 274 }
291 } 275 }
292 done: 276done:
293 if (GNUNET_NO == handled) 277 if (GNUNET_NO == handled)
294 { 278 {
295 LOG (GNUNET_ERROR_TYPE_INFO, 279 LOG (GNUNET_ERROR_TYPE_INFO,
296 "No handler for message of type %u and size %u\n", 280 "No handler for message of type %u and size %u\n",
297 mtype, msize); 281 mtype,
282 msize);
298 return GNUNET_NO; 283 return GNUNET_NO;
299 } 284 }
300 return GNUNET_OK; 285 return GNUNET_OK;
@@ -312,8 +297,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
312 * @param error the error type 297 * @param error the error type
313 */ 298 */
314void 299void
315GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, 300GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
316 enum GNUNET_MQ_Error error)
317{ 301{
318 if (NULL == mq->error_handler) 302 if (NULL == mq->error_handler)
319 { 303 {
@@ -322,8 +306,7 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
322 (int) error); 306 (int) error);
323 return; 307 return;
324 } 308 }
325 mq->error_handler (mq->error_handler_cls, 309 mq->error_handler (mq->error_handler_cls, error);
326 error);
327} 310}
328 311
329 312
@@ -367,8 +350,7 @@ GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
367 * @param ev the envelope with the message to send. 350 * @param ev the envelope with the message to send.
368 */ 351 */
369void 352void
370GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, 353GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
371 struct GNUNET_MQ_Envelope *ev)
372{ 354{
373 GNUNET_assert (NULL != mq); 355 GNUNET_assert (NULL != mq);
374 GNUNET_assert (NULL == ev->parent_queue); 356 GNUNET_assert (NULL == ev->parent_queue);
@@ -378,18 +360,15 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
378 { 360 {
379 /* This would seem like a bug... */ 361 /* This would seem like a bug... */
380 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 362 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
381 "MQ with %u entries extended by message of type %u (FC broken?)\n", 363 "MQ with %u entries extended by message of type %u (FC broken?)\n",
382 (unsigned int) mq->queue_length, 364 (unsigned int) mq->queue_length,
383 (unsigned int) ntohs (ev->mh->type)); 365 (unsigned int) ntohs (ev->mh->type));
384 } 366 }
385 ev->parent_queue = mq; 367 ev->parent_queue = mq;
386 /* is the implementation busy? queue it! */ 368 /* is the implementation busy? queue it! */
387 if ( (NULL != mq->current_envelope) || 369 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
388 (NULL != mq->send_task) )
389 { 370 {
390 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, 371 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
391 mq->envelope_tail,
392 ev);
393 return; 372 return;
394 } 373 }
395 GNUNET_assert (NULL == mq->envelope_head); 374 GNUNET_assert (NULL == mq->envelope_head);
@@ -397,12 +376,10 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
397 376
398 LOG (GNUNET_ERROR_TYPE_DEBUG, 377 LOG (GNUNET_ERROR_TYPE_DEBUG,
399 "sending message of type %u, queue empty (MQ: %p)\n", 378 "sending message of type %u, queue empty (MQ: %p)\n",
400 ntohs(ev->mh->type), 379 ntohs (ev->mh->type),
401 mq); 380 mq);
402 381
403 mq->send_impl (mq, 382 mq->send_impl (mq, ev->mh, mq->impl_state);
404 ev->mh,
405 mq->impl_state);
406} 383}
407 384
408 385
@@ -419,9 +396,7 @@ GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
419 struct GNUNET_MQ_Envelope *env; 396 struct GNUNET_MQ_Envelope *env;
420 397
421 env = mq->envelope_head; 398 env = mq->envelope_head;
422 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 399 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
423 mq->envelope_tail,
424 env);
425 mq->queue_length--; 400 mq->queue_length--;
426 env->parent_queue = NULL; 401 env->parent_queue = NULL;
427 return env; 402 return env;
@@ -461,16 +436,12 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
461 uint16_t msize; 436 uint16_t msize;
462 437
463 msize = ntohs (ev->mh->size); 438 msize = ntohs (ev->mh->size);
464 env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) + 439 env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) + msize);
465 msize);
466 env->mh = (struct GNUNET_MessageHeader *) &env[1]; 440 env->mh = (struct GNUNET_MessageHeader *) &env[1];
467 env->sent_cb = ev->sent_cb; 441 env->sent_cb = ev->sent_cb;
468 env->sent_cls = ev->sent_cls; 442 env->sent_cls = ev->sent_cls;
469 GNUNET_memcpy (&env[1], 443 GNUNET_memcpy (&env[1], ev->mh, msize);
470 ev->mh, 444 GNUNET_MQ_send (mq, env);
471 msize);
472 GNUNET_MQ_send (mq,
473 env);
474} 445}
475 446
476 447
@@ -493,16 +464,14 @@ impl_send_continue (void *cls)
493 return; 464 return;
494 mq->current_envelope = mq->envelope_head; 465 mq->current_envelope = mq->envelope_head;
495 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 466 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
496 mq->envelope_tail, 467 mq->envelope_tail,
497 mq->current_envelope); 468 mq->current_envelope);
498 469
499 LOG (GNUNET_ERROR_TYPE_DEBUG, 470 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "sending message of type %u from queue\n", 471 "sending message of type %u from queue\n",
501 ntohs(mq->current_envelope->mh->type)); 472 ntohs (mq->current_envelope->mh->type));
502 473
503 mq->send_impl (mq, 474 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
504 mq->current_envelope->mh,
505 mq->impl_state);
506} 475}
507 476
508 477
@@ -526,8 +495,7 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
526 current_envelope->parent_queue = NULL; 495 current_envelope->parent_queue = NULL;
527 mq->current_envelope = NULL; 496 mq->current_envelope = NULL;
528 GNUNET_assert (NULL == mq->send_task); 497 GNUNET_assert (NULL == mq->send_task);
529 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, 498 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
530 mq);
531 if (NULL != (cb = current_envelope->sent_cb)) 499 if (NULL != (cb = current_envelope->sent_cb))
532 { 500 {
533 current_envelope->sent_cb = NULL; 501 current_envelope->sent_cb = NULL;
@@ -612,12 +580,11 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
612 * @param handlers_cls new closure to use 580 * @param handlers_cls new closure to use
613 */ 581 */
614void 582void
615GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, 583GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
616 void *handlers_cls)
617{ 584{
618 if (NULL == mq->handlers) 585 if (NULL == mq->handlers)
619 return; 586 return;
620 for (unsigned int i=0;NULL != mq->handlers[i].cb; i++) 587 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
621 mq->handlers[i].cls = handlers_cls; 588 mq->handlers[i].cls = handlers_cls;
622} 589}
623 590
@@ -662,9 +629,7 @@ GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
662 629
663 630
664struct GNUNET_MQ_Envelope * 631struct GNUNET_MQ_Envelope *
665GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, 632GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
666 uint16_t size,
667 uint16_t type)
668{ 633{
669 struct GNUNET_MQ_Envelope *ev; 634 struct GNUNET_MQ_Envelope *ev;
670 635
@@ -692,9 +657,7 @@ GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
692 657
693 mqm = GNUNET_malloc (sizeof (*mqm) + size); 658 mqm = GNUNET_malloc (sizeof (*mqm) + size);
694 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 659 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
695 GNUNET_memcpy (mqm->mh, 660 GNUNET_memcpy (mqm->mh, hdr, size);
696 hdr,
697 size);
698 return mqm; 661 return mqm;
699} 662}
700 663
@@ -728,8 +691,8 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
728 691
729 mqm = GNUNET_MQ_msg_ (mhp, size, type); 692 mqm = GNUNET_MQ_msg_ (mhp, size, type);
730 GNUNET_memcpy ((char *) mqm->mh + base_size, 693 GNUNET_memcpy ((char *) mqm->mh + base_size,
731 nested_mh, 694 nested_mh,
732 ntohs (nested_mh->size)); 695 ntohs (nested_mh->size));
733 696
734 return mqm; 697 return mqm;
735} 698}
@@ -742,8 +705,7 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
742 * @param assoc_data to associate 705 * @param assoc_data to associate
743 */ 706 */
744uint32_t 707uint32_t
745GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, 708GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
746 void *assoc_data)
747{ 709{
748 uint32_t id; 710 uint32_t id;
749 711
@@ -754,10 +716,11 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
754 } 716 }
755 id = mq->assoc_id++; 717 id = mq->assoc_id++;
756 GNUNET_assert (GNUNET_OK == 718 GNUNET_assert (GNUNET_OK ==
757 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, 719 GNUNET_CONTAINER_multihashmap32_put (
758 id, 720 mq->assoc_map,
759 assoc_data, 721 id,
760 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 722 assoc_data,
723 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
761 return id; 724 return id;
762} 725}
763 726
@@ -770,13 +733,11 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
770 * @return the associated data 733 * @return the associated data
771 */ 734 */
772void * 735void *
773GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, 736GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
774 uint32_t request_id)
775{ 737{
776 if (NULL == mq->assoc_map) 738 if (NULL == mq->assoc_map)
777 return NULL; 739 return NULL;
778 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, 740 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
779 request_id);
780} 741}
781 742
782 743
@@ -788,17 +749,14 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
788 * @return the associated data 749 * @return the associated data
789 */ 750 */
790void * 751void *
791GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, 752GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
792 uint32_t request_id)
793{ 753{
794 void *val; 754 void *val;
795 755
796 if (NULL == mq->assoc_map) 756 if (NULL == mq->assoc_map)
797 return NULL; 757 return NULL;
798 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, 758 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
799 request_id); 759 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
800 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
801 request_id);
802 return val; 760 return val;
803} 761}
804 762
@@ -818,8 +776,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
818 void *cb_cls) 776 void *cb_cls)
819{ 777{
820 /* allow setting *OR* clearing callback */ 778 /* allow setting *OR* clearing callback */
821 GNUNET_assert ( (NULL == ev->sent_cb) || 779 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
822 (NULL == cb) );
823 ev->sent_cb = cb; 780 ev->sent_cb = cb;
824 ev->sent_cls = cb_cls; 781 ev->sent_cls = cb_cls;
825} 782}
@@ -883,9 +840,7 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
883 840
884 ev = mq->envelope_head; 841 ev = mq->envelope_head;
885 ev->parent_queue = NULL; 842 ev->parent_queue = NULL;
886 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 843 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
887 mq->envelope_tail,
888 ev);
889 GNUNET_assert (0 < mq->queue_length); 844 GNUNET_assert (0 < mq->queue_length);
890 mq->queue_length--; 845 mq->queue_length--;
891 LOG (GNUNET_ERROR_TYPE_DEBUG, 846 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -974,8 +929,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
974 GNUNET_assert (GNUNET_NO == mq->in_flight); 929 GNUNET_assert (GNUNET_NO == mq->in_flight);
975 GNUNET_assert (0 < mq->queue_length); 930 GNUNET_assert (0 < mq->queue_length);
976 mq->queue_length--; 931 mq->queue_length--;
977 mq->cancel_impl (mq, 932 mq->cancel_impl (mq, mq->impl_state);
978 mq->impl_state);
979 /* continue sending the next message, if any */ 933 /* continue sending the next message, if any */
980 mq->current_envelope = mq->envelope_head; 934 mq->current_envelope = mq->envelope_head;
981 if (NULL != mq->current_envelope) 935 if (NULL != mq->current_envelope)
@@ -986,19 +940,15 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
986 940
987 LOG (GNUNET_ERROR_TYPE_DEBUG, 941 LOG (GNUNET_ERROR_TYPE_DEBUG,
988 "sending canceled message of type %u queue\n", 942 "sending canceled message of type %u queue\n",
989 ntohs(ev->mh->type)); 943 ntohs (ev->mh->type));
990 944
991 mq->send_impl (mq, 945 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
992 mq->current_envelope->mh,
993 mq->impl_state);
994 } 946 }
995 } 947 }
996 else 948 else
997 { 949 {
998 /* simple case, message is still waiting in the queue */ 950 /* simple case, message is still waiting in the queue */
999 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 951 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
1000 mq->envelope_tail,
1001 ev);
1002 GNUNET_assert (0 < mq->queue_length); 952 GNUNET_assert (0 < mq->queue_length);
1003 mq->queue_length--; 953 mq->queue_length--;
1004 } 954 }
@@ -1044,21 +994,18 @@ GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1044 994
1045 995
1046/** 996/**
1047 * Set application-specific options for this envelope. 997 * Set application-specific preferences for this envelope.
1048 * Overrides the options set for the queue with 998 * Overrides the options set for the queue with
1049 * #GNUNET_MQ_set_options() for this message only. 999 * #GNUNET_MQ_set_options() for this message only.
1050 * 1000 *
1051 * @param env message to set options for 1001 * @param env message to set options for
1052 * @param flags flags to use (meaning is queue-specific) 1002 * @param pp priorities and preferences to apply
1053 * @param extra additional buffer for further data (also queue-specific)
1054 */ 1003 */
1055void 1004void
1056GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, 1005GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1057 uint64_t flags, 1006 enum GNUNET_MQ_PriorityPreferences pp)
1058 const void *extra)
1059{ 1007{
1060 env->flags = flags; 1008 env->priority = pp;
1061 env->extra = extra;
1062 env->have_custom_options = GNUNET_YES; 1009 env->have_custom_options = GNUNET_YES;
1063} 1010}
1064 1011
@@ -1067,44 +1014,57 @@ GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1067 * Get application-specific options for this envelope. 1014 * Get application-specific options for this envelope.
1068 * 1015 *
1069 * @param env message to set options for 1016 * @param env message to set options for
1070 * @param[out] flags set to flags to use (meaning is queue-specific) 1017 * @return priorities and preferences to apply for @a env
1071 * @return extra additional buffer for further data (also queue-specific)
1072 */ 1018 */
1073const void * 1019enum GNUNET_MQ_PriorityPreferences
1074GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env, 1020GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1075 uint64_t *flags)
1076{ 1021{
1077 struct GNUNET_MQ_Handle *mq = env->parent_queue; 1022 struct GNUNET_MQ_Handle *mq = env->parent_queue;
1078 1023
1079 if (GNUNET_YES == env->have_custom_options) 1024 if (GNUNET_YES == env->have_custom_options)
1080 { 1025 return env->priority;
1081 *flags = env->flags;
1082 return env->extra;
1083 }
1084 if (NULL == mq) 1026 if (NULL == mq)
1085 { 1027 return 0;
1086 *flags = 0; 1028 return mq->priority;
1087 return NULL; 1029}
1088 } 1030
1089 *flags = mq->default_flags; 1031
1090 return mq->default_extra; 1032/**
1033 * Combine performance preferences set for different
1034 * envelopes that are being combined into one larger envelope.
1035 *
1036 * @param p1 one set of preferences
1037 * @param p2 second set of preferences
1038 * @return combined priority and preferences to use
1039 */
1040enum GNUNET_MQ_PriorityPreferences
1041GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1042 enum GNUNET_MQ_PriorityPreferences p2)
1043{
1044 enum GNUNET_MQ_PriorityPreferences ret;
1045
1046 ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1047 ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1048 ret |=
1049 ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1050 ret |=
1051 ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1052 ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1053 return ret;
1091} 1054}
1092 1055
1093 1056
1094/** 1057/**
1095 * Set application-specific options for this queue. 1058 * Set application-specific default options for this queue.
1096 * 1059 *
1097 * @param mq message queue to set options for 1060 * @param mq message queue to set options for
1098 * @param flags flags to use (meaning is queue-specific) 1061 * @param pp priorities and preferences to apply
1099 * @param extra additional buffer for further data (also queue-specific)
1100 */ 1062 */
1101void 1063void
1102GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, 1064GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1103 uint64_t flags, 1065 enum GNUNET_MQ_PriorityPreferences pp)
1104 const void *extra)
1105{ 1066{
1106 mq->default_flags = flags; 1067 mq->priority = pp;
1107 mq->default_extra = extra;
1108} 1068}
1109 1069
1110 1070
@@ -1145,8 +1105,8 @@ GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1145 */ 1105 */
1146struct GNUNET_MQ_DestroyNotificationHandle * 1106struct GNUNET_MQ_DestroyNotificationHandle *
1147GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq, 1107GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1148 GNUNET_SCHEDULER_TaskCallback cb, 1108 GNUNET_SCHEDULER_TaskCallback cb,
1149 void *cb_cls) 1109 void *cb_cls)
1150{ 1110{
1151 struct GNUNET_MQ_DestroyNotificationHandle *dnh; 1111 struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1152 1112
@@ -1154,9 +1114,7 @@ GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1154 dnh->mq = mq; 1114 dnh->mq = mq;
1155 dnh->cb = cb; 1115 dnh->cb = cb;
1156 dnh->cb_cls = cb_cls; 1116 dnh->cb_cls = cb_cls;
1157 GNUNET_CONTAINER_DLL_insert (mq->dnh_head, 1117 GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1158 mq->dnh_tail,
1159 dnh);
1160 return dnh; 1118 return dnh;
1161} 1119}
1162 1120
@@ -1167,13 +1125,12 @@ GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1167 * @param dnh handle for registration to cancel 1125 * @param dnh handle for registration to cancel
1168 */ 1126 */
1169void 1127void
1170GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh) 1128GNUNET_MQ_destroy_notify_cancel (
1129 struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1171{ 1130{
1172 struct GNUNET_MQ_Handle *mq = dnh->mq; 1131 struct GNUNET_MQ_Handle *mq = dnh->mq;
1173 1132
1174 GNUNET_CONTAINER_DLL_remove (mq->dnh_head, 1133 GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1175 mq->dnh_tail,
1176 dnh);
1177 GNUNET_free (dnh); 1134 GNUNET_free (dnh);
1178} 1135}
1179 1136
@@ -1195,9 +1152,7 @@ GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1195 struct GNUNET_MQ_Envelope **env_tail, 1152 struct GNUNET_MQ_Envelope **env_tail,
1196 struct GNUNET_MQ_Envelope *env) 1153 struct GNUNET_MQ_Envelope *env)
1197{ 1154{
1198 GNUNET_CONTAINER_DLL_insert_tail (*env_head, 1155 GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1199 *env_tail,
1200 env);
1201} 1156}
1202 1157
1203 1158
@@ -1218,9 +1173,7 @@ GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1218 struct GNUNET_MQ_Envelope **env_tail, 1173 struct GNUNET_MQ_Envelope **env_tail,
1219 struct GNUNET_MQ_Envelope *env) 1174 struct GNUNET_MQ_Envelope *env)
1220{ 1175{
1221 GNUNET_CONTAINER_DLL_remove (*env_head, 1176 GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1222 *env_tail,
1223 env);
1224} 1177}
1225 1178
1226 1179
@@ -1244,8 +1197,7 @@ GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1244 return NULL; 1197 return NULL;
1245 1198
1246 count = GNUNET_MQ_count_handlers (handlers); 1199 count = GNUNET_MQ_count_handlers (handlers);
1247 copy = GNUNET_new_array (count + 1, 1200 copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1248 struct GNUNET_MQ_MessageHandler);
1249 GNUNET_memcpy (copy, 1201 GNUNET_memcpy (copy,
1250 handlers, 1202 handlers,
1251 count * sizeof (struct GNUNET_MQ_MessageHandler)); 1203 count * sizeof (struct GNUNET_MQ_MessageHandler));
@@ -1276,8 +1228,7 @@ GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1276 if (NULL == handlers) 1228 if (NULL == handlers)
1277 return NULL; 1229 return NULL;
1278 count = GNUNET_MQ_count_handlers (handlers); 1230 count = GNUNET_MQ_count_handlers (handlers);
1279 copy = GNUNET_new_array (count + 2, 1231 copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1280 struct GNUNET_MQ_MessageHandler);
1281 GNUNET_memcpy (copy, 1232 GNUNET_memcpy (copy,
1282 handlers, 1233 handlers,
1283 count * sizeof (struct GNUNET_MQ_MessageHandler)); 1234 count * sizeof (struct GNUNET_MQ_MessageHandler));
@@ -1304,7 +1255,8 @@ GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1304 if (NULL == handlers) 1255 if (NULL == handlers)
1305 return 0; 1256 return 0;
1306 1257
1307 for (i=0; NULL != handlers[i].cb; i++) ; 1258 for (i = 0; NULL != handlers[i].cb; i++)
1259 ;
1308 1260
1309 return i; 1261 return i;
1310} 1262}
@@ -1319,7 +1271,8 @@ GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1319const char * 1271const char *
1320GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type) 1272GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1321{ 1273{
1322 switch (type) { 1274 switch (type)
1275 {
1323 case GNUNET_MQ_PREFERENCE_NONE: 1276 case GNUNET_MQ_PREFERENCE_NONE:
1324 return "NONE"; 1277 return "NONE";
1325 case GNUNET_MQ_PREFERENCE_BANDWIDTH: 1278 case GNUNET_MQ_PREFERENCE_BANDWIDTH: