diff options
author | Maxime Devos <maximedevos@telenet.be> | 2021-05-24 18:07:44 +0200 |
---|---|---|
committer | Maxime Devos <maximedevos@telenet.be> | 2021-09-21 12:08:41 +0200 |
commit | 08d98c025e7f50d1c6bafd94dfadd2c384fe8260 (patch) | |
tree | f3b9053612de1b17d0c7f192c2a8d1ba4e5137ea /gnu/gnunet/mq/envelope.scm | |
parent | 8f041a1762721dd25dcf1959d04816b8f8d3974a (diff) | |
download | gnunet-scheme-08d98c025e7f50d1c6bafd94dfadd2c384fe8260.tar.gz gnunet-scheme-08d98c025e7f50d1c6bafd94dfadd2c384fe8260.zip |
mq: Define envelope data type, again.
The new envelope data type can be used without
fibers or multi-threading.
* Makefile.am (modules): Remove replaced
gnu/gnunet/message/envelope.scm.
(%.go: %.scm): Do not unset GUILE_LOAD_COMPILED_PATH as that
would interfere with guile-pfds.
* README.org (Modules): Remove the obsolete
gnu/gnunet/message/envelope.scm.
(Message queues): Document new envelope module. Adjust
message queue blurb for the future.
* gnu/gnunet/mq/envelope.scm: Define new envelope module.
* gnu/gnunet/message/envelope.scm: Delete.
* tests/envelope.scm: Test the new envelope module.
Diffstat (limited to 'gnu/gnunet/mq/envelope.scm')
-rw-r--r-- | gnu/gnunet/mq/envelope.scm | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/gnu/gnunet/mq/envelope.scm b/gnu/gnunet/mq/envelope.scm new file mode 100644 index 0000000..e0c94a2 --- /dev/null +++ b/gnu/gnunet/mq/envelope.scm | |||
@@ -0,0 +1,195 @@ | |||
1 | ;; This file is part of GNUnet. | ||
2 | ;; Copyright (C) 2012-2019 GNUnet e.V. | ||
3 | ;; Copyright (C) 2021 Maxime Devos (<maximedevos@telenet.be>) | ||
4 | ;; | ||
5 | ;; GNUnet is free software: you can redistribute it and/or modify it | ||
6 | ;; under the terms of the GNU Affero General Public License as published | ||
7 | ;; by the Free Software Foundation, either version 3 of the License, | ||
8 | ;; or (at your option) any later version. | ||
9 | ;; | ||
10 | ;; GNUnet is distributed in the hope that it will be useful, but | ||
11 | ;; WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | ;; Affero General Public License for more details. | ||
14 | ;; | ||
15 | ;; You should have received a copy of the GNU Affero General Public License | ||
16 | ;; along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | ;; | ||
18 | ;; SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | |||
20 | ;; Author: Florian Dold | ||
21 | ;; Author: Maxime Devos | ||
22 | ;; C file: util/mq.c | ||
23 | ;; Scheme module: (gnu gnunet mq envelope) | ||
24 | ;; | ||
25 | ;; Limitation: the format of messages is still in flux, | ||
26 | ;; so no type checks there. | ||
27 | (define-library (gnu gnunet mq envelope) | ||
28 | (export <envelope> make-envelope envelope? | ||
29 | attempt-cancel! attempt-irrevocable-sent!) | ||
30 | (import (gnu gnunet utils hat-let) | ||
31 | (only (guile) define* lambda* exact-integer?) | ||
32 | (only (ice-9 match) match) | ||
33 | (only (ice-9 atomic) | ||
34 | make-atomic-box atomic-box-ref | ||
35 | atomic-box-compare-and-swap!) | ||
36 | (only (rnrs base) | ||
37 | lambda assert letrec let begin define | ||
38 | syntax-rules let-syntax define-syntax | ||
39 | procedure? eq? >= = <= < if quote ... | ||
40 | identifier-syntax values and let* | ||
41 | vector vector-ref vector? vector-length) | ||
42 | (only (rnrs records syntactic) define-record-type)) | ||
43 | (begin | ||
44 | (define-record-type (<envelope> make-envelope envelope?) | ||
45 | ;; Atomic box: | ||
46 | ;; #t: cancelled | ||
47 | ;; #f: to late to cancel, message has been irrevocabily sent! | ||
48 | ;; | ||
49 | ;; (Unless you play tricks like pulling out the Ethernet | ||
50 | ;; cable before the message is received by the router) | ||
51 | ;; #(message prio notify-sent! cancel!) | ||
52 | (fields (immutable state %cancellation-state)) | ||
53 | (protocol | ||
54 | (lambda (%make) | ||
55 | (lambda* (cancel! message #:key (priority 0) (notify-sent! values)) | ||
56 | "Make a message envelope; i.e., a record containing the message | ||
57 | (@var{message}, @var{priority}) and information on how to cancel the sending | ||
58 | of the message (@var{cancel!}) and who should be notified when the message | ||
59 | cannot be unsent anymore (@var{notify-sent!}). | ||
60 | |||
61 | Once marked as cancelled or irrevocabily sent, the record drops its | ||
62 | references to @var{message}, @var{cancel!} and @var{notify-sent!}. | ||
63 | When being marked as cancelled, the thunk @var{cancel!} is called." | ||
64 | (assert (and (procedure? cancel!) (procedure? notify-sent!) | ||
65 | (exact-integer? priority) | ||
66 | (<= 0 priority 511))) | ||
67 | (%make (make-atomic-box | ||
68 | (vector message priority notify-sent! cancel!))))))) | ||
69 | |||
70 | (define (%attempt-irrevocable-sent! envelope already-sent go cancelled) | ||
71 | (bind-atomic-boxen | ||
72 | ((state (%cancellation-state envelope) swap!)) | ||
73 | (let spin ((old state)) | ||
74 | (match old | ||
75 | ;; See comment at %attempt-cancel! for | ||
76 | ;; why we don't do #(message prio notify-sent! cancel!) | ||
77 | ((? vector?) | ||
78 | (if (eq? old (swap! old #f)) | ||
79 | (let^ ((!! (= (vector-length old) 4)) | ||
80 | (! message (vector-ref old 0)) | ||
81 | (! prio (vector-ref old 1)) | ||
82 | (! notify-sent! (vector-ref old 2))) | ||
83 | (notify-sent!) | ||
84 | (go message prio)) | ||
85 | (spin state))) | ||
86 | (#t (cancelled)) | ||
87 | (#f (already-sent)))))) | ||
88 | |||
89 | (define-syntax attempt-irrevocable-sent! | ||
90 | (syntax-rules (go cancelled already-sent) | ||
91 | "If @var{envelope} is not cancelled and has not yet been sent, | ||
92 | mark the message as irrevocably sent, call the notify-sent callback and | ||
93 | evaluate @var{exp/go} in an environment where the message @var{message} | ||
94 | and its priority @var{priority} are bound. | ||
95 | |||
96 | If the message has already been marked as irrevocabily sent, | ||
97 | evaluate @var{exp/already-sent} instead. If the message is cancelled, | ||
98 | evaluate @var{exp/cancelled} instead. | ||
99 | |||
100 | Even if this operation (and perhaps @code{attempt-cancel!}) is used concurrently | ||
101 | on the same @var{envelope}, whether by multi-threading, asynchronicities | ||
102 | (via @code{system-async-mark}) or by recursion, the following properties hold: | ||
103 | |||
104 | @begin itemize | ||
105 | @item the notify-sent callback of @var{envelope} is called at most once | ||
106 | @item the notify-sent callback is never called if @var{envelope} is cancelled | ||
107 | at any point in time | ||
108 | @item likewise, the code in @var{exp/go} is at most evaluated once | ||
109 | @end itemize" | ||
110 | ((_ envelope | ||
111 | ((go message priority) . exp/go) | ||
112 | ((cancelled) . exp/cancelled) | ||
113 | ((already-sent) . exp/already-sent)) | ||
114 | (%attempt-irrevocable-sent! envelope | ||
115 | (lambda () . exp/already-sent) | ||
116 | (lambda (message priority) . exp/go) | ||
117 | (lambda () . exp/cancelled))))) | ||
118 | |||
119 | (define (%attempt-cancel! envelope now-cancelled already-cancelled | ||
120 | already-sent) | ||
121 | (bind-atomic-boxen | ||
122 | ((state (%cancellation-state envelope) swap!)) | ||
123 | (let spin ((old state)) | ||
124 | (match old | ||
125 | ;; Do _not_ use #(message prio notify-sent! cancel!) | ||
126 | ;; here! Instead, delay the bounds check and accessing | ||
127 | ;; the elements of the vector after the swap!. That way: | ||
128 | ;; | ||
129 | ;; Premature optimisation. | ||
130 | ;; We save a little time in case two threads try to concurrently | ||
131 | ;; @var{state}. | ||
132 | ;; | ||
133 | ;; Meager excuse: self-healing (in case of memory corruption). | ||
134 | ;; Suppose a cosmic ray flipped a few bits and now | ||
135 | ;; @var{state} contains another vector, of different length. | ||
136 | ;; Then by performing the swap before the bounds check, | ||
137 | ;; the envelope is brought into a valid state. (And an | ||
138 | ;; exception will still result.) | ||
139 | ((? vector?) | ||
140 | (if (eq? old (swap! old #t)) | ||
141 | (let^ ((!! (= (vector-length old) 4)) | ||
142 | (! cancel! (vector-ref old 3))) | ||
143 | (cancel!) | ||
144 | (now-cancelled)) | ||
145 | (spin state))) | ||
146 | (#t (already-cancelled)) | ||
147 | ;; XXX maybe make the meager excuse less meager | ||
148 | ;; and add a 'default' case where @var{state} is | ||
149 | ;; set to #f when bad (and an exception is raised)? | ||
150 | ;; Seems like some dedicated exception types for | ||
151 | ;; memory corruption are required then ... | ||
152 | ;; And tests. | ||
153 | (#f (already-sent)))))) | ||
154 | |||
155 | (define-syntax attempt-cancel! | ||
156 | (syntax-rules (now-cancelled already-cancelled already-sent) | ||
157 | "If @var{envelope} is not yet marked as cancelled or sent, | ||
158 | mark it as cancelled, call the corresponding cancellation callback | ||
159 | and evaluate @var{exp/now-cancelled}. | ||
160 | |||
161 | If @var{envelope} is already marked as cancelled, do not mutate | ||
162 | anything or call any callback and evaluate @var{exp/already-cancelled}. | ||
163 | Likewise, if @var{envelope} is marked as irrevocably sent, evaluate | ||
164 | @var{exp/already-sent} instead. | ||
165 | |||
166 | If this operation is interrupted before @var{exp/now-cancelled} is | ||
167 | evaluated, the envelope may be marked as cancelled even if the | ||
168 | cancellation callback has not yet been called or has not yet returned. | ||
169 | |||
170 | However, by tolerating this limitation, it can be (and is) guaranteed | ||
171 | that the cancellation callback is called at most once. Likewise, the | ||
172 | code in @var{exp/now-cancelled} is only be called at most once. | ||
173 | Also, the cancellation callback and is never called (and @var{exp/now-cancelled} | ||
174 | never evaluated) if @var{envelope} is marked as sent at any point in time." | ||
175 | ((_ envelope | ||
176 | ((now-cancelled) . exp/now-cancelled) | ||
177 | ((already-cancelled) . exp/already-cancelled) | ||
178 | ((already-sent) . exp/already-sent)) | ||
179 | (%attempt-cancel! envelope | ||
180 | (lambda () . exp/now-cancelled) | ||
181 | (lambda () . exp/already-cancelled) | ||
182 | (lambda () . exp/already-sent))))) | ||
183 | |||
184 | (define-syntax bind-atomic-boxen | ||
185 | (syntax-rules () | ||
186 | ((_ () exp exp* ...) | ||
187 | (let () exp exp* ...)) | ||
188 | ((_ ((variable box swap!) . etc) exp exp* ...) | ||
189 | (let ((stashed-box box)) | ||
190 | (let-syntax ((variable (identifier-syntax | ||
191 | (atomic-box-ref box)))) | ||
192 | (let ((swap! (lambda (expected desired) | ||
193 | (atomic-box-compare-and-swap! box expected | ||
194 | desired)))) | ||
195 | (bind-atomic-boxen etc exp exp* ...))))))))) | ||