aboutsummaryrefslogtreecommitdiff
path: root/gnu/gnunet/mq-impl/stream.scm
blob: ae79ae1ecba3e0301443d79c9ab54b93e6d3c21a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
;; This file is part of scheme-GNUnet.
;; Copyright (C) 2021 GNUnet e.V.
;;
;; scheme-GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; scheme-GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

;; C source: src/util/client.c (not completely ported).
;; The Scheme implementation is rather different from the C implementation
;; though.
;;
;; This module allows communication between GNUnet services using stream
;; sockets.

(define-library (gnu gnunet mq-impl stream)
  (export write-envelope! handle-input! handle-output!
	  port->message-queue connect/fibers)
  (import (only (gnu gnunet mq)
		make-one-by-one-sender inject-message! inject-error!
		make-message-queue)
	  (only (gnu gnunet concurrency repeated-condition)
		make-repeated-condition await-trigger! prepare-await-trigger!
		trigger-condition!)
	  (only (gnu gnunet utils bv-slice)
		slice-bv slice-offset slice-length
		slice-readable? bv-slice/read-write
		slice/read-only)
	  (only (gnu gnunet mq envelope)
		attempt-irrevocable-sent!)
	  (only (gnu gnunet utils tokeniser)
		make-tokeniser add-from-port!)
	  (only (gnu gnunet config db)
		read-value)
	  (only (gnu gnunet utils hat-let)
		let^)
	  (only (gnu gnunet util time)
		standard-back-off time-unit:second)
	  (only (rnrs base)
		begin define let values quote apply
		assert = or and eq?
		list if lambda car /)
	  (only (rnrs arithmetic bitwise)
		bitwise-ior)
	  (only (rnrs exceptions)
		guard)
	  (only (rnrs control)
		when)
	  (only (fibers)
		spawn-fiber)
	  (only (fibers timers)
		sleep-operation)
	  (only (fibers io-wakeup)
		wait-until-port-readable-operation
		wait-until-port-writable-operation)
	  (only (fibers conditions)
		make-condition signal-condition! wait-operation)
	  (only (fibers operations)
		wrap-operation perform-operation choice-operation)
	  (srfi srfi-26)
	  (only (srfi srfi-39)
		parameterize)
	  (only (ice-9 suspendable-ports)
		current-read-waiter current-write-waiter)
	  (only (guile)
		error define* identity define-values
		EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
		EPROTOTYPE EPIPE ECONNRESET
		PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
		O_NONBLOCK AF_UNIX
		socket connect fcntl force-output close-port
		make-socket-address
		exception-args exception-kind)
	  (only (rnrs io ports)
		put-bytevector)
	  (only (ice-9 atomic)
		make-atomic-box atomic-box-set! atomic-box-ref)
	  (only (ice-9 control)
		let/ec)
	  (only (srfi srfi-1)
		memv list-ref)
	  (srfi srfi-26))
  (begin
    (define (write-envelope! output envelope)
      "Write the envelope @var{envelope} to the output port @var{output},
unless it is cancelled.  @var{envelope} may not be already sent.  This
can block and raise I/O errors, depending on the port @var{output} and
(in Guile) the current write waiter.  As such, the caller might need to
parameterise the current write waiter and install exception handlers."
      (attempt-irrevocable-sent!
       envelope
       ((go message priority)
	(assert (slice-readable? message))
	;; TODO: how does the port API react to OUTPUT being closed
	;; by the remote peer?
	(put-bytevector output (slice-bv message)
			(slice-offset message) (slice-length message))
	(values))
       ((cancelled) (values))
       ((already-sent) (error "tried to send an envelope twice"))))

    ;; TODO: maybe note that this procedure blocks?
    (define (handle-input! mq input)
      "Keep reading message from the input port @var{input}.

Feed each read message in-order to @var{mq} with @code{inject-message!}.
This procedure might inject errors by its own as usual (e.g. when
no appropriate message handler exists).  This does not include
@code{input:overly-small}, @code{input:premature-end-of-file} or
@code{input:regular-end-of-file}.

If a message with an overly small message size it its header
is encountered, return the error @code{input:overly-small type size},
where @var{type} is the message type as an integer (or @code{#f} if it
could not be determined) and @var{size} is the message size in the header.

When the end-of-file has been reached, return (not inject) the error
@code{input:regular-end-of-file} into @var{mq}.  If the end-of-file
happened while inside a (partial) message, return
@code{input:premature-end-of-file} instead.

@code{ECONNRESET} is treated as @code{input:regular-end-of-file}.
This might or might not be correct.  In case of an I/O error, TODO.

TODO closing message queues."
      (let^ ((! tok (make-tokeniser))
	     (! (handle/message bv offset length)
		(inject-message!
		 mq
		 (slice/read-only
		  (bv-slice/read-write bv offset length))))
	     (! (return/overly-small type size)
		(values 'input:overly-small type size))
	     (! (return/premature-eof)
		(values 'input:premature-end-of-file))
	     (! (return/done-eof)
		(values 'input:regular-end-of-file)))
	    ;; Prevent ‘In procedure fport_read: Connection reset by peer’
	    ;; in tests/network-size.scm.  XXX can this also happen in
	    ;; 'handle-output!'?
	    (guard (c ((and (eq? 'system-error (exception-kind c))
			    (= ECONNRESET (car (list-ref (exception-args c) 3))))
		       (return/done-eof)))
	      (add-from-port! tok input handle/message return/overly-small
			      return/done-eof return/premature-eof))))

    (define (handle-output! mq output wait!)
      "Keep sending message envelopes over the output port @var{output}.

The messages to send are taken in-order from the message queue @var{mq}.
In case of an I/O error, ???.  When the message queue is (temporarily)
empty, the thunk @var{wait!} is called.  It should return when messages
have been added to the queue.

When using guile-fibers, @var{wait!} can be implemented with
@code{await-trigger!} and by calling @code{trigger-condition!}
from the ‘message sender’ of @var{mq}.

When the port @var{output} has been closed for writing, this procedure
returns.  This is detected with the @code{EPIPE} error, so don't block
@code{SIGPIPE} signals.

TODO: detect it has been closed even when not actually writing,
with EPOLLERR -- needs fibers support."
      (define (one-by-one-proc ev)
	(write-envelope! output ev))
      (define send-round
	(cute (make-one-by-one-sender one-by-one-proc)
	      mq))
      (guard (c ((and (eq? 'system-error (exception-kind c))
		      (= EPIPE (car (list-ref (exception-args c) 3))))
		 (values)))
	(let loop ()
	  ;; Doing 'wait!' or 'send-round' the other way around
	  ;; should be acceptable as well.
	  (send-round)
	  ;; If 'output' is buffered, make sure bytes don't just sit
	  ;; in the buffer forever.  Don't flush after each individual
	  ;; envelope for performance.  TODO: should connect-unix enable
	  ;; buffering?
	  (force-output output)
	  (wait!)
	  (loop))))

    ;; See, e.g., the Linux man page path_resolution(7).
    (define %path-resolution-errors
      (list EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG))

    (define (connect-unix config service-name sleep)
      "Try connecting to the server using UNIX domain sockets.

On success, the socket is returned.  If the server has bound
the socket but is not yet listening, wait a little and retry.
If the socket file does not yet exist, wait until it does exist
and retry.  It is assumed the file name of the socket is set
in the configuration @var{config}.  If it is not set there,
an appropriate @code{&undefined-key-error} is raised."
      ;; TODO: use a mechanism like 'inotify' instead of sleeping
      ;; when the socket file does not exist.
      (let^ ((! unix-path
		(read-value identity config service-name "UNIXPATH"))
	     (! address (make-socket-address PF_UNIX unix-path))
	     ;; TODO: maybe catch ENOMEM, ENOBUFS, EMFILE ...
	     (! socket (socket PF_UNIX SOCK_STREAM 0))
	     (_ (fcntl socket F_SETFD
		       (bitwise-ior (fcntl socket F_GETFD) FD_CLOEXEC)))
	     (_ (fcntl socket F_SETFL
		       (bitwise-ior (fcntl socket F_GETFL) O_NONBLOCK)))
	     ;; Grrr why can't we just use 'select' on socket to wait
	     ;; for the connection to complete, like with Internet sockets?
	     (/o/ retry (timeout (standard-back-off 0)))
	     (! (retry)
		(sleep (/ timeout time-unit:second))
		(retry (standard-back-off timeout)))
	     ;; 'system-error-errno' returns #f if 'condition'
	     ;; is not a 'system-error'.
	     (! (eagain? errno)
		(= EAGAIN errno))
	     (! (connection-refused? errno)
		(= ECONNREFUSED errno))
	     (! (path-resolution-error? errno)
		(memv errno %path-resolution-errors))
	     (! (wrong-type-socket-error? errno)
		(= EPROTOTYPE errno))
	     (! (retry-errno? errno)
		;; On Linux, EAGAIN can happen if the receive queue
		;; of the listening socket is full.  I.e., the listening
		;; is being connected to more frequently than the corresponding
		;; proces can keep up with.  This situation should resolve
		;; itself automatically.
		(or (eagain? errno)
		    ;; This can happen if the listening socket is
		    ;; not actually listening yet.  Give the
		    ;; corresponding process a little more time.
		    (connection-refused? errno)
		    ;; See "connect-unix, will connect even if previous socket
		    ;; is different type" test case.
		    (wrong-type-socket-error? errno)
		    ;; Give the process implementing the process some
		    ;; time to set up directory structures, set the
		    ;; permissions appropriately ...
		    (path-resolution-error? errno)))
	     (! ok? (guard (c ((and (eq? (exception-kind c) 'system-error)
				    (retry-errno?
				     (car (list-ref (exception-args c) 3))))
			       #f))
		      (connect socket AF_UNIX unix-path))))
	    ;; Guile returns #f if SOCKET is non-blocking
	    ;; and the connection cannot be made immediately.
	    (if ok?
		socket
		(retry))))

    ;; See 'port->message-queue'.  Also used by connect/fibers.
    (define* (prepare-port-message-queue spawn)
      (define rcvar (make-repeated-condition))
      (define (interrupt! mq)
	(trigger-condition! rcvar))
      ;; 'closed-condition' is used to coordinate the termination of the
      ;; two fibers.  When one fiber detects an EOF condition (or half-duplex),
      ;; it informs the other fiber by signalling the condition and injects
      ;; an appropriate error, unless the other fiber will do it already.
      ;;
      ;; It is also used to determine which of the two fibers should close
      ;; the port.  The port is closed by the fiber for which signal-condition!
      ;; on closed-condition returns #f, as in that case, the other fiber has
      ;; already done all its I/O and won't need the port anymore.
      (define closed-condition (make-condition))
      ;; 'request-close-condition' is used by 'close!' to stop the read fiber,
      ;; such that it will signal 'closed-condition', then the 'write-fiber'
      ;; will also stop and close the port.  Closing it from the write fiber
      ;; would also be possible, but in this implementation, closing happens
      ;; in the read fiber.
      (define request-close-condition (make-condition))
      (define (start-reader! mq port)
	(define-values (key . rest)
	  (let/ec escape
	    (define wait-op
	      (choice-operation
	       (wait-until-port-readable-operation port)
	       (wrap-operation (wait-operation request-close-condition)
			       (lambda ()
				 (escape 'input:regular-end-of-file)))
	       (wrap-operation (wait-operation closed-condition)
			       (lambda ()
				 (escape 'input:regular-end-fof-file)))))
	    (define (new-waiter . _)
	      (perform-operation wait-op))
	    ;; XXX: if (define-values error ...) is written and
	    ;; 'handle-input!' raises an error (resulting in a backtrace),
	    ;; a segfault can
	    ;; happen: <https://debbugs.gnu.org/cgi/bugreport.cgi?bug=50153>.
	    (parameterize ((current-read-waiter new-waiter))
	      (handle-input! mq port))))
	(if (signal-condition! closed-condition)
	    (apply inject-error! mq key rest)
	    ;; TODO: close-port can block!
	    (close-port port)))
      (define (start-writer! mq port)
	(let/ec escape
	  ;; operation for calling the escape continuation when
	  ;; when the other fiber detected the connection is broken
	  (define escape-when-closed-operation
	    (wrap-operation (wait-operation closed-condition)
			    escape))
	  ;; operation for waiting until the port is writable
	  ;; or the other fiber detected the connection is broken.
	  (define wait-writable-operation
	    (choice-operation
	     escape-when-closed-operation
	     (wait-until-port-writable-operation port)))
	  (define (wait!)
	    (perform-operation
	     (choice-operation
	      (prepare-await-trigger! rcvar)
              ;; Don't wait for the port to be writable here!
              ;;
              ;; Otherwise, if the port is writable, but the message
              ;; queue has nothing buffered for a while, the fiber
              ;; keeps spinning.
              ;;
              ;; XXX it would be nice if it could be detected when the
              ;; write end of the port is closed.
	      escape-when-closed-operation)))
	  (define (wait!/blocking)
	    (perform-operation wait-writable-operation))
	  (define old-waiter (current-write-waiter))
	  (define (new-waiter p)
	    (if (eq? p port)
		(wait!/blocking)
		;; Maybe a backtrace is being printed,
		;; 'system-async-mark' is used ...
		(old-waiter p)))
	  (parameterize ((current-write-waiter new-waiter))
	    (handle-output! mq port wait!)))
	(if (signal-condition! closed-condition)
	    (inject-error! mq 'input:regular-end-of-file)
	    ;; TODO: close-port can block!
	    (close-port port)))
      (define (close!)
	(signal-condition! request-close-condition))
      (values (lambda (mq port)
		(spawn (lambda () (start-reader! mq port)))
		(spawn (lambda () (start-writer! mq port))))
	      ;; Pass this to make-message-queue as the 'sender'.
	      interrupt!
	      close!))

    (define* (port->message-queue port handlers error-handler
				  #:key (spawn spawn-fiber))
      "Create a message queue sending and receiving data over @var{port}.

This creates some fibers with @var{spawn} (@code{spawn-fiber} by default).
As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
All fibers will complete when the end-of-file has been encountered.

When the connection is broken, the error @code{input:regular-end-of-file}
is injected.  A half-duplex port is treated as a broken connection.
XXX: half-duplex connections cannot always be detected
XXX: Likewise for connect/fibers?

The port will be closed when the connection is broken or after
@ode{close-queue!} is called."
      (define-values (start-fibers interrupt! close!)
	(prepare-port-message-queue spawn))
      (define mq
	(make-message-queue handlers error-handler interrupt! close!))
      (start-fibers mq port)
      mq)

    (define* (connect/fibers config service-name handlers error-handler
			     #:key (spawn spawn-fiber))
      "Create a message queue that will be connected in the background
to a GNUnet service @var{service-name}.  The message queue can be used
before the message queue is connected; any send messages will be buffered
until they can be sent.  Some fibers may be created with @var{spawn}
(@code{spawn-fiber} by default).

When the connection has been established, the error @code{connection:connected}
(a symbol) is injected into the message queue.  When the connection has been
closed by the server (e.g. because the server was stopped or is restarting)
the error @code{input:regular-end-of-file} is injected into the message queue.

If the message queue is closed while establishing the connection, the error
@code{connection:interrupted} (a symbol) is injected and
@code{connection:connected} is not injected."
      (define-values (start-fibers interrupt! close!)
	(prepare-port-message-queue spawn))
      (define interrupt-condition (make-condition))
      (define (close*!)
	(signal-condition! interrupt-condition)
	(close!))
      (define mq
	(make-message-queue handlers error-handler interrupt! close*!))
      (spawn (lambda ()
	       ;; Use 'sleep*' to allow 'close*!' to stop the connection
	       ;; forming.
	       (define socket (let/ec escape
				(define allow-interrupt-operation
				  (wrap-operation
				   (wait-operation interrupt-condition)
				   (lambda () (escape #f))))
				(define (sleep* time)
				  (perform-operation
				   (choice-operation
				    (sleep-operation time)
				    allow-interrupt-operation)))
				(connect-unix config service-name sleep*)))
	       (if socket
		   (begin (inject-error! mq 'connection:connected)
			  (start-fibers mq socket))
		   (inject-error! mq 'connection:interrupted))))
      mq)))