;; This file is part of GNUnet.
;; Copyright (C) 2012, 2018 GNUnet e.V.
;; Copyright (C) 2021 Maxime Devos
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program. If not, see .
;;
;; SPDX-License-Identifier: AGPL3.0-or-later
;; Author: Florian Dold
;; Author: Christian Grothoff
;; Author: Maxime Devos
(define-module (tests mq))
(use-modules (ice-9 control)
(tests utils) ; for conservative-gc?
(fibers conditions)
(fibers)
(srfi srfi-1)
(srfi srfi-26)
(srfi srfi-39)
(srfi srfi-43)
(srfi srfi-64)
(srfi srfi-111)
(ice-9 match)
((rnrs base) #:select (assert mod))
((rnrs exceptions) #:select (guard))
((rnrs conditions) #:select (condition-who))
((rnrs arithmetic bitwise)
#:select (bitwise-ior))
(gnu gnunet netstruct syntactic)
((gnu gnunet netstruct procedural)
#:select (u32/big))
(gnu gnunet mq prio-prefs)
(gnu gnunet mq prio-prefs2)
(gnu gnunet util struct)
(gnu gnunet utils bv-slice)
(gnu gnunet utils hat-let)
((gnu extractor enum)
#:select (symbol-value value->index))
(gnu gnunet message protocols)
(gnu gnunet mq)
(gnu gnunet mq envelope)
(gnu gnunet mq handler)
(quickcheck property)
(quickcheck)
(quickcheck arbitrary))
;; The client code sends the numbers 0 to
;; NUM_TRANSMISSIONS-1 over the message queue.
;; The notify-sent callback verifies whether
;; messages were sent in-order. The fake
;; ‘sender’ procedure verifies whether it received
;; the messages in order.
;;
;; Note that in more realistic situations, some
;; queueing can happen! A very special case
;; is being tested here.
(define NUM_TRANSMISSIONS 100)
(eval-when (expand load eval)
(define-type /:msg:our-test:dummy
(structure/packed
(synopsis "A test message, containing an index")
(documentation
"The first time, a message with index 0 is sent.
Then each time the index is increased.")
(field (header /:message-header))
(field (index u32/big)))))
(define (index->dummy i)
(let ((s (make-slice/read-write
(sizeof /:msg:our-test:dummy '()))))
(set%! /:msg:our-test:dummy '(header type) s
(value->index (symbol-value message-type msg:util:dummy)))
(set%! /:msg:our-test:dummy '(header size) s
(sizeof /:msg:our-test:dummy '()))
(set%! /:msg:our-test:dummy '(index) s i)
s))
(define (dummy->index s)
(read% /:msg:our-test:dummy '(index) s))
(define (client mq notify-sent-box sent-box)
(define (see i)
(if (= i (unbox notify-sent-box))
(set-box! notify-sent-box (+ 1 i))
(error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)"
i
(unbox notify-sent-box)
(unbox sent-box))))
(do ((i 0 (+ 1 i)))
((>= i NUM_TRANSMISSIONS))
(send-message! mq (index->dummy i)
#:notify-sent! (cut see i))))
(define (send-proc notify-sent-box sent-box envelope)
(attempt-irrevocable-sent!
envelope
((go message priority)
(let ((index (dummy->index message)))
(unless (= (+ index 1) (unbox notify-sent-box))
(error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)"
index
(unbox notify-sent-box)
(unbox sent-box)))
(unless (= index (unbox sent-box))
(error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
index
(unbox notify-sent-box)
(unbox sent-box)))
(set-box! sent-box (+ 1 index))
(values)))
((cancelled)
(error "how did this cancelling happen?"))
((already-sent)
(error "forgot to remove envelope from queue"))))
(define no-handlers (message-handlers))
(define (no-error-handler . what)
(error "were did this error come from?"))
(test-equal "in-order, no queuing"
(list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
(let* ((notify-sent-box (box 0))
(sent-box (box 0))
(mq (make-message-queue no-handlers
no-error-handler
(make-one-by-one-sender
(cut send-proc notify-sent-box sent-box <>)))))
(client mq notify-sent-box sent-box)
(list (unbox notify-sent-box) (unbox sent-box))))
;; Simulate buffering, by only ‘truly’ sending after each three messages.
;; This does _not_ test the queueing code! See the next test for that.
;; Make sure messages aren't lost, and they are still be sent in-order!
;;
;; (Assuming the sender is well-implemented. A buggy sender could send
;; things out-of-order.)
(define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
(let ((first-free (vector-index not stashed))
(expected-filled (unbox mod-box)))
(unless (= (or first-free 0) expected-filled)
(error "did we lose a message?"))
(set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
(if (not first-free)
(begin
(vector-map!
(lambda (i envelope)
(send-proc notify-sent-box sent-box envelope)
#f)
stashed)
(vector-set! stashed 0 envelope))
;; @var{stashed} is not yet full; send the
;; envelope later!
(vector-set! stashed first-free envelope))
(values)))
(define (expected-sent n k)
(- n (let ((mod (mod n k)))
(if (= mod 0)
k
mod))))
(define k 3)
(test-equal "in-order, some buffering"
(map (cut expected-sent <> 3)
(list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
(let* ((notify-sent-box (box 0))
(sent-box (box 0))
(mod-box (box 0))
(stashed (make-vector k #f))
(mq (make-message-queue no-handlers
no-error-handler
(make-one-by-one-sender
(cut send-proc2 notify-sent-box sent-box mod-box stashed <>)))))
(client mq notify-sent-box sent-box)
(list (unbox notify-sent-box) (unbox sent-box))))
;; Test the queueing code by only flushing
;; the queue every N messages. Also check,
;; using flushing-allowed?, that sending
;; only happens when we expect it to happen.
(define flushing-allowed?
(make-parameter #f))
(define (send-proc/check notify-sent-box sent-box envelope)
(assert (flushing-allowed?))
(send-proc notify-sent-box sent-box envelope))
(define (make-every-n proc k)
"Make a sender using @var{proc} every @var{k}
invocations, and at other times doing nothing."
;; Should theoretically be an atomic, but the test is singly-threaded,
;; so don't bother.
(define n-mod-k 0)
(lambda (mq)
(assert (not (flushing-allowed?)))
(set! n-mod-k (+ 1 n-mod-k))
(when (>= n-mod-k k)
(set! n-mod-k 0)
(parameterize ((flushing-allowed? #t))
(proc mq)))))
(test-equal "in-order, some queueing"
(map (cut expected-sent <> 3)
(list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
(let* ((notify-sent-box (box 0))
(sent-box (box 0))
(mq (make-message-queue no-handlers
no-error-handler
(make-every-n
(make-one-by-one-sender
(cut send-proc/check notify-sent-box sent-box <>))
3))))
(client mq notify-sent-box sent-box)
(list (unbox notify-sent-box) (unbox sent-box))))
;; Test that concurrency interacts well with queueing.
;;
;; The situation we consider, is a number
;; of different threads concurrently sending messages.
;; The test verifies whether all messages were, in fact, sent.
;;
;; To make things complicated, some queueing is introduced.
;; The sender will only proceed each time the current thread
;; has tried to send @var{k/thread} messages, and the sender
;; will only try to send at most @code{(+ k/thread e)}, where
;; @var{e} is a random number from @var{e/min} to @var{e/max}.
;; The tests detect the following potential problems in the code
;; by crashing (but not always, so you may need to re-run a few
;; times, three times tends to be enough in practice for me):
;;
;; * Replacing 'old' with 'queue' in
;; unless (pfds:queue-empty? old)
;; * Replacing 'old' with 'queue' in
;; receive (envelope new) (pfds:dequeue old)
;; * Replacing the first 'old' with 'queue' in
;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
;; * Replacing the second 'old' with 'queue' in
;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
;; * Replacing 'old' by 'queue' in
;; (pfds:enqueue old envelope)
;; (only detected infrequently, odds 1 to 7 or so)
;; * Replacing the first 'old' by 'queue' in
;; (eq? old (swap-queue! old new))
;; in 'send-message!'
;; * Replacing the second 'old' by 'queue' in
;; (eq? old (swap-queue! old new))
;; in 'send-message!'
;;
;; The following problems cause a hang when testing:
;; * Replacing 'queue' by 'old' in (spin queue)
;; in 'make-one-by-one-sender'
;; * Replacing 'queue' by 'old' in (spin queue)
;; in 'send-message!'.
;;
;; The following problems cause a hang in a preceding
;; test:
;;
;; * Replacing the first 'old' by 'new' in
;; (eq? old (swap-queue! old new))
;; in 'send-message!'
;; * Replacing 'queue' by 'old' in
;; (spin queue)
;; in 'send-message!'
;; * Replacing 'queue' by 'new' in
;; (spin queue)
;; in 'send-message!'
;;
;; Some potential problems currently remain undetected:
;; * Replacing the 'new' by 'queue' in
;; (pfds:queue-length new)
;;
;; However, it is only for printing a warning
;; when the queue is rather full. Being slightly
;; off in queue length shouldn't be a problem
;; there, as the 'maximum reasonable bound'
;; is just a wild guess and not some exact
;; cut-off.
;;
;; Cancellation will be tested separately.
(define random/thread
(fluid->parameter (make-unbound-fluid)))
(define k/thread 12)
(define e/min 2)
(define e/max 7)
(define N_MESSAGES 1000)
(define N_THREAD 40)
;; List of (thread-index . message-index)
;; received by current thread.
(define received/thread
(fluid->parameter (make-unbound-fluid)))
(define i/thread
(fluid->parameter (make-unbound-fluid)))
;; The sending is happening concurrently,
;; so in-order delivery cannot be guaranteed.
;; Thus, requesting in-order delivery seems
;; silly.
(define prio
(bitwise-ior
(prio->integer 'prio:background)
(value->index (symbol-value priority-preference
pref:out-of-order))))
(eval-when (expand load eval)
(define-type /:msg:our-test:concurrency
(structure/packed
(synopsis "A test message, containing an thread and message index")
(documentation
"The first time, a message with index 0 is sent.
Then each time the index is increased.")
(field (header /:message-header))
(field (index u32/big))
(field (thread u32/big)))))
(define (make-thread-message thread-index i)
(let ((s (make-slice/read-write
(sizeof /:msg:our-test:concurrency '()))))
(set%! /:msg:our-test:concurrency '(header type) s
(value->index (symbol-value message-type msg:util:dummy)))
(set%! /:msg:our-test:concurrency '(header size) s
(sizeof /:msg:our-test:concurrency '()))
(set%! /:msg:our-test:concurrency '(index) s i)
(set%! /:msg:our-test:concurrency '(thread) s thread-index)
s))
(define (decode-thread-message s)
(cons (read% /:msg:our-test:concurrency '(thread) s)
(read% /:msg:our-test:concurrency '(index) s)))
(define (make-every-n/thread proc k)
"Make a sender using @var{proc} every @var{k}
invocations, and at other times doing nothing.
@code{i/thread} is used for state."
(lambda (mq)
(assert (not (flushing-allowed?)))
(i/thread (+ 1 (i/thread)))
(when (>= (i/thread) k)
(i/thread 0)
(parameterize ((flushing-allowed? #t))
(proc mq)))))
(define (thread mq thread-index)
(parameterize ((received/thread '())
(i/thread 0)
(random/thread
(seed->random-state thread-index)))
(do ((i 0 (+ 1 i)))
((>= i N_MESSAGES))
(send-message! mq (make-thread-message thread-index i)
#:priority prio))
(received/thread)))
(define (make-restricted-sender how-many make-sender inner-proc)
"Make a sender that, when called, tries to send @code{(how-many)}
messages, using @var{make-sender} and @var{inner-proc}."
(define escape-thunk
(fluid->parameter (make-unbound-fluid)))
(define count
(fluid->parameter (make-unbound-fluid)))
(define max-count
(fluid->parameter (make-unbound-fluid)))
(define (count!)
(count (+ 1 (count)))
(when (= (count) (max-count))
(count 0)
((escape-thunk))))
(lambda (mq)
(let/ec ec
(parameterize ((max-count (how-many))
(count 0)
(escape-thunk ec))
((make-sender
(lambda (envelope)
(inner-proc envelope)
;; Check 'count' AFTER some things
;; have been sent! Otherwise, the
;; message is lost.
(count!)
(values)))
mq)))))
;; After all threads have exited, we'll ‘drain’ out
;; the left-overs.
(define drain? (make-parameter #f))
(define (make-sender/choice y? x y)
"When @code{(y?)}, send with @code{y}. Else, send
with @code{x}."
(lambda (mq)
(if (y?)
(y mq)
(x mq))))
(define (inner-send envelope)
(attempt-irrevocable-sent!
envelope
((go message priority)
(received/thread (cons (decode-thread-message message)
(received/thread)))
(values))
((cancelled) (error "what/cancelled"))
((already-sent) (error "what/already-sent"))))
(define sender/thread
(make-sender/choice
drain?
(make-every-n/thread
(make-restricted-sender
(lambda ()
(+ k/thread e/min
(random (- e/max e/min -1) (random/thread))))
make-one-by-one-sender
inner-send)
k/thread)
(make-one-by-one-sender inner-send)))
(define (results->array per-thread-sent)
;; A bit array of messages the send function has
;; seen.
(define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
(define (visit-message message)
(define thread-index (car message))
(define message-index (cdr message))
(array-set! a #t message-index thread-index))
(define (visit-per-thread _ messages)
(for-each visit-message messages))
(vector-for-each visit-per-thread per-thread-sent)
a)
(define (array-missing a)
(define missing '())
(array-index-map! a
(lambda (i j)
(define found (array-ref a i j))
(unless found
(set! missing `((,i . ,j) . ,missing)))
found))
missing)
;; But possibly out-of-order!
(test-equal "nothing lost when sending concurrently"
'()
(let* ((mq (make-message-queue no-handlers
no-error-handler
sender/thread))
(thread-indices (iota N_THREAD))
;; The ‘drained-out’ messages are put
;; at index N_THREAD.
(results (make-vector (+ 1 N_THREAD)))
(done? (vector-unfold (lambda (_) (make-condition)) N_THREAD))
(ready? (make-condition)))
(run-fibers
(lambda ()
(define (run! thread-index)
(spawn-fiber
(lambda ()
(wait ready?)
(vector-set! results thread-index
(thread mq thread-index))
(signal-condition! (vector-ref done? thread-index)))))
(for-each run! thread-indices)
;; Try to start every thread at the same time!
(signal-condition! ready?)
;; #:drain? #t with parallelism is broken,
;; see .
;; So explicitely wait on each fiber.
(vector-for-each (lambda (_ c) (wait c)) done?))
#:drain? #t
;; No need
#:install-suspendable-ports? #f
;; More interrupts --> more switches
;; --> more test coverage. At least,
;; that's the idea. Not really tested.
#:hz 700)
;; Drain the left-overs.
(parameterize ((drain? #t)
(received/thread '()))
(try-send-again! mq)
(vector-set! results N_THREAD (received/thread)))
(array-missing (results->array results))))
;; Test message injection / handling (no exceptions).
(define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4))
(define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4))
(define mh (apply message-handlers
(map (lambda (i)
(message-handler
(type i)
((interpose code) code)
((well-formed? slice)
(((vector-ref mhv i)) slice))
((handle! slice)
(((vector-ref mhp i)) slice))))
(iota (vector-length mhp)))))
;; FWIW, passing #f is not really allowed.
(define mq (make-message-queue mh #f #f))
(test-eq "when injecting, handled message is eq?"
#t
(let ((m (make-slice/read-write 40))) ; could as wel have been 20
(set%! /:message-header '(size)
(slice-slice m 0 (sizeof /:message-header '())) 40)
(let/ec ec
(parameterize (((vector-ref mhp 0)
(lambda (x)
(ec (eq? x m))))
((vector-ref mhv 0)
(lambda (x)
(assert (eq? x m))
#t)))
(inject-message! mq m)
'unreachable))))
(test-eq "non-zero types ok"
#t
(let ((s (make-slice/read-write (sizeof /:message-header '()))))
(set%! /:message-header '(type) s 3)
(set%! /:message-header '(size) s (sizeof /:message-header '()))
(let/ec ec
(parameterize (((vector-ref mhp 3)
(lambda (x)
(ec (eq? x s))))
((vector-ref mhv 3)
(lambda (x)
(assert (eq? s x))
#t)))
(inject-message! mq s)
'unreachable))))
(test-equal "verifier & handler only called once"
'(1 . 1)
(let ((hcount 0)
(vcount 0)
(s (make-slice/read-write (sizeof /:message-header '()))))
(set%! /:message-header '(size) s (sizeof /:message-header '()))
(parameterize (((vector-ref mhp 0)
(lambda (x)
(set! hcount (+ 1 hcount))
(assert (eq? x s))
(values)))
((vector-ref mhv 0)
(lambda (x)
(set! vcount (+ 1 vcount))
(assert (eq? x s))
#t)))
(inject-message! mq s)
(cons hcount vcount))))
;; Test message injection (exceptions)
(test-equal "missing header error"
(map (lambda (i)
`(missing-header-error (size . ,i)
(who . inject-message!)))
(iota (sizeof /:message-header '())))
(map (lambda (i)
(guard (e ((missing-header-error? e)
`(missing-header-error
(size . ,(missing-header-error-received-size e))
(who . ,(condition-who e)))))
(inject-message! mq (make-slice/read-write i))
'unreachable))
(iota (sizeof /:message-header '()))))
(test-assert "[prop] wrong header size error"
(quickcheck
(property ((%real-length $natural)
(supposed-length $natural))
(let* ((real-length (+ (sizeof /:message-header '())
%real-length))
(supposed-length (if (= real-length supposed-length)
(+ 1 supposed-length)
supposed-length))
(s (make-slice/read-write real-length))
(sheader (slice-slice s 0 (sizeof /:message-header '()))))
(set%! /:message-header '(size)
(slice-slice s 0 (sizeof /:message-header '()))
supposed-length)
(guard (e ((size-mismatch-error? e)
(equal? `(,(size-mismatch-error-expected-size e)
,(size-mismatch-error-received-size e)
,(condition-who e))
`(,supposed-length
,real-length
inject-message!))))
(inject-message! mq s)
#f)))))
(test-assert "no applicable message handler error"
(let^ ((! errored? #f)
(! slice (bv-slice/read-write #vu8(0 4 0 0)))
(! (error-handler . e)
(match e
('(logic:no-handler 0)
(assert (not errored?))
(set! errored? #t)
(values))))
(! mq (make-message-queue no-handlers error-handler #f)))
(inject-message! mq slice)
errored?))
(test-assert "ill-formed message error"
(let^ ((! errored? #f)
(! slice (bv-slice/read-write #vu8(0 4 0 0)))
(! handlers
(message-handlers
(message-handler
(type 0)
((interpose code) code)
((well-formed? s)
(assert (eq? s slice))
#f)
((handle! slice)
(error "unreachable")))))
(! (error-handler . e)
(match e
;; Note: it theoretically may have some unspecified rest
;; rest arguments. In ‘real code’, use
;; (logic:ill-formed 0 . rest) instead.
('(logic:ill-formed 0)
(assert (not errored?))
(set! errored? #t))))
(! mq (make-message-queue handlers error-handler #f)))
(inject-message! mq slice)
errored?))
;; Test the following part of the send-message! docstring:
;; ‘After normal execution, the message envelope is returned,
;; but in case of an exception (for example, an out-of-memory exception
;; during the handling of a @code{&overly-full-queue-warning}), it is
;; possible the envelope isn't returned even though it has been enqueued
;; and it might perhaps be sent.
(test-assert "returned envelope and sent envelope are equal"
(let* ((returned-values #f)
(sent-values #f)
(sender
(make-one-by-one-sender
(lambda envelope-arguments
(assert (eq? sent-values #f))
(set! sent-values envelope-arguments)
(values))))
(mq (make-message-queue #f #f sender))
(msg (index->dummy #xdeadbeef)))
(call-with-values
(lambda () (send-message! mq msg))
(lambda return-values
(set! returned-values return-values)))
(and (equal? sent-values returned-values)
(= (length sent-values) 1)
(every envelope? sent-values))))
;; Strictly speaking, this test is allowed to fail
;; (as it is only ‘might’, not ‘it must be possible’),
;; but it seems a good idea to check our understanding is correct.
(test-assert "message might be enqueued & sent but not returned"
(let* ((enqueued? #f)
(flush? (make-parameter #f))
(sender/flush
(make-one-by-one-sender
(lambda (envelope)
(set! enqueued? envelope)
(values))))
(sender/hold
(lambda _ (values)))
(sender (make-sender/choice flush? sender/hold
sender/flush))
(mq (make-message-queue #f #f sender))
(msg (index->dummy 0))
(exceptional #f)
(enveloped #f))
(with-exception-handler
(lambda (_)
(assert exceptional)
(assert (envelope? enqueued?))
(assert (not enveloped)))
(lambda ()
(with-exception-handler
(lambda (e)
(if (overly-full-queue-warning? e)
(begin
(set! exceptional #t)
(parameterize ((flush? #t))
(try-send-again! mq)
;; At least in the current implementation,
;; this holds.
;;
;; In a different implementation, the
;; envelope could be enqueued after
;; checking the queue length.
(assert enqueued?))
(throw 'out-of-memory))
(raise-exception e #:continuable? #t)))
(lambda ()
(call-with-values
(lambda ()
(parameterize ((%suspicious-length 0))
(send-message! mq msg)))
(lambda args (set! enveloped args))))
#:unwind? #f))
#:unwind? #t
#:unwind-for-type 'out-of-memory)
(and enqueued? exceptional
(not enveloped))))
;; Message cancellation.
;;
;; Cancellation is already tested in tests/envelope.scm.
;; However, the interaction with message queues has not
;; yet been tested.
;; This test detected (not detected by previous tests):
;; * the cdr of the contents of messages+garbage/box
;; being initialised incorrectly in make-message-queue
;; * using car instead of cdr in increment-garbage&maybe-cleanup
(test-assert "envelopes do not keep a strong reference to the message queue"
(let* ((mq (make-message-queue #f #f (lambda _ (values))))
(mq-guard (make-guardian))
(envelope (send-message! mq (index->dummy 0))))
(mq-guard mq)
(attempt-cancel!
envelope
((now-cancelled)
(gc)
(->bool (mq-guard)))
((already-cancelled) (error "what/cancelled"))
((already-sent) (error "what/sent")))))
(define (count-guardian/cancelled guardian)
"Count how many elements are present in @var{guardian}.
While we're at it, verify each element is a cancelled envelope."
(let loop ((n 0))
(let ((e (guardian)))
(cond ((not e) n)
((envelope-peek-cancelled? e) (loop (+ n 1)))
(#t (error "a not-cancelled envelope was freed!"))))))
(define (count-guardian/uncancelled guardian)
"Count how many elements are present in @var{guardian}.
While we're at it, verify each element is an uncancelled envelope."
(let loop ((n 0))
(let ((e (guardian)))
(cond ((not e) n)
((not (envelope-peek-cancelled? e)) (loop (+ n 1)))
(#t (error "a cancelled envelope was freed!"))))))
;; This is a variant of
;; "the one-by-one message sender removes cancelled envelopes",
;; using guardians, and purely testing the cancelling code, and
;; not the sending code.
;;
;; It detects the following mutations:
;; * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?'
;; branch of 'make-one-by-one-sender'
(test-assert "cancelling envelopes eventually frees memory even if message sender is dead"
(let* ((mq (make-message-queue #f #f (lambda _ (values))))
(cancelled-guard (make-guardian))
(uncancelled-guard (make-guardian)))
;; Add a bunch of messages.
(let ((messages
(map (lambda (i)
(send-message! mq (index->dummy i)))
(iota 50))))
;; Cancel most of them. This should trigger collection of
;; cancelled envelopes.
(for-each
(lambda (e)
(cancelled-guard e)
(attempt-cancel!
e
((now-cancelled) (values))
((already-cancelled) (error "what/cancelled"))
((already-sent) (error "what/sent"))))
(list-head messages 40)))
;; Move freed envelopes to the guardian.
(gc)
;; How many were freed?
(let ((freed/cancelled (count-guardian/cancelled cancelled-guard))
(freed/uncancelled (count-guardian/uncancelled uncancelled-guard))
(cancelled 40)
(total 50))
(pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled
'freed/uncancelled freed/uncancelled
'queue-length (message-queue-length mq))
;; Only cancelled messages were supposed to be freed.
(assert (= freed/uncancelled 0))
(assert (<= freed/cancelled cancelled))
;; A large fraction of cancelled messages should be freed.
(assert (>= (/ freed/cancelled cancelled) 7/8))
;; If the GC is exact, all messages removed from the message
;; queue (due to cancelling) should be removed.
(unless (conservative-gc?)
(assert (= freed/cancelled (- total (message-queue-length mq)))))
#t)))
(define sender/no-cancelled
(make-one-by-one-sender
(lambda (e)
(pk 'ee e)
(assert (not (envelope-peek-cancelled? e)))
(values))))
;; Not strictly necessary (and also undocumented), but this should
;; improve the accuracy of the garbage counter. Maybe not trying
;; to send useless (cancelled) envelopes could help with performance
;; as well (untested)?
;;
;; Also, this caught a bug in (gnu gnunet mq) -- the procedure returned
;; by 'make-one-by-one-sender' went into an infinite loop if it encountered
;; a cancelled envelope.
;;
;; This tests detects negating the test
;; (eq? old (swap! old (cons old-queue incremented-garbage)))
;; in increment-garbage&maybe-cleanup.
(test-assert "the one-by-one message sender removes cancelled envelopes"
(let* ((flush? (make-parameter #f))
(sender (make-sender/choice flush? (lambda _ (values))
sender/no-cancelled))
(mq (make-message-queue #f #f sender)))
;; Fill the queue with many uncancelled messages, such that
;; the logic for collecting cancelled envelopes doesn't kick in too early.
(do ((i 0 (+ i 1)))
((>= i 30))
(send-message! mq (index->dummy i)))
(assert (= (message-queue-length mq) 30))
;; Now add some envelopes to the queue & cancel them.
(do ((i 0 (+ i 1)))
((>= i 4))
(attempt-cancel!
(send-message! mq (index->dummy (+ 30 i)))
((now-cancelled) (values))
((already-cancelled) (error "what / cancelled"))
((already-sent) (error "what / sent"))))
(assert (= (message-queue-length mq) 34))
(parameterize ((flush? #t))
(try-send-again! mq))
(assert (= (message-queue-length mq) 0))
(assert (= (%message-queue-garbagitude mq) 0))
#t))
;; This is a variation of "nothing lost when sending concurrently",
;; but for cancelation.
;;
;; This test fails in case of the following mutations:
;; * replace 0 with 1 in (or some other number) in
;; (swap! old (cons filtered 0))
;; in increment-garbage&maybe-cleanup
(test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)"
(let* ((messages/cancellation 10000)
(n/not-cancelled #f)
(flush? (make-parameter #f))
(sender/check (lambda (e)
(unless (envelope-peek-cancelled? e)
(set! n/not-cancelled (+ 1 n/not-cancelled)))
(values)))
(sender (make-sender/choice flush?
(lambda _ (values))
(make-one-by-one-sender sender/check)))
(mq (make-message-queue #f #f sender))
(ready? (make-condition))
(done? (vector-unfold
(lambda (_) (make-condition))
(/ messages/cancellation 2)))
(messages
(with-exception-handler
(lambda (e)
(if (overly-full-queue-warning? e)
(values)
(raise-exception e #:continuable? #t)))
(lambda ()
(vector-unfold (compose (cut send-message! mq <>)
index->dummy)
messages/cancellation)))))
(run-fibers
(lambda ()
;; Cancel half of the messages, concurrently.
;; Only half of all the messages are cancelled,
;; to avoid resetting the garbage counter.
(vector-for-each
(lambda (i done? message)
(when (< i (/ messages/cancellation 2))
(spawn-fiber
(lambda ()
(wait ready?)
(attempt-cancel!
message
((now-cancelled)
(signal-condition! done?)
(values))
((already-cancelled)
(signal-condition! done?)
(error "what/cancelled"))
((already-sent)
(signal-condition! done?)
(error "what/sent")))))))
done? messages)
(signal-condition! ready?)
(vector-for-each (lambda (_ c) (wait c)) done?))
#:hz 4000)
;; Verify the estimate is accurate, at least in this
;; situation.
(assert (= (pk 'garbagitude (%message-queue-garbagitude mq))
(pk 'expected (/ messages/cancellation 2))))
;; Cancel more messages (until 7/8 are cancelled),
;; to trigger collection. While we're at, verify
;; the estimate is still correct.
(do ((i (/ messages/cancellation 2) (+ i 1)))
((>= (/ i messages/cancellation) 7/8))
(attempt-cancel!
(vector-ref messages i)
((now-cancelled)
;; 3/4 is the (arbitrary) ratio at which
;; the garbage is thrown out
(if (< (* 4 i) (* 3 messages/cancellation))
(assert (= (%message-queue-garbagitude mq)
(+ i 1)))
(assert (= (%message-queue-garbagitude mq)
(- i (* 3/4 messages/cancellation))))))
((already-cancelled) (error "what/cancelled2"))
((already-sent) (error "what/sent2"))))
;; Now send the envelopes, to verify uncancelled messages
;; are still in the queue.
(parameterize ((flush? #t))
(set! n/not-cancelled 0)
(try-send-again! mq))
(assert (= n/not-cancelled (* 1/8 messages/cancellation)))
;; As everything has been removed from the queue,
;; the estimate should now be zero.
(assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq))
0))
#t))