aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c71
1 files changed, 40 insertions, 31 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index aff9f465c..6f8c04224 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -302,13 +302,8 @@ static void
302impl_send_continue (void *cls) 302impl_send_continue (void *cls)
303{ 303{
304 struct GNUNET_MQ_Handle *mq = cls; 304 struct GNUNET_MQ_Handle *mq = cls;
305 const struct GNUNET_SCHEDULER_TaskContext *tc;
306 struct GNUNET_MQ_Envelope *current_envelope; 305 struct GNUNET_MQ_Envelope *current_envelope;
307 306
308 tc = GNUNET_SCHEDULER_get_task_context ();
309 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
310 return;
311
312 mq->continue_task = NULL; 307 mq->continue_task = NULL;
313 /* call is only valid if we're actually currently sending 308 /* call is only valid if we're actually currently sending
314 * a message */ 309 * a message */
@@ -325,7 +320,9 @@ impl_send_continue (void *cls)
325 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 320 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
326 mq->envelope_tail, 321 mq->envelope_tail,
327 mq->current_envelope); 322 mq->current_envelope);
328 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); 323 mq->send_impl (mq,
324 mq->current_envelope->mh,
325 mq->impl_state);
329 } 326 }
330 if (NULL != current_envelope->sent_cb) 327 if (NULL != current_envelope->sent_cb)
331 current_envelope->sent_cb (current_envelope->sent_cls); 328 current_envelope->sent_cb (current_envelope->sent_cls);
@@ -334,10 +331,9 @@ impl_send_continue (void *cls)
334 331
335 332
336/** 333/**
337 * Call the send implementation for the next queued message, 334 * Call the send implementation for the next queued message, if any.
338 * if any. 335 * Only useful for implementing message queues, results in undefined
339 * Only useful for implementing message queues, 336 * behavior if not used carefully.
340 * results in undefined behavior if not used carefully.
341 * 337 *
342 * @param mq message queue to send the next message with 338 * @param mq message queue to send the next message with
343 */ 339 */
@@ -471,7 +467,9 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
471 return NULL; 467 return NULL;
472 468
473 mqm = GNUNET_MQ_msg_ (mhp, size, type); 469 mqm = GNUNET_MQ_msg_ (mhp, size, type);
474 memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size)); 470 memcpy ((char *) mqm->mh + base_size,
471 nested_mh,
472 ntohs (nested_mh->size));
475 473
476 return mqm; 474 return mqm;
477} 475}
@@ -481,9 +479,9 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
481 * Transmit a queued message to the session's client. 479 * Transmit a queued message to the session's client.
482 * 480 *
483 * @param cls consensus session 481 * @param cls consensus session
484 * @param size number of bytes available in buf 482 * @param size number of bytes available in @a buf
485 * @param buf where the callee should write the message 483 * @param buf where the callee should write the message
486 * @return number of bytes written to buf 484 * @return number of bytes written to @a buf
487 */ 485 */
488static size_t 486static size_t
489transmit_queued (void *cls, size_t size, 487transmit_queued (void *cls, size_t size,
@@ -535,10 +533,10 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq,
535 533
536 GNUNET_assert (NULL != mq); 534 GNUNET_assert (NULL != mq);
537 GNUNET_assert (NULL != state); 535 GNUNET_assert (NULL != state);
538 state->th = 536 state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
539 GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), 537 ntohs (msg->size),
540 GNUNET_TIME_UNIT_FOREVER_REL, 538 GNUNET_TIME_UNIT_FOREVER_REL,
541 &transmit_queued, mq); 539 &transmit_queued, mq);
542} 540}
543 541
544 542
@@ -580,10 +578,10 @@ handle_client_message (void *cls,
580 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); 578 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
581 return; 579 return;
582 } 580 }
583 581 GNUNET_CLIENT_receive (state->connection,
584 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, 582 &handle_client_message,
583 mq,
585 GNUNET_TIME_UNIT_FOREVER_REL); 584 GNUNET_TIME_UNIT_FOREVER_REL);
586
587 GNUNET_MQ_inject_message (mq, msg); 585 GNUNET_MQ_inject_message (mq, msg);
588} 586}
589 587
@@ -652,7 +650,8 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
652 GNUNET_assert (NULL != state); 650 GNUNET_assert (NULL != state);
653 GNUNET_assert (NULL == state->th); 651 GNUNET_assert (NULL == state->th);
654 state->th = 652 state->th =
655 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 653 GNUNET_CLIENT_notify_transmit_ready (state->connection,
654 ntohs (msg->size),
656 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 655 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
657 &connection_client_transmit_queued, mq); 656 &connection_client_transmit_queued, mq);
658 GNUNET_assert (NULL != state->th); 657 GNUNET_assert (NULL != state->th);
@@ -752,8 +751,10 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
752 751
753 if (NULL == mq->assoc_map) 752 if (NULL == mq->assoc_map)
754 return NULL; 753 return NULL;
755 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); 754 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
756 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id); 755 request_id);
756 GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
757 request_id);
757 return val; 758 return val;
758} 759}
759 760
@@ -785,10 +786,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
785 struct GNUNET_MQ_Envelope *ev; 786 struct GNUNET_MQ_Envelope *ev;
786 ev = mq->envelope_head; 787 ev = mq->envelope_head;
787 ev->parent_queue = NULL; 788 ev->parent_queue = NULL;
788 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); 789 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
790 mq->envelope_tail,
791 ev);
789 GNUNET_MQ_discard (ev); 792 GNUNET_MQ_discard (ev);
790 } 793 }
791
792 if (NULL != mq->current_envelope) 794 if (NULL != mq->current_envelope)
793 { 795 {
794 /* we can only discard envelopes that 796 /* we can only discard envelopes that
@@ -797,7 +799,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
797 GNUNET_MQ_discard (mq->current_envelope); 799 GNUNET_MQ_discard (mq->current_envelope);
798 mq->current_envelope = NULL; 800 mq->current_envelope = NULL;
799 } 801 }
800
801 if (NULL != mq->assoc_map) 802 if (NULL != mq->assoc_map)
802 { 803 {
803 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); 804 GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
@@ -851,10 +852,12 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
851 GNUNET_assert (NULL != mq); 852 GNUNET_assert (NULL != mq);
852 GNUNET_assert (NULL != mq->cancel_impl); 853 GNUNET_assert (NULL != mq->cancel_impl);
853 854
854 if (mq->current_envelope == ev) { 855 if (mq->current_envelope == ev)
856 {
855 // complex case, we already started with transmitting 857 // complex case, we already started with transmitting
856 // the message 858 // the message
857 mq->cancel_impl (mq, mq->impl_state); 859 mq->cancel_impl (mq,
860 mq->impl_state);
858 // continue sending the next message, if any 861 // continue sending the next message, if any
859 if (NULL == mq->envelope_head) 862 if (NULL == mq->envelope_head)
860 { 863 {
@@ -866,11 +869,17 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
866 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 869 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
867 mq->envelope_tail, 870 mq->envelope_tail,
868 mq->current_envelope); 871 mq->current_envelope);
869 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); 872 mq->send_impl (mq,
873 mq->current_envelope->mh,
874 mq->impl_state);
870 } 875 }
871 } else { 876 }
877 else
878 {
872 // simple case, message is still waiting in the queue 879 // simple case, message is still waiting in the queue
873 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); 880 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
881 mq->envelope_tail,
882 ev);
874 } 883 }
875 884
876 ev->parent_queue = NULL; 885 ev->parent_queue = NULL;