;; This file is part of GNUnet. ;; Copyright (C) 2012, 2018 GNUnet e.V. ;; Copyright (C) 2021 Maxime Devos ;; ;; GNUnet is free software: you can redistribute it and/or modify it ;; under the terms of the GNU Affero General Public License as published ;; by the Free Software Foundation, either version 3 of the License, ;; or (at your option) any later version. ;; ;; GNUnet is distributed in the hope that it will be useful, but ;; WITHOUT ANY WARRANTY; without even the implied warranty of ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;; Affero General Public License for more details. ;; ;; You should have received a copy of the GNU Affero General Public License ;; along with this program. If not, see . ;; ;; SPDX-License-Identifier: AGPL3.0-or-later ;; Author: Florian Dold ;; Author: Christian Grothoff ;; Author: Maxime Devos (define-module (tests mq)) (use-modules (ice-9 control) (tests utils) ; for conservative-gc? (fibers conditions) (fibers) (srfi srfi-1) (srfi srfi-26) (srfi srfi-39) (srfi srfi-43) (srfi srfi-64) (srfi srfi-111) (ice-9 match) ((rnrs base) #:select (assert mod)) ((rnrs exceptions) #:select (guard)) ((rnrs conditions) #:select (condition-who)) ((rnrs arithmetic bitwise) #:select (bitwise-ior)) (gnu gnunet netstruct syntactic) ((gnu gnunet netstruct procedural) #:select (u32/big)) (gnu gnunet mq prio-prefs) (gnu gnunet mq prio-prefs2) (gnu gnunet util struct) (gnu gnunet utils bv-slice) (gnu gnunet utils hat-let) ((gnu extractor enum) #:select (symbol-value value->index)) (gnu gnunet message protocols) (gnu gnunet mq) (gnu gnunet mq envelope) (gnu gnunet mq handler) (quickcheck property) (quickcheck) (quickcheck arbitrary)) ;; The client code sends the numbers 0 to ;; NUM_TRANSMISSIONS-1 over the message queue. ;; The notify-sent callback verifies whether ;; messages were sent in-order. The fake ;; ‘sender’ procedure verifies whether it received ;; the messages in order. ;; ;; Note that in more realistic situations, some ;; queueing can happen! A very special case ;; is being tested here. (define NUM_TRANSMISSIONS 100) (eval-when (expand load eval) (define-type /:msg:our-test:dummy (structure/packed (synopsis "A test message, containing an index") (documentation "The first time, a message with index 0 is sent. Then each time the index is increased.") (field (header /:message-header)) (field (index u32/big))))) (define (index->dummy i) (let ((s (make-slice/read-write (sizeof /:msg:our-test:dummy '())))) (set%! /:msg:our-test:dummy '(header type) s (value->index (symbol-value message-type msg:util:dummy))) (set%! /:msg:our-test:dummy '(header size) s (sizeof /:msg:our-test:dummy '())) (set%! /:msg:our-test:dummy '(index) s i) s)) (define (dummy->index s) (read% /:msg:our-test:dummy '(index) s)) (define (client mq notify-sent-box sent-box) (define (see i) (if (= i (unbox notify-sent-box)) (set-box! notify-sent-box (+ 1 i)) (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)" i (unbox notify-sent-box) (unbox sent-box)))) (do ((i 0 (+ 1 i))) ((>= i NUM_TRANSMISSIONS)) (send-message! mq (index->dummy i) #:notify-sent! (cut see i)))) (define (send-proc notify-sent-box sent-box envelope) (attempt-irrevocable-sent! envelope ((go message priority) (let ((index (dummy->index message))) (unless (= (+ index 1) (unbox notify-sent-box)) (error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)" index (unbox notify-sent-box) (unbox sent-box))) (unless (= index (unbox sent-box)) (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)" index (unbox notify-sent-box) (unbox sent-box))) (set-box! sent-box (+ 1 index)) (values))) ((cancelled) (error "how did this cancelling happen?")) ((already-sent) (error "forgot to remove envelope from queue")))) (define no-handlers (message-handlers)) (define (no-error-handler . what) (error "were did this error come from?")) (test-equal "in-order, no queuing" (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS) (let* ((notify-sent-box (box 0)) (sent-box (box 0)) (mq (make-message-queue no-handlers no-error-handler (make-one-by-one-sender (cut send-proc notify-sent-box sent-box <>))))) (client mq notify-sent-box sent-box) (list (unbox notify-sent-box) (unbox sent-box)))) ;; Simulate buffering, by only ‘truly’ sending after each three messages. ;; This does _not_ test the queueing code! See the next test for that. ;; Make sure messages aren't lost, and they are still be sent in-order! ;; ;; (Assuming the sender is well-implemented. A buggy sender could send ;; things out-of-order.) (define (send-proc2 notify-sent-box sent-box mod-box stashed envelope) (let ((first-free (vector-index not stashed)) (expected-filled (unbox mod-box))) (unless (= (or first-free 0) expected-filled) (error "did we lose a message?")) (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed))) (if (not first-free) (begin (vector-map! (lambda (i envelope) (send-proc notify-sent-box sent-box envelope) #f) stashed) (vector-set! stashed 0 envelope)) ;; @var{stashed} is not yet full; send the ;; envelope later! (vector-set! stashed first-free envelope)) (values))) (define (expected-sent n k) (- n (let ((mod (mod n k))) (if (= mod 0) k mod)))) (define k 3) (test-equal "in-order, some buffering" (map (cut expected-sent <> 3) (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)) (let* ((notify-sent-box (box 0)) (sent-box (box 0)) (mod-box (box 0)) (stashed (make-vector k #f)) (mq (make-message-queue no-handlers no-error-handler (make-one-by-one-sender (cut send-proc2 notify-sent-box sent-box mod-box stashed <>))))) (client mq notify-sent-box sent-box) (list (unbox notify-sent-box) (unbox sent-box)))) ;; Test the queueing code by only flushing ;; the queue every N messages. Also check, ;; using flushing-allowed?, that sending ;; only happens when we expect it to happen. (define flushing-allowed? (make-parameter #f)) (define (send-proc/check notify-sent-box sent-box envelope) (assert (flushing-allowed?)) (send-proc notify-sent-box sent-box envelope)) (define (make-every-n proc k) "Make a sender using @var{proc} every @var{k} invocations, and at other times doing nothing." ;; Should theoretically be an atomic, but the test is singly-threaded, ;; so don't bother. (define n-mod-k 0) (lambda (mq) (assert (not (flushing-allowed?))) (set! n-mod-k (+ 1 n-mod-k)) (when (>= n-mod-k k) (set! n-mod-k 0) (parameterize ((flushing-allowed? #t)) (proc mq))))) (test-equal "in-order, some queueing" (map (cut expected-sent <> 3) (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)) (let* ((notify-sent-box (box 0)) (sent-box (box 0)) (mq (make-message-queue no-handlers no-error-handler (make-every-n (make-one-by-one-sender (cut send-proc/check notify-sent-box sent-box <>)) 3)))) (client mq notify-sent-box sent-box) (list (unbox notify-sent-box) (unbox sent-box)))) ;; Test that concurrency interacts well with queueing. ;; ;; The situation we consider, is a number ;; of different threads concurrently sending messages. ;; The test verifies whether all messages were, in fact, sent. ;; ;; To make things complicated, some queueing is introduced. ;; The sender will only proceed each time the current thread ;; has tried to send @var{k/thread} messages, and the sender ;; will only try to send at most @code{(+ k/thread e)}, where ;; @var{e} is a random number from @var{e/min} to @var{e/max}. ;; The tests detect the following potential problems in the code ;; by crashing (but not always, so you may need to re-run a few ;; times, three times tends to be enough in practice for me): ;; ;; * Replacing 'old' with 'queue' in ;; unless (pfds:queue-empty? old) ;; * Replacing 'old' with 'queue' in ;; receive (envelope new) (pfds:dequeue old) ;; * Replacing the first 'old' with 'queue' in ;; (eq? old (swap! old new)), in 'make-one-by-one-sender' ;; * Replacing the second 'old' with 'queue' in ;; (eq? old (swap! old new)), in 'make-one-by-one-sender' ;; * Replacing 'old' by 'queue' in ;; (pfds:enqueue old envelope) ;; (only detected infrequently, odds 1 to 7 or so) ;; * Replacing the first 'old' by 'queue' in ;; (eq? old (swap-queue! old new)) ;; in 'send-message!' ;; * Replacing the second 'old' by 'queue' in ;; (eq? old (swap-queue! old new)) ;; in 'send-message!' ;; ;; The following problems cause a hang when testing: ;; * Replacing 'queue' by 'old' in (spin queue) ;; in 'make-one-by-one-sender' ;; * Replacing 'queue' by 'old' in (spin queue) ;; in 'send-message!'. ;; ;; The following problems cause a hang in a preceding ;; test: ;; ;; * Replacing the first 'old' by 'new' in ;; (eq? old (swap-queue! old new)) ;; in 'send-message!' ;; * Replacing 'queue' by 'old' in ;; (spin queue) ;; in 'send-message!' ;; * Replacing 'queue' by 'new' in ;; (spin queue) ;; in 'send-message!' ;; ;; Some potential problems currently remain undetected: ;; * Replacing the 'new' by 'queue' in ;; (pfds:queue-length new) ;; ;; However, it is only for printing a warning ;; when the queue is rather full. Being slightly ;; off in queue length shouldn't be a problem ;; there, as the 'maximum reasonable bound' ;; is just a wild guess and not some exact ;; cut-off. ;; ;; Cancellation will be tested separately. (define random/thread (fluid->parameter (make-unbound-fluid))) (define k/thread 12) (define e/min 2) (define e/max 7) (define N_MESSAGES 1000) (define N_THREAD 40) ;; List of (thread-index . message-index) ;; received by current thread. (define received/thread (fluid->parameter (make-unbound-fluid))) (define i/thread (fluid->parameter (make-unbound-fluid))) ;; The sending is happening concurrently, ;; so in-order delivery cannot be guaranteed. ;; Thus, requesting in-order delivery seems ;; silly. (define prio (bitwise-ior (prio->integer 'prio:background) (value->index (symbol-value priority-preference pref:out-of-order)))) (eval-when (expand load eval) (define-type /:msg:our-test:concurrency (structure/packed (synopsis "A test message, containing an thread and message index") (documentation "The first time, a message with index 0 is sent. Then each time the index is increased.") (field (header /:message-header)) (field (index u32/big)) (field (thread u32/big))))) (define (make-thread-message thread-index i) (let ((s (make-slice/read-write (sizeof /:msg:our-test:concurrency '())))) (set%! /:msg:our-test:concurrency '(header type) s (value->index (symbol-value message-type msg:util:dummy))) (set%! /:msg:our-test:concurrency '(header size) s (sizeof /:msg:our-test:concurrency '())) (set%! /:msg:our-test:concurrency '(index) s i) (set%! /:msg:our-test:concurrency '(thread) s thread-index) s)) (define (decode-thread-message s) (cons (read% /:msg:our-test:concurrency '(thread) s) (read% /:msg:our-test:concurrency '(index) s))) (define (make-every-n/thread proc k) "Make a sender using @var{proc} every @var{k} invocations, and at other times doing nothing. @code{i/thread} is used for state." (lambda (mq) (assert (not (flushing-allowed?))) (i/thread (+ 1 (i/thread))) (when (>= (i/thread) k) (i/thread 0) (parameterize ((flushing-allowed? #t)) (proc mq))))) (define (thread mq thread-index) (parameterize ((received/thread '()) (i/thread 0) (random/thread (seed->random-state thread-index))) (do ((i 0 (+ 1 i))) ((>= i N_MESSAGES)) (send-message! mq (make-thread-message thread-index i) #:priority prio)) (received/thread))) (define (make-restricted-sender how-many make-sender inner-proc) "Make a sender that, when called, tries to send @code{(how-many)} messages, using @var{make-sender} and @var{inner-proc}." (define escape-thunk (fluid->parameter (make-unbound-fluid))) (define count (fluid->parameter (make-unbound-fluid))) (define max-count (fluid->parameter (make-unbound-fluid))) (define (count!) (count (+ 1 (count))) (when (= (count) (max-count)) (count 0) ((escape-thunk)))) (lambda (mq) (let/ec ec (parameterize ((max-count (how-many)) (count 0) (escape-thunk ec)) ((make-sender (lambda (envelope) (inner-proc envelope) ;; Check 'count' AFTER some things ;; have been sent! Otherwise, the ;; message is lost. (count!) (values))) mq))))) ;; After all threads have exited, we'll ‘drain’ out ;; the left-overs. (define drain? (make-parameter #f)) (define (make-sender/choice y? x y) "When @code{(y?)}, send with @code{y}. Else, send with @code{x}." (lambda (mq) (if (y?) (y mq) (x mq)))) (define (inner-send envelope) (attempt-irrevocable-sent! envelope ((go message priority) (received/thread (cons (decode-thread-message message) (received/thread))) (values)) ((cancelled) (error "what/cancelled")) ((already-sent) (error "what/already-sent")))) (define sender/thread (make-sender/choice drain? (make-every-n/thread (make-restricted-sender (lambda () (+ k/thread e/min (random (- e/max e/min -1) (random/thread)))) make-one-by-one-sender inner-send) k/thread) (make-one-by-one-sender inner-send))) (define (results->array per-thread-sent) ;; A bit array of messages the send function has ;; seen. (define a (make-typed-array 'b #f N_MESSAGES N_THREAD)) (define (visit-message message) (define thread-index (car message)) (define message-index (cdr message)) (array-set! a #t message-index thread-index)) (define (visit-per-thread _ messages) (for-each visit-message messages)) (vector-for-each visit-per-thread per-thread-sent) a) (define (array-missing a) (define missing '()) (array-index-map! a (lambda (i j) (define found (array-ref a i j)) (unless found (set! missing `((,i . ,j) . ,missing))) found)) missing) ;; But possibly out-of-order! (test-equal "nothing lost when sending concurrently" '() (let* ((mq (make-message-queue no-handlers no-error-handler sender/thread)) (thread-indices (iota N_THREAD)) ;; The ‘drained-out’ messages are put ;; at index N_THREAD. (results (make-vector (+ 1 N_THREAD))) (done? (vector-unfold (lambda (_) (make-condition)) N_THREAD)) (ready? (make-condition))) (run-fibers (lambda () (define (run! thread-index) (spawn-fiber (lambda () (wait ready?) (vector-set! results thread-index (thread mq thread-index)) (signal-condition! (vector-ref done? thread-index))))) (for-each run! thread-indices) ;; Try to start every thread at the same time! (signal-condition! ready?) ;; #:drain? #t with parallelism is broken, ;; see . ;; So explicitely wait on each fiber. (vector-for-each (lambda (_ c) (wait c)) done?)) #:drain? #t ;; No need #:install-suspendable-ports? #f ;; More interrupts --> more switches ;; --> more test coverage. At least, ;; that's the idea. Not really tested. #:hz 700) ;; Drain the left-overs. (parameterize ((drain? #t) (received/thread '())) (try-send-again! mq) (vector-set! results N_THREAD (received/thread))) (array-missing (results->array results)))) ;; Test message injection / handling (no exceptions). (define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4)) (define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4)) (define mh (apply message-handlers (map (lambda (i) (message-handler (type i) ((interpose code) code) ((well-formed? slice) (((vector-ref mhv i)) slice)) ((handle! slice) (((vector-ref mhp i)) slice)))) (iota (vector-length mhp))))) ;; FWIW, passing #f is not really allowed. (define mq (make-message-queue mh #f #f)) (test-eq "when injecting, handled message is eq?" #t (let ((m (make-slice/read-write 40))) ; could as wel have been 20 (set%! /:message-header '(size) (slice-slice m 0 (sizeof /:message-header '())) 40) (let/ec ec (parameterize (((vector-ref mhp 0) (lambda (x) (ec (eq? x m)))) ((vector-ref mhv 0) (lambda (x) (assert (eq? x m)) #t))) (inject-message! mq m) 'unreachable)))) (test-eq "non-zero types ok" #t (let ((s (make-slice/read-write (sizeof /:message-header '())))) (set%! /:message-header '(type) s 3) (set%! /:message-header '(size) s (sizeof /:message-header '())) (let/ec ec (parameterize (((vector-ref mhp 3) (lambda (x) (ec (eq? x s)))) ((vector-ref mhv 3) (lambda (x) (assert (eq? s x)) #t))) (inject-message! mq s) 'unreachable)))) (test-equal "verifier & handler only called once" '(1 . 1) (let ((hcount 0) (vcount 0) (s (make-slice/read-write (sizeof /:message-header '())))) (set%! /:message-header '(size) s (sizeof /:message-header '())) (parameterize (((vector-ref mhp 0) (lambda (x) (set! hcount (+ 1 hcount)) (assert (eq? x s)) (values))) ((vector-ref mhv 0) (lambda (x) (set! vcount (+ 1 vcount)) (assert (eq? x s)) #t))) (inject-message! mq s) (cons hcount vcount)))) ;; Test message injection (exceptions) (test-equal "missing header error" (map (lambda (i) `(missing-header-error (size . ,i) (who . inject-message!))) (iota (sizeof /:message-header '()))) (map (lambda (i) (guard (e ((missing-header-error? e) `(missing-header-error (size . ,(missing-header-error-received-size e)) (who . ,(condition-who e))))) (inject-message! mq (make-slice/read-write i)) 'unreachable)) (iota (sizeof /:message-header '())))) (test-assert "[prop] wrong header size error" (quickcheck (property ((%real-length $natural) (supposed-length $natural)) (let* ((real-length (+ (sizeof /:message-header '()) %real-length)) (supposed-length (if (= real-length supposed-length) (+ 1 supposed-length) supposed-length)) (s (make-slice/read-write real-length)) (sheader (slice-slice s 0 (sizeof /:message-header '())))) (set%! /:message-header '(size) (slice-slice s 0 (sizeof /:message-header '())) supposed-length) (guard (e ((size-mismatch-error? e) (equal? `(,(size-mismatch-error-expected-size e) ,(size-mismatch-error-received-size e) ,(condition-who e)) `(,supposed-length ,real-length inject-message!)))) (inject-message! mq s) #f))))) (test-assert "no applicable message handler error" (let^ ((! errored? #f) (! slice (bv-slice/read-write #vu8(0 4 0 0))) (! (error-handler . e) (match e ('(logic:no-handler 0) (assert (not errored?)) (set! errored? #t) (values)))) (! mq (make-message-queue no-handlers error-handler #f))) (inject-message! mq slice) errored?)) (test-assert "ill-formed message error" (let^ ((! errored? #f) (! slice (bv-slice/read-write #vu8(0 4 0 0))) (! handlers (message-handlers (message-handler (type 0) ((interpose code) code) ((well-formed? s) (assert (eq? s slice)) #f) ((handle! slice) (error "unreachable"))))) (! (error-handler . e) (match e ;; Note: it theoretically may have some unspecified rest ;; rest arguments. In ‘real code’, use ;; (logic:ill-formed 0 . rest) instead. ('(logic:ill-formed 0) (assert (not errored?)) (set! errored? #t)))) (! mq (make-message-queue handlers error-handler #f))) (inject-message! mq slice) errored?)) ;; Test the following part of the send-message! docstring: ;; ‘After normal execution, the message envelope is returned, ;; but in case of an exception (for example, an out-of-memory exception ;; during the handling of a @code{&overly-full-queue-warning}), it is ;; possible the envelope isn't returned even though it has been enqueued ;; and it might perhaps be sent. (test-assert "returned envelope and sent envelope are equal" (let* ((returned-values #f) (sent-values #f) (sender (make-one-by-one-sender (lambda envelope-arguments (assert (eq? sent-values #f)) (set! sent-values envelope-arguments) (values)))) (mq (make-message-queue #f #f sender)) (msg (index->dummy #xdeadbeef))) (call-with-values (lambda () (send-message! mq msg)) (lambda return-values (set! returned-values return-values))) (and (equal? sent-values returned-values) (= (length sent-values) 1) (every envelope? sent-values)))) ;; Strictly speaking, this test is allowed to fail ;; (as it is only ‘might’, not ‘it must be possible’), ;; but it seems a good idea to check our understanding is correct. (test-assert "message might be enqueued & sent but not returned" (let* ((enqueued? #f) (flush? (make-parameter #f)) (sender/flush (make-one-by-one-sender (lambda (envelope) (set! enqueued? envelope) (values)))) (sender/hold (lambda _ (values))) (sender (make-sender/choice flush? sender/hold sender/flush)) (mq (make-message-queue #f #f sender)) (msg (index->dummy 0)) (exceptional #f) (enveloped #f)) (with-exception-handler (lambda (_) (assert exceptional) (assert (envelope? enqueued?)) (assert (not enveloped))) (lambda () (with-exception-handler (lambda (e) (if (overly-full-queue-warning? e) (begin (set! exceptional #t) (parameterize ((flush? #t)) (try-send-again! mq) ;; At least in the current implementation, ;; this holds. ;; ;; In a different implementation, the ;; envelope could be enqueued after ;; checking the queue length. (assert enqueued?)) (throw 'out-of-memory)) (raise-exception e #:continuable? #t))) (lambda () (call-with-values (lambda () (parameterize ((%suspicious-length 0)) (send-message! mq msg))) (lambda args (set! enveloped args)))) #:unwind? #f)) #:unwind? #t #:unwind-for-type 'out-of-memory) (and enqueued? exceptional (not enveloped)))) ;; Message cancellation. ;; ;; Cancellation is already tested in tests/envelope.scm. ;; However, the interaction with message queues has not ;; yet been tested. ;; This test detected (not detected by previous tests): ;; * the cdr of the contents of messages+garbage/box ;; being initialised incorrectly in make-message-queue ;; * using car instead of cdr in increment-garbage&maybe-cleanup (test-assert "envelopes do not keep a strong reference to the message queue" (let* ((mq (make-message-queue #f #f (lambda _ (values)))) (mq-guard (make-guardian)) (envelope (send-message! mq (index->dummy 0)))) (mq-guard mq) (attempt-cancel! envelope ((now-cancelled) (gc) (->bool (mq-guard))) ((already-cancelled) (error "what/cancelled")) ((already-sent) (error "what/sent"))))) (define (count-guardian/cancelled guardian) "Count how many elements are present in @var{guardian}. While we're at it, verify each element is a cancelled envelope." (let loop ((n 0)) (let ((e (guardian))) (cond ((not e) n) ((envelope-peek-cancelled? e) (loop (+ n 1))) (#t (error "a not-cancelled envelope was freed!")))))) (define (count-guardian/uncancelled guardian) "Count how many elements are present in @var{guardian}. While we're at it, verify each element is an uncancelled envelope." (let loop ((n 0)) (let ((e (guardian))) (cond ((not e) n) ((not (envelope-peek-cancelled? e)) (loop (+ n 1))) (#t (error "a cancelled envelope was freed!")))))) ;; This is a variant of ;; "the one-by-one message sender removes cancelled envelopes", ;; using guardians, and purely testing the cancelling code, and ;; not the sending code. ;; ;; It detects the following mutations: ;; * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?' ;; branch of 'make-one-by-one-sender' (test-assert "cancelling envelopes eventually frees memory even if message sender is dead" (let* ((mq (make-message-queue #f #f (lambda _ (values)))) (cancelled-guard (make-guardian)) (uncancelled-guard (make-guardian))) ;; Add a bunch of messages. (let ((messages (map (lambda (i) (send-message! mq (index->dummy i))) (iota 50)))) ;; Cancel most of them. This should trigger collection of ;; cancelled envelopes. (for-each (lambda (e) (cancelled-guard e) (attempt-cancel! e ((now-cancelled) (values)) ((already-cancelled) (error "what/cancelled")) ((already-sent) (error "what/sent")))) (list-head messages 40))) ;; Move freed envelopes to the guardian. (gc) ;; How many were freed? (let ((freed/cancelled (count-guardian/cancelled cancelled-guard)) (freed/uncancelled (count-guardian/uncancelled uncancelled-guard)) (cancelled 40) (total 50)) (pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled 'freed/uncancelled freed/uncancelled 'queue-length (message-queue-length mq)) ;; Only cancelled messages were supposed to be freed. (assert (= freed/uncancelled 0)) (assert (<= freed/cancelled cancelled)) ;; A large fraction of cancelled messages should be freed. (assert (>= (/ freed/cancelled cancelled) 7/8)) ;; If the GC is exact, all messages removed from the message ;; queue (due to cancelling) should be removed. (unless (conservative-gc?) (assert (= freed/cancelled (- total (message-queue-length mq))))) #t))) (define sender/no-cancelled (make-one-by-one-sender (lambda (e) (pk 'ee e) (assert (not (envelope-peek-cancelled? e))) (values)))) ;; Not strictly necessary (and also undocumented), but this should ;; improve the accuracy of the garbage counter. Maybe not trying ;; to send useless (cancelled) envelopes could help with performance ;; as well (untested)? ;; ;; Also, this caught a bug in (gnu gnunet mq) -- the procedure returned ;; by 'make-one-by-one-sender' went into an infinite loop if it encountered ;; a cancelled envelope. ;; ;; This tests detects negating the test ;; (eq? old (swap! old (cons old-queue incremented-garbage))) ;; in increment-garbage&maybe-cleanup. (test-assert "the one-by-one message sender removes cancelled envelopes" (let* ((flush? (make-parameter #f)) (sender (make-sender/choice flush? (lambda _ (values)) sender/no-cancelled)) (mq (make-message-queue #f #f sender))) ;; Fill the queue with many uncancelled messages, such that ;; the logic for collecting cancelled envelopes doesn't kick in too early. (do ((i 0 (+ i 1))) ((>= i 30)) (send-message! mq (index->dummy i))) (assert (= (message-queue-length mq) 30)) ;; Now add some envelopes to the queue & cancel them. (do ((i 0 (+ i 1))) ((>= i 4)) (attempt-cancel! (send-message! mq (index->dummy (+ 30 i))) ((now-cancelled) (values)) ((already-cancelled) (error "what / cancelled")) ((already-sent) (error "what / sent")))) (assert (= (message-queue-length mq) 34)) (parameterize ((flush? #t)) (try-send-again! mq)) (assert (= (message-queue-length mq) 0)) (assert (= (%message-queue-garbagitude mq) 0)) #t)) ;; This is a variation of "nothing lost when sending concurrently", ;; but for cancelation. ;; ;; This test fails in case of the following mutations: ;; * replace 0 with 1 in (or some other number) in ;; (swap! old (cons filtered 0)) ;; in increment-garbage&maybe-cleanup (test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)" (let* ((messages/cancellation 10000) (n/not-cancelled #f) (flush? (make-parameter #f)) (sender/check (lambda (e) (unless (envelope-peek-cancelled? e) (set! n/not-cancelled (+ 1 n/not-cancelled))) (values))) (sender (make-sender/choice flush? (lambda _ (values)) (make-one-by-one-sender sender/check))) (mq (make-message-queue #f #f sender)) (ready? (make-condition)) (done? (vector-unfold (lambda (_) (make-condition)) (/ messages/cancellation 2))) (messages (with-exception-handler (lambda (e) (if (overly-full-queue-warning? e) (values) (raise-exception e #:continuable? #t))) (lambda () (vector-unfold (compose (cut send-message! mq <>) index->dummy) messages/cancellation))))) (run-fibers (lambda () ;; Cancel half of the messages, concurrently. ;; Only half of all the messages are cancelled, ;; to avoid resetting the garbage counter. (vector-for-each (lambda (i done? message) (when (< i (/ messages/cancellation 2)) (spawn-fiber (lambda () (wait ready?) (attempt-cancel! message ((now-cancelled) (signal-condition! done?) (values)) ((already-cancelled) (signal-condition! done?) (error "what/cancelled")) ((already-sent) (signal-condition! done?) (error "what/sent"))))))) done? messages) (signal-condition! ready?) (vector-for-each (lambda (_ c) (wait c)) done?)) #:hz 4000) ;; Verify the estimate is accurate, at least in this ;; situation. (assert (= (pk 'garbagitude (%message-queue-garbagitude mq)) (pk 'expected (/ messages/cancellation 2)))) ;; Cancel more messages (until 7/8 are cancelled), ;; to trigger collection. While we're at, verify ;; the estimate is still correct. (do ((i (/ messages/cancellation 2) (+ i 1))) ((>= (/ i messages/cancellation) 7/8)) (attempt-cancel! (vector-ref messages i) ((now-cancelled) ;; 3/4 is the (arbitrary) ratio at which ;; the garbage is thrown out (if (< (* 4 i) (* 3 messages/cancellation)) (assert (= (%message-queue-garbagitude mq) (+ i 1))) (assert (= (%message-queue-garbagitude mq) (- i (* 3/4 messages/cancellation)))))) ((already-cancelled) (error "what/cancelled2")) ((already-sent) (error "what/sent2")))) ;; Now send the envelopes, to verify uncancelled messages ;; are still in the queue. (parameterize ((flush? #t)) (set! n/not-cancelled 0) (try-send-again! mq)) (assert (= n/not-cancelled (* 1/8 messages/cancellation))) ;; As everything has been removed from the queue, ;; the estimate should now be zero. (assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq)) 0)) #t))