diff options
author | Maxime Devos <maximedevos@telenet.be> | 2021-03-04 18:56:33 +0100 |
---|---|---|
committer | Maxime Devos <maximedevos@telenet.be> | 2021-09-21 12:08:36 +0200 |
commit | a851800e1c471adc51b65f6298907fbc84e9073c (patch) | |
tree | eb71848eb6a6b83dfec6093bb697efa480379d28 /gnu/gnunet/mq | |
parent | 8313e48bfdcb6ed9b005dc83676750e6b0abf902 (diff) | |
download | gnunet-scheme-a851800e1c471adc51b65f6298907fbc84e9073c.tar.gz gnunet-scheme-a851800e1c471adc51b65f6298907fbc84e9073c.zip |
mq: define message queue module
* gnu/gnunet/mq/message-io.scm: new module.
* tests/message-io.scm: test it.
* README.org (Modules): document message queues.
* Makefile.am: compile new module and run its tests.
Diffstat (limited to 'gnu/gnunet/mq')
-rw-r--r-- | gnu/gnunet/mq/message-io.scm | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/gnu/gnunet/mq/message-io.scm b/gnu/gnunet/mq/message-io.scm new file mode 100644 index 0000000..b19fb6b --- /dev/null +++ b/gnu/gnunet/mq/message-io.scm | |||
@@ -0,0 +1,238 @@ | |||
1 | ;; This file is part of scheme-GNUnet. | ||
2 | ;; Copyright (C) 2021 Maxime Devos | ||
3 | ;; | ||
4 | ;; scheme-GNUnet is free software: you can redistribute it and/or modify it | ||
5 | ;; under the terms of the GNU Affero General Public License as published | ||
6 | ;; by the Free Software Foundation, either version 3 of the License, | ||
7 | ;; or (at your option) any later version. | ||
8 | ;; | ||
9 | ;; scheme-GNUnet is distributed in the hope that it will be useful, but | ||
10 | ;; WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | ;; Affero General Public License for more details. | ||
13 | ;; | ||
14 | ;; You should have received a copy of the GNU Affero General Public License | ||
15 | ;; along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | ;; | ||
17 | ;; SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | ;; @author Maxime Devos (scheme-GNUnet) | ||
20 | ;; | ||
21 | ;; @brief Generic interface for sending / receiving messages | ||
22 | ;; TODO perhaps some kind of buffering would be useful. | ||
23 | ;; | ||
24 | ;; What's the impact on performance from having to wait on a fiber | ||
25 | ;; when sending / receving a message? Maybe put the 'queue' in | ||
26 | ;; message queue when it makes sense. | ||
27 | ;; | ||
28 | ;; TODO integrate with message envelopes. Maybe change the | ||
29 | ;; definition of envelopes. | ||
30 | (define-library (gnu gnunet mq message-io) | ||
31 | (export <message-input> message-transport->input message-input? | ||
32 | read-message-operation | ||
33 | read-input-error-operation | ||
34 | close-input! | ||
35 | wait-for-input-closed-operation | ||
36 | |||
37 | <message-output> message-transport->output message-output? | ||
38 | send-message-operation | ||
39 | read-output-error-operation | ||
40 | wait-for-output-closed-operation | ||
41 | close-output! | ||
42 | |||
43 | <message-transport> make-message-transport message-transport? | ||
44 | wait-for-transport-close-operation | ||
45 | notice-input-error-operation | ||
46 | notice-output-error-operation | ||
47 | close-transport!) | ||
48 | (import (only (rnrs base) | ||
49 | begin let define lambda assert) | ||
50 | (only (fibers operations) | ||
51 | wrap-operation) | ||
52 | (only (fibers conditions) | ||
53 | make-condition signal-condition! wait-operation) | ||
54 | (only (fibers channels) | ||
55 | make-channel get-operation put-operation) | ||
56 | (only (ice-9 atomic) | ||
57 | make-atomic-box atomic-box-ref atomic-box-set!) | ||
58 | (only (rnrs records syntactic) | ||
59 | define-record-type) | ||
60 | (only (rnrs conditions) | ||
61 | define-condition-type | ||
62 | &violation | ||
63 | make-message-condition | ||
64 | make-who-condition)) | ||
65 | (begin | ||
66 | |||
67 | (define-record-type (<message-transport> make-message-transport message-transport?) | ||
68 | (fields (immutable close? transport-close-condition) ; condition | ||
69 | (immutable closed? transport-closed-condition) ; condition | ||
70 | (immutable messages transport-messages) ; fibers channel | ||
71 | (immutable input-errors transport-input-errors) ; fibers channel | ||
72 | (immutable output-errors transport-output-errors) ; fibers channel | ||
73 | ;; TODO I don't think atomic boxes are strictly required here. | ||
74 | ;; atomic box of (#f or the errors) | ||
75 | (immutable input-errors/close transport-input-errors/close) | ||
76 | ;; atomic box of (#f or the errors) | ||
77 | (immutable output-errors/close transport-output-errors/close)) | ||
78 | (protocol | ||
79 | (lambda (%make) | ||
80 | (lambda () | ||
81 | "Return a fresh message transport. | ||
82 | |||
83 | Messages will be sent from the output half-pipe to the input half-pipe. | ||
84 | By default, closing the half-pipes will do nothing, and the half-pipes | ||
85 | will remain marked as open. Use @code{wait-for-transport-close-operation} | ||
86 | and @code{close-transport!} to react to close requests. | ||
87 | |||
88 | Errors can be sent with @code{notice-input-error-operation} and | ||
89 | @code{notice-output-error-operation}. Note that input and output | ||
90 | errors are separated. | ||
91 | |||
92 | No restrictions are placed upon the types of messages sent." | ||
93 | (%make (make-condition) | ||
94 | (make-condition) | ||
95 | (make-channel) | ||
96 | (make-channel) | ||
97 | (make-channel) | ||
98 | (make-atomic-box #f) | ||
99 | (make-atomic-box #f))))) | ||
100 | (opaque #t) | ||
101 | (sealed #f)) | ||
102 | |||
103 | (define-record-type (<message-input> message-transport->input message-input?) | ||
104 | (fields (immutable transport message-input-transport)) | ||
105 | (protocol | ||
106 | (lambda (%make) | ||
107 | (lambda (transport) | ||
108 | "Return an input queue corresponding to the transport | ||
109 | @var{transport}. Currently, this is a fresh object, but that might | ||
110 | change in the future." | ||
111 | (assert (message-transport? transport)) | ||
112 | (%make transport)))) | ||
113 | (opaque #t) | ||
114 | (sealed #f)) | ||
115 | |||
116 | (define-record-type (<message-output> message-transport->output message-output?) | ||
117 | (fields (immutable transport message-output-transport)) | ||
118 | (protocol | ||
119 | (lambda (%make) | ||
120 | (lambda (transport) | ||
121 | "Return an output queue corresponding to the transport | ||
122 | @var{transport}. Currently, this is a fresh object, but that might | ||
123 | change in the future." | ||
124 | (assert (message-transport? transport)) | ||
125 | (%make transport)))) | ||
126 | (opaque #t) | ||
127 | (sealed #f)) | ||
128 | |||
129 | (define (close-input! in) | ||
130 | "Close the input queue @var{in} (asynchronuous). | ||
131 | @code{wait-for-input-closed-operation} can be used to wait | ||
132 | until the queue has been closed. This has the same effect | ||
133 | as @code{close-output!} on the output queue." | ||
134 | (assert (message-input? in)) | ||
135 | (signal-condition! | ||
136 | (transport-close-condition (message-input-transport in)))) | ||
137 | |||
138 | (define (close-output! out) | ||
139 | "Close the output queue @var{in} (asynchronuous). | ||
140 | @code{wait-for-output-closed-operation} can be used to wait | ||
141 | until the queue has been closed. This has the same effect | ||
142 | as @code{close-input!} on the input queue." | ||
143 | (assert (message-output? out)) | ||
144 | (signal-condition! | ||
145 | (transport-close-condition (message-output-transport out)))) | ||
146 | |||
147 | (define (read-message-operation in) | ||
148 | "Return an operation for reading a message from the input queue @var{in}. | ||
149 | |||
150 | The operation will block until a message has been read, so this should probably | ||
151 | be combined with @code{wait-for-input-closed-operation} and | ||
152 | @code{read-input-error-operation}." | ||
153 | (assert (message-input? in)) | ||
154 | (get-operation (transport-messages (message-input-transport in)))) | ||
155 | |||
156 | (define (send-message-operation out msg) | ||
157 | "Make an operation for sending a message @var{msg} into the output queue | ||
158 | @var{out}. | ||
159 | |||
160 | The operation will block until the message has been sent (though it may take | ||
161 | some time before it ends up on the other side of the network, and some kind | ||
162 | of output error could happen in-between), so this should probably be combined | ||
163 | with @code{wait-for-output-closed-operation} and @code{read-output-error-operation}" | ||
164 | (assert (message-output? out)) | ||
165 | (put-operation (transport-messages (message-output-transport out)) msg)) | ||
166 | |||
167 | (define (read-input-error-operation in) | ||
168 | "Return an operation for reading the next input error from the | ||
169 | input queue @var{in}." | ||
170 | (assert (message-input? in)) | ||
171 | (get-operation (transport-input-errors (message-input-transport in)))) | ||
172 | |||
173 | (define (read-output-error-operation out) | ||
174 | "Return an operation for reading the next output error from the | ||
175 | output queue @var{out}." | ||
176 | (assert (message-output? out)) | ||
177 | (get-operation (transport-output-errors (message-output-transport out)))) | ||
178 | |||
179 | (define (wait-for-transport-close-operation transport) | ||
180 | "Return an operation for waiting upon a close request | ||
181 | from the input or output queue." | ||
182 | (assert (message-transport? transport)) | ||
183 | (wait-operation (transport-close-condition transport))) | ||
184 | |||
185 | (define (close-transport! transport input-errors output-errors) | ||
186 | "Close the transport @var{transport}, with some closing input errors | ||
187 | and closing output errors @var{input-errors} and @var{output-errors}. | ||
188 | This marks the input and output queues as closed. | ||
189 | XXX double closes probably should be detected." | ||
190 | (assert (message-transport? transport)) | ||
191 | (atomic-box-set! (transport-input-errors/close transport) | ||
192 | input-errors) | ||
193 | (atomic-box-set! (transport-output-errors/close transport) | ||
194 | output-errors) | ||
195 | (signal-condition! | ||
196 | (transport-closed-condition transport))) | ||
197 | |||
198 | (define (notice-input-error-operation transport error) | ||
199 | "Return an operation for indicating the transport @var{transport} | ||
200 | noticed an input error @var{error}. This will block if no fiber is waiting | ||
201 | for an input error, so this procedure should probably not be used after | ||
202 | the transport has been closed." | ||
203 | (assert (message-transport? transport)) | ||
204 | (put-operation (transport-input-errors transport) error)) | ||
205 | |||
206 | (define (notice-output-error-operation transport error) | ||
207 | "Return an operation for indicating the transport @var{transport} | ||
208 | noticed an output error @var{error}. This will block if no fiber is waiting | ||
209 | for an output error, so this procedure should probably not be used after | ||
210 | the transport has been closed." | ||
211 | (assert (message-transport? transport)) | ||
212 | (put-operation (transport-output-errors transport) error)) | ||
213 | |||
214 | (define (wait-for-output-closed-operation out) | ||
215 | "Return an operation for waiting until the output queue @var{out} | ||
216 | has been closed. This has the same effect as waiting until the corresponding | ||
217 | input queue has been closed, except the return values are presumably different. | ||
218 | Any output errors happening during the closing are returned in a data structure | ||
219 | according to the transport." | ||
220 | (assert (message-output? out)) | ||
221 | (let ((transport (message-output-transport out))) | ||
222 | (wrap-operation | ||
223 | (wait-operation (transport-closed-condition transport)) | ||
224 | (lambda () | ||
225 | (atomic-box-ref (transport-output-errors/close transport)))))) | ||
226 | |||
227 | (define (wait-for-input-closed-operation in) | ||
228 | "Return an operation for waiting until the input queue @var{in} | ||
229 | has been closed. This has the same effect as waiting until the corresponding | ||
230 | output queue has been closed, except the return values are presumably different. | ||
231 | Any input errors happening during the closing are returned in a data structure | ||
232 | according to the transport." | ||
233 | (assert (message-input? in)) | ||
234 | (let ((transport (message-input-transport in))) | ||
235 | (wrap-operation | ||
236 | (wait-operation (transport-closed-condition transport)) | ||
237 | (lambda () | ||
238 | (atomic-box-ref (transport-input-errors/close transport)))))))) | ||