aboutsummaryrefslogtreecommitdiff
path: root/gnu/gnunet/mq/envelope.scm
blob: 4826722e824e6075a5ddb5b3003e02c3a17ab2fb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
;; This file is part of GNUnet.
;; Copyright (C) 2012-2019 GNUnet e.V.
;; Copyright (C) 2021 Maxime Devos (<maximedevos@telenet.be>)
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

;; Author: Florian Dold
;; Author: Maxime Devos
;; C file: util/mq.c
;; Scheme module: (gnu gnunet mq envelope)
;;
;; Limitation: the format of messages is still in flux,
;; so no type checks there.
(define-library (gnu gnunet mq envelope)
  (export <envelope> make-envelope envelope?
	  attempt-cancel! attempt-irrevocable-sent!
	  envelope-peek-cancelled?
	  ;; TODO find a better place
	  (rename (bind-atomic-boxen %%bind-atomic-boxen)))
  (import (gnu gnunet utils hat-let)
	  (only (guile) define* lambda* exact-integer?)
	  (only (ice-9 match) match)
	  (only (ice-9 atomic)
		make-atomic-box atomic-box-ref
		atomic-box-compare-and-swap!)
	  (only (rnrs base)
		lambda assert letrec let begin define
		syntax-rules let-syntax define-syntax
		procedure? eq? >= = <= < if quote ...
		identifier-syntax values and let*
		vector vector-ref vector? vector-length)
	  (only (rnrs records syntactic) define-record-type))
  (begin
    (define-record-type (<envelope> make-envelope envelope?)
      ;; Atomic box:
      ;;    #t: cancelled
      ;;    #f: too late to cancel, message has been irrevocably sent!
      ;;
      ;;        (Unless you play tricks like pulling out the Ethernet
      ;;        cable before the message is received by the router)
      ;;    #(message prio notify-sent! cancel!)
      (fields (immutable state %cancellation-state))
      (protocol
       (lambda (%make)
	 (lambda* (cancel! message #:key (priority 0) (notify-sent! values))
	   "Make a message envelope; i.e., a record containing the message
(@var{message}, @var{priority}) and information on how to cancel the sending
of the message (@var{cancel!}) and who should be notified when the message
cannot be unsent anymore (@var{notify-sent!}).

Once marked as cancelled or irrevocably sent, the record drops its
references to @var{message}, @var{cancel!} and @var{notify-sent!}.
When being marked as cancelled, the thunk @var{cancel!} is called."
	   (assert (and (procedure? cancel!) (procedure? notify-sent!)
			(exact-integer? priority)
			(<= 0 priority 511)))
	   (%make (make-atomic-box
		   (vector message priority notify-sent! cancel!)))))))

    (define (envelope-peek-cancelled? envelope)
      "Test whether @var{envelope} is currently cancelled (true / false)."
      (eq? #t (atomic-box-ref (%cancellation-state envelope))))

    (define (%attempt-irrevocable-sent! envelope already-sent go cancelled)
      (bind-atomic-boxen
       ((state (%cancellation-state envelope) swap!))
       (let spin ((old state))
	 (match old
	   ;; See comment at %attempt-cancel! for
	   ;; why we don't do #(message prio notify-sent! cancel!)
	   ((? vector?)
	    (if (eq? old (swap! old #f))
		(let^ ((!! (= (vector-length old) 4))
		       (! message (vector-ref old 0))
		       (! prio (vector-ref old 1))
		       (! notify-sent! (vector-ref old 2)))
		      (notify-sent!)
		      (go message prio))
		(spin state)))
	   (#t (cancelled))
	   (#f (already-sent))))))

    (define-syntax attempt-irrevocable-sent!
      (syntax-rules (go cancelled already-sent)
	"If @var{envelope} is not cancelled and has not yet been sent,
mark the message as irrevocably sent, call the notify-sent callback and
evaluate @var{exp/go} in an environment where the message @var{message}
and its priority @var{priority} are bound.

If the message has already been marked as irrevocably sent,
evaluate @var{exp/already-sent} instead. If the message is cancelled,
evaluate @var{exp/cancelled} instead.

Even if this operation (and perhaps @code{attempt-cancel!}) is used concurrently
on the same @var{envelope}, whether by multi-threading, asynchronicities
(via @code{system-async-mark}) or by recursion, the following properties hold:

@begin itemize
@item the notify-sent callback of @var{envelope} is called at most once
@item the notify-sent callback is never called if @var{envelope} is cancelled
  at any point in time
@item likewise, the code in @var{exp/go} is at most evaluated once
@end itemize"
	((_ envelope
	    ((go message priority) . exp/go)
	    ((cancelled) . exp/cancelled)
	    ((already-sent) . exp/already-sent))
	 (%attempt-irrevocable-sent! envelope
				     (lambda () . exp/already-sent)
				     (lambda (message priority) . exp/go)
				     (lambda () . exp/cancelled)))))

    (define (%attempt-cancel! envelope now-cancelled already-cancelled
			      already-sent)
      (bind-atomic-boxen
       ((state (%cancellation-state envelope) swap!))
       (let spin ((old state))
	 (match old
	   ;; Do _not_ use #(message prio notify-sent! cancel!)
	   ;; here! Instead, delay the bounds check and accessing
	   ;; the elements of the vector after the swap!. That way:
	   ;;
	   ;; Premature optimisation.
	   ;;   We save a little time in case two threads try to concurrently
	   ;;   @var{state}.
	   ;;
	   ;; Meager excuse: self-healing (in case of memory corruption).
	   ;;   Suppose a cosmic ray flipped a few bits and now
	   ;;   @var{state} contains another vector, of different length.
	   ;;   Then by performing the swap before the bounds check,
	   ;;   the envelope is brought into a valid state. (And an
	   ;;   exception will still result.)
	   ((? vector?)
	    (if (eq? old (swap! old #t))
		(let^ ((!! (= (vector-length old) 4))
		       (! cancel! (vector-ref old 3)))
		      (cancel!)
		      (now-cancelled))
		(spin state)))
	   (#t (already-cancelled))
	   ;; XXX maybe make the meager excuse less meager
	   ;; and add a 'default' case where @var{state} is
	   ;; set to #f when bad (and an exception is raised)?
	   ;; Seems like some dedicated exception types for
	   ;; memory corruption are required then ...
	   ;; And tests.
	   (#f (already-sent))))))

    (define-syntax attempt-cancel!
      (syntax-rules (now-cancelled already-cancelled already-sent)
	"If @var{envelope} is not yet marked as cancelled or sent,
mark it as cancelled, call the corresponding cancellation callback
and evaluate @var{exp/now-cancelled}.

If @var{envelope} is already marked as cancelled, do not mutate
anything or call any callback and evaluate @var{exp/already-cancelled}.
Likewise, if @var{envelope} is marked as irrevocably sent, evaluate
@var{exp/already-sent} instead.

If this operation is interrupted before @var{exp/now-cancelled} is
evaluated, the envelope may be marked as cancelled even if the
cancellation callback has not yet been called or has not yet returned.

However, by tolerating this limitation, it can be (and is) guaranteed
that the cancellation callback is called at most once. Likewise, the
code in @var{exp/now-cancelled} is only be called at most once.
Also, the cancellation callback and is never called (and @var{exp/now-cancelled}
never evaluated) if @var{envelope} is marked as sent at any point in time."
	((_ envelope
	    ((now-cancelled) . exp/now-cancelled)
	    ((already-cancelled) . exp/already-cancelled)
	    ((already-sent) . exp/already-sent))
	 (%attempt-cancel! envelope
			   (lambda () . exp/now-cancelled)
			   (lambda () . exp/already-cancelled)
			   (lambda () . exp/already-sent)))))

    (define-syntax bind-atomic-boxen
      (syntax-rules ()
	((_ () exp exp* ...)
	 (let () exp exp* ...))
	((_ ((variable box swap!) . etc) exp exp* ...)
	 (let ((stashed-box box))
	   (let-syntax ((variable (identifier-syntax
				   (atomic-box-ref box))))
	     (let ((swap! (lambda (expected desired)
			    (atomic-box-compare-and-swap! box expected
							  desired))))
	       (bind-atomic-boxen etc exp exp* ...)))))))))