diff options
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 71 |
1 files changed, 40 insertions, 31 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index aff9f465c..6f8c04224 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -302,13 +302,8 @@ static void | |||
302 | impl_send_continue (void *cls) | 302 | impl_send_continue (void *cls) |
303 | { | 303 | { |
304 | struct GNUNET_MQ_Handle *mq = cls; | 304 | struct GNUNET_MQ_Handle *mq = cls; |
305 | const struct GNUNET_SCHEDULER_TaskContext *tc; | ||
306 | struct GNUNET_MQ_Envelope *current_envelope; | 305 | struct GNUNET_MQ_Envelope *current_envelope; |
307 | 306 | ||
308 | tc = GNUNET_SCHEDULER_get_task_context (); | ||
309 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
310 | return; | ||
311 | |||
312 | mq->continue_task = NULL; | 307 | mq->continue_task = NULL; |
313 | /* call is only valid if we're actually currently sending | 308 | /* call is only valid if we're actually currently sending |
314 | * a message */ | 309 | * a message */ |
@@ -325,7 +320,9 @@ impl_send_continue (void *cls) | |||
325 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 320 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
326 | mq->envelope_tail, | 321 | mq->envelope_tail, |
327 | mq->current_envelope); | 322 | mq->current_envelope); |
328 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); | 323 | mq->send_impl (mq, |
324 | mq->current_envelope->mh, | ||
325 | mq->impl_state); | ||
329 | } | 326 | } |
330 | if (NULL != current_envelope->sent_cb) | 327 | if (NULL != current_envelope->sent_cb) |
331 | current_envelope->sent_cb (current_envelope->sent_cls); | 328 | current_envelope->sent_cb (current_envelope->sent_cls); |
@@ -334,10 +331,9 @@ impl_send_continue (void *cls) | |||
334 | 331 | ||
335 | 332 | ||
336 | /** | 333 | /** |
337 | * Call the send implementation for the next queued message, | 334 | * Call the send implementation for the next queued message, if any. |
338 | * if any. | 335 | * Only useful for implementing message queues, results in undefined |
339 | * Only useful for implementing message queues, | 336 | * behavior if not used carefully. |
340 | * results in undefined behavior if not used carefully. | ||
341 | * | 337 | * |
342 | * @param mq message queue to send the next message with | 338 | * @param mq message queue to send the next message with |
343 | */ | 339 | */ |
@@ -471,7 +467,9 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | |||
471 | return NULL; | 467 | return NULL; |
472 | 468 | ||
473 | mqm = GNUNET_MQ_msg_ (mhp, size, type); | 469 | mqm = GNUNET_MQ_msg_ (mhp, size, type); |
474 | memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size)); | 470 | memcpy ((char *) mqm->mh + base_size, |
471 | nested_mh, | ||
472 | ntohs (nested_mh->size)); | ||
475 | 473 | ||
476 | return mqm; | 474 | return mqm; |
477 | } | 475 | } |
@@ -481,9 +479,9 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, | |||
481 | * Transmit a queued message to the session's client. | 479 | * Transmit a queued message to the session's client. |
482 | * | 480 | * |
483 | * @param cls consensus session | 481 | * @param cls consensus session |
484 | * @param size number of bytes available in buf | 482 | * @param size number of bytes available in @a buf |
485 | * @param buf where the callee should write the message | 483 | * @param buf where the callee should write the message |
486 | * @return number of bytes written to buf | 484 | * @return number of bytes written to @a buf |
487 | */ | 485 | */ |
488 | static size_t | 486 | static size_t |
489 | transmit_queued (void *cls, size_t size, | 487 | transmit_queued (void *cls, size_t size, |
@@ -535,10 +533,10 @@ server_client_send_impl (struct GNUNET_MQ_Handle *mq, | |||
535 | 533 | ||
536 | GNUNET_assert (NULL != mq); | 534 | GNUNET_assert (NULL != mq); |
537 | GNUNET_assert (NULL != state); | 535 | GNUNET_assert (NULL != state); |
538 | state->th = | 536 | state->th = GNUNET_SERVER_notify_transmit_ready (state->client, |
539 | GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), | 537 | ntohs (msg->size), |
540 | GNUNET_TIME_UNIT_FOREVER_REL, | 538 | GNUNET_TIME_UNIT_FOREVER_REL, |
541 | &transmit_queued, mq); | 539 | &transmit_queued, mq); |
542 | } | 540 | } |
543 | 541 | ||
544 | 542 | ||
@@ -580,10 +578,10 @@ handle_client_message (void *cls, | |||
580 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); | 578 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
581 | return; | 579 | return; |
582 | } | 580 | } |
583 | 581 | GNUNET_CLIENT_receive (state->connection, | |
584 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | 582 | &handle_client_message, |
583 | mq, | ||
585 | GNUNET_TIME_UNIT_FOREVER_REL); | 584 | GNUNET_TIME_UNIT_FOREVER_REL); |
586 | |||
587 | GNUNET_MQ_inject_message (mq, msg); | 585 | GNUNET_MQ_inject_message (mq, msg); |
588 | } | 586 | } |
589 | 587 | ||
@@ -652,7 +650,8 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, | |||
652 | GNUNET_assert (NULL != state); | 650 | GNUNET_assert (NULL != state); |
653 | GNUNET_assert (NULL == state->th); | 651 | GNUNET_assert (NULL == state->th); |
654 | state->th = | 652 | state->th = |
655 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), | 653 | GNUNET_CLIENT_notify_transmit_ready (state->connection, |
654 | ntohs (msg->size), | ||
656 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 655 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
657 | &connection_client_transmit_queued, mq); | 656 | &connection_client_transmit_queued, mq); |
658 | GNUNET_assert (NULL != state->th); | 657 | GNUNET_assert (NULL != state->th); |
@@ -752,8 +751,10 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, | |||
752 | 751 | ||
753 | if (NULL == mq->assoc_map) | 752 | if (NULL == mq->assoc_map) |
754 | return NULL; | 753 | return NULL; |
755 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | 754 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, |
756 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id); | 755 | request_id); |
756 | GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, | ||
757 | request_id); | ||
757 | return val; | 758 | return val; |
758 | } | 759 | } |
759 | 760 | ||
@@ -785,10 +786,11 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | |||
785 | struct GNUNET_MQ_Envelope *ev; | 786 | struct GNUNET_MQ_Envelope *ev; |
786 | ev = mq->envelope_head; | 787 | ev = mq->envelope_head; |
787 | ev->parent_queue = NULL; | 788 | ev->parent_queue = NULL; |
788 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); | 789 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
790 | mq->envelope_tail, | ||
791 | ev); | ||
789 | GNUNET_MQ_discard (ev); | 792 | GNUNET_MQ_discard (ev); |
790 | } | 793 | } |
791 | |||
792 | if (NULL != mq->current_envelope) | 794 | if (NULL != mq->current_envelope) |
793 | { | 795 | { |
794 | /* we can only discard envelopes that | 796 | /* we can only discard envelopes that |
@@ -797,7 +799,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | |||
797 | GNUNET_MQ_discard (mq->current_envelope); | 799 | GNUNET_MQ_discard (mq->current_envelope); |
798 | mq->current_envelope = NULL; | 800 | mq->current_envelope = NULL; |
799 | } | 801 | } |
800 | |||
801 | if (NULL != mq->assoc_map) | 802 | if (NULL != mq->assoc_map) |
802 | { | 803 | { |
803 | GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); | 804 | GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map); |
@@ -851,10 +852,12 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
851 | GNUNET_assert (NULL != mq); | 852 | GNUNET_assert (NULL != mq); |
852 | GNUNET_assert (NULL != mq->cancel_impl); | 853 | GNUNET_assert (NULL != mq->cancel_impl); |
853 | 854 | ||
854 | if (mq->current_envelope == ev) { | 855 | if (mq->current_envelope == ev) |
856 | { | ||
855 | // complex case, we already started with transmitting | 857 | // complex case, we already started with transmitting |
856 | // the message | 858 | // the message |
857 | mq->cancel_impl (mq, mq->impl_state); | 859 | mq->cancel_impl (mq, |
860 | mq->impl_state); | ||
858 | // continue sending the next message, if any | 861 | // continue sending the next message, if any |
859 | if (NULL == mq->envelope_head) | 862 | if (NULL == mq->envelope_head) |
860 | { | 863 | { |
@@ -866,11 +869,17 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
866 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 869 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
867 | mq->envelope_tail, | 870 | mq->envelope_tail, |
868 | mq->current_envelope); | 871 | mq->current_envelope); |
869 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); | 872 | mq->send_impl (mq, |
873 | mq->current_envelope->mh, | ||
874 | mq->impl_state); | ||
870 | } | 875 | } |
871 | } else { | 876 | } |
877 | else | ||
878 | { | ||
872 | // simple case, message is still waiting in the queue | 879 | // simple case, message is still waiting in the queue |
873 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev); | 880 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
881 | mq->envelope_tail, | ||
882 | ev); | ||
874 | } | 883 | } |
875 | 884 | ||
876 | ev->parent_queue = NULL; | 885 | ev->parent_queue = NULL; |