aboutsummaryrefslogtreecommitdiff
path: root/gnu/gnunet/mq.scm
blob: a980c56d15f4661a3e5ad1b957f7ed5bd15da697 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
;; This file is part of GNUnet.
;; Copyright (C) 2012-2019, 2021 GNUnet e.V.
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

;; Author: Florian Dold
;; Author: Maxime Devos
;; C file: util/mq.c
;; Scheme module: (gnu gnunet mq)
;;
;; A message queue for GNUnet messages.
;; Messages are made of bytes. In particular,
;; messages must be prefixed by a /:message-header.
(define-library (gnu gnunet mq)
  (export <message-queue> make-message-queue message-queue?
	  make-one-by-one-sender
	  inject-message! inject-error! send-message!
	  message-queue-length
	  %message-queue-garbagitude
	  try-send-again!
	  close-queue!

	  &missing-header-error make-missing-header-error
	  missing-header-error? missing-header-error-received-size
	  &size-mismatch-error make-size-mismatch-error
	  size-mismatch-error? size-mismatch-error-expected-size
	  size-mismatch-error-received-size

	  &overly-full-queue-warning
	  make-overly-full-queue-warning overly-full-queue-warning?
	  overly-full-queue-current-length overly-full-queue-suspicious-when

	  ;; Can be adjusted for debugging -- no guarantees it
	  ;; will not be removed!
	  %suspicious-length)
  (import (gnu gnunet mq handler)
	  (gnu gnunet mq envelope)
	  (gnu gnunet utils hat-let)
	  (only (gnu gnunet utils bv-slice)
		slice-slice slice-length slice?)
	  (only (gnu gnunet util struct)
		/:message-header)
	  (only (gnu gnunet netstruct syntactic)
		sizeof read%)
	  (only (guile) lambda* define* exact-integer?)
	  (only (ice-9 weak-vector)
		weak-vector weak-vector-ref)
	  (only (ice-9 atomic)
		make-atomic-box atomic-box-ref
		atomic-box-compare-and-swap!)
	  (only (rnrs base)
		lambda assert let begin define
		procedure? eq? >= = <= < if quote
		values and let* not cons car cdr
		cond + - > * apply)
	  (only (rnrs control)
		when unless)
	  (only (rnrs conditions)
		define-condition-type &warning &error
		make-who-condition condition)
	  (only (rnrs exceptions)
		raise raise-continuable)
	  (only (rnrs records syntactic) define-record-type)
	  (only (srfi srfi-1) filter)
	  (only (srfi srfi-8) receive)
	  (only (srfi srfi-39) make-parameter)
	  (prefix (only (pfds queues)
			make-queue dequeue enqueue queue-length
			queue-empty? queue->list list->queue)
		  #{pfds:}#))
  (begin
    (define-record-type (<message-queue> make-message-queue message-queue?)
      (fields (immutable handlers message-queue-handlers)
	      (immutable error-handler message-queue-error-handler)
	      ;; Atomic box of a queue of messages to send (as @code{<envelope>}
	      ;; objects), together with an over-estimate of how many items in
	      ;; the queue are already cancelled, used as a heuristic for when
	      ;; optimising the message queue is required.
	      ;;
	      ;; It can occassionally be an under-estimate due to marking
	      ;; envelopes as cancelled and updating the estimate not being
	      ;; an atomic operation.
	      (immutable messages+garbage/box message-queue-messages+garbage/box)
	      ;; A procedure for actually sending the messages.
	      ;; It accepts a single argument, the message queue.
	      ;;
	      ;; It is run each time a message a message has been
	      ;; enqueued. It is not obligated to send the messages
	      ;; right now, though it probably should send them
	      ;; soonish. It can be run at any time, with
	      ;; @code{try-send-again!}.
	      (immutable sender message-queue-sender)
	      ;; A thunk to ‘close’ the message queue, usually telling the
	      ;; remote peer that no additional data will be transmitted
	      ;; in either direction and stopping associated threads.
	      (immutable closer message-queue-closer))
      (protocol
       (lambda (%make)
	 (lambda* (handlers error-handler sender #:optional (closer values))
	   "Make a message queue with message handlers @var{handlers}.

The message handlers are expected to handle bytevector slices
that start with a @code{/:message-header}. The index of the message
handler is the ‘message type’.  Note that, unlike in the C implementation,
messages are not serialised.  As such, some synchronisation or punting
messages onto a separate thread may be necessary.

Injected errors are passed to @var{error-handler}, a variadic procedure.
A list of possible errors can be found in the manual.

Messages are sent with @var{sender}. It can be created with
@code{make-one-by-one-sender}.  Optionally, a @var{closer} procedure can
be passed.  Such a procedure is expected to be idempotent, see
@code{close-queue!} for details."
	   ;; Predicate does not exist yet ...
	   #;(assert (message-handlers? handlers))
	   #;(assert (message-handler? error-handler))
	   (assert (procedure? closer))
	   (%make handlers error-handler
		  (make-atomic-box (cons (pfds:make-queue) 0))
		  sender
		  closer)))))

    (define (make-one-by-one-sender proc)
      "Make a message sender, sending messages one-by-one with @var{proc}.

The procedure @var{proc} must accept a single argument,
the message envelope to send. This procedure should
use @code{attempt-irrevocable-sent!} when it feels ready.
It must not return any values currently.

The message does not need to be send directly.
However, remember that unless the priority allows otherwise,
messages must be sent in-order (TODO really received in-order?)."
      (assert (procedure? proc))
      (lambda (mq)
	(assert (message-queue? mq))
	(%%bind-atomic-boxen
	 ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
	 ;; First extract an envelope ...
	 (let spin ((old queue+garbage))
	   (define old-queue (car old))
	   (define old-garbage (cdr old))
	   (assert (<= 0 old-garbage))
	   ;; ... unless there isn't anything to remove anymore.
	   ;; This check cannot be moved outside the (let spin ...),
	   ;; as message senders may be called at any time
	   ;; (even if there are no messages!). Also, in case of
	   ;; concurrency, the queue may become empty after a spin
	   ;; iteration.
	   (unless (pfds:queue-empty? old-queue)
	     (receive (envelope new-queue) (pfds:dequeue old-queue)
	       (cond ((envelope-peek-cancelled? envelope)
		      ;; There is no need to pass already cancelled
		      ;; envelopes to @var{proc} (although passing them
		      ;; anyway should be harmless), so remove them
		      ;; from the queue. Also try to keep the estimate
		      ;; accurate.
		      (swap! old (cons new-queue (- old-garbage 1))))
		     ;; We extracted a (not-yet-cancelled) envelope.
		     ;; Now do something with it!
		     ((eq? old (swap! old (cons new-queue old-garbage)))
		      ;; Make sure @var{proc} does not return
		      ;; any values, as we may want to assign
		      ;; meaning to return values later.
		      (receive ()
			  ;; Process the message.
			  (proc envelope)
			(values))))
	       ;; Process remaining messages (or retry in case there was
	       ;; a race and we lost it).
	       ;; TODO: if someone else modified the message queue,
	       ;; does that mean we don't have to anymore?
	       (spin queue+garbage)))))))

    (define (inject-message! mq message)
      "Call the message handler that was registered
for the type of the message @var{mq} in the message queue var{mq}
with the message @var{message}. In case the message is malformed
(according to the message handler), inject a @code{logic:ill-formed}
error instead.  In case no appropriate message handler exists,
inject a @code{logic:no-handler} error instead.

It is an error for @var{message} to be so small it doesn't have
a @code{/:message-header}. Likewise, it is also an error for the
size in the message header not to correspond to the size of the
slice @var{message}.  In the first case, a @code{&missing-header-error}
is raised.  In the second case, a @code{&size-mismatch-error} is raised.

In both cases, a @code{&who} condition with as value @code{inject-message!}
(a symbol) is included as well.

This procedure is intended to be used by the implementation
of message queues."
      (let^ ((! message-header-size (sizeof /:message-header '()))
	     (! message-size (slice-length message))
	     (? (< message-size message-header-size)
		(raise (condition
			(make-missing-header-error message-size)
			(make-who-condition 'inject-message!))))
	     (! header
		(slice-slice message 0 (sizeof /:message-header '())))
	     (! type (read% /:message-header '(type) header))
	     (! supposed-size (read% /:message-header '(size) header))
	     (? (not (= message-size supposed-size))
		(raise (condition
			(make-size-mismatch-error supposed-size message-size)
			(make-who-condition 'inject-message!))))
	     ;; #f if does not exist
	     (! handler (message-handler-for
			 (message-queue-handlers mq)
			 type))
	     (? (not handler)
		(inject-error! mq 'logic:no-handler type))
	     (? (not (verify-message? handler message))
		(inject-error! mq 'logic:ill-formed type)))
	    (handle-message! handler message)))

    (define (inject-error! mq key . rest)
      "Inject the error @code{key . rest} in the message queue @var{mq}.

This is meant to be used by the message queue implementation,
e.g. in response to an I/O error, although in principle it can be used by
the user of the message queue as well.  Whether the message queue is still
usable when this procedure is called, depends on the message queue
implementation and injected error."
      (apply (message-queue-error-handler mq) key rest))

    (define (message-queue-length mq)
      "How many messages are currently in the message queue @var{mq}?"
      (pfds:queue-length
       (car (atomic-box-ref (message-queue-messages+garbage/box mq)))))

    (define (%message-queue-garbagitude mq)
      "Return the estimated amount of cancelled envelopes. This procedure
is not part of the API and is only intended for the test suite."
      (cdr (atomic-box-ref (message-queue-messages+garbage/box mq))))

    (define-condition-type &missing-header-error &error
      make-missing-header-error missing-header-error?
      (received-size missing-header-error-received-size))

    (define-condition-type &size-mismatch-error &error
      make-size-mismatch-error size-mismatch-error?
      (expected-size size-mismatch-error-expected-size)
      (received-size size-mismatch-error-received-size))

    (define-condition-type &overly-full-queue-warning &warning
      make-overly-full-queue-warning overly-full-queue-warning?
      (current-length  overly-full-queue-current-length)
      (suspicious-when overly-full-queue-suspicious-when))

    (define %suspicious-length
      (make-parameter 10000))

    (define* (send-message! mq message #:key (priority 0)
			    (notify-sent! values))
      "Send a message with the given message queue. A continuable
@code{&warning} may be raised, e.g. a @code{&overly-full-queue-warning}
in case the queue is suspiciously long. The message queue implementation
can raise errors of its own as well, as usual.

@var{priority} is a numeric priority-preference value for @var{message},
from @code{(gnu gnunet mq prio-prefs)}. By default, @var{message} will be
sent as reliable background traffic (@code{prio:background}).

The in-order sending of ordered messages (when requested by @var{priority})
is only guaranteed when supported by the message queue implementation
and when @code{try-send-again!} and @code{send-message!} are not being
used concurrently on the same message queue.

When the message has been irrevocabily sent, the thunk @var{notify-sent!}
will be called.

After normal execution, the message envelope is returned,
but in case of an exception (for example, an out-of-memory exception during
the handling of a @code{&overly-full-queue-warning}), it is possible
the envelope isn't returned even though it has been enqueued and it might
perhaps be sent."
      (define mq/weak
	(let ((v (weak-vector mq)))
	  (lambda () (weak-vector-ref v 0))))
      (define (cancel!)
	(let ((mq (mq/weak)))
	  (if mq
	      (increment-garbage&maybe-cleanup mq)
	      (values))))
      (assert (and (slice? message)
		   (exact-integer? priority)
		   (<= 0 priority) (< priority 512)))
      (%%bind-atomic-boxen
       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
       ;; Add the message to the queue. Also remember the
       ;; length of the new queue; we'll need it later.
       (let* ((envelope (make-envelope cancel!
				       message
				       #:priority priority
				       #:notify-sent! notify-sent!))
	      (queue-length
	       (let spin ((old queue+garbage))
		 (let* ((old-queue (car old))
			(old-garbage (cdr old))
			(new-queue (pfds:enqueue old-queue envelope)))
		   (if (eq? old (swap! old (cons new-queue old-garbage)))
		       (pfds:queue-length new-queue)
		       (spin queue+garbage))))))
	 ;; The C implementation emits a warning if the queue has
	 ;; many entries, as this may indicate a bug (in the scheduler,
	 ;; in the queue implementation, ...). This seems a good idea.
	 (let ((suspicious-length (%suspicious-length)))
	   (when (>= queue-length suspicious-length)
	     (raise-continuable
	      (condition (make-overly-full-queue-warning
			  queue-length suspicious-length)
			 ;; TODO: consider
			 ;; (@ (gnu gnunet mq) send!) here and elsewhere.
			 (make-who-condition 'send-message!)))))
	 (try-send-again! mq)
	 envelope)))

    (define (try-send-again! mq)
      "Try to send messages in the queue @var{mq} that were not yet sent.
This is expected to be called from the message queue implementation."
      ((message-queue-sender mq) mq))

    (define (close-queue! mq)
      "Close the message queue @var{mq}.  The exact semantics are
implementation-dependent.  Conventionally, this frees resources such as
sockets and threads and is idempotent and non-blocking.  There is no
guarantee that all messages in the queue will be sent before closing."
      ((message-queue-closer mq)))

    (define (queue-filter ? queue)
      "Construct a queue, based on @var{queue}, restricted to elements
satisfying the predicate @var{?}."
      (pfds:list->queue (filter ? (pfds:queue->list queue))))

    (define (increment-garbage&maybe-cleanup mq)
      "Increment the garbage counter of @var{mq} and perhaps
take out the trash (i.e., cancelled envelopes still in the queue),
and if the trash is taken out, reset the garbage counter to zero,
as an atomic operation."
      (%%bind-atomic-boxen
       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
       (let loop ((old queue+garbage))
	 (let* ((old-queue (car old))
		(old-queue-length (pfds:queue-length old-queue))
		(old-garbage (cdr old))
		(incremented-garbage (+ 1 old-garbage)))
	   (assert (<= 0 old-garbage))
	   ;; If the messages in the queue are largely
	   ;; garbage, throw the garbage out.  The procedure
	   ;; choses to throw the garbage out if the (estimated)
	   ;; ratio of garbage to the queue length is more than
	   ;; 3/4.
	   ;;
	   ;; There are no deep theoretical reasons for choosing
	   ;; the ratio 3/4=0.75, only that it is between 1/2 and
	   ;; 1. Choosing a ratio seemed less arbitrary than, say,
	   ;; only collect garbage if the garbage exceeds some
	   ;; fixed amount.
	   (if (> (* 4 incremented-garbage) (* 3 old-queue-length))
	       ;; It is time to collect garbage!
	       ;; Construct a new queue with all garbage removed.
	       (let ((filtered (queue-filter
				(lambda (i)
				  (not (envelope-peek-cancelled? i)))
				old-queue)))
		 ;; Try to write this new queue,
		 ;; resetting the garbage counter.
		 (if (eq? old (swap! old (cons filtered 0)))
		     ;; All garbage has been thrown out! Done!
		     (values)
		     ;; We lost the race, try again!
		     (loop queue+garbage)))
	       ;; Not yet time for garbage collection,
	       ;; just increment the garbage counter
	       (if (eq? old (swap! old (cons old-queue incremented-garbage)))
		   ;; The garbage counter has been incremented! Done!
		   (values)
		   ;; We lost the race, try again!
		   (loop queue+garbage)))))))))