aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxime Devos <maximedevos@telenet.be>2021-09-20 12:28:52 +0200
committerMaxime Devos <maximedevos@telenet.be>2021-09-21 13:28:03 +0200
commit008e48e11c24b9a82e725c55e00c7e0279e162af (patch)
treef79cbcf594e3d7375f96e972ebafe7d5fa6583be
parent7279d5d258b608471c6343b36213a4d680b5381e (diff)
downloadgnunet-scheme-008e48e11c24b9a82e725c55e00c7e0279e162af.tar.gz
gnunet-scheme-008e48e11c24b9a82e725c55e00c7e0279e162af.zip
dht/client: Send PUT messages.
* gnu/gnunet/dht/client.scm (<server>)[new-put-operationzs,new-put-operations-trigger]: New record type. (<put>): New record type. (canonical-block-type): Extract from ... (start-get!): ... here. (put!): New procedure. (connect): Set new '<put>' fields and 'reconnect' arguments. (reconnect)[error-handler]: Pass new arguments when reconnecting. (process-new-put-operations): New fiber. * examples/web.scm: Insert and retrieve something.
-rw-r--r--examples/web.scm13
-rw-r--r--gnu/gnunet/dht/client.scm89
2 files changed, 91 insertions, 11 deletions
diff --git a/examples/web.scm b/examples/web.scm
index be4fa7c..dde6446 100644
--- a/examples/web.scm
+++ b/examples/web.scm
@@ -6,6 +6,10 @@
6;; without any warranty. 6;; without any warranty.
7 7
8(use-modules (fibers) 8(use-modules (fibers)
9 (rnrs bytevectors)
10 (gnu extractor enum)
11 (gnu gnunet block)
12 (gnu gnunet utils bv-slice)
9 (gnu gnunet config db) 13 (gnu gnunet config db)
10 (gnu gnunet config fs) 14 (gnu gnunet config fs)
11 (rnrs hashtables) 15 (rnrs hashtables)
@@ -36,6 +40,15 @@
36 (define server (open-server impl `(#:port 8089))) 40 (define server (open-server impl `(#:port 8089)))
37 (define (url-handler* request body) 41 (define (url-handler* request body)
38 (url-handler nse-server request body)) 42 (url-handler nse-server request body))
43 ;; TODO: Form to start GET and PUT requests?
44 ;; For now, hard code the data to insert.
45 (dht:put! dht-server
46 (symbol-value block-type block:test)
47 (bv-slice/read-write (make-bytevector 64))
48 (bv-slice/read-write #vu8(#xde #xad #xbe #xef)))
49 (dht:start-get! dht-server
50 (symbol-value block-type block:test)
51 (bv-slice/read-write (make-bytevector 64)) pk)
39 (let loop () 52 (let loop ()
40 (let-values (((client request body) 53 (let-values (((client request body)
41 (read-client impl server))) 54 (read-client impl server)))
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index 0a03ad1..25aa880 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -63,7 +63,8 @@
63 (only (gnu gnunet netstruct syntactic) 63 (only (gnu gnunet netstruct syntactic)
64 read% sizeof set%! select) 64 read% sizeof set%! select)
65 (only (gnu gnunet utils bv-slice) 65 (only (gnu gnunet utils bv-slice)
66 slice-length slice/read-only make-slice/read-write slice-copy!) 66 slice-length slice/read-only make-slice/read-write slice-copy!
67 slice-slice)
67 (only (rnrs base) 68 (only (rnrs base)
68 and >= = quote * + - define begin ... let* 69 and >= = quote * + - define begin ... let*
69 quote case else values apply let cond if > 70 quote case else values apply let cond if >
@@ -87,6 +88,13 @@
87 ;; responsible for processing the new get operations. 88 ;; responsible for processing the new get operations.
88 (immutable new-get-operaton-trigger 89 (immutable new-get-operaton-trigger
89 server-new-get-operation-trigger) 90 server-new-get-operation-trigger)
91 ;; Hash table from new <put> to #true. These put operations
92 ;; are not yet sent to the service, and not yet queued for
93 ;; sending.
94 (immutable new-put-operations
95 server-new-put-operations)
96 (immutable new-put-operation-trigger
97 server-new-put-operation-trigger)
90 ;; Atomic box holding an unsigned 64-bit integer. 98 ;; Atomic box holding an unsigned 64-bit integer.
91 (immutable next-unique-id/box server-next-unique-id/box))) 99 (immutable next-unique-id/box server-next-unique-id/box)))
92 100
@@ -100,6 +108,12 @@
100 (immutable type get:type) 108 (immutable type get:type)
101 (immutable options get:options))) 109 (immutable options get:options)))
102 110
111 (define-record-type (<put> %make-put put?)
112 (fields (immutable server put:server)
113 (immutable inserted put:inserted) ; thunk
114 ;; bytevector slice (/:msg:dht:client:put)
115 (immutable message put:message)))
116
103 (define (send-get! mq get) 117 (define (send-get! mq get)
104 "Send a GET message for @var{get}." 118 "Send a GET message for @var{get}."
105 (pk 'new get) 119 (pk 'new get)
@@ -134,6 +148,17 @@
134 expected 148 expected
135 (loop actual))))) 149 (loop actual)))))
136 150
151 (define (canonical-block-type type)
152 "Return the numeric value of the block type @var{type}
153(a @code{block-type?} or in-bounds integer)."
154 (cond ((integer? type)
155 (unless (and (<= 0 type (- (expt 2 32) 1)))
156 (error "block type out of bounds"))
157 type)
158 (#t
159 (assert (block-type? type))
160 (value->index type))))
161
137 (define* (start-get! server type key found 162 (define* (start-get! server type key found
138 #:key (desired-replication-level 3)) 163 #:key (desired-replication-level 3))
139 "Perform an asynchronous GET operation on the DHT, and return a handle 164 "Perform an asynchronous GET operation on the DHT, and return a handle
@@ -141,26 +166,48 @@ to control the GET operation. Search for a block of type @var{type} (a
141@code{block-type} or its numeric value) and key @var{key}, a readable bytevector 166@code{block-type} or its numeric value) and key @var{key}, a readable bytevector
142slice. Call @var{found} on every search result." 167slice. Call @var{found} on every search result."
143 ;; TODO: options, xquery ... 168 ;; TODO: options, xquery ...
144 (define canonical-type
145 (cond ((integer? type)
146 (unless (and (<= 0 type (- (expt 2 32) 1)))
147 (error "block type out of bounds")
148 type))
149 (#t
150 (assert (block-type? type))
151 (value->index type))))
152 (unless (= (slice-length key) (sizeof /hashcode:512 '())) 169 (unless (= (slice-length key) (sizeof /hashcode:512 '()))
153 (error "length of key incorrect")) 170 (error "length of key incorrect"))
154 (define handle (%make-get server found (slice/read-only key) 171 (define handle (%make-get server found (slice/read-only key)
155 (fresh-id server) 172 (fresh-id server)
156 desired-replication-level 173 desired-replication-level
157 type 174 (canonical-block-type type)
158 0)) ; TODO 175 0)) ; TODO
159 (hashq-set! (server-new-get-operations server) handle #t) 176 (hashq-set! (server-new-get-operations server) handle #t)
160 ;; Asynchronuously process the new get request. 177 ;; Asynchronuously process the new get request.
161 (trigger-condition! (server-new-get-operation-trigger server)) 178 (trigger-condition! (server-new-get-operation-trigger server))
162 handle) 179 handle)
163 180
181 (define* (put! server type key data #:key (desired-replication-level 3)
182 (confirmed values))
183 "Perform an asynchronous PUT operation on the DHT, inserting @var{data}
184(a readable bytevector slice) under @var{key} (a readable bytevector slice
185holding a @code{/hashcode:512}). The block type is @var{type} (a
186@code{block-type} or its numeric value).
187
188TODO expiration, replication, confirm ..."
189 ;; Prepare the message to send.
190 (define put-message
191 (make-slice/read-write (+ (sizeof /:msg:dht:client:put '())
192 (slice-length data))))
193 (define meta (slice-slice put-message 0
194 (sizeof /:msg:dht:client:put '())))
195 (set%! /:msg:dht:client:put '(header type) meta
196 (value->index (symbol-value message-type msg:dht:client:put)))
197 (set%! /:msg:dht:client:put '(header size) meta (slice-length put-message))
198 (set%! /:msg:dht:client:put '(type) meta (pk 'can (canonical-block-type type)))
199 (set%! /:msg:dht:client:put '(option) meta 0) ; TODO
200 (set%! /:msg:dht:client:put '(desired-replication-level) meta
201 desired-replication-level)
202 (set%! /:msg:dht:client:put '(expiration) meta 0) ; TODO
203 ;; Copy data to insert into the DHT.
204 (slice-copy! data
205 (slice-slice put-message (sizeof /:msg:dht:client:put '())))
206 (define handle (%make-put server confirmed put-message))
207 (hashq-set! (server-new-put-operations server) handle #t)
208 (trigger-condition! (server-new-put-operation-trigger server))
209 handle)
210
164 (define-syntax-rule (well-formed?/path-length slice type (field ...) compare) 211 (define-syntax-rule (well-formed?/path-length slice type (field ...) compare)
165 "Verify the TYPE message in @var{slice}, which has @var{field ...} ... 212 "Verify the TYPE message in @var{slice}, which has @var{field ...} ...
166(e.g. one or more of get-path-length or put-path-length) and corresponding 213(e.g. one or more of get-path-length or put-path-length) and corresponding
@@ -193,15 +240,20 @@ even if not connected. This is an idempotent operation."
193 (define request-close-condition (make-condition)) 240 (define request-close-condition (make-condition))
194 (define new-get-operation-trigger (make-repeated-condition)) 241 (define new-get-operation-trigger (make-repeated-condition))
195 (define new-get-operations (make-hash-table)) 242 (define new-get-operations (make-hash-table))
243 (define new-put-operation-trigger (make-repeated-condition))
244 (define new-put-operations (make-hash-table))
196 (reconnect new-get-operations new-get-operation-trigger 245 (reconnect new-get-operations new-get-operation-trigger
246 new-put-operations new-put-operation-trigger
197 request-close?/box request-close-condition config 247 request-close?/box request-close-condition config
198 #:spawn spawn) 248 #:spawn spawn)
199 (%make-server request-close?/box request-close-condition 249 (%make-server request-close?/box request-close-condition
200 new-get-operations new-get-operation-trigger 250 new-get-operations new-get-operation-trigger
251 new-put-operations new-put-operation-trigger
201 ;; Any ‘small’ exact natural number will do. 252 ;; Any ‘small’ exact natural number will do.
202 (make-atomic-box 0))) 253 (make-atomic-box 0)))
203 254
204 (define* (reconnect new-get-operations new-get-operation-trigger 255 (define* (reconnect new-get-operations new-get-operation-trigger
256 new-put-operations new-put-operation-trigger
205 request-close?/box request-close-condition config 257 request-close?/box request-close-condition config
206 #:key (spawn spawn-fiber) 258 #:key (spawn spawn-fiber)
207 #:rest rest) 259 #:rest rest)
@@ -253,7 +305,9 @@ even if not connected. This is an idempotent operation."
253 ((input:regular-end-of-file input:premature-end-of-file) 305 ((input:regular-end-of-file input:premature-end-of-file)
254 (signal-condition! mq-closed) 306 (signal-condition! mq-closed)
255 (unless (atomic-box-ref request-close?/box) 307 (unless (atomic-box-ref request-close?/box)
256 (apply reconnect new-get-operations new-get-operation-trigger 308 (apply reconnect
309 new-get-operations new-get-operation-trigger
310 new-put-operations new-put-operation-trigger
257 request-close?/box request-close-condition 311 request-close?/box request-close-condition
258 config rest))) 312 config rest)))
259 ((connection:interrupted) 313 ((connection:interrupted)
@@ -282,9 +336,22 @@ to the DHT service."
282 ;; TODO reconnection, closing queues and cancelling get operations, 336 ;; TODO reconnection, closing queues and cancelling get operations,
283 ;; processing answers ... 337 ;; processing answers ...
284 (process-new-get-operations)) 338 (process-new-get-operations))
339 ;; TODO: remove duplication with process-new-get-operations
340 (define (process-new-put-operations)
341 (await-trigger! new-put-operation-trigger)
342 ;; Extract the latest new put operations
343 (define new (hash-map->list (lambda (put _) put) new-put-operations))
344 ;; And remove them from the hash table
345 (for-each (lambda (put) (hashq-remove! new-put-operations put)) new)
346 ;; and (asynchronuously) sent the PUT message
347 (for-each (lambda (put) (send-message! mq (put:message put))) new)
348 ;; TODO notify-sent callbacks, closing queues, cancelling put operations,
349 ;; processing answers ...
350 (process-new-put-operations))
285 (define mq (connect/fibers config "dht" handlers error-handler 351 (define mq (connect/fibers config "dht" handlers error-handler
286 #:spawn spawn)) 352 #:spawn spawn))
287 (spawn request-close-handler) 353 (spawn request-close-handler)
288 (spawn process-new-get-operations) 354 (spawn process-new-get-operations)
355 (spawn process-new-put-operations)
289 ;; TODO: use new-get-operations 356 ;; TODO: use new-get-operations
290 'todo))) 357 'todo)))