diff options
author | Maxime Devos <maximedevos@telenet.be> | 2021-09-20 12:28:52 +0200 |
---|---|---|
committer | Maxime Devos <maximedevos@telenet.be> | 2021-09-21 13:28:03 +0200 |
commit | 008e48e11c24b9a82e725c55e00c7e0279e162af (patch) | |
tree | f79cbcf594e3d7375f96e972ebafe7d5fa6583be | |
parent | 7279d5d258b608471c6343b36213a4d680b5381e (diff) | |
download | gnunet-scheme-008e48e11c24b9a82e725c55e00c7e0279e162af.tar.gz gnunet-scheme-008e48e11c24b9a82e725c55e00c7e0279e162af.zip |
dht/client: Send PUT messages.
* gnu/gnunet/dht/client.scm
(<server>)[new-put-operationzs,new-put-operations-trigger]: New
record type.
(<put>): New record type.
(canonical-block-type): Extract from ...
(start-get!): ... here.
(put!): New procedure.
(connect): Set new '<put>' fields and 'reconnect' arguments.
(reconnect)[error-handler]: Pass new arguments when reconnecting.
(process-new-put-operations): New fiber.
* examples/web.scm: Insert and retrieve something.
-rw-r--r-- | examples/web.scm | 13 | ||||
-rw-r--r-- | gnu/gnunet/dht/client.scm | 89 |
2 files changed, 91 insertions, 11 deletions
diff --git a/examples/web.scm b/examples/web.scm index be4fa7c..dde6446 100644 --- a/examples/web.scm +++ b/examples/web.scm | |||
@@ -6,6 +6,10 @@ | |||
6 | ;; without any warranty. | 6 | ;; without any warranty. |
7 | 7 | ||
8 | (use-modules (fibers) | 8 | (use-modules (fibers) |
9 | (rnrs bytevectors) | ||
10 | (gnu extractor enum) | ||
11 | (gnu gnunet block) | ||
12 | (gnu gnunet utils bv-slice) | ||
9 | (gnu gnunet config db) | 13 | (gnu gnunet config db) |
10 | (gnu gnunet config fs) | 14 | (gnu gnunet config fs) |
11 | (rnrs hashtables) | 15 | (rnrs hashtables) |
@@ -36,6 +40,15 @@ | |||
36 | (define server (open-server impl `(#:port 8089))) | 40 | (define server (open-server impl `(#:port 8089))) |
37 | (define (url-handler* request body) | 41 | (define (url-handler* request body) |
38 | (url-handler nse-server request body)) | 42 | (url-handler nse-server request body)) |
43 | ;; TODO: Form to start GET and PUT requests? | ||
44 | ;; For now, hard code the data to insert. | ||
45 | (dht:put! dht-server | ||
46 | (symbol-value block-type block:test) | ||
47 | (bv-slice/read-write (make-bytevector 64)) | ||
48 | (bv-slice/read-write #vu8(#xde #xad #xbe #xef))) | ||
49 | (dht:start-get! dht-server | ||
50 | (symbol-value block-type block:test) | ||
51 | (bv-slice/read-write (make-bytevector 64)) pk) | ||
39 | (let loop () | 52 | (let loop () |
40 | (let-values (((client request body) | 53 | (let-values (((client request body) |
41 | (read-client impl server))) | 54 | (read-client impl server))) |
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm index 0a03ad1..25aa880 100644 --- a/gnu/gnunet/dht/client.scm +++ b/gnu/gnunet/dht/client.scm | |||
@@ -63,7 +63,8 @@ | |||
63 | (only (gnu gnunet netstruct syntactic) | 63 | (only (gnu gnunet netstruct syntactic) |
64 | read% sizeof set%! select) | 64 | read% sizeof set%! select) |
65 | (only (gnu gnunet utils bv-slice) | 65 | (only (gnu gnunet utils bv-slice) |
66 | slice-length slice/read-only make-slice/read-write slice-copy!) | 66 | slice-length slice/read-only make-slice/read-write slice-copy! |
67 | slice-slice) | ||
67 | (only (rnrs base) | 68 | (only (rnrs base) |
68 | and >= = quote * + - define begin ... let* | 69 | and >= = quote * + - define begin ... let* |
69 | quote case else values apply let cond if > | 70 | quote case else values apply let cond if > |
@@ -87,6 +88,13 @@ | |||
87 | ;; responsible for processing the new get operations. | 88 | ;; responsible for processing the new get operations. |
88 | (immutable new-get-operaton-trigger | 89 | (immutable new-get-operaton-trigger |
89 | server-new-get-operation-trigger) | 90 | server-new-get-operation-trigger) |
91 | ;; Hash table from new <put> to #true. These put operations | ||
92 | ;; are not yet sent to the service, and not yet queued for | ||
93 | ;; sending. | ||
94 | (immutable new-put-operations | ||
95 | server-new-put-operations) | ||
96 | (immutable new-put-operation-trigger | ||
97 | server-new-put-operation-trigger) | ||
90 | ;; Atomic box holding an unsigned 64-bit integer. | 98 | ;; Atomic box holding an unsigned 64-bit integer. |
91 | (immutable next-unique-id/box server-next-unique-id/box))) | 99 | (immutable next-unique-id/box server-next-unique-id/box))) |
92 | 100 | ||
@@ -100,6 +108,12 @@ | |||
100 | (immutable type get:type) | 108 | (immutable type get:type) |
101 | (immutable options get:options))) | 109 | (immutable options get:options))) |
102 | 110 | ||
111 | (define-record-type (<put> %make-put put?) | ||
112 | (fields (immutable server put:server) | ||
113 | (immutable inserted put:inserted) ; thunk | ||
114 | ;; bytevector slice (/:msg:dht:client:put) | ||
115 | (immutable message put:message))) | ||
116 | |||
103 | (define (send-get! mq get) | 117 | (define (send-get! mq get) |
104 | "Send a GET message for @var{get}." | 118 | "Send a GET message for @var{get}." |
105 | (pk 'new get) | 119 | (pk 'new get) |
@@ -134,6 +148,17 @@ | |||
134 | expected | 148 | expected |
135 | (loop actual))))) | 149 | (loop actual))))) |
136 | 150 | ||
151 | (define (canonical-block-type type) | ||
152 | "Return the numeric value of the block type @var{type} | ||
153 | (a @code{block-type?} or in-bounds integer)." | ||
154 | (cond ((integer? type) | ||
155 | (unless (and (<= 0 type (- (expt 2 32) 1))) | ||
156 | (error "block type out of bounds")) | ||
157 | type) | ||
158 | (#t | ||
159 | (assert (block-type? type)) | ||
160 | (value->index type)))) | ||
161 | |||
137 | (define* (start-get! server type key found | 162 | (define* (start-get! server type key found |
138 | #:key (desired-replication-level 3)) | 163 | #:key (desired-replication-level 3)) |
139 | "Perform an asynchronous GET operation on the DHT, and return a handle | 164 | "Perform an asynchronous GET operation on the DHT, and return a handle |
@@ -141,26 +166,48 @@ to control the GET operation. Search for a block of type @var{type} (a | |||
141 | @code{block-type} or its numeric value) and key @var{key}, a readable bytevector | 166 | @code{block-type} or its numeric value) and key @var{key}, a readable bytevector |
142 | slice. Call @var{found} on every search result." | 167 | slice. Call @var{found} on every search result." |
143 | ;; TODO: options, xquery ... | 168 | ;; TODO: options, xquery ... |
144 | (define canonical-type | ||
145 | (cond ((integer? type) | ||
146 | (unless (and (<= 0 type (- (expt 2 32) 1))) | ||
147 | (error "block type out of bounds") | ||
148 | type)) | ||
149 | (#t | ||
150 | (assert (block-type? type)) | ||
151 | (value->index type)))) | ||
152 | (unless (= (slice-length key) (sizeof /hashcode:512 '())) | 169 | (unless (= (slice-length key) (sizeof /hashcode:512 '())) |
153 | (error "length of key incorrect")) | 170 | (error "length of key incorrect")) |
154 | (define handle (%make-get server found (slice/read-only key) | 171 | (define handle (%make-get server found (slice/read-only key) |
155 | (fresh-id server) | 172 | (fresh-id server) |
156 | desired-replication-level | 173 | desired-replication-level |
157 | type | 174 | (canonical-block-type type) |
158 | 0)) ; TODO | 175 | 0)) ; TODO |
159 | (hashq-set! (server-new-get-operations server) handle #t) | 176 | (hashq-set! (server-new-get-operations server) handle #t) |
160 | ;; Asynchronuously process the new get request. | 177 | ;; Asynchronuously process the new get request. |
161 | (trigger-condition! (server-new-get-operation-trigger server)) | 178 | (trigger-condition! (server-new-get-operation-trigger server)) |
162 | handle) | 179 | handle) |
163 | 180 | ||
181 | (define* (put! server type key data #:key (desired-replication-level 3) | ||
182 | (confirmed values)) | ||
183 | "Perform an asynchronous PUT operation on the DHT, inserting @var{data} | ||
184 | (a readable bytevector slice) under @var{key} (a readable bytevector slice | ||
185 | holding a @code{/hashcode:512}). The block type is @var{type} (a | ||
186 | @code{block-type} or its numeric value). | ||
187 | |||
188 | TODO expiration, replication, confirm ..." | ||
189 | ;; Prepare the message to send. | ||
190 | (define put-message | ||
191 | (make-slice/read-write (+ (sizeof /:msg:dht:client:put '()) | ||
192 | (slice-length data)))) | ||
193 | (define meta (slice-slice put-message 0 | ||
194 | (sizeof /:msg:dht:client:put '()))) | ||
195 | (set%! /:msg:dht:client:put '(header type) meta | ||
196 | (value->index (symbol-value message-type msg:dht:client:put))) | ||
197 | (set%! /:msg:dht:client:put '(header size) meta (slice-length put-message)) | ||
198 | (set%! /:msg:dht:client:put '(type) meta (pk 'can (canonical-block-type type))) | ||
199 | (set%! /:msg:dht:client:put '(option) meta 0) ; TODO | ||
200 | (set%! /:msg:dht:client:put '(desired-replication-level) meta | ||
201 | desired-replication-level) | ||
202 | (set%! /:msg:dht:client:put '(expiration) meta 0) ; TODO | ||
203 | ;; Copy data to insert into the DHT. | ||
204 | (slice-copy! data | ||
205 | (slice-slice put-message (sizeof /:msg:dht:client:put '()))) | ||
206 | (define handle (%make-put server confirmed put-message)) | ||
207 | (hashq-set! (server-new-put-operations server) handle #t) | ||
208 | (trigger-condition! (server-new-put-operation-trigger server)) | ||
209 | handle) | ||
210 | |||
164 | (define-syntax-rule (well-formed?/path-length slice type (field ...) compare) | 211 | (define-syntax-rule (well-formed?/path-length slice type (field ...) compare) |
165 | "Verify the TYPE message in @var{slice}, which has @var{field ...} ... | 212 | "Verify the TYPE message in @var{slice}, which has @var{field ...} ... |
166 | (e.g. one or more of get-path-length or put-path-length) and corresponding | 213 | (e.g. one or more of get-path-length or put-path-length) and corresponding |
@@ -193,15 +240,20 @@ even if not connected. This is an idempotent operation." | |||
193 | (define request-close-condition (make-condition)) | 240 | (define request-close-condition (make-condition)) |
194 | (define new-get-operation-trigger (make-repeated-condition)) | 241 | (define new-get-operation-trigger (make-repeated-condition)) |
195 | (define new-get-operations (make-hash-table)) | 242 | (define new-get-operations (make-hash-table)) |
243 | (define new-put-operation-trigger (make-repeated-condition)) | ||
244 | (define new-put-operations (make-hash-table)) | ||
196 | (reconnect new-get-operations new-get-operation-trigger | 245 | (reconnect new-get-operations new-get-operation-trigger |
246 | new-put-operations new-put-operation-trigger | ||
197 | request-close?/box request-close-condition config | 247 | request-close?/box request-close-condition config |
198 | #:spawn spawn) | 248 | #:spawn spawn) |
199 | (%make-server request-close?/box request-close-condition | 249 | (%make-server request-close?/box request-close-condition |
200 | new-get-operations new-get-operation-trigger | 250 | new-get-operations new-get-operation-trigger |
251 | new-put-operations new-put-operation-trigger | ||
201 | ;; Any ‘small’ exact natural number will do. | 252 | ;; Any ‘small’ exact natural number will do. |
202 | (make-atomic-box 0))) | 253 | (make-atomic-box 0))) |
203 | 254 | ||
204 | (define* (reconnect new-get-operations new-get-operation-trigger | 255 | (define* (reconnect new-get-operations new-get-operation-trigger |
256 | new-put-operations new-put-operation-trigger | ||
205 | request-close?/box request-close-condition config | 257 | request-close?/box request-close-condition config |
206 | #:key (spawn spawn-fiber) | 258 | #:key (spawn spawn-fiber) |
207 | #:rest rest) | 259 | #:rest rest) |
@@ -253,7 +305,9 @@ even if not connected. This is an idempotent operation." | |||
253 | ((input:regular-end-of-file input:premature-end-of-file) | 305 | ((input:regular-end-of-file input:premature-end-of-file) |
254 | (signal-condition! mq-closed) | 306 | (signal-condition! mq-closed) |
255 | (unless (atomic-box-ref request-close?/box) | 307 | (unless (atomic-box-ref request-close?/box) |
256 | (apply reconnect new-get-operations new-get-operation-trigger | 308 | (apply reconnect |
309 | new-get-operations new-get-operation-trigger | ||
310 | new-put-operations new-put-operation-trigger | ||
257 | request-close?/box request-close-condition | 311 | request-close?/box request-close-condition |
258 | config rest))) | 312 | config rest))) |
259 | ((connection:interrupted) | 313 | ((connection:interrupted) |
@@ -282,9 +336,22 @@ to the DHT service." | |||
282 | ;; TODO reconnection, closing queues and cancelling get operations, | 336 | ;; TODO reconnection, closing queues and cancelling get operations, |
283 | ;; processing answers ... | 337 | ;; processing answers ... |
284 | (process-new-get-operations)) | 338 | (process-new-get-operations)) |
339 | ;; TODO: remove duplication with process-new-get-operations | ||
340 | (define (process-new-put-operations) | ||
341 | (await-trigger! new-put-operation-trigger) | ||
342 | ;; Extract the latest new put operations | ||
343 | (define new (hash-map->list (lambda (put _) put) new-put-operations)) | ||
344 | ;; And remove them from the hash table | ||
345 | (for-each (lambda (put) (hashq-remove! new-put-operations put)) new) | ||
346 | ;; and (asynchronuously) sent the PUT message | ||
347 | (for-each (lambda (put) (send-message! mq (put:message put))) new) | ||
348 | ;; TODO notify-sent callbacks, closing queues, cancelling put operations, | ||
349 | ;; processing answers ... | ||
350 | (process-new-put-operations)) | ||
285 | (define mq (connect/fibers config "dht" handlers error-handler | 351 | (define mq (connect/fibers config "dht" handlers error-handler |
286 | #:spawn spawn)) | 352 | #:spawn spawn)) |
287 | (spawn request-close-handler) | 353 | (spawn request-close-handler) |
288 | (spawn process-new-get-operations) | 354 | (spawn process-new-get-operations) |
355 | (spawn process-new-put-operations) | ||
289 | ;; TODO: use new-get-operations | 356 | ;; TODO: use new-get-operations |
290 | 'todo))) | 357 | 'todo))) |