diff options
Diffstat (limited to 'src/gnunet/service/store.go')
-rw-r--r-- | src/gnunet/service/store.go | 293 |
1 files changed, 208 insertions, 85 deletions
diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go index c599c54..5de5415 100644 --- a/src/gnunet/service/store.go +++ b/src/gnunet/service/store.go | |||
@@ -22,17 +22,21 @@ import ( | |||
22 | "context" | 22 | "context" |
23 | "database/sql" | 23 | "database/sql" |
24 | "encoding/binary" | 24 | "encoding/binary" |
25 | "encoding/gob" | ||
25 | "encoding/hex" | 26 | "encoding/hex" |
26 | "errors" | 27 | "errors" |
27 | "fmt" | 28 | "fmt" |
29 | "gnunet/config" | ||
28 | "gnunet/crypto" | 30 | "gnunet/crypto" |
29 | "gnunet/service/dht/blocks" | 31 | "gnunet/service/dht/blocks" |
30 | "gnunet/util" | 32 | "gnunet/util" |
33 | "io" | ||
31 | "io/ioutil" | 34 | "io/ioutil" |
32 | "os" | 35 | "os" |
33 | "strconv" | 36 | "sort" |
34 | "strings" | 37 | "sync" |
35 | 38 | ||
39 | "github.com/bfix/gospel/logger" | ||
36 | redis "github.com/go-redis/redis/v8" | 40 | redis "github.com/go-redis/redis/v8" |
37 | ) | 41 | ) |
38 | 42 | ||
@@ -46,22 +50,20 @@ var ( | |||
46 | //------------------------------------------------------------ | 50 | //------------------------------------------------------------ |
47 | // Generic storage interface. Can be used for persistent or | 51 | // Generic storage interface. Can be used for persistent or |
48 | // transient (caching) storage of key/value data. | 52 | // transient (caching) storage of key/value data. |
49 | // One set of methods (Get/Put) work on DHT queries and blocks, | ||
50 | // the other set (GetS, PutS) work on key/value strings. | ||
51 | // Each custom implementation can decide which sets to support. | ||
52 | //------------------------------------------------------------ | 53 | //------------------------------------------------------------ |
53 | 54 | ||
54 | // Store is a key/value storage where the type of the key is either | 55 | // Store is a key/value storage where the type of the key is either |
55 | // a SHA512 hash value or a string and the value is either a DHT | 56 | // a SHA512 hash value or a string and the value is either a DHT |
56 | // block or a string. | 57 | // block or a string. It is possiblle to mix any key/value types, |
58 | // but not used in this implementation. | ||
57 | type Store[K, V any] interface { | 59 | type Store[K, V any] interface { |
58 | // Put block into storage under given key | 60 | // Put value into storage under given key |
59 | Put(key K, val V) error | 61 | Put(key K, val V) error |
60 | 62 | ||
61 | // Get block with given key from storage | 63 | // Get value with given key from storage |
62 | Get(key K) (V, error) | 64 | Get(key K) (V, error) |
63 | 65 | ||
64 | // List all store queries | 66 | // List all store keys |
65 | List() ([]K, error) | 67 | List() ([]K, error) |
66 | } | 68 | } |
67 | 69 | ||
@@ -78,22 +80,18 @@ type KVStore Store[string, string] | |||
78 | //------------------------------------------------------------ | 80 | //------------------------------------------------------------ |
79 | // NewDHTStore creates a new storage handler with given spec | 81 | // NewDHTStore creates a new storage handler with given spec |
80 | // for use with DHT queries and blocks | 82 | // for use with DHT queries and blocks |
81 | func NewDHTStore(spec string) (DHTStore, error) { | 83 | func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) { |
82 | specs := strings.Split(spec, ":") | 84 | // get the mode parameter |
83 | if len(specs) < 2 { | 85 | mode, ok := config.GetParam[string](spec, "mode") |
86 | if !ok { | ||
84 | return nil, ErrStoreInvalidSpec | 87 | return nil, ErrStoreInvalidSpec |
85 | } | 88 | } |
86 | switch specs[0] { | 89 | switch mode { |
87 | //------------------------------------------------------------------ | 90 | //------------------------------------------------------------------ |
88 | // File-base storage | 91 | // File-base storage |
89 | //------------------------------------------------------------------ | 92 | //------------------------------------------------------------------ |
90 | case "file_store": | 93 | case "file": |
91 | return NewFileStore(specs[1]) | 94 | return NewFileStore(spec) |
92 | case "file_cache": | ||
93 | if len(specs) < 3 { | ||
94 | return nil, ErrStoreInvalidSpec | ||
95 | } | ||
96 | return NewFileCache(specs[1], specs[2]) | ||
97 | } | 95 | } |
98 | return nil, ErrStoreUnknown | 96 | return nil, ErrStoreUnknown |
99 | } | 97 | } |
@@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) { | |||
101 | //------------------------------------------------------------ | 99 | //------------------------------------------------------------ |
102 | // NewKVStore creates a new storage handler with given spec | 100 | // NewKVStore creates a new storage handler with given spec |
103 | // for use with key/value string pairs. | 101 | // for use with key/value string pairs. |
104 | func NewKVStore(spec string) (KVStore, error) { | 102 | func NewKVStore(spec config.ParameterConfig) (KVStore, error) { |
105 | specs := strings.SplitN(spec, ":", 2) | 103 | // get the mode parameter |
106 | if len(specs) < 2 { | 104 | mode, ok := config.GetParam[string](spec, "mode") |
105 | if !ok { | ||
107 | return nil, ErrStoreInvalidSpec | 106 | return nil, ErrStoreInvalidSpec |
108 | } | 107 | } |
109 | switch specs[0] { | 108 | switch mode { |
110 | //-------------------------------------------------------------- | 109 | //-------------------------------------------------------------- |
111 | // Redis service | 110 | // Redis service |
112 | //-------------------------------------------------------------- | 111 | //-------------------------------------------------------------- |
113 | case "redis": | 112 | case "redis": |
114 | if len(specs) < 4 { | 113 | return NewRedisStore(spec) |
115 | return nil, ErrStoreInvalidSpec | ||
116 | } | ||
117 | return NewRedisStore(specs[1], specs[2], specs[3]) | ||
118 | 114 | ||
119 | //-------------------------------------------------------------- | 115 | //-------------------------------------------------------------- |
120 | // SQL database service | 116 | // SQL database service |
121 | //-------------------------------------------------------------- | 117 | //-------------------------------------------------------------- |
122 | case "sql": | 118 | case "sql": |
123 | if len(specs) < 4 { | 119 | return NewSQLStore(spec) |
124 | return nil, ErrStoreInvalidSpec | ||
125 | } | ||
126 | return NewSQLStore(specs[1]) | ||
127 | } | 120 | } |
128 | return nil, errors.New("unknown storage mechanism") | 121 | return nil, errors.New("unknown storage mechanism") |
129 | } | 122 | } |
@@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) { | |||
132 | // Filesystem-based storage | 125 | // Filesystem-based storage |
133 | //------------------------------------------------------------ | 126 | //------------------------------------------------------------ |
134 | 127 | ||
128 | // FileHeader is the layout of a file managed by the storage handler. | ||
129 | // On start-up the file store recreates the list of file entries from | ||
130 | // traversing the actual filesystem. This is done in the background. | ||
131 | type FileHeader struct { | ||
132 | key string // storage key | ||
133 | size uint64 // size of file | ||
134 | btype uint16 // block type | ||
135 | stored util.AbsoluteTime // time added to store | ||
136 | expires util.AbsoluteTime // expiration time | ||
137 | lastUsed util.AbsoluteTime // time last used | ||
138 | usedCount uint64 // usage count | ||
139 | } | ||
140 | |||
135 | // FileStore implements a filesystem-based storage mechanism for | 141 | // FileStore implements a filesystem-based storage mechanism for |
136 | // DHT queries and blocks. | 142 | // DHT queries and blocks. |
137 | type FileStore struct { | 143 | type FileStore struct { |
138 | path string // storage path | 144 | path string // storage path |
139 | cached []*crypto.HashCode // list of cached entries (key) | 145 | cache bool // storage works as cache |
140 | wrPos int // write position in cyclic list | 146 | args config.ParameterConfig // arguments / settings |
147 | |||
148 | totalSize uint64 // total storage size (logical, not physical) | ||
149 | files map[string]*FileHeader // list of file headers | ||
150 | wrPos int // write position in cyclic list | ||
151 | mtx sync.Mutex // serialize operations (prune) | ||
141 | } | 152 | } |
142 | 153 | ||
143 | // NewFileStore instantiates a new file storage. | 154 | // NewFileStore instantiates a new file storage. |
144 | func NewFileStore(path string) (DHTStore, error) { | 155 | func NewFileStore(spec config.ParameterConfig) (DHTStore, error) { |
145 | // create file store | 156 | // get path parameter |
146 | return &FileStore{ | 157 | path, ok := config.GetParam[string](spec, "path") |
147 | path: path, | 158 | if !ok { |
148 | }, nil | 159 | return nil, ErrStoreInvalidSpec |
149 | } | 160 | } |
150 | 161 | isCache, ok := config.GetParam[bool](spec, "cache") | |
151 | // NewFileCache instantiates a new file-based cache. | 162 | if !ok { |
152 | func NewFileCache(path, param string) (DHTStore, error) { | 163 | isCache = false |
164 | } | ||
153 | // remove old cache content | 165 | // remove old cache content |
154 | os.RemoveAll(path) | 166 | if isCache { |
167 | os.RemoveAll(path) | ||
168 | } | ||
169 | // create file store handler | ||
170 | fs := &FileStore{ | ||
171 | path: path, | ||
172 | args: spec, | ||
173 | cache: isCache, | ||
174 | files: make(map[string]*FileHeader), | ||
175 | } | ||
176 | // load file header list | ||
177 | if !isCache { | ||
178 | if fp, err := os.Open(path + "/files.db"); err == nil { | ||
179 | dec := gob.NewDecoder(fp) | ||
180 | for { | ||
181 | hdr := new(FileHeader) | ||
182 | if dec.Decode(hdr) != nil { | ||
183 | if err != io.EOF { | ||
184 | return nil, err | ||
185 | } | ||
186 | break | ||
187 | } | ||
188 | fs.files[hdr.key] = hdr | ||
189 | fs.totalSize += hdr.size | ||
190 | } | ||
191 | fp.Close() | ||
192 | } | ||
193 | } | ||
194 | return fs, nil | ||
195 | } | ||
155 | 196 | ||
156 | // get number of cache entries | 197 | // Close file storage. write metadata to file |
157 | num, err := strconv.ParseUint(param, 10, 32) | 198 | func (s *FileStore) Close() (err error) { |
158 | if err != nil { | 199 | if !s.cache { |
159 | return nil, err | 200 | if fp, err := os.Create(s.path + "/files.db"); err == nil { |
201 | defer fp.Close() | ||
202 | enc := gob.NewEncoder(fp) | ||
203 | for _, hdr := range s.files { | ||
204 | if err = enc.Encode(hdr); err != nil { | ||
205 | break | ||
206 | } | ||
207 | } | ||
208 | } | ||
160 | } | 209 | } |
161 | // create file store | 210 | return |
162 | return &FileStore{ | ||
163 | path: path, | ||
164 | cached: make([]*crypto.HashCode, num), | ||
165 | wrPos: 0, | ||
166 | }, nil | ||
167 | } | 211 | } |
168 | 212 | ||
169 | // Put block into storage under given key | 213 | // Put block into storage under given key |
170 | func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | 214 | func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { |
215 | // check for free space | ||
216 | if s.cache { | ||
217 | // caching is limited by explicit number of files | ||
218 | num, ok := config.GetParam[int](s.args, "num") | ||
219 | if !ok { | ||
220 | num = 100 | ||
221 | } | ||
222 | if len(s.files) >= num { | ||
223 | // make space for at least one new entry | ||
224 | s.prune(1) | ||
225 | } | ||
226 | } else { | ||
227 | // normal storage is limited by quota (default: 10GB) | ||
228 | max, ok := config.GetParam[int](s.args, "maxGB") | ||
229 | if !ok { | ||
230 | max = 10 | ||
231 | } | ||
232 | if int(s.totalSize>>30) > max { | ||
233 | // drop a significant number of blocks | ||
234 | s.prune(20) | ||
235 | } | ||
236 | } | ||
171 | // get query parameters for entry | 237 | // get query parameters for entry |
172 | var ( | 238 | var btype uint16 // block type |
173 | btype uint16 // block type | ||
174 | expire util.AbsoluteTime // block expiration | ||
175 | ) | ||
176 | query.Get("blkType", &btype) | 239 | query.Get("blkType", &btype) |
240 | var expire util.AbsoluteTime // block expiration | ||
177 | query.Get("expire", &expire) | 241 | query.Get("expire", &expire) |
178 | 242 | ||
179 | // are we in caching mode? | ||
180 | if s.cached != nil { | ||
181 | // release previous block if defined | ||
182 | if key := s.cached[s.wrPos]; key != nil { | ||
183 | // get path and filename from key | ||
184 | path, fname := s.expandPath(key) | ||
185 | if err = os.Remove(path + "/" + fname); err != nil { | ||
186 | return | ||
187 | } | ||
188 | // free slot | ||
189 | s.cached[s.wrPos] = nil | ||
190 | } | ||
191 | } | ||
192 | // get path and filename from key | 243 | // get path and filename from key |
193 | path, fname := s.expandPath(query.Key()) | 244 | path, fname := s.expandPath(query.Key()) |
194 | // make sure the path exists | 245 | // make sure the path exists |
@@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | |||
197 | } | 248 | } |
198 | // write to file for storage | 249 | // write to file for storage |
199 | var fp *os.File | 250 | var fp *os.File |
251 | var fpSize int | ||
200 | if fp, err = os.Create(path + "/" + fname); err == nil { | 252 | if fp, err = os.Create(path + "/" + fname); err == nil { |
201 | defer fp.Close() | 253 | defer fp.Close() |
202 | // write block data | 254 | // write block data |
@@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | |||
206 | } | 258 | } |
207 | } | 259 | } |
208 | } | 260 | } |
209 | // update cache list | 261 | // add header to internal list |
210 | if s.cached != nil { | 262 | now := util.AbsoluteTimeNow() |
211 | s.cached[s.wrPos] = query.Key() | 263 | hdr := &FileHeader{ |
212 | s.wrPos = (s.wrPos + 1) % len(s.cached) | 264 | key: hex.EncodeToString(query.Key().Bits), |
265 | size: uint64(fpSize), | ||
266 | btype: btype, | ||
267 | expires: expire, | ||
268 | stored: now, | ||
269 | lastUsed: now, | ||
270 | usedCount: 1, | ||
213 | } | 271 | } |
272 | s.files[hdr.key] = hdr | ||
214 | return | 273 | return |
215 | } | 274 | } |
216 | 275 | ||
@@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode) (string, string) { | |||
258 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] | 317 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] |
259 | } | 318 | } |
260 | 319 | ||
320 | // Prune list of file headers so we drop at least n entries. | ||
321 | // returns number of removed entries. | ||
322 | func (s *FileStore) prune(n int) (del int) { | ||
323 | // get list of headers; remove expired entries on the fly | ||
324 | list := make([]*FileHeader, 0) | ||
325 | for key, hdr := range s.files { | ||
326 | // remove expired entry | ||
327 | if hdr.expires.Expired() { | ||
328 | s.dropFile(key) | ||
329 | del++ | ||
330 | } | ||
331 | // append to list | ||
332 | list = append(list, hdr) | ||
333 | } | ||
334 | // check if we are already done. | ||
335 | if del >= n { | ||
336 | return | ||
337 | } | ||
338 | // sort list by decending rate "(lifetime * size) / usedCount" | ||
339 | sort.Slice(list, func(i, j int) bool { | ||
340 | ri := (list[i].stored.Elapsed().Val * list[i].size) / list[i].usedCount | ||
341 | rj := (list[j].stored.Elapsed().Val * list[j].size) / list[j].usedCount | ||
342 | return ri > rj | ||
343 | }) | ||
344 | // remove from start of list until prune limit is reached | ||
345 | for _, hdr := range list { | ||
346 | s.dropFile(hdr.key) | ||
347 | del++ | ||
348 | if del == n { | ||
349 | break | ||
350 | } | ||
351 | } | ||
352 | return | ||
353 | } | ||
354 | |||
355 | // drop file removes a file from the internal list and the physical storage. | ||
356 | func (s *FileStore) dropFile(key string) { | ||
357 | // remove for internal list | ||
358 | delete(s.files, key) | ||
359 | // remove from filesystem | ||
360 | path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:]) | ||
361 | if err := os.Remove(path); err != nil { | ||
362 | logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) | ||
363 | return | ||
364 | } | ||
365 | } | ||
366 | |||
261 | //------------------------------------------------------------ | 367 | //------------------------------------------------------------ |
262 | // Redis: only use for caching purposes on key/value strings | 368 | // Redis: only use for caching purposes on key/value strings |
263 | //------------------------------------------------------------ | 369 | //------------------------------------------------------------ |
@@ -269,16 +375,28 @@ type RedisStore struct { | |||
269 | } | 375 | } |
270 | 376 | ||
271 | // NewRedisStore creates a Redis service client instance. | 377 | // NewRedisStore creates a Redis service client instance. |
272 | func NewRedisStore(addr, passwd, db string) (s KVStore, err error) { | 378 | func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) { |
273 | kvs := new(RedisStore) | 379 | // get connection parameters |
274 | if kvs.db, err = strconv.Atoi(db); err != nil { | 380 | addr, ok := config.GetParam[string](spec, "addr") |
275 | err = ErrStoreInvalidSpec | 381 | if !ok { |
276 | return | 382 | return nil, ErrStoreInvalidSpec |
383 | } | ||
384 | passwd, ok := config.GetParam[string](spec, "passwd") | ||
385 | if !ok { | ||
386 | passwd = "" | ||
277 | } | 387 | } |
388 | db, ok := config.GetParam[int](spec, "db") | ||
389 | if !ok { | ||
390 | return nil, ErrStoreInvalidSpec | ||
391 | } | ||
392 | |||
393 | // create new Redis store | ||
394 | kvs := new(RedisStore) | ||
395 | kvs.db = db | ||
278 | kvs.client = redis.NewClient(&redis.Options{ | 396 | kvs.client = redis.NewClient(&redis.Options{ |
279 | Addr: addr, | 397 | Addr: addr, |
280 | Password: passwd, | 398 | Password: passwd, |
281 | DB: kvs.db, | 399 | DB: db, |
282 | }) | 400 | }) |
283 | if kvs.client == nil { | 401 | if kvs.client == nil { |
284 | err = ErrStoreNotAvailable | 402 | err = ErrStoreNotAvailable |
@@ -328,11 +446,17 @@ type SQLStore struct { | |||
328 | } | 446 | } |
329 | 447 | ||
330 | // NewSQLStore creates a new SQL-based key/value store. | 448 | // NewSQLStore creates a new SQL-based key/value store. |
331 | func NewSQLStore(spec string) (s KVStore, err error) { | 449 | func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) { |
450 | // get connection parameters | ||
451 | connect, ok := config.GetParam[string](spec, "connect") | ||
452 | if !ok { | ||
453 | return nil, ErrStoreInvalidSpec | ||
454 | } | ||
455 | // create SQL store | ||
332 | kvs := new(SQLStore) | 456 | kvs := new(SQLStore) |
333 | 457 | ||
334 | // connect to SQL database | 458 | // connect to SQL database |
335 | kvs.db, err = util.DbPool.Connect(spec) | 459 | kvs.db, err = util.DbPool.Connect(connect) |
336 | if err != nil { | 460 | if err != nil { |
337 | return nil, err | 461 | return nil, err |
338 | } | 462 | } |
@@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) { | |||
343 | return nil, ErrStoreNotAvailable | 467 | return nil, ErrStoreNotAvailable |
344 | } | 468 | } |
345 | return kvs, nil | 469 | return kvs, nil |
346 | |||
347 | } | 470 | } |
348 | 471 | ||
349 | // Put a key/value pair into the store | 472 | // Put a key/value pair into the store |