diff options
Diffstat (limited to 'src/util/mq.c')
-rw-r--r-- | src/util/mq.c | 135 |
1 files changed, 57 insertions, 78 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index 4ba6c5ff8..ba947d5b8 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -128,6 +128,11 @@ struct GNUNET_MQ_Handle | |||
128 | void *error_handler_cls; | 128 | void *error_handler_cls; |
129 | 129 | ||
130 | /** | 130 | /** |
131 | * Task to asynchronously run #impl_send_continue(). | ||
132 | */ | ||
133 | struct GNUNET_SCHEDULER_Task *send_task; | ||
134 | |||
135 | /** | ||
131 | * Linked list of messages pending to be sent | 136 | * Linked list of messages pending to be sent |
132 | */ | 137 | */ |
133 | struct GNUNET_MQ_Envelope *envelope_head; | 138 | struct GNUNET_MQ_Envelope *envelope_head; |
@@ -145,23 +150,11 @@ struct GNUNET_MQ_Handle | |||
145 | struct GNUNET_MQ_Envelope *current_envelope; | 150 | struct GNUNET_MQ_Envelope *current_envelope; |
146 | 151 | ||
147 | /** | 152 | /** |
148 | * GNUNET_YES if the sent notification was called | ||
149 | * for the current envelope. | ||
150 | */ | ||
151 | int send_notification_called; | ||
152 | |||
153 | /** | ||
154 | * Map of associations, lazily allocated | 153 | * Map of associations, lazily allocated |
155 | */ | 154 | */ |
156 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | 155 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; |
157 | 156 | ||
158 | /** | 157 | /** |
159 | * Task scheduled during #GNUNET_MQ_impl_send_continue | ||
160 | * or #GNUNET_MQ_impl_send_in_flight | ||
161 | */ | ||
162 | struct GNUNET_SCHEDULER_Task *send_task; | ||
163 | |||
164 | /** | ||
165 | * Functions to call on queue destruction; kept in a DLL. | 158 | * Functions to call on queue destruction; kept in a DLL. |
166 | */ | 159 | */ |
167 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; | 160 | struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; |
@@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle | |||
196 | unsigned int queue_length; | 189 | unsigned int queue_length; |
197 | 190 | ||
198 | /** | 191 | /** |
199 | * GNUNET_YES if GNUNET_MQ_impl_evacuate was called. | 192 | * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called. |
193 | * FIXME: is this dead? | ||
200 | */ | 194 | */ |
201 | int evacuate_called; | 195 | int evacuate_called; |
196 | |||
197 | /** | ||
198 | * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called. | ||
199 | */ | ||
200 | int in_flight; | ||
202 | }; | 201 | }; |
203 | 202 | ||
204 | 203 | ||
@@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) | |||
364 | unsigned int | 363 | unsigned int |
365 | GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) | 364 | GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) |
366 | { | 365 | { |
367 | return mq->queue_length; | 366 | return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0; |
368 | } | 367 | } |
369 | 368 | ||
370 | 369 | ||
@@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, | |||
385 | mq->queue_length++; | 384 | mq->queue_length++; |
386 | ev->parent_queue = mq; | 385 | ev->parent_queue = mq; |
387 | /* is the implementation busy? queue it! */ | 386 | /* is the implementation busy? queue it! */ |
388 | if (NULL != mq->current_envelope) | 387 | if ( (NULL != mq->current_envelope) || |
388 | (NULL != mq->send_task) ) | ||
389 | { | 389 | { |
390 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, | 390 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, |
391 | mq->envelope_tail, | 391 | mq->envelope_tail, |
@@ -428,35 +428,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, | |||
428 | 428 | ||
429 | 429 | ||
430 | /** | 430 | /** |
431 | * Task run to call the send notification for the next queued | ||
432 | * message, if any. Only useful for implementing message queues, | ||
433 | * results in undefined behavior if not used carefully. | ||
434 | * | ||
435 | * @param cls message queue to send the next message with | ||
436 | */ | ||
437 | static void | ||
438 | impl_send_in_flight (void *cls) | ||
439 | { | ||
440 | struct GNUNET_MQ_Handle *mq = cls; | ||
441 | struct GNUNET_MQ_Envelope *current_envelope; | ||
442 | |||
443 | mq->send_task = NULL; | ||
444 | /* call is only valid if we're actually currently sending | ||
445 | * a message */ | ||
446 | current_envelope = mq->current_envelope; | ||
447 | GNUNET_assert (NULL != current_envelope); | ||
448 | /* can't call cancel from now on anymore */ | ||
449 | current_envelope->parent_queue = NULL; | ||
450 | if ( (GNUNET_NO == mq->send_notification_called) && | ||
451 | (NULL != current_envelope->sent_cb) ) | ||
452 | { | ||
453 | current_envelope->sent_cb (current_envelope->sent_cls); | ||
454 | } | ||
455 | mq->send_notification_called = GNUNET_YES; | ||
456 | } | ||
457 | |||
458 | |||
459 | /** | ||
460 | * Task run to call the send implementation for the next queued | 431 | * Task run to call the send implementation for the next queued |
461 | * message, if any. Only useful for implementing message queues, | 432 | * message, if any. Only useful for implementing message queues, |
462 | * results in undefined behavior if not used carefully. | 433 | * results in undefined behavior if not used carefully. |
@@ -467,32 +438,19 @@ static void | |||
467 | impl_send_continue (void *cls) | 438 | impl_send_continue (void *cls) |
468 | { | 439 | { |
469 | struct GNUNET_MQ_Handle *mq = cls; | 440 | struct GNUNET_MQ_Handle *mq = cls; |
470 | struct GNUNET_MQ_Envelope *current_envelope; | 441 | |
471 | |||
472 | mq->send_task = NULL; | 442 | mq->send_task = NULL; |
473 | /* call is only valid if we're actually currently sending | 443 | /* call is only valid if we're actually currently sending |
474 | * a message */ | 444 | * a message */ |
475 | current_envelope = mq->current_envelope; | ||
476 | GNUNET_assert (NULL != current_envelope); | ||
477 | impl_send_in_flight (mq); | ||
478 | GNUNET_assert (0 < mq->queue_length); | ||
479 | mq->queue_length--; | ||
480 | if (NULL == mq->envelope_head) | 445 | if (NULL == mq->envelope_head) |
481 | { | 446 | return; |
482 | mq->current_envelope = NULL; | 447 | mq->current_envelope = mq->envelope_head; |
483 | } | 448 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
484 | else | 449 | mq->envelope_tail, |
485 | { | 450 | mq->current_envelope); |
486 | mq->current_envelope = mq->envelope_head; | 451 | mq->send_impl (mq, |
487 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 452 | mq->current_envelope->mh, |
488 | mq->envelope_tail, | 453 | mq->impl_state); |
489 | mq->current_envelope); | ||
490 | mq->send_notification_called = GNUNET_NO; | ||
491 | mq->send_impl (mq, | ||
492 | mq->current_envelope->mh, | ||
493 | mq->impl_state); | ||
494 | } | ||
495 | GNUNET_free (current_envelope); | ||
496 | } | 454 | } |
497 | 455 | ||
498 | 456 | ||
@@ -506,22 +464,32 @@ impl_send_continue (void *cls) | |||
506 | void | 464 | void |
507 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | 465 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) |
508 | { | 466 | { |
509 | /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ | 467 | struct GNUNET_MQ_Envelope *current_envelope; |
510 | if (NULL != mq->send_task) | 468 | GNUNET_MQ_NotifyCallback cb; |
511 | { | 469 | |
512 | GNUNET_SCHEDULER_cancel (mq->send_task); | 470 | GNUNET_assert (0 < mq->queue_length); |
513 | } | 471 | mq->queue_length--; |
472 | current_envelope = mq->current_envelope; | ||
473 | current_envelope->parent_queue = NULL; | ||
474 | mq->current_envelope = NULL; | ||
475 | GNUNET_assert (NULL == mq->send_task); | ||
514 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, | 476 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, |
515 | mq); | 477 | mq); |
478 | if (NULL != (cb = current_envelope->sent_cb)) | ||
479 | { | ||
480 | current_envelope->sent_cb = NULL; | ||
481 | cb (current_envelope->sent_cls); | ||
482 | } | ||
483 | GNUNET_free (current_envelope); | ||
516 | } | 484 | } |
517 | 485 | ||
518 | 486 | ||
519 | /** | 487 | /** |
520 | * Call the send notification for the current message, but do not | 488 | * Call the send notification for the current message, but do not |
521 | * try to send the next message until #gnunet_mq_impl_send_continue | 489 | * try to send the next message until #GNUNET_MQ_impl_send_continue |
522 | * is called. | 490 | * is called. |
523 | * | 491 | * |
524 | * only useful for implementing message queues, results in undefined | 492 | * Only useful for implementing message queues, results in undefined |
525 | * behavior if not used carefully. | 493 | * behavior if not used carefully. |
526 | * | 494 | * |
527 | * @param mq message queue to send the next message with | 495 | * @param mq message queue to send the next message with |
@@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | |||
529 | void | 497 | void |
530 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) | 498 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) |
531 | { | 499 | { |
532 | GNUNET_assert (NULL == mq->send_task); | 500 | struct GNUNET_MQ_Envelope *current_envelope; |
533 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, | 501 | GNUNET_MQ_NotifyCallback cb; |
534 | mq); | 502 | |
503 | mq->in_flight = GNUNET_YES; | ||
504 | /* call is only valid if we're actually currently sending | ||
505 | * a message */ | ||
506 | current_envelope = mq->current_envelope; | ||
507 | GNUNET_assert (NULL != current_envelope); | ||
508 | /* can't call cancel from now on anymore */ | ||
509 | current_envelope->parent_queue = NULL; | ||
510 | if (NULL != (cb = current_envelope->sent_cb)) | ||
511 | { | ||
512 | current_envelope->sent_cb = NULL; | ||
513 | cb (current_envelope->sent_cls); | ||
514 | } | ||
535 | } | 515 | } |
536 | 516 | ||
537 | 517 | ||
@@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
1187 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 1167 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
1188 | mq->envelope_tail, | 1168 | mq->envelope_tail, |
1189 | mq->current_envelope); | 1169 | mq->current_envelope); |
1190 | mq->send_notification_called = GNUNET_NO; | ||
1191 | mq->send_impl (mq, | 1170 | mq->send_impl (mq, |
1192 | mq->current_envelope->mh, | 1171 | mq->current_envelope->mh, |
1193 | mq->impl_state); | 1172 | mq->impl_state); |