diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2022-06-11 20:21:38 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2022-06-11 20:21:38 +0200 |
commit | 4261e07def81e7c3eb183b9d5c4059a2e9c53759 (patch) | |
tree | 81334bb74e14305dd5274b671d8e351defdfa242 /src | |
parent | 8f8feaf176e62f14a6d449c0a2fb6f0ca76b22b8 (diff) | |
download | gnunet-go-4261e07def81e7c3eb183b9d5c4059a2e9c53759.tar.gz gnunet-go-4261e07def81e7c3eb183b9d5c4059a2e9c53759.zip |
File/block caching strategy improved.v0.1.27
Diffstat (limited to 'src')
-rw-r--r-- | src/gnunet/config/config.go | 33 | ||||
-rw-r--r-- | src/gnunet/config/gnunet-config.json | 25 | ||||
-rw-r--r-- | src/gnunet/service/dht/dhtstore_test.go | 19 | ||||
-rw-r--r-- | src/gnunet/service/dht/module.go | 4 | ||||
-rw-r--r-- | src/gnunet/service/store.go | 293 |
5 files changed, 266 insertions, 108 deletions
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go index a92bdd6..b4d2840 100644 --- a/src/gnunet/config/config.go +++ b/src/gnunet/config/config.go | |||
@@ -94,14 +94,33 @@ type GNSConfig struct { | |||
94 | } | 94 | } |
95 | 95 | ||
96 | //---------------------------------------------------------------------- | 96 | //---------------------------------------------------------------------- |
97 | // Generic parameter configuration (handle any key/value settings) | ||
98 | //---------------------------------------------------------------------- | ||
99 | |||
100 | // ParameterConfig handle arbitrary values for a key strings. This necessary | ||
101 | // e.g. in the 'Storage' configuration, as custom storage implementations | ||
102 | // require different sets of parameters. | ||
103 | type ParameterConfig map[string]any | ||
104 | |||
105 | // Get a parameter value with given type 'V' | ||
106 | func GetParam[V any](params ParameterConfig, key string) (i V, ok bool) { | ||
107 | var v any | ||
108 | if v, ok = params[key]; ok { | ||
109 | if i, ok = v.(V); ok { | ||
110 | return | ||
111 | } | ||
112 | } | ||
113 | return | ||
114 | } | ||
115 | |||
116 | //---------------------------------------------------------------------- | ||
97 | // DHT configuration | 117 | // DHT configuration |
98 | //---------------------------------------------------------------------- | 118 | //---------------------------------------------------------------------- |
99 | 119 | ||
100 | // DHTConfig contains parameters for the distributed hash table (DHT) | 120 | // DHTConfig contains parameters for the distributed hash table (DHT) |
101 | type DHTConfig struct { | 121 | type DHTConfig struct { |
102 | Service *ServiceConfig `json:"service"` // socket for DHT service | 122 | Service *ServiceConfig `json:"service"` // socket for DHT service |
103 | Storage string `json:"storage"` // filesystem storage location | 123 | Storage ParameterConfig `json:"storage"` // filesystem storage location |
104 | Cache string `json:"cache"` // key/value cache | ||
105 | } | 124 | } |
106 | 125 | ||
107 | //---------------------------------------------------------------------- | 126 | //---------------------------------------------------------------------- |
@@ -110,8 +129,8 @@ type DHTConfig struct { | |||
110 | 129 | ||
111 | // NamecacheConfig contains parameters for the local name cache | 130 | // NamecacheConfig contains parameters for the local name cache |
112 | type NamecacheConfig struct { | 131 | type NamecacheConfig struct { |
113 | Service *ServiceConfig `json:"service"` // socket for Namecache service | 132 | Service *ServiceConfig `json:"service"` // socket for Namecache service |
114 | Storage string `json:"storage"` // key/value cache | 133 | Storage ParameterConfig `json:"storage"` // key/value cache |
115 | } | 134 | } |
116 | 135 | ||
117 | //---------------------------------------------------------------------- | 136 | //---------------------------------------------------------------------- |
@@ -120,8 +139,8 @@ type NamecacheConfig struct { | |||
120 | 139 | ||
121 | // RevocationConfig contains parameters for the key revocation service | 140 | // RevocationConfig contains parameters for the key revocation service |
122 | type RevocationConfig struct { | 141 | type RevocationConfig struct { |
123 | Service *ServiceConfig `json:"service"` // socket for Revocation service | 142 | Service *ServiceConfig `json:"service"` // socket for Revocation service |
124 | Storage string `json:"storage"` // persistance mechanism for revocation data | 143 | Storage ParameterConfig `json:"storage"` // persistance mechanism for revocation data |
125 | } | 144 | } |
126 | 145 | ||
127 | //---------------------------------------------------------------------- | 146 | //---------------------------------------------------------------------- |
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json index 7927cda..167bfa0 100644 --- a/src/gnunet/config/gnunet-config.json +++ b/src/gnunet/config/gnunet-config.json | |||
@@ -11,7 +11,7 @@ | |||
11 | { | 11 | { |
12 | "id": "test", | 12 | "id": "test", |
13 | "network": "ip+udp", | 13 | "network": "ip+udp", |
14 | "address": "upnp:192.168.178.1", | 14 | "address": "upnp:192.168.134.1", |
15 | "port": 6666, | 15 | "port": 6666, |
16 | "ttl": 86400 | 16 | "ttl": 86400 |
17 | } | 17 | } |
@@ -28,8 +28,12 @@ | |||
28 | "perm": "0770" | 28 | "perm": "0770" |
29 | } | 29 | } |
30 | }, | 30 | }, |
31 | "storage": "dht_file_store+/var/lib/gnunet/dht/store", | 31 | "storage": { |
32 | "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" | 32 | "mode": "file", |
33 | "cache": false, | ||
34 | "path": "/var/lib/gnunet/dht/store", | ||
35 | "maxGB": 10 | ||
36 | } | ||
33 | }, | 37 | }, |
34 | "gns": { | 38 | "gns": { |
35 | "service": { | 39 | "service": { |
@@ -48,7 +52,13 @@ | |||
48 | "perm": "0770" | 52 | "perm": "0770" |
49 | } | 53 | } |
50 | }, | 54 | }, |
51 | "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" | 55 | "storage": { |
56 | "mode": "file", | ||
57 | "cache": true, | ||
58 | "path": "/var/lib/gnunet/namecache", | ||
59 | "num": 1000, | ||
60 | "expire": 43200 | ||
61 | } | ||
52 | }, | 62 | }, |
53 | "revocation": { | 63 | "revocation": { |
54 | "service": { | 64 | "service": { |
@@ -57,7 +67,12 @@ | |||
57 | "perm": "0770" | 67 | "perm": "0770" |
58 | } | 68 | } |
59 | }, | 69 | }, |
60 | "storage": "redis:localhost:6397::15" | 70 | "storage": { |
71 | "mode": "redis", | ||
72 | "addr": "localhost:6397", | ||
73 | "passwd": "", | ||
74 | "id": 15 | ||
75 | } | ||
61 | }, | 76 | }, |
62 | "rpc": { | 77 | "rpc": { |
63 | "endpoint": "tcp:127.0.0.1:80" | 78 | "endpoint": "tcp:127.0.0.1:80" |
diff --git a/src/gnunet/service/dht/dhtstore_test.go b/src/gnunet/service/dht/dhtstore_test.go index 3cb8080..d9fc1d0 100644 --- a/src/gnunet/service/dht/dhtstore_test.go +++ b/src/gnunet/service/dht/dhtstore_test.go | |||
@@ -20,9 +20,10 @@ package dht | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "encoding/hex" | 22 | "encoding/hex" |
23 | "gnunet/config" | ||
23 | "gnunet/crypto" | 24 | "gnunet/crypto" |
24 | "gnunet/service" | 25 | "gnunet/service" |
25 | blocks "gnunet/service/dht/blocks" | 26 | "gnunet/service/dht/blocks" |
26 | "math/rand" | 27 | "math/rand" |
27 | "testing" | 28 | "testing" |
28 | ) | 29 | ) |
@@ -37,8 +38,15 @@ const ( | |||
37 | // each block from storage and checks for matching hash. | 38 | // each block from storage and checks for matching hash. |
38 | func TestDHTFilesStore(t *testing.T) { | 39 | func TestDHTFilesStore(t *testing.T) { |
39 | 40 | ||
41 | // test configuration | ||
42 | cfg := make(config.ParameterConfig) | ||
43 | cfg["mode"] = "file" | ||
44 | cfg["cache"] = false | ||
45 | cfg["path"] = "/var/lib/gnunet/dht/store" | ||
46 | cfg["maxGB"] = 10 | ||
47 | |||
40 | // create file store | 48 | // create file store |
41 | fs, err := service.NewFileCache("/var/lib/gnunet/dht/cache", "100") | 49 | fs, err := service.NewFileStore(cfg) |
42 | if err != nil { | 50 | if err != nil { |
43 | t.Fatal(err) | 51 | t.Fatal(err) |
44 | } | 52 | } |
@@ -48,33 +56,30 @@ func TestDHTFilesStore(t *testing.T) { | |||
48 | // First round: save blocks | 56 | // First round: save blocks |
49 | for i := 0; i < fsNumBlocks; i++ { | 57 | for i := 0; i < fsNumBlocks; i++ { |
50 | // generate random block | 58 | // generate random block |
51 | size := 20 // 1024 + rand.Intn(62000) | 59 | size := 1024 + rand.Intn(62000) |
52 | buf := make([]byte, size) | 60 | buf := make([]byte, size) |
53 | rand.Read(buf) | 61 | rand.Read(buf) |
54 | val := blocks.NewGenericBlock(buf) | 62 | val := blocks.NewGenericBlock(buf) |
55 | // generate associated key | 63 | // generate associated key |
56 | k := crypto.Hash(buf).Bits | 64 | k := crypto.Hash(buf).Bits |
57 | key := blocks.NewGenericQuery(k) | 65 | key := blocks.NewGenericQuery(k) |
58 | t.Logf("> %d: %s -- %s", i, hex.EncodeToString(k), hex.EncodeToString(buf)) | ||
59 | 66 | ||
60 | // store block | 67 | // store block |
61 | if err := fs.Put(key, val); err != nil { | 68 | if err := fs.Put(key, val); err != nil { |
62 | t.Fatal(err) | 69 | t.Fatal(err) |
63 | } | 70 | } |
64 | |||
65 | // remember key | 71 | // remember key |
66 | keys = append(keys, key) | 72 | keys = append(keys, key) |
67 | } | 73 | } |
68 | 74 | ||
69 | // Second round: retrieve blocks and check | 75 | // Second round: retrieve blocks and check |
70 | for i, key := range keys { | 76 | for _, key := range keys { |
71 | // get block | 77 | // get block |
72 | val, err := fs.Get(key) | 78 | val, err := fs.Get(key) |
73 | if err != nil { | 79 | if err != nil { |
74 | t.Fatal(err) | 80 | t.Fatal(err) |
75 | } | 81 | } |
76 | buf := val.Data() | 82 | buf := val.Data() |
77 | t.Logf("< %d: %s -- %s", i, hex.EncodeToString(key.Key().Bits), hex.EncodeToString(buf)) | ||
78 | 83 | ||
79 | // re-create key | 84 | // re-create key |
80 | k := crypto.Hash(buf) | 85 | k := crypto.Hash(buf) |
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index 34bf020..612d0ec 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go | |||
@@ -57,10 +57,6 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module, err error) { | |||
57 | if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil { | 57 | if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil { |
58 | return | 58 | return |
59 | } | 59 | } |
60 | // create cache handler | ||
61 | if cache, err = service.NewDHTStore(config.Cfg.DHT.Cache); err != nil { | ||
62 | return | ||
63 | } | ||
64 | // create routing table | 60 | // create routing table |
65 | rt := NewRoutingTable(NewPeerAddress(c.PeerID())) | 61 | rt := NewRoutingTable(NewPeerAddress(c.PeerID())) |
66 | 62 | ||
diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go index c599c54..5de5415 100644 --- a/src/gnunet/service/store.go +++ b/src/gnunet/service/store.go | |||
@@ -22,17 +22,21 @@ import ( | |||
22 | "context" | 22 | "context" |
23 | "database/sql" | 23 | "database/sql" |
24 | "encoding/binary" | 24 | "encoding/binary" |
25 | "encoding/gob" | ||
25 | "encoding/hex" | 26 | "encoding/hex" |
26 | "errors" | 27 | "errors" |
27 | "fmt" | 28 | "fmt" |
29 | "gnunet/config" | ||
28 | "gnunet/crypto" | 30 | "gnunet/crypto" |
29 | "gnunet/service/dht/blocks" | 31 | "gnunet/service/dht/blocks" |
30 | "gnunet/util" | 32 | "gnunet/util" |
33 | "io" | ||
31 | "io/ioutil" | 34 | "io/ioutil" |
32 | "os" | 35 | "os" |
33 | "strconv" | 36 | "sort" |
34 | "strings" | 37 | "sync" |
35 | 38 | ||
39 | "github.com/bfix/gospel/logger" | ||
36 | redis "github.com/go-redis/redis/v8" | 40 | redis "github.com/go-redis/redis/v8" |
37 | ) | 41 | ) |
38 | 42 | ||
@@ -46,22 +50,20 @@ var ( | |||
46 | //------------------------------------------------------------ | 50 | //------------------------------------------------------------ |
47 | // Generic storage interface. Can be used for persistent or | 51 | // Generic storage interface. Can be used for persistent or |
48 | // transient (caching) storage of key/value data. | 52 | // transient (caching) storage of key/value data. |
49 | // One set of methods (Get/Put) work on DHT queries and blocks, | ||
50 | // the other set (GetS, PutS) work on key/value strings. | ||
51 | // Each custom implementation can decide which sets to support. | ||
52 | //------------------------------------------------------------ | 53 | //------------------------------------------------------------ |
53 | 54 | ||
54 | // Store is a key/value storage where the type of the key is either | 55 | // Store is a key/value storage where the type of the key is either |
55 | // a SHA512 hash value or a string and the value is either a DHT | 56 | // a SHA512 hash value or a string and the value is either a DHT |
56 | // block or a string. | 57 | // block or a string. It is possiblle to mix any key/value types, |
58 | // but not used in this implementation. | ||
57 | type Store[K, V any] interface { | 59 | type Store[K, V any] interface { |
58 | // Put block into storage under given key | 60 | // Put value into storage under given key |
59 | Put(key K, val V) error | 61 | Put(key K, val V) error |
60 | 62 | ||
61 | // Get block with given key from storage | 63 | // Get value with given key from storage |
62 | Get(key K) (V, error) | 64 | Get(key K) (V, error) |
63 | 65 | ||
64 | // List all store queries | 66 | // List all store keys |
65 | List() ([]K, error) | 67 | List() ([]K, error) |
66 | } | 68 | } |
67 | 69 | ||
@@ -78,22 +80,18 @@ type KVStore Store[string, string] | |||
78 | //------------------------------------------------------------ | 80 | //------------------------------------------------------------ |
79 | // NewDHTStore creates a new storage handler with given spec | 81 | // NewDHTStore creates a new storage handler with given spec |
80 | // for use with DHT queries and blocks | 82 | // for use with DHT queries and blocks |
81 | func NewDHTStore(spec string) (DHTStore, error) { | 83 | func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) { |
82 | specs := strings.Split(spec, ":") | 84 | // get the mode parameter |
83 | if len(specs) < 2 { | 85 | mode, ok := config.GetParam[string](spec, "mode") |
86 | if !ok { | ||
84 | return nil, ErrStoreInvalidSpec | 87 | return nil, ErrStoreInvalidSpec |
85 | } | 88 | } |
86 | switch specs[0] { | 89 | switch mode { |
87 | //------------------------------------------------------------------ | 90 | //------------------------------------------------------------------ |
88 | // File-base storage | 91 | // File-base storage |
89 | //------------------------------------------------------------------ | 92 | //------------------------------------------------------------------ |
90 | case "file_store": | 93 | case "file": |
91 | return NewFileStore(specs[1]) | 94 | return NewFileStore(spec) |
92 | case "file_cache": | ||
93 | if len(specs) < 3 { | ||
94 | return nil, ErrStoreInvalidSpec | ||
95 | } | ||
96 | return NewFileCache(specs[1], specs[2]) | ||
97 | } | 95 | } |
98 | return nil, ErrStoreUnknown | 96 | return nil, ErrStoreUnknown |
99 | } | 97 | } |
@@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) { | |||
101 | //------------------------------------------------------------ | 99 | //------------------------------------------------------------ |
102 | // NewKVStore creates a new storage handler with given spec | 100 | // NewKVStore creates a new storage handler with given spec |
103 | // for use with key/value string pairs. | 101 | // for use with key/value string pairs. |
104 | func NewKVStore(spec string) (KVStore, error) { | 102 | func NewKVStore(spec config.ParameterConfig) (KVStore, error) { |
105 | specs := strings.SplitN(spec, ":", 2) | 103 | // get the mode parameter |
106 | if len(specs) < 2 { | 104 | mode, ok := config.GetParam[string](spec, "mode") |
105 | if !ok { | ||
107 | return nil, ErrStoreInvalidSpec | 106 | return nil, ErrStoreInvalidSpec |
108 | } | 107 | } |
109 | switch specs[0] { | 108 | switch mode { |
110 | //-------------------------------------------------------------- | 109 | //-------------------------------------------------------------- |
111 | // Redis service | 110 | // Redis service |
112 | //-------------------------------------------------------------- | 111 | //-------------------------------------------------------------- |
113 | case "redis": | 112 | case "redis": |
114 | if len(specs) < 4 { | 113 | return NewRedisStore(spec) |
115 | return nil, ErrStoreInvalidSpec | ||
116 | } | ||
117 | return NewRedisStore(specs[1], specs[2], specs[3]) | ||
118 | 114 | ||
119 | //-------------------------------------------------------------- | 115 | //-------------------------------------------------------------- |
120 | // SQL database service | 116 | // SQL database service |
121 | //-------------------------------------------------------------- | 117 | //-------------------------------------------------------------- |
122 | case "sql": | 118 | case "sql": |
123 | if len(specs) < 4 { | 119 | return NewSQLStore(spec) |
124 | return nil, ErrStoreInvalidSpec | ||
125 | } | ||
126 | return NewSQLStore(specs[1]) | ||
127 | } | 120 | } |
128 | return nil, errors.New("unknown storage mechanism") | 121 | return nil, errors.New("unknown storage mechanism") |
129 | } | 122 | } |
@@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) { | |||
132 | // Filesystem-based storage | 125 | // Filesystem-based storage |
133 | //------------------------------------------------------------ | 126 | //------------------------------------------------------------ |
134 | 127 | ||
128 | // FileHeader is the layout of a file managed by the storage handler. | ||
129 | // On start-up the file store recreates the list of file entries from | ||
130 | // traversing the actual filesystem. This is done in the background. | ||
131 | type FileHeader struct { | ||
132 | key string // storage key | ||
133 | size uint64 // size of file | ||
134 | btype uint16 // block type | ||
135 | stored util.AbsoluteTime // time added to store | ||
136 | expires util.AbsoluteTime // expiration time | ||
137 | lastUsed util.AbsoluteTime // time last used | ||
138 | usedCount uint64 // usage count | ||
139 | } | ||
140 | |||
135 | // FileStore implements a filesystem-based storage mechanism for | 141 | // FileStore implements a filesystem-based storage mechanism for |
136 | // DHT queries and blocks. | 142 | // DHT queries and blocks. |
137 | type FileStore struct { | 143 | type FileStore struct { |
138 | path string // storage path | 144 | path string // storage path |
139 | cached []*crypto.HashCode // list of cached entries (key) | 145 | cache bool // storage works as cache |
140 | wrPos int // write position in cyclic list | 146 | args config.ParameterConfig // arguments / settings |
147 | |||
148 | totalSize uint64 // total storage size (logical, not physical) | ||
149 | files map[string]*FileHeader // list of file headers | ||
150 | wrPos int // write position in cyclic list | ||
151 | mtx sync.Mutex // serialize operations (prune) | ||
141 | } | 152 | } |
142 | 153 | ||
143 | // NewFileStore instantiates a new file storage. | 154 | // NewFileStore instantiates a new file storage. |
144 | func NewFileStore(path string) (DHTStore, error) { | 155 | func NewFileStore(spec config.ParameterConfig) (DHTStore, error) { |
145 | // create file store | 156 | // get path parameter |
146 | return &FileStore{ | 157 | path, ok := config.GetParam[string](spec, "path") |
147 | path: path, | 158 | if !ok { |
148 | }, nil | 159 | return nil, ErrStoreInvalidSpec |
149 | } | 160 | } |
150 | 161 | isCache, ok := config.GetParam[bool](spec, "cache") | |
151 | // NewFileCache instantiates a new file-based cache. | 162 | if !ok { |
152 | func NewFileCache(path, param string) (DHTStore, error) { | 163 | isCache = false |
164 | } | ||
153 | // remove old cache content | 165 | // remove old cache content |
154 | os.RemoveAll(path) | 166 | if isCache { |
167 | os.RemoveAll(path) | ||
168 | } | ||
169 | // create file store handler | ||
170 | fs := &FileStore{ | ||
171 | path: path, | ||
172 | args: spec, | ||
173 | cache: isCache, | ||
174 | files: make(map[string]*FileHeader), | ||
175 | } | ||
176 | // load file header list | ||
177 | if !isCache { | ||
178 | if fp, err := os.Open(path + "/files.db"); err == nil { | ||
179 | dec := gob.NewDecoder(fp) | ||
180 | for { | ||
181 | hdr := new(FileHeader) | ||
182 | if dec.Decode(hdr) != nil { | ||
183 | if err != io.EOF { | ||
184 | return nil, err | ||
185 | } | ||
186 | break | ||
187 | } | ||
188 | fs.files[hdr.key] = hdr | ||
189 | fs.totalSize += hdr.size | ||
190 | } | ||
191 | fp.Close() | ||
192 | } | ||
193 | } | ||
194 | return fs, nil | ||
195 | } | ||
155 | 196 | ||
156 | // get number of cache entries | 197 | // Close file storage. write metadata to file |
157 | num, err := strconv.ParseUint(param, 10, 32) | 198 | func (s *FileStore) Close() (err error) { |
158 | if err != nil { | 199 | if !s.cache { |
159 | return nil, err | 200 | if fp, err := os.Create(s.path + "/files.db"); err == nil { |
201 | defer fp.Close() | ||
202 | enc := gob.NewEncoder(fp) | ||
203 | for _, hdr := range s.files { | ||
204 | if err = enc.Encode(hdr); err != nil { | ||
205 | break | ||
206 | } | ||
207 | } | ||
208 | } | ||
160 | } | 209 | } |
161 | // create file store | 210 | return |
162 | return &FileStore{ | ||
163 | path: path, | ||
164 | cached: make([]*crypto.HashCode, num), | ||
165 | wrPos: 0, | ||
166 | }, nil | ||
167 | } | 211 | } |
168 | 212 | ||
169 | // Put block into storage under given key | 213 | // Put block into storage under given key |
170 | func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | 214 | func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { |
215 | // check for free space | ||
216 | if s.cache { | ||
217 | // caching is limited by explicit number of files | ||
218 | num, ok := config.GetParam[int](s.args, "num") | ||
219 | if !ok { | ||
220 | num = 100 | ||
221 | } | ||
222 | if len(s.files) >= num { | ||
223 | // make space for at least one new entry | ||
224 | s.prune(1) | ||
225 | } | ||
226 | } else { | ||
227 | // normal storage is limited by quota (default: 10GB) | ||
228 | max, ok := config.GetParam[int](s.args, "maxGB") | ||
229 | if !ok { | ||
230 | max = 10 | ||
231 | } | ||
232 | if int(s.totalSize>>30) > max { | ||
233 | // drop a significant number of blocks | ||
234 | s.prune(20) | ||
235 | } | ||
236 | } | ||
171 | // get query parameters for entry | 237 | // get query parameters for entry |
172 | var ( | 238 | var btype uint16 // block type |
173 | btype uint16 // block type | ||
174 | expire util.AbsoluteTime // block expiration | ||
175 | ) | ||
176 | query.Get("blkType", &btype) | 239 | query.Get("blkType", &btype) |
240 | var expire util.AbsoluteTime // block expiration | ||
177 | query.Get("expire", &expire) | 241 | query.Get("expire", &expire) |
178 | 242 | ||
179 | // are we in caching mode? | ||
180 | if s.cached != nil { | ||
181 | // release previous block if defined | ||
182 | if key := s.cached[s.wrPos]; key != nil { | ||
183 | // get path and filename from key | ||
184 | path, fname := s.expandPath(key) | ||
185 | if err = os.Remove(path + "/" + fname); err != nil { | ||
186 | return | ||
187 | } | ||
188 | // free slot | ||
189 | s.cached[s.wrPos] = nil | ||
190 | } | ||
191 | } | ||
192 | // get path and filename from key | 243 | // get path and filename from key |
193 | path, fname := s.expandPath(query.Key()) | 244 | path, fname := s.expandPath(query.Key()) |
194 | // make sure the path exists | 245 | // make sure the path exists |
@@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | |||
197 | } | 248 | } |
198 | // write to file for storage | 249 | // write to file for storage |
199 | var fp *os.File | 250 | var fp *os.File |
251 | var fpSize int | ||
200 | if fp, err = os.Create(path + "/" + fname); err == nil { | 252 | if fp, err = os.Create(path + "/" + fname); err == nil { |
201 | defer fp.Close() | 253 | defer fp.Close() |
202 | // write block data | 254 | // write block data |
@@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { | |||
206 | } | 258 | } |
207 | } | 259 | } |
208 | } | 260 | } |
209 | // update cache list | 261 | // add header to internal list |
210 | if s.cached != nil { | 262 | now := util.AbsoluteTimeNow() |
211 | s.cached[s.wrPos] = query.Key() | 263 | hdr := &FileHeader{ |
212 | s.wrPos = (s.wrPos + 1) % len(s.cached) | 264 | key: hex.EncodeToString(query.Key().Bits), |
265 | size: uint64(fpSize), | ||
266 | btype: btype, | ||
267 | expires: expire, | ||
268 | stored: now, | ||
269 | lastUsed: now, | ||
270 | usedCount: 1, | ||
213 | } | 271 | } |
272 | s.files[hdr.key] = hdr | ||
214 | return | 273 | return |
215 | } | 274 | } |
216 | 275 | ||
@@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode) (string, string) { | |||
258 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] | 317 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] |
259 | } | 318 | } |
260 | 319 | ||
320 | // Prune list of file headers so we drop at least n entries. | ||
321 | // returns number of removed entries. | ||
322 | func (s *FileStore) prune(n int) (del int) { | ||
323 | // get list of headers; remove expired entries on the fly | ||
324 | list := make([]*FileHeader, 0) | ||
325 | for key, hdr := range s.files { | ||
326 | // remove expired entry | ||
327 | if hdr.expires.Expired() { | ||
328 | s.dropFile(key) | ||
329 | del++ | ||
330 | } | ||
331 | // append to list | ||
332 | list = append(list, hdr) | ||
333 | } | ||
334 | // check if we are already done. | ||
335 | if del >= n { | ||
336 | return | ||
337 | } | ||
338 | // sort list by decending rate "(lifetime * size) / usedCount" | ||
339 | sort.Slice(list, func(i, j int) bool { | ||
340 | ri := (list[i].stored.Elapsed().Val * list[i].size) / list[i].usedCount | ||
341 | rj := (list[j].stored.Elapsed().Val * list[j].size) / list[j].usedCount | ||
342 | return ri > rj | ||
343 | }) | ||
344 | // remove from start of list until prune limit is reached | ||
345 | for _, hdr := range list { | ||
346 | s.dropFile(hdr.key) | ||
347 | del++ | ||
348 | if del == n { | ||
349 | break | ||
350 | } | ||
351 | } | ||
352 | return | ||
353 | } | ||
354 | |||
355 | // drop file removes a file from the internal list and the physical storage. | ||
356 | func (s *FileStore) dropFile(key string) { | ||
357 | // remove for internal list | ||
358 | delete(s.files, key) | ||
359 | // remove from filesystem | ||
360 | path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:]) | ||
361 | if err := os.Remove(path); err != nil { | ||
362 | logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) | ||
363 | return | ||
364 | } | ||
365 | } | ||
366 | |||
261 | //------------------------------------------------------------ | 367 | //------------------------------------------------------------ |
262 | // Redis: only use for caching purposes on key/value strings | 368 | // Redis: only use for caching purposes on key/value strings |
263 | //------------------------------------------------------------ | 369 | //------------------------------------------------------------ |
@@ -269,16 +375,28 @@ type RedisStore struct { | |||
269 | } | 375 | } |
270 | 376 | ||
271 | // NewRedisStore creates a Redis service client instance. | 377 | // NewRedisStore creates a Redis service client instance. |
272 | func NewRedisStore(addr, passwd, db string) (s KVStore, err error) { | 378 | func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) { |
273 | kvs := new(RedisStore) | 379 | // get connection parameters |
274 | if kvs.db, err = strconv.Atoi(db); err != nil { | 380 | addr, ok := config.GetParam[string](spec, "addr") |
275 | err = ErrStoreInvalidSpec | 381 | if !ok { |
276 | return | 382 | return nil, ErrStoreInvalidSpec |
383 | } | ||
384 | passwd, ok := config.GetParam[string](spec, "passwd") | ||
385 | if !ok { | ||
386 | passwd = "" | ||
277 | } | 387 | } |
388 | db, ok := config.GetParam[int](spec, "db") | ||
389 | if !ok { | ||
390 | return nil, ErrStoreInvalidSpec | ||
391 | } | ||
392 | |||
393 | // create new Redis store | ||
394 | kvs := new(RedisStore) | ||
395 | kvs.db = db | ||
278 | kvs.client = redis.NewClient(&redis.Options{ | 396 | kvs.client = redis.NewClient(&redis.Options{ |
279 | Addr: addr, | 397 | Addr: addr, |
280 | Password: passwd, | 398 | Password: passwd, |
281 | DB: kvs.db, | 399 | DB: db, |
282 | }) | 400 | }) |
283 | if kvs.client == nil { | 401 | if kvs.client == nil { |
284 | err = ErrStoreNotAvailable | 402 | err = ErrStoreNotAvailable |
@@ -328,11 +446,17 @@ type SQLStore struct { | |||
328 | } | 446 | } |
329 | 447 | ||
330 | // NewSQLStore creates a new SQL-based key/value store. | 448 | // NewSQLStore creates a new SQL-based key/value store. |
331 | func NewSQLStore(spec string) (s KVStore, err error) { | 449 | func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) { |
450 | // get connection parameters | ||
451 | connect, ok := config.GetParam[string](spec, "connect") | ||
452 | if !ok { | ||
453 | return nil, ErrStoreInvalidSpec | ||
454 | } | ||
455 | // create SQL store | ||
332 | kvs := new(SQLStore) | 456 | kvs := new(SQLStore) |
333 | 457 | ||
334 | // connect to SQL database | 458 | // connect to SQL database |
335 | kvs.db, err = util.DbPool.Connect(spec) | 459 | kvs.db, err = util.DbPool.Connect(connect) |
336 | if err != nil { | 460 | if err != nil { |
337 | return nil, err | 461 | return nil, err |
338 | } | 462 | } |
@@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) { | |||
343 | return nil, ErrStoreNotAvailable | 467 | return nil, ErrStoreNotAvailable |
344 | } | 468 | } |
345 | return kvs, nil | 469 | return kvs, nil |
346 | |||
347 | } | 470 | } |
348 | 471 | ||
349 | // Put a key/value pair into the store | 472 | // Put a key/value pair into the store |