aboutsummaryrefslogtreecommitdiff
path: root/gnu/gnunet/dht/client.scm
blob: 3874c32fe83180ed9b09ef4db80eaa300f0df672 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
;#!r6rs
;; This file is part of GNUnet
;; Copyright (C) 2004-2013, 2016, 2021-2023 GNUnet e.V.
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL-3.0-or-later

;; Author: Christian Grothoff
;; Author: Nathan Evans
;; ^^ TODO: not visible yet, but once more parts are ported ...
;; Author: Maxime Devos (Scheme port)
(define-library (gnu gnunet dht client)
  (export %effective-minimum-replication-level
	  %effective-maximum-replication-level
	  %minimum-replication-level
	  %maximum-replication-level
	  bound-replication-level

	  ;; Non-interactive data structures
	  %max-datum-value-length
	  &overly-large-datum make-overly-large-datum overly-large-datum?
	  overly-large-datum-type overly-large-datum-length

	  %overly-large-paths make-overly-large-paths overly-large-paths?
	  overly-large-paths-datum-length
	  overly-large-paths-get-path-length overly-large-paths-put-path-length

	  &malformed-path make-malformed-path malformed-path?
	  malformed-path-what malformed-path-size

	  make-datum make-datum/share datum? datum-type datum-key datum-value
	  datum-expiration datum=?
	  make-insertion make-insertion/share insertion? insertion->datum
	  insertion=?
	  insertion-desired-replication-level
	  make-query query? query-type query-key query-desired-replication-level
	  datum->search-result search-result? search-result->datum
	  search-result-get-path search-result-put-path

	  copy-datum copy-search-result copy-insertion copy-query

	  ;; Network message manipulation procedures
	  ;; (these belong to (gnu gnunet dht network)).
	  (rename (construct-client-get | construct-client-get|)
		  (construct-client-get-stop | construct-client-get-stop|)
		  (construct-client-put | construct-client-put|)
		  (construct-client-result | construct-client-result|)
		  (analyse-client-get | analyse-client-get|)
		  (analyse-client-get-stop | analyse-client-get-stop|)
		  (analyse-client-put | analyse-client-put|)
		  (analyse-client-result | analyse-client-result|))

	  (rename (server:dht? server?))

	  connect
	  disconnect!
	  put!
	  cancel-put!
	  start-get!
	  filter-get:known-results!
	  stop-get!
	  ;; Extended API: monitor
	  start-monitor!
	  stop-monitor!)
  (import (gnu extractor enum)
	  (gnu gnunet block)
	  (gnu gnunet hashcode struct)
	  (gnu gnunet hashcode)
	  (gnu gnunet mq)
	  (gnu gnunet mq handler)
	  (gnu gnunet mq envelope)
	  (only (gnu gnunet server)
		maybe-send-control-message! maybe-send-control-message!*
		maybe-ask* answer
		<server> server-terminal-condition server-control-channel
		make-disconnect! handle-control-message!
		loop:terminal-condition loop:control-channel
		run-loop spawn-server-loop)
	  (only (guile)
		define-syntax-rule define* lambda* error
		->bool and=> identity)
	  (only (ice-9 atomic)
		make-atomic-box)
	  (only (ice-9 match)
		match)
	  (only (ice-9 weak-vector)
		weak-vector weak-vector-ref weak-vector?)
	  (only (pfds bbtrees)
		bbtree-size bbtree-fold bbtree-set bbtree-contains?
		bbtree-delete make-bbtree bbtree-ref)
	  (only (gnu extractor enum)
		symbol-value)
	  (only (gnu gnunet concurrency lost-and-found)
		losable-lost-and-found)
	  (gnu gnunet dht struct)
	  (only (gnu gnunet message protocols)
		message-type)
	  (only (gnu gnunet netstruct syntactic)
		read% sizeof r% s% analyse define-analyser
		construct =>! =>slice! %sizeof)
	  (only (gnu gnunet utils bv-slice)
		slice-length slice/read-only make-slice/read-write slice-copy!
		slice-slice verify-slice-readable slice-copy/read-write
		slice-copy/read-only slice-contents-equal?)
	  (gnu gnunet utils hat-let)
	  (only (gnu gnunet utils records)
		define-record-type*)
	  (only (rnrs base)
		and < >= = quote * / + - define begin ... let*
		quote case else values apply let cond if > eq?
		<= expt assert exact? integer? lambda for-each
		not expt min max div-and-mod positive?
		vector cons append list =>)
	  (only (rnrs control)
		unless when)
	  (only (rnrs records syntactic)
		define-record-type)
	  (only (rnrs conditions)
		&error condition make-who-condition define-condition-type)
	  (only (rnrs exceptions)
		raise)
	  (only (srfi srfi-26)
		cut))
  (begin
    ;; The minimal and maximal replication levels the DHT service allows.
    ;; While the service won't reject replication levels outside this range,
    ;; it will clip them to within this range, so choosing replication levels
    ;; outside this range is useless.
    ;;
    ;; Also, GNUnet v0.15.3 and earlier has a bug where the DHT service can crash
    ;; if the replication level 0 is passed, see https://bugs.gnunet.org/view.php?id=7029.
    ;;
    ;; These values are based on the MINIMUM_REPLICATION_LEVEL and
    ;; MAXIMUM_REPLICATION_LEVEL values in src/dht/gnunet-service-dht_neighbours.c
    ;; of the C implementation.
    (define %effective-minimum-replication-level 1)
    (define %effective-maximum-replication-level 16)
    (define %minimum-replication-level 0)
    (define %maximum-replication-level (- (expt 2 32) 1))

    ;; Called by 'send-get!'.
    (define (bound-replication-level replication-level)
      "Bound the replication level @var{replication-level}, which must be a
valid replication to the level, to the range the DHT service likes."
      (unless (<= %minimum-replication-level replication-level
		  %maximum-replication-level)
	(error "replication level is out of bounds"))
      ;; OOPS swap them
      (max %effective-minimum-replication-level
	   (min %effective-maximum-replication-level replication-level)))

    (define (validate-key key)
      "If @var{key} is, in-fact, a hashcode:512, return it.   If not, raise an
appropriate exception."
      (if (hashcode:512? key)
	  key
	  (error "not a hashcode:512")))

    (define (validate-datum datum)
      "If @var{datum} is, in-fact, a datum, return it. Otherwise, raise an
appropriate exception."
      (if (datum? datum) datum (error "not a datum")))

    ;; TODO: more-or-less copied from gnunet_util_lib.h
    (define %max-message-size 65535)

    (define %max-datum-value-length
      (- %max-message-size (sizeof /:msg:dht:client:put '())))
    (assert (<= 0 %max-datum-value-length))

    ;; TODO: maybe check types in 'make-overly-large-datum'.
    ;; TODO: &error / &serious / &condition?
    (define-condition-type &overly-large-datum &error
      make-overly-large-datum
      overly-large-datum?
      ;; block type, as an (exact) integer
      (type   overly-large-datum-type)
      ;; length of the (overly large) value
      (length overly-large-datum-length))

    ;; A condition indicating that the combination of get path, put path
    ;; and datum is too long -- it is the combination that is too long,
    ;; not necessarily any part in particular.
    ;;
    ;; TODO: see &overly-large-datum
    (define-condition-type &overly-large-paths &error
      make-overly-large-paths
      overly-large-paths?
      ;; length of datum value (does not include the key or type)
      (datum-length overly-large-paths-datum-length)
      ;; The length (not the size!) of the get path, i.e. the number
      ;; of path elements.  Zero if there is no get path.
      (get-path-length overly-large-paths-get-path-length)
      ;; Likewise, for the put path.
      (put-path-length overly-large-paths-put-path-length))

    ;; The would-be get-path or put-path does not have the correct size
    ;; to be a path.
    (define-condition-type &malformed-path &error
      make-malformed-path
      malformed-path?
      ;; the symbol 'get-path' or 'put-path'
      (what malformed-path-what)
      ;; size of the would-be path (in octets)
      (size malformed-path-size))

    ;; An key-value entry in the DHT.
    (define-record-type* (<datum> datum?)
      #:constructor %make-datum/share
      #:constructor/copy %make-datum
      #:copy (copy-datum
	      "Make a copy of the datum, such that modifications to the slices
in the original do not impact the copy.")
      #:equality datum=?
      #:field (type #:copy identity
		    #:equality =
		    #:getter datum-type
		    #:preprocess canonical-block-type)
      #:field (key #:copy copy-hashcode:512
		   #:equality hashcode:512=?
		   #:getter datum-key
		   #:preprocess validate-key)
      #:field (value #:copy slice-copy/read-only
		     #:equality slice-contents-equal?
		     #:getter datum-value
		     #:preprocess
		     (=>
		      (if (<= (slice-length value) %max-datum-value-length)
			  (slice/read-only value)
			  (raise (condition
				  (make-who-condition 'make-datum)
				  (make-overly-large-datum
				   type (slice-length value)))))))
      #:field (expiration #:copy identity
			  #:equality =
			  #:getter datum-expiration))

    ;; TODO default expiration
    (define* (make-datum type key value #:key (expiration 0))
      "Make a datum object of block type @var{type} (or its corresponding
numeric value), with key @var{key} (a hashcode:512), value @var{value} (a
readable bytevector slice) and expiring at @var{expiration} (TODO type, epoch).
The keyword argument expiration is optional, see ?.

Datums are @acronym{cisw} objects and as such the procedures
@code{datum-type}, @code{datum-key}, @code{datum-value},
@code{datum-expiration}, @code{datum?}, @code{make-datum} and
@code{make-datum/share} and @datum=?} have the usual semantics.  The length of
value may be at most @code{%max-datum-value-length}. If this bound is exceeded,
an appropriate @code{&overly-large-datum} and @code{&who} condition is raised in
the constructor."
      (%make-datum type key value expiration))

    (define* (make-datum/share type key value #:key (expiration 0))
      (%make-datum/share type key value expiration))

    ;; A request to insert something in the DHT.
    (define-record-type* (<insertion> insertion?)
      #:copy (copy-insertion
	      "Make a copy of the insertion, such that modifications to the
slices in the old insertion do not impact the new insertion.")
      #:constructor/copy %make-insertion
      #:constructor %make-insertion/share
      #:equality insertion=?
      #:field (datum #:copy copy-datum
		     #:equality datum=?
		     #:getter insertion->datum
		     #:preprocess validate-datum)
      #:field (desired-replication-level
	       #:copy identity
	       #:equality =
	       #:getter insertion-desired-replication-level
	       #:preprocess bound-replication-level))

    (define* (make-insertion datum #:key (desired-replication-level 3)) ; TODO defaults
      "Make an insertion object for inserting the datum @var{datum},
desiring a replication level @var{desired-replication-level} (see ??).

Insertions are cisw (?) ojects and as such the procedures
@code{insertion->datum}, @code{insertion-desired-replication-level},
@code{insertion?}, @code{make-insertion}, @code{make-insertion/share}
and @code{insertion=?} have the usual semantics."
      (%make-insertion datum desired-replication-level))
    (define* (make-insertion/share datum #:key (desired-replication-level 3))
      (%make-insertion/share datum desired-replication-level))

    (define-record-type (<query> make-query query?)
      (fields (immutable type query-type)
	      (immutable key query-key)
	      (immutable desired-replication-level query-desired-replication-level))
      (protocol
       (lambda (%make)
	 (lambda* (type key #:key (desired-replication-level 3))
	   "Make a query object for searching for a value of block type @var{type}
(or its corresponding numeric value), with key @var{key} (a hashcode:512}, at
desired replication level @var{desired-replication-level}.

The numeric value of the block type, the key and the desired replication level
can be recovered with the accessors @code{query-type}, @code{query-key} and
@code{query-desired-replication-level}. It can be tested if an object is a
query object with the predicate @code{query?}."
	   (%make (canonical-block-type type)
		  (validate-key key)
		  (bound-replication-level desired-replication-level))))))

    (define (copy-query old)
      "Make a copy of the query object @var{old}, such that modifications to the
slices in @var{old} do not impact the new query object."
      (make-query (query-type old)
		  (copy-hashcode:512 (query-key old))
		  #:desired-replication-level
		  (query-desired-replication-level old)))

    (define-record-type (<search-result> datum->search-result search-result?)
      (fields (immutable datum search-result->datum)
	      (immutable get-path search-result-get-path)
	      (immutable put-path search-result-put-path))
      (protocol
       (lambda (%make)
	 (lambda* (datum #:key (get-path #f) (put-path #f))
	   "Make a search result object for the datum @var{datum}. The datum can
be recovered with the accessor @code{search-result->datum}. It can be tested if
an object is a search result with the predicate @code{search-result?}. The
optional arguments @var{get-path} and @var{put-path}, when not false, are bytevector
slices consisting of a list of @code{/dht:path-element}.

The @var{get-path} , if any, is the path from the storage location to the
current peer. Conversely, the @var{put-path}, if any, is a path from the
peer that inserted the datum into the DHT to the storage location. The
@var{get-path}} and @var{put-path} can be accessed with
@code{search-result-get-path} and @code{search-result-put-path} respectively.

When the datum, get path and put path together are too large, a
@code{&overly-large-paths} condition is raised. When the
bytevector slice length of @var{get-path} or @var{put-path} is not a
multiple of the size of a path element, then a @code{&malformed-path}
condition is raised."
	   ;; TODO: can a get-path exist without a put-path?
	   (let^ ((! (make-who)
		     (make-who-condition 'datum->search-result))
		  (! datum (validate-datum datum))
		  (!^ (verify-path what path)
		      "Test if @var{path} looks like a get path, put path or
falsehood. If it is false, return @code{#false}, @code{0} and @code{0}.
Otherwise, if it appears to be a valid path, return @var{path} as a readable
bytevector slice, the size of the path and the length of the path.  If
@var{path} is invalid, raise an appropriate exception."
		     ((? (not path)
			 (values #false 0 0))
		      ;; Verify the slice is readable, and make sure the
		      ;; 'what' field of the &missing-capabilities is
		      ;; precise -- we can rely on slice/read-only to
		      ;; perform capability checking, but then the 'what'
		      ;; field wouldn't be correct.
		      (_ (verify-slice-readable what path))
		      ;; Verify the path actually consists of an integral number
		      ;; of /dht:path-element structures.
		      (! size (slice-length path))
		      (<-- (length remainder)
			   (div-and-mod size (sizeof /dht:path-element '())))
		      (? (positive? remainder)
			 (raise (condition
				 (make-who)
				 (make-malformed-path what size)))))
		     ;; We could place an upper bound on the length of
		     ;; @var{path} here, but that's a bit useless because
		     ;; we will verify the total length (get-path + put-path)
		     ;; later anyway.
		     (values (slice/read-only path) size length))
		  ;; Verify both the get-path and the put-path (if any),
		  ;; remove writability and only keep readability.
		  (<-- (get-path get-path-size get-path-length)
		       (verify-path 'get-path get-path))
		  (<-- (put-path put-path-size put-path-length)
		       (verify-path 'put-path put-path))
		  ;; Make sure the get-path, put-path, datum and
		  ;; /:msg:dht:client:result header will fit in a GNUnet
		  ;; message.  TODO: maybe also consider other messages?
		  (! hypothetical-message-size
		     (+ (sizeof /:msg:dht:client:result '())
			get-path-size put-path-size))
		  (? (> hypothetical-message-size %max-message-size)
		     (raise (condition
			     (make-who)
			     (make-overly-large-paths
			      (slice-length (datum-value datum))
			      get-path-length put-path-length)))))
		 (%make datum get-path put-path))))))

    (define (copy-search-result old)
      "Make a copy of the search result @var{old}, such that modifications to the
slices in @var{old} do not impact the new search result."
      (define get-path (search-result-get-path old))
      (define put-path (search-result-put-path old))
      (datum->search-result (copy-datum (search-result->datum old))
			    #:get-path
			    (and get-path (slice-copy/read-only get-path))
			    #:put-path
			    (and put-path (slice-copy/read-only put-path))))

    

    ;;;
    ;;; Constructing and analysing network messages.
    ;;;
    ;;; These procedures are defined here instead of in (gnu gnunet dht network),
    ;;; but only to prevent cycles.
    ;;;

    (define* (construct-client-get query unique-id #:optional (options 0))
      "Create a new @code{/:msg:dht:client:get} message for the query object
 @var{query}, with @var{unique-id} as ‘unique id’ and @var{options} as options."
      (construct /:msg:dht:client:get
        (=>! (header size) (%sizeof))
	(=>! (header type)
	     (value->index (symbol-value message-type msg:dht:client:get)))
	(=>! (options) options)
	(=>! (desired-replication-level)
	     (query-desired-replication-level query))
	(=>! (type) (query-type query))
	(=>slice! (key) (hashcode:512->slice (query-key query)))
	(=>! (unique-id) unique-id)))

    (define* (construct-client-get-stop key unique-id)
      "Create a new @code{/:msg:dht:client:get:stop} message for cancelling a
get request with @var{unique-id} as unique id and @var{key} as key."
      (construct /:msg:dht:client:get:stop
        (=>! (header size) (%sizeof))
	(=>! (header type)
	     (value->index (symbol-value message-type msg:dht:client:get:stop)))
	(=>! (reserved) 0)
	(=>! (unique-id) unique-id)
	(=>slice! (key) (hashcode:512->slice key))))

    (define* (construct-client-put insertion #:optional (options 0))
      "Create a new @code{/:msg:dht:client:put} message for the insertion
object insertion with @var{options} as options."
      (define datum (insertion->datum insertion))
      (construct /:msg:dht:client:put
        #:tail (rest (slice-length (datum-value datum)))
	(=>! (header size) (%sizeof))
	(=>! (header type)
	     (value->index (symbol-value message-type msg:dht:client:put)))
	(=>! (type) (datum-type datum))
	(=>! (option) options)
	(=>! (desired-replication-level)
	     (insertion-desired-replication-level insertion))
	(=>! (expiration) (datum-expiration datum))
	;; Copy key-data pair to insert into the DHT.
	(=>slice! (key) (hashcode:512->slice (datum-key datum)))
	(slice-copy! (datum-value datum) rest)))

    (define (construct-client-result search-result unique-id)
      "Create a new @code{/:msg:dht:client:result} message for the search
result object @var{search-result}, with @var{unique-id} as ‘unique id’"
      (let^ ((! datum (search-result->datum search-result))
	     (! get-path (search-result-get-path search-result))
	     (! put-path (search-result-put-path search-result))
	     (! value (datum-value datum))
	     (! (path-length path)
		(if path
		    (/ (slice-length path) (sizeof /dht:path-element '()))
		    0))
	     (! get-path-length (path-length get-path))
	     (! put-path-length (path-length put-path)))
	    (construct /:msg:dht:client:result
	      #:tail (rest (+ (slice-length value)
			      get-path-length
			      put-path-length))
	      (=>! (header type)
		   (value->index
		    (symbol-value message-type msg:dht:client:result)))
	      (=>! (header size) (%sizeof))
	      (=>! (type) (datum-type datum))
	      (=>! (get-path-length) get-path-length)
	      (=>! (put-path-length) put-path-length)
	      (=>! (unique-id) unique-id)
	      (=>! (expiration) (datum-expiration datum))
	      (=>slice! (key) (hashcode:512->slice (datum-key datum)))
	      ;; TODO: get-path and put path!
	      (slice-copy! value rest))))

    (define-analyser analyse-client-get /:msg:dht:client:get
      "Return the query object, the unique id and the options corresponding to
the @code{/:msg:dht:client:result} message @var{message}.  Xqueries are
currently unsupported."
      (values (make-query (r% type) (make-hashcode:512/share (s% key))
			  #:desired-replication-level
			  (r% desired-replication-level))
	      (r% unique-id)
	      (r% options)))

    (define-analyser analyse-client-get-stop /:msg:dht:client:get:stop
      "Return the unique id and the key corresponding to the
@code{/:msg:dht:client:stop} message @var{message}."
      (values (r% unique-id) (s% key)))

    (define (analyse-client-put message)
      "Return the insertion object and options corresponding to the
@code{/:msg:dht:client:put} message @var{message}."
      (define header (slice-slice message 0 (sizeof /:msg:dht:client:put '())))
      (define value (slice-slice message (sizeof /:msg:dht:client:put '())))
      (analyse /:msg:dht:client:put header
	       (values
		(make-insertion/share
		 (make-datum/share
		  (r% type)
		  (make-hashcode:512/share (s% key))
		  value
		  #:expiration (r% expiration))
		 #:desired-replication-level (r% desired-replication-level))
		(r% option))))

    (define (analyse-client-result message)
      "Return search result object and unique id for the
@code{/:msg:dht:client:result} message @var{message}."
      (define message* (slice/read-only message))
      (define size/header (sizeof /:msg:dht:client:result '()))
      (define header (slice-slice message* 0 size/header))
      (define rest (slice-slice message* size/header))
      (define size/path-element (sizeof /dht:path-element '()))
      (analyse
       /:msg:dht:client:result
       header
       (values (datum->search-result
		(make-datum/share
		 (r% type)
		 (make-hashcode:512/share (s% key))
		 ;; 'value'
		 (slice-slice rest
			      (* size/path-element
				 (+ (r% put-path-length)
				    (r% get-path-length))))
		 #:expiration (r% expiration))
		#:get-path
		(slice-slice rest
			     (* size/path-element (r% put-path-length))
			     (* size/path-element (r% get-path-length)))
		#:put-path
		(slice-slice rest 0
			     (* size/path-element (r% put-path-length))))
	       (r% unique-id))))
    

    ;; New operations are communicated to the main event loop
    ;; via the control channel, using 'maybe-send-control-message!'.
    ;; Operations must be put in id->operation-map before sending them
    ;; to the service!
    (define-record-type (<server:dht> make-server server:dht?)
      (parent <server>)
      ;; Atomic box holding an unsigned 64-bit integer.
      (fields (immutable next-unique-id/box server-next-unique-id/box))
      (protocol (lambda (%make)
		  (lambda ()
		    ((%make)
		     ;; Any ‘small’ natural number will do.
		     (make-atomic-box 0))))))

    (define-record-type (<get> %make-get get?)
      (parent <losable>)
      (fields (immutable server get:server)
	      (immutable found get:iterator) ; procedure accepting <search-result>
	      (immutable query get:query) ; <query>
	      (immutable unique-id get:unique-id)
	      (immutable options get:options)
	      ;; TODO: test if non-lingering actually works.
	      ;;
	      ;; If #false, 'reconnect' does not keep a strong reference to the
	      ;; search object and 'reconnect' will automatically cancel the
	      ;; search when the search object becomes unreachable.
	      ;;
	      ;; If #true, the search will not be automatically cancelled;
	      ;; 'reconnect' keeps a strong reference.
	      (immutable linger? get:linger?))
      (protocol (lambda (%make)
		  (lambda (server found query unique-id options linger?)
		    ;; When not lingering, add this search object to the lost
		    ;; and found, such that it will eventually be cancelled.
		    ((%make (and (not linger?)
				 (losable-lost-and-found server)))
		     server found query unique-id options linger?)))))

    (define-record-type (<put> %make-put put?)
      (fields (immutable server put:server)
	      (immutable inserted put:inserted) ; thunk
	      ;; bytevector slice (/:msg:dht:client:put)
	      (immutable message put:message)))

    (define (send-get! mq get)
      "Send a GET message for @var{get}."
      (send-message! mq (construct-client-get (get:query get)
					      (get:unique-id get)
					      (get:options get))))

    (define (send-stop-get! mq get)
      "Send a message for stopping the get operation @var{get}."
      (send-message!
       mq
       (construct-client-get-stop (query-key (get:query get))
				  (get:unique-id get))))

    (define (fresh-id server)
      "Generate a fresh numeric ID to use for communication with @var{server}."
      ;; Atomically increment the ‘next unique id’, but avoid
      ;; overflow (the GNUnet network structures limit the ‘unique id’
      ;; to being less than (expt 2 64)).
      (%%bind-atomic-boxen
       ((next-unique-id (server-next-unique-id/box server) swap!))
       (let loop ((expected next-unique-id))
	 (define desired (+ 1 expected))
	 ;; TODO(low-priority): handle overflow without errors
	 (when (> desired (- (expt 2 64) 1))
	   (error "you overflowed an 64-bit counter."))
	 (define actual (swap! expected desired))
	 (if (= expected actual)
	     ;; Always returning ‘desired’ instead of ‘expected’ would work
	     ;; too.
	     expected
	     (loop actual)))))

    (define (canonical-block-type type)
      "Return the numeric value of the block type @var{type}
(a @code{block-type?} or in-bounds integer)."
      (cond ((and (integer? type) (exact? type))
	     (unless (and (<= 0 type (- (expt 2 32) 1)))
	       (error "block type out of bounds"))
	     type)
	    (#t
	     (assert (block-type? type))
	     (value->index type))))

    (define* (start-get! server query found #:key (linger? #false))
      "Search for data matching query in the DHT. When a datum is found, call
the unary procedure @var{found} on the search result. It is possible to find
multiple data matching a query. In that case, found is called multiple times.
Searching happens asynchronuously; to stop the search, a fresh search object
for controlling the search is returned.

The procedure found is run from the context of server. As such, if @var{found}
blocks, then all operations on server might block. As such, it is recommended
for found to do as little as possible by itself and instead delegate any work
to a separate fiber.

To avoid expensive copies, the implementation can choose to reuse internal
buffers for the slices passed to @var{found}, which could be overwritten after
the call to @var{found}. As such, it might be necessary to make a copy of the
search result, using @lisp{copy-search-result}.

When the boolean @var{linger?} is false (this is the default), the search is
automatically cancelled when the search object becomes unreachable according
to the GC."
      ;; TODO: options, xquery ...
      (define id (fresh-id server))
      (define options 0) ; TODO: allow setting some options
      (when linger? (assert (eq? linger? #true)))
      (define handle (%make-get server found query id 0 (->bool linger?)))
      (maybe-send-control-message! server 'start-get! handle)
      handle)

    (define (stop-get! search)
      "Cancel the get operation @var{search}.  This is an asynchronuous operation;
it does not have an immediate effect.  This is an idempotent operation; cancelling
a search twice does not have any additional effect."
      (maybe-send-control-message! (get:server search) 'stop-search! search)
      (values))

    (define* (put! server insertion #:key (confirmed values))
      "Perform the insertion @var{insertion}. When the datum has been inserted,
the thunk @var{confirmed} is called. A @emph{put object} is returned which can
be used to stop the insertion.

TODO actually call @var{confirmed}"
      ;; Prepare the message to send.  TODO: options
      (define message (construct-client-put insertion))
      (define handle (%make-put server confirmed message))
      ;; TODO: see start-get!
      (maybe-send-control-message! server 'put! handle)
      handle)

    (define-syntax-rule (well-formed?/path-length slice type (field ...) compare)
      "Verify the TYPE message in @var{slice}, which has @var{field ...} ...
(e.g. one or more of get-path-length or put-path-length) and corresponding
/dht:path-element at the end of the message is well-formed -- i.e., check if the length
of @var{slice} corresponds to the size of @var{type} and the get-path-length and
put-path-length.

@var{compare} must be @code{=} if no additional payload follows, or @code{>=}
if an additional payload may follow.  The message type and the size in the
message header is assumed to be correct."
      ;; Warning: slice is evaluated multiple times!
      (and (>= (slice-length slice) (sizeof type '()))
	   (let* ((header (slice/read-only slice 0 (sizeof type '())))
		  (extra-size (- (slice-length slice) (sizeof type '())))
		  (field (read% type '(field) header))
		  ...)
	     (compare extra-size (* (+ field ...) (sizeof /dht:path-element '()))))))

    (define disconnect!
      (make-disconnect! 'distributed-hash-table ; for error messages
			server:dht?))

    (define* (connect config #:key connected disconnected spawn #:rest r)
      "Connect to the DHT service, using the configuration @var{config}.  The
connection is made asynchronuously; the optional thunk @var{connected} is called
when the connection has been made.  The connection can break; the optional thunk
@var{disconnected} is called when it does. If the connection breaks, the client
code automatically tries to reconnect, so @var{connected} can be called after
@var{disconnected}.  This procedure returns a DHT server object."
      (apply spawn-server-loop (make-server)
	     #:make-message-handlers make-message-handlers
	     #:control-message-handler control-message-handler
	     #:configuration config
	     #:service-name "dht"
	     #:initial-extra-loop-arguments (list empty-bbtree empty-bbtree) r))

    ;; TODO: put in new module?
    (define (make-weak-reference to)
      (weak-vector to))
    (define (make-strong-reference to)
      (assert (not (weak-vector? to)))
      to)
    (define (dereference reference)
      (if (weak-vector? reference)
	  (weak-vector-ref reference 0)
	  reference))

    (define* (make-message-handlers loop _1 _2)
      (define (request-search-result-iterator unique-id)
	"Ask @code{control} what is the iterator for the get operation with
unique id @var{unique-id}.  If there is no such get operation, or the get
operation is cancelled, return @code{#false} instead."
	;; TODO: is the 'terminal-condition' case needed?
	(maybe-ask* (loop:terminal-condition loop) (loop:control-channel loop)
		    'request-search-result-iterator unique-id))
      (message-handlers
       (message-handler
	(type (symbol-value message-type msg:dht:monitor:get))
	((interpose exp) exp)
	((well-formed? slice)
	 ;; The C implementation verifies that 'get-path-length' at most
	 ;; (- (expt 2 16) 1), but this seems only to prevent integer overflow,
	 ;; which cannot happen in Scheme due to the use of bignums.
	 ;;
	 ;; This message does _not_ have a payload, so use = instead of >=.
	 (well-formed?/path-length slice /:msg:dht:monitor:get-response
				   (get-path-length) =))
	((handle! slice) ???))
       (message-handler
	(type (symbol-value message-type msg:dht:monitor:get-response))
	((interpose exp) exp)
	((well-formed? slice)
	 ;; Payload follows, hence >= instead of =.
	 (well-formed?/path-length slice /:msg:dht:monitor:get-response
				   (get-path-length put-path-length) >=))
	((handle! slice) ???))
       (message-handler
	(type (symbol-value message-type msg:dht:monitor:put))
	((interpose exp) exp)
	((well-formed? slice)
	 ;; Payload follows, hence >= instead of =.
	 (well-formed?/path-length slice /:msg:dht:monitor:put
				   (put-path-length) >=))
	((handle! slice) ???))
       (message-handler
	(type (symbol-value message-type msg:dht:client:result))
	((interpose exp) exp)
	((well-formed? slice)
	 ;; Actual data follows, hence >= instead of =.
	 (well-formed?/path-length slice /:msg:dht:client:result
				   (get-path-length put-path-length) >=))
	((handle! slice)
	 ;; The DHT service found some data we were looking for.
	 (let^ ((<-- (search-result unique-id)
		     ;; TODO: maybe verify the type and key?
		     (analyse-client-result slice))
		(! handle (request-search-result-iterator unique-id))
		(? (not handle)
		   ;; Perhaps the search object became unreachable;
		   ;; 'process-stop-search' (see next commit) will be
		   ;; called soon to inform the DHT service.
		   (values))
		(? (get? handle)
		   ;; TODO might not be true once monitoring operations
		   ;; are supported.
		   ((get:iterator handle) search-result)))
	       ;; TODO: wrong type (maybe a put handle?).
	       TODO-error-reporting/2)))))

    (define (process-stop-search old-id->operation-map id->operation-map
				 message-queue get)
      ;; TODO: tests!
      ;; TODO: cancel outstanding messages to the DHT services for this
      ;; get operation (including the request to start searching), if
      ;; any.
      (let^ ((! old-id->operation-map
		(bbtree-delete old-id->operation-map (get:unique-id get)))
	     (? (not (bbtree-contains? id->operation-map (get:unique-id get)))
		(values old-id->operation-map id->operation-map))
	     (! id->operation-map
		(bbtree-delete id->operation-map (get:unique-id get))))
	    (send-stop-get! message-queue get)
	    (values old-id->operation-map id->operation-map)))

    (define (control-message-handler message continue continue* message-queue
				     loop old-id->operation-map
				     id->operation-map)
      (define (continue/no-change)
	(continue loop old-id->operation-map id->operation-map))
      (define (k/reconnect!)
	;; Self-check to make sure no information will be lost.
	(assert (= (bbtree-size old-id->operation-map) 0))
	(run-loop loop id->operation-map empty-bbtree))
      (match message
        (('start-get! get)
	 ;; Register the new get operation, such that we remember
	 ;; where to send responses to.
	 (let ((id->operation-map
		(bbtree-set id->operation-map (get:unique-id get)
			    ((if (get:linger? get)
				 make-strong-reference
				 make-weak-reference) get))))
	   ;; (Asynchronuously) send the GET message.
	   (send-get! message-queue get)
	   ;; Continue!
	   (continue loop old-id->operation-map id->operation-map)))
	(('stop-search! get)
	 (let^ ((<-- (old-id->operation-map id->operation-map)
		     (process-stop-search old-id->operation-map
					  id->operation-map message-queue get)))
	       (continue loop old-id->operation-map id->operation-map)))
	(('put! put)
	 ;; Send the put operation to the DHT service.
	 (send-message! message-queue (put:message put))
	 (continue/no-change))
	;; Send by @code{request-search-result-iterator}.
	(('request-search-result-iterator answer-box unique-id)
	 (answer answer-box
		 (and=> (bbtree-ref id->operation-map unique-id) dereference))
	 (continue/no-change))
	(('resend-old-operations!)
	 ;; Restart old operations.  Only get operations need to be submitted
	 ;; again.
	 ;;
	 ;; TODO: restarting monitoring operations
	 (continue loop empty-bbtree
		   (bbtree-fold
		    (lambda (id reference id->operation-map)
		      (let^ ((! get (dereference reference))
			     ;; If the (weak) reference is broken, that means
			     ;; the operation is unreachable, so then there is
			     ;; no point to resending the get operation.
			     (? (not get) id->operation-map)
			     (! id->operation-map
				(bbtree-set id->operation-map id reference)))
			    (send-get! message-queue get)
			    id->operation-map))
		    id->operation-map old-id->operation-map)))
	;; Some handles became unreachable and can be cancelled.
	(('lost . lost)
	 (let next ((lost lost) (old-id->operation-map old-id->operation-map)
		    (id->operation-map id->operation-map))
	   (match lost
	     (() (continue loop old-id->operation-map id->operation-map))
	     ((object . rest)
	      (match object
	        ((? get? get)
		 (let^ ((<-- (old-id->operation-map id->operation-map)
			     (process-stop-search old-id->operation-map
						  id->operation-map
						  message-queue get)))
		       (next rest old-id->operation-map id->operation-map)))
		((? server:dht? server)
		 (continue* '(disconnect!) loop old-id->operation-map
			    id->operation-map)))))))
	(rest (handle-control-message!
	       rest message-queue (loop:terminal-condition loop)
	       k/reconnect!))))

    (define empty-bbtree (make-bbtree <))))