aboutsummaryrefslogtreecommitdiff
path: root/gnu/gnunet/cadet/client.scm
blob: 7de91fb13c2276a824f3b0defbdc66ed020c9986 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
;#!r6rs
;; This file is part of Scheme-GNUnet.
;; Copyright © 2022 GNUnet e.V.
;;
;; Scheme-GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; Scheme-GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL-3.0-or-later
(define-library (gnu gnunet cadet client)
  (export connect disconnect!
	  make-cadet-address cadet-address? cadet-address-peer cadet-address-port
	  channel? open-channel! close-channel!
	  channel-message-queue
	  port? open-port! close-port!
	  %max-cadet-message-size

	  (rename (server:cadet? server?))

	  ;; Network manipulation procedures
	  ;; (these belong to (gnu gnunet cadet network)).
	  (rename (analyse-local-channel-create
		   | analyse-local-channel-create|)
		  (construct-local-channel-create
		   | construct-local-channel-create|)
		  (analyse-local-channel-destroy
		   | analyse-local-channel-destroy|)
		  (construct-local-channel-destroy
		   | construct-local-channel-destroy|)
		  (analyse-local-data | analyse-local-data|)
		  (construct-local-data | construct-local-data|)
		  (analyse-local-acknowledgement
		   | analyse-local-acknowledgement|)
		  (construct-local-acknowledgement
		   | construct-local-acknowledgement|)))
  (import (only (gnu extractor enum)
		value->index symbol-value)
	  (only (gnu gnunet cadet struct)
		%minimum-local-channel-id
		/:msg:cadet:local:channel:create
		/:msg:cadet:local:channel:destroy
		/:msg:cadet:local:data
		/:msg:cadet:local:acknowledgement)
	  (only (gnu gnunet crypto struct)
		/peer-identity)
	  (only (gnu gnunet concurrency lost-and-found)
	        losable-lost-and-found)
	  (only (gnu gnunet mq handler)
		message-handlers message-handler)
	  (only (gnu gnunet mq)
		close-queue! send-message! make-one-by-one-sender
		message-queue-length)
	  (only (gnu gnunet mq envelope)
		attempt-irrevocable-sent!)
	  (only (gnu gnunet server)
		maybe-ask* answer
		maybe-send-control-message!
		maybe-send-control-message!*
		make-disconnect!
		server-terminal-condition
		server-control-channel
		handle-control-message!
		make-loop run-loop server->loop-arguments loop:control-channel
		loop:terminal-condition)
	  (only (gnu gnunet hashcode struct)
		/hashcode:512)
	  (only (gnu gnunet message protocols) message-type)
	  (only (gnu gnunet mq)
		make-message-queue inject-message!)
	  (only (gnu gnunet netstruct syntactic)
		sizeof select read% set%!)
	  (only (gnu gnunet utils bv-slice)
		make-slice/read-write slice-copy/read-only slice-length
		slice-copy! slice-slice)
	  (only (gnu gnunet utils cut-syntax)
		cut-syntax)
	  (only (gnu gnunet utils hat-let)
		let^)
	  (only (rnrs base)
		begin define lambda assert quote cons apply values
		case else = define-syntax + expt - let and >
		not if <)
	  (only (rnrs control)
		when)
	  (only (pfds bbtrees)
		bbtree-set  make-bbtree bbtree-ref)
	  (only (rnrs records syntactic) define-record-type)
	  (only (ice-9 control) let/ec)
	  (only (ice-9 match) match)
	  (only (guile) define* error)
	  (only (fibers) spawn-fiber)
	  (only (srfi srfi-26)
		cut)
	  (only (srfi srfi-45)
		delay force))
  (begin
    (define-record-type (<server:cadet> %make-server server:cadet?)
      (parent <server>)
      (protocol (lambda (%make)
		  (lambda ()
		    ((%make))))))

    (define disconnect!
      (make-disconnect! 'cadet server:cadet?))

    (define-record-type (<channel> %make-channel channel?)
      (parent <losable>)
      (fields (immutable server channel-server) ; <server>
	      (immutable destination channel-address) ; <cadet-address>
	      (immutable options channel-options)
	      ;; Initially #false, when no channel number has been chosen yet
	      ;; by the client.  When the control loop accepts the <channel>,
	      ;; a channel number is assigned.  When a channel is closed, it is
	      ;; set to #true, but only after the remaining messages have been
	      ;; sent to the service.  Before setting this to #true, 'desire-close?'
	      ;; must be #true.
	      ;;
	      ;; After a reconnect, channel numbers are reset (TODO: implement that).
	      (mutable channel-number channel-channel-number
		       set-channel-channel-number!)
	      ;; Initially #false.  Set to #true when a close is requested.  Cannot
	      ;; revert to #false.  If #true, then once all messages have been sent
	      ;; to the service, channel-number must be set to #true.
	      (mutable desire-close? channel-desire-close?
		       set-channel-desire-close?)
	      (immutable message-queue channel-message-queue) ; <message-queue>
	      ;; (Natural number, possibly zero) The number of messages the service
	      ;; currently allows the the client to send to the service.
	      ;; This is decremented after sending a message to the service
	      ;; and incremented after receiving a
	      ;; @code{/:msg:cadet:local:acknowledgement}.
	      ;;
	      ;; Concurrency: this may only be read/written in the main event loop.
	      (mutable allow-send channel-allow-send set-channel-allow-send!))
      (protocol (lambda (%make)
		  (lambda (server destination options message-queue)
		    ((%make (losable-lost-and-found server)) server
		     destination options #false #false message-queue 0)))))

    (define empty-bbtree (make-bbtree <))

    (define* (connect config #:key (connected values) (disconnected values)
		      (spawn spawn-fiber))
      "Asynchronuously connect to the CADET service, using the configuration
@var{config}, returning a CADET server object."
      (define server (%make-server))
      (define loop
	(apply make-loop
	       #:make-message-handlers make-message-handlers
	       #:control-message-handler control-message-handler
	       #:service-name "cadet"
	       #:configuration config
	       #:connected connected
	       #:disconnected disconnected
	       #:spawn spawn
	       (server->loop-arguments server)))
      (spawn (lambda () (run-loop loop empty-bbtree %minimum-local-channel-id)))
      server)

    ;; channel-number->channel-map:
    ;;   A 'bbtree' from channel numbers to their corresponding
    ;;   <channel> object, or nothing if the control loop
    ;;   has not processes 'open-channel!' yet or if the channel
    ;;   has been closed.
    ;;
    ;;   TODO: GC problems, split in external and internal parts

    (define (make-message-handlers loop . _)
      (message-handlers
       (message-handler
	(type (symbol-value message-type msg:cadet:local:data))
	((interpose exp) exp)
	((well-formed? slice) #true)
	((handle! slice)
	 (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
		(! header (slice-slice slice 0 cadet-data-length))
		(! tail (slice-slice slice cadet-data-length))
		(! channel-number
		   (read% /:msg:cadet:local:data '(channel-number) header))
		(! channel
		   (maybe-ask* (loop:terminal-condition loop)
			       (loop:control-channel loop) 'channel
			       channel-number))
		(? (not channel)
		   ???))
	       ;; TODO: while the message is being processed, other messages
	       ;; cannot be accepted -- document this limitation.
	       (inject-message! (channel-message-queue channel) tail))))
       (message-handler
	(type (symbol-value message-type msg:cadet:local:acknowledgement))
	((interpose exp) exp)
	((well-formed? slice)
	 (= (slice-length slice)
	    (sizeof /:msg:cadet:local:acknowledgement '())))
	((handle! slice)
	 ;; The slice needs to be read here (and not in 'control'), as it might
	 ;; later be reused for something different.
	 (let ((channel-number (analyse-local-acknowledgement slice)))
	   (maybe-send-control-message!*
	    (loop:terminal-condition loop) (loop:control-channel loop)
	    'acknowledgement channel-number))))))

    (define (control-message-handler message control control* message-queue loop
				     channel-number->channel-map
				     next-free-channel-number)
      "The main event loop"
      (define (k/reconnect! channel-number->channel-map)
	(run-loop loop channel-number->channel-map next-free-channel-number))
      (define (close-if-possible! channel)
	;; Pre-conditions:
	;;  * the channel is open
	;;  * and a close has been requested
	;;
	;; TODO: untested.
	(when (= (message-queue-length (channel-message-queue channel)) 0)
	  (send-message! message-queue
			 (construct-local-channel-destroy
			  (channel-channel-number channel)))
	  ;; We don't need the envelope.
	  (values)))
      (define (continue)
	(control loop channel-number->channel-map next-free-channel-number))
      (define (continue* message)
	(control* message loop channel-number->channel-map
		  next-free-channel-number))
      ;; TODO: what about closed channels?
      (define (send-channel-stuff! channel)
	;; Send messages one-by-one, keeping in mind that we might not be able
	;; to send all messages to the service at once, only 'channel-allow-send'
	;; messages can be sent and this decreases by sending messages.
	;;
	;; TODO: use priority information, somehow when cancelling a message
	;; cancel the corresponding message to be sent to the CADET service when
	;; there is still time, zero-copy networking.
	(let/ec
	 stop
	 (define (stop-if-exhausted)
	   ;; The mutation 'replace > by >=' is caught by
	   ;; "data is not sent before an acknowledgement"
	   ;; in form of a hang.
	   (if (> (channel-allow-send channel) 0)
	       ;; (unless ...) and (when ...) can return *unspecified*,
	       ;; but (gnu gnunet mq) expects no return values. Detected
	       ;; by the "data is properly sent in response to acknowledgements, in-order"
	       ;; test.
	       (values)
	       (stop)))
	 ;; Tested by ‘data is properly sent in response to acknowledgements, in-order’
	 ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
	 (define (decrement!)
	   (set-channel-allow-send! channel
				    (- (channel-allow-send channel) 1)))
	 ;; It is important to check that a message can be sent before
	 ;; send! is called, otherwise the message will be removed from
	 ;; the message queue and be forgotten without being ever sent.
	 ;;
	 ;; Tested by ‘data is not sent before an acknowledgement’ -- it catches
	 ;; the mutation 'remove this line' (as a hang).
	 (stop-if-exhausted)
	 (define (send! envelope)
	   (attempt-irrevocable-sent!
	    envelope
	    ((go message priority)
	     ;; The mutation ‘don't call send-message!’ is caught by
	     ;; ‘data is properly sent in response to acknowledgements, in-order’
	     ;; as a hang and an exception.
	     ;;
	     ;; The mutation 'swap send-message!' and 'decrement!' is uncaught,
	     ;; but theoretically harmless.
	     ;; TODO: maybe get rid of the message queue limit in (gnu gnunet mq)
	     (send-message! message-queue
			    (construct-local-data
			     (channel-channel-number channel) ; TODO: multiple channels is untested
			     0 ;; TODO: relation between priority and priority-preference?
			     message)) ; TODO: sending the _right_ message is untested
	     ;; The mutation ‘don't call decrement!' is caught by
	     ;; ‘data is properly sent in response to acknowledgements, in-order’,
	     ;; as a hang with an exception.
	     (decrement!))
	    ((cancelled) (values)) ; TODO: untested
	    ((already-sent) (error "tried to send an envelope twice (CADET)")))
	   ;; Exit once nothing can be sent anymore (TODO check if
	   ;; make-one-by-one-sender allows non-local exits).
	   ;;
	   ;; The mutation 'don't call it' is caught by
	   ;; ‘data is properly sent in response to acknowledgements, in-order’
	   ;; as a hang and an exception?
	   ;;
	   ;; The mutation 'duplicate it' is uncaught, but theoretically harmless
	   ;; albeit inefficient.
	   (stop-if-exhausted))
	 ((make-one-by-one-sender send!) (channel-message-queue channel)))
	(when (channel-desire-close? channel)
	  (close-if-possible! channel)))
      (match message
        (('open-channel! channel)
	 (let^ ((! channel-number next-free-channel-number)
		;; TODO: handle overflow, and respect bounds
		(! next-free-channel-number (+ 1 next-free-channel-number))
		(_ (set-channel-channel-number! channel channel-number))
		;; Keep track of the new <channel> object; it will be required
		;; later by 'acknowledgement'.
		(! channel-number->channel-map
		   (bbtree-set channel-number->channel-map channel-number
			       channel)))
	       (send-local-channel-create! message-queue channel)
	       (control loop channel-number->channel-map next-free-channel-number)))
	(('close-channel! channel)
	 ;; 'close-channel!' can only be sent after the <channel> object
	 ;; was returned by the procedure 'open-channel!', because only
	 ;; then the channel becomes available. This procedure (synchronuously)
	 ;; sends a 'open-channel!' message and messages are processed by
	 ;; the control loop in-order, so the channel has already been opened.
	 ;;
	 ;; The only remaining states are: the channel is open / the channel
	 ;; is closed.
	 (let^ ((! channel-number (channel-channel-number channel))
		(? (channel-desire-close? channel)
		   ;; It has already been requested to close to channel
		   ;; (maybe it even has already been closed).  This is fine,
		   ;; as 'close-channel!' is idempotent.  Nothing to do!
		   ;; TODO: untested.
		   (continue)))
	       (set-channel-desire-close? channel #true)
	       ;; This procedure will take care of actually closing the channel
	       ;; (if currently possible).  If it's not currently possible
	       ;; due to a lack of acknowledgements, then a future 'send-channel-stuff!'
	       ;; (in response to an 'acknowledgement' message) will take care of things.
	       ;;
	       ;; TODO: untested.  TODO: untested in case of reconnects.
	       (close-if-possible! channel)
	       (continue)))
	(('resend-old-operations!)
	 ;; TODO: no operations and no channels are implemented yet,
	 ;; so for now nothing can be done.
	 (continue))
	(('acknowledgement channel-number)
	 ;; TODO: failure case
	 (let^ ((! channel
		   (bbtree-ref channel-number->channel-map channel-number)))
	       ;; The service is allowing us to send another message;
	       ;; update the number of allowed messages.
	       (set-channel-allow-send!
		channel (+ 1 (channel-allow-send channel)))
	       ;; Actually send some message, if there are any to send.
	       (send-channel-stuff! channel)
	       (continue)))
	(('send-channel-stuff! message-queue channel)
	 ;; Tell the service to send the messages over CADET.
	 (send-channel-stuff! channel)
	 (continue))
	;; Respond to a query of the msg:cadet:local:data message handler.
	(('channel answer-box channel-number)
	 (answer answer-box
		 (bbtree-ref channel-number->channel-map
			     channel-number (lambda () #false)))
	 (continue))
	(('lost . lost)
	 (let loop ((lost lost))
	   (match lost
	     (() (continue))
	     ((object . rest)
	      (match object
	        ((? channel? lost)
		 TODO
		 (loop rest))
		((? server:cadet? lost)
		 (continue* '(disconnect!))))))))
	(rest
	 (handle-control-message!
	  rest message-queue (loop:terminal-condition loop)
	  (cut k/reconnect! channel-number->channel-map)))))

    (define-record-type (<cadet-address> make-cadet-address cadet-address?)
      (fields (immutable peer cadet-address-peer)
	      (immutable port cadet-address-port))
      (protocol (lambda (%make)
		  "Make a CADET address for contacting the peer @var{peer}
(a readable bytevector slice containing a @code{/peer-identity}) at port
@var{port} (a readable bytevector slice containing a @code{/hashcode:512}).
The slices @var{peer} and @var{port} are copied, so future changes to them
do not have any impact on the cadet address."
		  (lambda (peer port)
		    (assert (= (sizeof /peer-identity '()) (slice-length peer)))
		    (assert (= (sizeof /hashcode:512 '()) (slice-length port)))
		    (%make (slice-copy/read-only peer)
			   (slice-copy/read-only port))))))

    (define* (construct-local-channel-create cadet-address channel-number
					     #:optional (options 0))
      "Create a new @code{/:msg:cadet:channel:create} message for contacting
the CADET addresss @var{cadet-address}, using the channel number
@var{channel-number} and options @var{options}."
      (define s
	(make-slice/read-write (sizeof /:msg:cadet:local:channel:create '())))
      (define-syntax set*
	(cut-syntax set%! /:msg:cadet:local:channel:create <> s <>))
      (define-syntax select*
	(cut-syntax select /:msg:cadet:local:channel:create <> s))
      (set* '(header size) (slice-length s))
      (set* '(header type)
	    (value->index
	     (symbol-value message-type msg:cadet:local:channel:create)))
      (set* '(channel-number) channel-number)
      (slice-copy! (cadet-address-peer cadet-address) (select* '(peer)))
      (slice-copy! (cadet-address-port cadet-address) (select* '(port)))
      (set* '(options) options)
      s)

    (define (send-local-channel-create! mq channel)
      (send-message!
       mq (construct-local-channel-create
	   (channel-address channel) (channel-channel-number channel))))

    (define (analyse-local-channel-create message)
      "Return the CADET address, channel number and options corresponding to
the @code{/:msg:cadet:channel:create} message @var{message}."
      (define-syntax read*
	(cut-syntax read% /:msg:cadet:local:channel:create <> message))
      (define-syntax select*
	(cut-syntax select /:msg:cadet:local:channel:create <> message))
      (let^ ((! channel-number (read* '(channel-number)))
	     (! peer (select* '(peer)))
	     (! port (select* '(port)))
	     (! channel-number (read* '(channel-number)))
	     (! options (read* '(options)))
	     (! address (make-cadet-address peer port)))
	    (values address channel-number options)))

    (define (construct-local-channel-destroy channel-number)
      "Create a @code{/:msg:cadet:channel:destroy} message for closing the
CADET channel with channel number @var{channel-number}."
      (define s
	(make-slice/read-write (sizeof /:msg:cadet:local:channel:destroy '())))
      (define-syntax set*
	(cut-syntax set%! /:msg:cadet:local:channel:destroy <> s <>))
      (set* '(header size) (slice-length s))
      (set* '(header type)
	    (value->index
	     (symbol-value message-type msg:cadet:local:channel:destroy)))
      (set* '(channel-number) channel-number)
      s)

    (define (analyse-local-channel-destroy message)
      "Return the channel number corresponding to the
@code{/:msg:cadet:local:channel:destroy} message @var{message}."
      (read% /:msg:cadet:local:channel:destroy '(channel-number) message))

    ;; TODO: determine maximum length
    (define %max-cadet-message-size
      (- (- (expt 2 16) 1) (sizeof /:msg:cadet:local:data '())))

    ;; would be nice to avoid copying
    ;; TODO: direction (service->client, client->service?)
    (define (construct-local-data channel-number priority-preference data)
      "Create a @code{/:msg:cadet:local:data} message ???"
      (define header-size (sizeof /:msg:cadet:local:data '()))
      (define s (make-slice/read-write (+ header-size (slice-length data))))
      (define header (slice-slice s 0 header-size))
      (define rest (slice-slice s header-size))
      (define-syntax set*
	(cut-syntax set%! /:msg:cadet:local:data <> header <>))
      (set* '(header size) (slice-length s))
      (set* '(header type)
	    (value->index
	     (symbol-value message-type msg:cadet:local:data)))
      (set* '(channel-number) channel-number)
      (set* '(priority-preference) priority-preference)
      (slice-copy! data rest)
      s)

    (define (analyse-local-data message)
      "Return the channel number, the numeric priority-preference value and data
in the @code{/:msg:cadet:local:data} message @var{message}."
      (define header
	(slice-slice message 0 (sizeof /:msg:cadet:local:data '())))
      (define-syntax read*
	(cut-syntax read% /:msg:cadet:local:data <> header))
      (define-syntax select*
	(cut-syntax select /:msg:cadet:local:data <> header))
      (values (read* '(channel-number))
	      (read* '(priority-preference))
	      (slice-slice message
			   (sizeof /:msg:cadet:local:data '()))))

    (define (construct-local-acknowledgement channel-number)
      "Create a @code{/:msg:cadet:local:acknowledgement} message,
to inform the client that more data can be sent across the channel
identified by @var{channel-number}."
      (define s
	(make-slice/read-write (sizeof /:msg:cadet:local:acknowledgement '())))
      (define-syntax set*
	(cut-syntax set%! /:msg:cadet:local:acknowledgement <> s <>))
      (set* '(header size) (slice-length s))
      (set* '(header type)
	    (value->index
	     (symbol-value message-type msg:cadet:local:acknowledgement)))
      (set* '(client-channel-number) channel-number)
      s)

    (define (analyse-local-acknowledgement message)
      "Return the channel number in the @code{/:msg:cadet:local:data}
message @var{message}."
      (read% /:msg:cadet:local:acknowledgement '(client-channel-number) message))

    (define (stub . foo)
      (error "todo"))

    ;; TODO: callbacks, message queue, actually test it
    (define* (open-channel! server address handlers)
      "Asynchronuously connect to the cadet address @var{address} via the CADET server
object @var{server}, returning a CADET channel object.  When a message is
received, it is passed to the appropriate handler."
      (assert (and (server:cadet? server) (cadet-address? address)))
      (define error-handler stub)
      (define sender (make-channel-sender (delay channel)))
      (define close-channel!
	(let ((terminal-condition (server-terminal-condition server))
	      (control (server-control-channel server)))
	  (lambda ()
	    ;; To keep channels garbage-collectable, use
	    ;; maybe-send-control-message!* instead of maybe-send-control-message!.
	    ;;
	    ;; TODO: add handler in control loop
	    (maybe-send-control-message!* terminal-condition
					  control 'close-channel! channel))))
      (define message-queue
	(make-message-queue handlers
			    error-handler
			    (make-channel-sender (delay channel))
			    close-channel!))
      (define channel (%make-channel server address 0 message-queue))
      (maybe-send-control-message! server 'open-channel! channel)
      channel)

    (define (make-channel-sender channel-promise)
      (lambda (message-queue)
	(define channel (force channel-promise))
	(define server (channel-server channel))
	;; Ask the main event loop to send messages.
	;; We could use 'make-one-by-one' sender to ask the main event
	;; loop to send them (one-by-one), but such ping-ponging seems
	;; much slower than needed (unverified).
	(maybe-send-control-message! server
				     'send-channel-stuff!
				     message-queue
				     (force channel-promise))
	;; the #true or #false return value does not appear relevant here
	(values)))

    ;; TODO: call when mq is closed, maybe unify closing the message queue
    ;; and the channel?
    (define (close-channel! channel)
      "Close the channel @var{channel}. This is an asynchronuous operation, it
does not have an immediate effect. This is an idempotent operation, closing
a channel twice does not have any additional effect.

Any buffered messages before the call to the first @code{close-channel!},
will still be sent to the service (unless cancelled and until the
@code{disconnect!}). If messages requested reliable transport, then CADET will
still retransmit lost messages even though the channel is closed or closing.

For buffered messages not before (*) the call, it is unspecified whether they
will still be transmitted.

(*) Warning: in a concurrent setting, ‘after’ is not the same as ‘not before’."
      (assert (channel? channel))
      (maybe-send-control-message! (channel-server channel) 'close-channel!
				   channel))
    (define port? stub)
    (define open-port! stub)
    (define close-port! stub)))