aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c135
1 files changed, 57 insertions, 78 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index 4ba6c5ff8..ba947d5b8 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -128,6 +128,11 @@ struct GNUNET_MQ_Handle
128 void *error_handler_cls; 128 void *error_handler_cls;
129 129
130 /** 130 /**
131 * Task to asynchronously run #impl_send_continue().
132 */
133 struct GNUNET_SCHEDULER_Task *send_task;
134
135 /**
131 * Linked list of messages pending to be sent 136 * Linked list of messages pending to be sent
132 */ 137 */
133 struct GNUNET_MQ_Envelope *envelope_head; 138 struct GNUNET_MQ_Envelope *envelope_head;
@@ -145,23 +150,11 @@ struct GNUNET_MQ_Handle
145 struct GNUNET_MQ_Envelope *current_envelope; 150 struct GNUNET_MQ_Envelope *current_envelope;
146 151
147 /** 152 /**
148 * GNUNET_YES if the sent notification was called
149 * for the current envelope.
150 */
151 int send_notification_called;
152
153 /**
154 * Map of associations, lazily allocated 153 * Map of associations, lazily allocated
155 */ 154 */
156 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; 155 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
157 156
158 /** 157 /**
159 * Task scheduled during #GNUNET_MQ_impl_send_continue
160 * or #GNUNET_MQ_impl_send_in_flight
161 */
162 struct GNUNET_SCHEDULER_Task *send_task;
163
164 /**
165 * Functions to call on queue destruction; kept in a DLL. 158 * Functions to call on queue destruction; kept in a DLL.
166 */ 159 */
167 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; 160 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
@@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle
196 unsigned int queue_length; 189 unsigned int queue_length;
197 190
198 /** 191 /**
199 * GNUNET_YES if GNUNET_MQ_impl_evacuate was called. 192 * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
193 * FIXME: is this dead?
200 */ 194 */
201 int evacuate_called; 195 int evacuate_called;
196
197 /**
198 * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
199 */
200 int in_flight;
202}; 201};
203 202
204 203
@@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
364unsigned int 363unsigned int
365GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) 364GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
366{ 365{
367 return mq->queue_length; 366 return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
368} 367}
369 368
370 369
@@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
385 mq->queue_length++; 384 mq->queue_length++;
386 ev->parent_queue = mq; 385 ev->parent_queue = mq;
387 /* is the implementation busy? queue it! */ 386 /* is the implementation busy? queue it! */
388 if (NULL != mq->current_envelope) 387 if ( (NULL != mq->current_envelope) ||
388 (NULL != mq->send_task) )
389 { 389 {
390 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, 390 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
391 mq->envelope_tail, 391 mq->envelope_tail,
@@ -428,35 +428,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
428 428
429 429
430/** 430/**
431 * Task run to call the send notification for the next queued
432 * message, if any. Only useful for implementing message queues,
433 * results in undefined behavior if not used carefully.
434 *
435 * @param cls message queue to send the next message with
436 */
437static void
438impl_send_in_flight (void *cls)
439{
440 struct GNUNET_MQ_Handle *mq = cls;
441 struct GNUNET_MQ_Envelope *current_envelope;
442
443 mq->send_task = NULL;
444 /* call is only valid if we're actually currently sending
445 * a message */
446 current_envelope = mq->current_envelope;
447 GNUNET_assert (NULL != current_envelope);
448 /* can't call cancel from now on anymore */
449 current_envelope->parent_queue = NULL;
450 if ( (GNUNET_NO == mq->send_notification_called) &&
451 (NULL != current_envelope->sent_cb) )
452 {
453 current_envelope->sent_cb (current_envelope->sent_cls);
454 }
455 mq->send_notification_called = GNUNET_YES;
456}
457
458
459/**
460 * Task run to call the send implementation for the next queued 431 * Task run to call the send implementation for the next queued
461 * message, if any. Only useful for implementing message queues, 432 * message, if any. Only useful for implementing message queues,
462 * results in undefined behavior if not used carefully. 433 * results in undefined behavior if not used carefully.
@@ -467,32 +438,19 @@ static void
467impl_send_continue (void *cls) 438impl_send_continue (void *cls)
468{ 439{
469 struct GNUNET_MQ_Handle *mq = cls; 440 struct GNUNET_MQ_Handle *mq = cls;
470 struct GNUNET_MQ_Envelope *current_envelope; 441
471
472 mq->send_task = NULL; 442 mq->send_task = NULL;
473 /* call is only valid if we're actually currently sending 443 /* call is only valid if we're actually currently sending
474 * a message */ 444 * a message */
475 current_envelope = mq->current_envelope;
476 GNUNET_assert (NULL != current_envelope);
477 impl_send_in_flight (mq);
478 GNUNET_assert (0 < mq->queue_length);
479 mq->queue_length--;
480 if (NULL == mq->envelope_head) 445 if (NULL == mq->envelope_head)
481 { 446 return;
482 mq->current_envelope = NULL; 447 mq->current_envelope = mq->envelope_head;
483 } 448 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
484 else 449 mq->envelope_tail,
485 { 450 mq->current_envelope);
486 mq->current_envelope = mq->envelope_head; 451 mq->send_impl (mq,
487 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 452 mq->current_envelope->mh,
488 mq->envelope_tail, 453 mq->impl_state);
489 mq->current_envelope);
490 mq->send_notification_called = GNUNET_NO;
491 mq->send_impl (mq,
492 mq->current_envelope->mh,
493 mq->impl_state);
494 }
495 GNUNET_free (current_envelope);
496} 454}
497 455
498 456
@@ -506,22 +464,32 @@ impl_send_continue (void *cls)
506void 464void
507GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) 465GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
508{ 466{
509 /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ 467 struct GNUNET_MQ_Envelope *current_envelope;
510 if (NULL != mq->send_task) 468 GNUNET_MQ_NotifyCallback cb;
511 { 469
512 GNUNET_SCHEDULER_cancel (mq->send_task); 470 GNUNET_assert (0 < mq->queue_length);
513 } 471 mq->queue_length--;
472 current_envelope = mq->current_envelope;
473 current_envelope->parent_queue = NULL;
474 mq->current_envelope = NULL;
475 GNUNET_assert (NULL == mq->send_task);
514 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, 476 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
515 mq); 477 mq);
478 if (NULL != (cb = current_envelope->sent_cb))
479 {
480 current_envelope->sent_cb = NULL;
481 cb (current_envelope->sent_cls);
482 }
483 GNUNET_free (current_envelope);
516} 484}
517 485
518 486
519/** 487/**
520 * Call the send notification for the current message, but do not 488 * Call the send notification for the current message, but do not
521 * try to send the next message until #gnunet_mq_impl_send_continue 489 * try to send the next message until #GNUNET_MQ_impl_send_continue
522 * is called. 490 * is called.
523 * 491 *
524 * only useful for implementing message queues, results in undefined 492 * Only useful for implementing message queues, results in undefined
525 * behavior if not used carefully. 493 * behavior if not used carefully.
526 * 494 *
527 * @param mq message queue to send the next message with 495 * @param mq message queue to send the next message with
@@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
529void 497void
530GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) 498GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
531{ 499{
532 GNUNET_assert (NULL == mq->send_task); 500 struct GNUNET_MQ_Envelope *current_envelope;
533 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, 501 GNUNET_MQ_NotifyCallback cb;
534 mq); 502
503 mq->in_flight = GNUNET_YES;
504 /* call is only valid if we're actually currently sending
505 * a message */
506 current_envelope = mq->current_envelope;
507 GNUNET_assert (NULL != current_envelope);
508 /* can't call cancel from now on anymore */
509 current_envelope->parent_queue = NULL;
510 if (NULL != (cb = current_envelope->sent_cb))
511 {
512 current_envelope->sent_cb = NULL;
513 cb (current_envelope->sent_cls);
514 }
535} 515}
536 516
537 517
@@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1187 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 1167 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1188 mq->envelope_tail, 1168 mq->envelope_tail,
1189 mq->current_envelope); 1169 mq->current_envelope);
1190 mq->send_notification_called = GNUNET_NO;
1191 mq->send_impl (mq, 1170 mq->send_impl (mq,
1192 mq->current_envelope->mh, 1171 mq->current_envelope->mh,
1193 mq->impl_state); 1172 mq->impl_state);