aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2022-06-11 20:21:38 +0200
committerBernd Fix <brf@hoi-polloi.org>2022-06-11 20:21:38 +0200
commit4261e07def81e7c3eb183b9d5c4059a2e9c53759 (patch)
tree81334bb74e14305dd5274b671d8e351defdfa242 /src
parent8f8feaf176e62f14a6d449c0a2fb6f0ca76b22b8 (diff)
downloadgnunet-go-4261e07def81e7c3eb183b9d5c4059a2e9c53759.tar.gz
gnunet-go-4261e07def81e7c3eb183b9d5c4059a2e9c53759.zip
File/block caching strategy improved.v0.1.27
Diffstat (limited to 'src')
-rw-r--r--src/gnunet/config/config.go33
-rw-r--r--src/gnunet/config/gnunet-config.json25
-rw-r--r--src/gnunet/service/dht/dhtstore_test.go19
-rw-r--r--src/gnunet/service/dht/module.go4
-rw-r--r--src/gnunet/service/store.go293
5 files changed, 266 insertions, 108 deletions
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index a92bdd6..b4d2840 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -94,14 +94,33 @@ type GNSConfig struct {
94} 94}
95 95
96//---------------------------------------------------------------------- 96//----------------------------------------------------------------------
97// Generic parameter configuration (handle any key/value settings)
98//----------------------------------------------------------------------
99
100// ParameterConfig handle arbitrary values for a key strings. This necessary
101// e.g. in the 'Storage' configuration, as custom storage implementations
102// require different sets of parameters.
103type ParameterConfig map[string]any
104
105// Get a parameter value with given type 'V'
106func GetParam[V any](params ParameterConfig, key string) (i V, ok bool) {
107 var v any
108 if v, ok = params[key]; ok {
109 if i, ok = v.(V); ok {
110 return
111 }
112 }
113 return
114}
115
116//----------------------------------------------------------------------
97// DHT configuration 117// DHT configuration
98//---------------------------------------------------------------------- 118//----------------------------------------------------------------------
99 119
100// DHTConfig contains parameters for the distributed hash table (DHT) 120// DHTConfig contains parameters for the distributed hash table (DHT)
101type DHTConfig struct { 121type DHTConfig struct {
102 Service *ServiceConfig `json:"service"` // socket for DHT service 122 Service *ServiceConfig `json:"service"` // socket for DHT service
103 Storage string `json:"storage"` // filesystem storage location 123 Storage ParameterConfig `json:"storage"` // filesystem storage location
104 Cache string `json:"cache"` // key/value cache
105} 124}
106 125
107//---------------------------------------------------------------------- 126//----------------------------------------------------------------------
@@ -110,8 +129,8 @@ type DHTConfig struct {
110 129
111// NamecacheConfig contains parameters for the local name cache 130// NamecacheConfig contains parameters for the local name cache
112type NamecacheConfig struct { 131type NamecacheConfig struct {
113 Service *ServiceConfig `json:"service"` // socket for Namecache service 132 Service *ServiceConfig `json:"service"` // socket for Namecache service
114 Storage string `json:"storage"` // key/value cache 133 Storage ParameterConfig `json:"storage"` // key/value cache
115} 134}
116 135
117//---------------------------------------------------------------------- 136//----------------------------------------------------------------------
@@ -120,8 +139,8 @@ type NamecacheConfig struct {
120 139
121// RevocationConfig contains parameters for the key revocation service 140// RevocationConfig contains parameters for the key revocation service
122type RevocationConfig struct { 141type RevocationConfig struct {
123 Service *ServiceConfig `json:"service"` // socket for Revocation service 142 Service *ServiceConfig `json:"service"` // socket for Revocation service
124 Storage string `json:"storage"` // persistance mechanism for revocation data 143 Storage ParameterConfig `json:"storage"` // persistance mechanism for revocation data
125} 144}
126 145
127//---------------------------------------------------------------------- 146//----------------------------------------------------------------------
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json
index 7927cda..167bfa0 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -11,7 +11,7 @@
11 { 11 {
12 "id": "test", 12 "id": "test",
13 "network": "ip+udp", 13 "network": "ip+udp",
14 "address": "upnp:192.168.178.1", 14 "address": "upnp:192.168.134.1",
15 "port": 6666, 15 "port": 6666,
16 "ttl": 86400 16 "ttl": 86400
17 } 17 }
@@ -28,8 +28,12 @@
28 "perm": "0770" 28 "perm": "0770"
29 } 29 }
30 }, 30 },
31 "storage": "dht_file_store+/var/lib/gnunet/dht/store", 31 "storage": {
32 "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" 32 "mode": "file",
33 "cache": false,
34 "path": "/var/lib/gnunet/dht/store",
35 "maxGB": 10
36 }
33 }, 37 },
34 "gns": { 38 "gns": {
35 "service": { 39 "service": {
@@ -48,7 +52,13 @@
48 "perm": "0770" 52 "perm": "0770"
49 } 53 }
50 }, 54 },
51 "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" 55 "storage": {
56 "mode": "file",
57 "cache": true,
58 "path": "/var/lib/gnunet/namecache",
59 "num": 1000,
60 "expire": 43200
61 }
52 }, 62 },
53 "revocation": { 63 "revocation": {
54 "service": { 64 "service": {
@@ -57,7 +67,12 @@
57 "perm": "0770" 67 "perm": "0770"
58 } 68 }
59 }, 69 },
60 "storage": "redis:localhost:6397::15" 70 "storage": {
71 "mode": "redis",
72 "addr": "localhost:6397",
73 "passwd": "",
74 "id": 15
75 }
61 }, 76 },
62 "rpc": { 77 "rpc": {
63 "endpoint": "tcp:127.0.0.1:80" 78 "endpoint": "tcp:127.0.0.1:80"
diff --git a/src/gnunet/service/dht/dhtstore_test.go b/src/gnunet/service/dht/dhtstore_test.go
index 3cb8080..d9fc1d0 100644
--- a/src/gnunet/service/dht/dhtstore_test.go
+++ b/src/gnunet/service/dht/dhtstore_test.go
@@ -20,9 +20,10 @@ package dht
20 20
21import ( 21import (
22 "encoding/hex" 22 "encoding/hex"
23 "gnunet/config"
23 "gnunet/crypto" 24 "gnunet/crypto"
24 "gnunet/service" 25 "gnunet/service"
25 blocks "gnunet/service/dht/blocks" 26 "gnunet/service/dht/blocks"
26 "math/rand" 27 "math/rand"
27 "testing" 28 "testing"
28) 29)
@@ -37,8 +38,15 @@ const (
37// each block from storage and checks for matching hash. 38// each block from storage and checks for matching hash.
38func TestDHTFilesStore(t *testing.T) { 39func TestDHTFilesStore(t *testing.T) {
39 40
41 // test configuration
42 cfg := make(config.ParameterConfig)
43 cfg["mode"] = "file"
44 cfg["cache"] = false
45 cfg["path"] = "/var/lib/gnunet/dht/store"
46 cfg["maxGB"] = 10
47
40 // create file store 48 // create file store
41 fs, err := service.NewFileCache("/var/lib/gnunet/dht/cache", "100") 49 fs, err := service.NewFileStore(cfg)
42 if err != nil { 50 if err != nil {
43 t.Fatal(err) 51 t.Fatal(err)
44 } 52 }
@@ -48,33 +56,30 @@ func TestDHTFilesStore(t *testing.T) {
48 // First round: save blocks 56 // First round: save blocks
49 for i := 0; i < fsNumBlocks; i++ { 57 for i := 0; i < fsNumBlocks; i++ {
50 // generate random block 58 // generate random block
51 size := 20 // 1024 + rand.Intn(62000) 59 size := 1024 + rand.Intn(62000)
52 buf := make([]byte, size) 60 buf := make([]byte, size)
53 rand.Read(buf) 61 rand.Read(buf)
54 val := blocks.NewGenericBlock(buf) 62 val := blocks.NewGenericBlock(buf)
55 // generate associated key 63 // generate associated key
56 k := crypto.Hash(buf).Bits 64 k := crypto.Hash(buf).Bits
57 key := blocks.NewGenericQuery(k) 65 key := blocks.NewGenericQuery(k)
58 t.Logf("> %d: %s -- %s", i, hex.EncodeToString(k), hex.EncodeToString(buf))
59 66
60 // store block 67 // store block
61 if err := fs.Put(key, val); err != nil { 68 if err := fs.Put(key, val); err != nil {
62 t.Fatal(err) 69 t.Fatal(err)
63 } 70 }
64
65 // remember key 71 // remember key
66 keys = append(keys, key) 72 keys = append(keys, key)
67 } 73 }
68 74
69 // Second round: retrieve blocks and check 75 // Second round: retrieve blocks and check
70 for i, key := range keys { 76 for _, key := range keys {
71 // get block 77 // get block
72 val, err := fs.Get(key) 78 val, err := fs.Get(key)
73 if err != nil { 79 if err != nil {
74 t.Fatal(err) 80 t.Fatal(err)
75 } 81 }
76 buf := val.Data() 82 buf := val.Data()
77 t.Logf("< %d: %s -- %s", i, hex.EncodeToString(key.Key().Bits), hex.EncodeToString(buf))
78 83
79 // re-create key 84 // re-create key
80 k := crypto.Hash(buf) 85 k := crypto.Hash(buf)
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 34bf020..612d0ec 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -57,10 +57,6 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module, err error) {
57 if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil { 57 if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil {
58 return 58 return
59 } 59 }
60 // create cache handler
61 if cache, err = service.NewDHTStore(config.Cfg.DHT.Cache); err != nil {
62 return
63 }
64 // create routing table 60 // create routing table
65 rt := NewRoutingTable(NewPeerAddress(c.PeerID())) 61 rt := NewRoutingTable(NewPeerAddress(c.PeerID()))
66 62
diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go
index c599c54..5de5415 100644
--- a/src/gnunet/service/store.go
+++ b/src/gnunet/service/store.go
@@ -22,17 +22,21 @@ import (
22 "context" 22 "context"
23 "database/sql" 23 "database/sql"
24 "encoding/binary" 24 "encoding/binary"
25 "encoding/gob"
25 "encoding/hex" 26 "encoding/hex"
26 "errors" 27 "errors"
27 "fmt" 28 "fmt"
29 "gnunet/config"
28 "gnunet/crypto" 30 "gnunet/crypto"
29 "gnunet/service/dht/blocks" 31 "gnunet/service/dht/blocks"
30 "gnunet/util" 32 "gnunet/util"
33 "io"
31 "io/ioutil" 34 "io/ioutil"
32 "os" 35 "os"
33 "strconv" 36 "sort"
34 "strings" 37 "sync"
35 38
39 "github.com/bfix/gospel/logger"
36 redis "github.com/go-redis/redis/v8" 40 redis "github.com/go-redis/redis/v8"
37) 41)
38 42
@@ -46,22 +50,20 @@ var (
46//------------------------------------------------------------ 50//------------------------------------------------------------
47// Generic storage interface. Can be used for persistent or 51// Generic storage interface. Can be used for persistent or
48// transient (caching) storage of key/value data. 52// transient (caching) storage of key/value data.
49// One set of methods (Get/Put) work on DHT queries and blocks,
50// the other set (GetS, PutS) work on key/value strings.
51// Each custom implementation can decide which sets to support.
52//------------------------------------------------------------ 53//------------------------------------------------------------
53 54
54// Store is a key/value storage where the type of the key is either 55// Store is a key/value storage where the type of the key is either
55// a SHA512 hash value or a string and the value is either a DHT 56// a SHA512 hash value or a string and the value is either a DHT
56// block or a string. 57// block or a string. It is possiblle to mix any key/value types,
58// but not used in this implementation.
57type Store[K, V any] interface { 59type Store[K, V any] interface {
58 // Put block into storage under given key 60 // Put value into storage under given key
59 Put(key K, val V) error 61 Put(key K, val V) error
60 62
61 // Get block with given key from storage 63 // Get value with given key from storage
62 Get(key K) (V, error) 64 Get(key K) (V, error)
63 65
64 // List all store queries 66 // List all store keys
65 List() ([]K, error) 67 List() ([]K, error)
66} 68}
67 69
@@ -78,22 +80,18 @@ type KVStore Store[string, string]
78//------------------------------------------------------------ 80//------------------------------------------------------------
79// NewDHTStore creates a new storage handler with given spec 81// NewDHTStore creates a new storage handler with given spec
80// for use with DHT queries and blocks 82// for use with DHT queries and blocks
81func NewDHTStore(spec string) (DHTStore, error) { 83func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) {
82 specs := strings.Split(spec, ":") 84 // get the mode parameter
83 if len(specs) < 2 { 85 mode, ok := config.GetParam[string](spec, "mode")
86 if !ok {
84 return nil, ErrStoreInvalidSpec 87 return nil, ErrStoreInvalidSpec
85 } 88 }
86 switch specs[0] { 89 switch mode {
87 //------------------------------------------------------------------ 90 //------------------------------------------------------------------
88 // File-base storage 91 // File-base storage
89 //------------------------------------------------------------------ 92 //------------------------------------------------------------------
90 case "file_store": 93 case "file":
91 return NewFileStore(specs[1]) 94 return NewFileStore(spec)
92 case "file_cache":
93 if len(specs) < 3 {
94 return nil, ErrStoreInvalidSpec
95 }
96 return NewFileCache(specs[1], specs[2])
97 } 95 }
98 return nil, ErrStoreUnknown 96 return nil, ErrStoreUnknown
99} 97}
@@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) {
101//------------------------------------------------------------ 99//------------------------------------------------------------
102// NewKVStore creates a new storage handler with given spec 100// NewKVStore creates a new storage handler with given spec
103// for use with key/value string pairs. 101// for use with key/value string pairs.
104func NewKVStore(spec string) (KVStore, error) { 102func NewKVStore(spec config.ParameterConfig) (KVStore, error) {
105 specs := strings.SplitN(spec, ":", 2) 103 // get the mode parameter
106 if len(specs) < 2 { 104 mode, ok := config.GetParam[string](spec, "mode")
105 if !ok {
107 return nil, ErrStoreInvalidSpec 106 return nil, ErrStoreInvalidSpec
108 } 107 }
109 switch specs[0] { 108 switch mode {
110 //-------------------------------------------------------------- 109 //--------------------------------------------------------------
111 // Redis service 110 // Redis service
112 //-------------------------------------------------------------- 111 //--------------------------------------------------------------
113 case "redis": 112 case "redis":
114 if len(specs) < 4 { 113 return NewRedisStore(spec)
115 return nil, ErrStoreInvalidSpec
116 }
117 return NewRedisStore(specs[1], specs[2], specs[3])
118 114
119 //-------------------------------------------------------------- 115 //--------------------------------------------------------------
120 // SQL database service 116 // SQL database service
121 //-------------------------------------------------------------- 117 //--------------------------------------------------------------
122 case "sql": 118 case "sql":
123 if len(specs) < 4 { 119 return NewSQLStore(spec)
124 return nil, ErrStoreInvalidSpec
125 }
126 return NewSQLStore(specs[1])
127 } 120 }
128 return nil, errors.New("unknown storage mechanism") 121 return nil, errors.New("unknown storage mechanism")
129} 122}
@@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) {
132// Filesystem-based storage 125// Filesystem-based storage
133//------------------------------------------------------------ 126//------------------------------------------------------------
134 127
128// FileHeader is the layout of a file managed by the storage handler.
129// On start-up the file store recreates the list of file entries from
130// traversing the actual filesystem. This is done in the background.
131type FileHeader struct {
132 key string // storage key
133 size uint64 // size of file
134 btype uint16 // block type
135 stored util.AbsoluteTime // time added to store
136 expires util.AbsoluteTime // expiration time
137 lastUsed util.AbsoluteTime // time last used
138 usedCount uint64 // usage count
139}
140
135// FileStore implements a filesystem-based storage mechanism for 141// FileStore implements a filesystem-based storage mechanism for
136// DHT queries and blocks. 142// DHT queries and blocks.
137type FileStore struct { 143type FileStore struct {
138 path string // storage path 144 path string // storage path
139 cached []*crypto.HashCode // list of cached entries (key) 145 cache bool // storage works as cache
140 wrPos int // write position in cyclic list 146 args config.ParameterConfig // arguments / settings
147
148 totalSize uint64 // total storage size (logical, not physical)
149 files map[string]*FileHeader // list of file headers
150 wrPos int // write position in cyclic list
151 mtx sync.Mutex // serialize operations (prune)
141} 152}
142 153
143// NewFileStore instantiates a new file storage. 154// NewFileStore instantiates a new file storage.
144func NewFileStore(path string) (DHTStore, error) { 155func NewFileStore(spec config.ParameterConfig) (DHTStore, error) {
145 // create file store 156 // get path parameter
146 return &FileStore{ 157 path, ok := config.GetParam[string](spec, "path")
147 path: path, 158 if !ok {
148 }, nil 159 return nil, ErrStoreInvalidSpec
149} 160 }
150 161 isCache, ok := config.GetParam[bool](spec, "cache")
151// NewFileCache instantiates a new file-based cache. 162 if !ok {
152func NewFileCache(path, param string) (DHTStore, error) { 163 isCache = false
164 }
153 // remove old cache content 165 // remove old cache content
154 os.RemoveAll(path) 166 if isCache {
167 os.RemoveAll(path)
168 }
169 // create file store handler
170 fs := &FileStore{
171 path: path,
172 args: spec,
173 cache: isCache,
174 files: make(map[string]*FileHeader),
175 }
176 // load file header list
177 if !isCache {
178 if fp, err := os.Open(path + "/files.db"); err == nil {
179 dec := gob.NewDecoder(fp)
180 for {
181 hdr := new(FileHeader)
182 if dec.Decode(hdr) != nil {
183 if err != io.EOF {
184 return nil, err
185 }
186 break
187 }
188 fs.files[hdr.key] = hdr
189 fs.totalSize += hdr.size
190 }
191 fp.Close()
192 }
193 }
194 return fs, nil
195}
155 196
156 // get number of cache entries 197// Close file storage. write metadata to file
157 num, err := strconv.ParseUint(param, 10, 32) 198func (s *FileStore) Close() (err error) {
158 if err != nil { 199 if !s.cache {
159 return nil, err 200 if fp, err := os.Create(s.path + "/files.db"); err == nil {
201 defer fp.Close()
202 enc := gob.NewEncoder(fp)
203 for _, hdr := range s.files {
204 if err = enc.Encode(hdr); err != nil {
205 break
206 }
207 }
208 }
160 } 209 }
161 // create file store 210 return
162 return &FileStore{
163 path: path,
164 cached: make([]*crypto.HashCode, num),
165 wrPos: 0,
166 }, nil
167} 211}
168 212
169// Put block into storage under given key 213// Put block into storage under given key
170func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { 214func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
215 // check for free space
216 if s.cache {
217 // caching is limited by explicit number of files
218 num, ok := config.GetParam[int](s.args, "num")
219 if !ok {
220 num = 100
221 }
222 if len(s.files) >= num {
223 // make space for at least one new entry
224 s.prune(1)
225 }
226 } else {
227 // normal storage is limited by quota (default: 10GB)
228 max, ok := config.GetParam[int](s.args, "maxGB")
229 if !ok {
230 max = 10
231 }
232 if int(s.totalSize>>30) > max {
233 // drop a significant number of blocks
234 s.prune(20)
235 }
236 }
171 // get query parameters for entry 237 // get query parameters for entry
172 var ( 238 var btype uint16 // block type
173 btype uint16 // block type
174 expire util.AbsoluteTime // block expiration
175 )
176 query.Get("blkType", &btype) 239 query.Get("blkType", &btype)
240 var expire util.AbsoluteTime // block expiration
177 query.Get("expire", &expire) 241 query.Get("expire", &expire)
178 242
179 // are we in caching mode?
180 if s.cached != nil {
181 // release previous block if defined
182 if key := s.cached[s.wrPos]; key != nil {
183 // get path and filename from key
184 path, fname := s.expandPath(key)
185 if err = os.Remove(path + "/" + fname); err != nil {
186 return
187 }
188 // free slot
189 s.cached[s.wrPos] = nil
190 }
191 }
192 // get path and filename from key 243 // get path and filename from key
193 path, fname := s.expandPath(query.Key()) 244 path, fname := s.expandPath(query.Key())
194 // make sure the path exists 245 // make sure the path exists
@@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
197 } 248 }
198 // write to file for storage 249 // write to file for storage
199 var fp *os.File 250 var fp *os.File
251 var fpSize int
200 if fp, err = os.Create(path + "/" + fname); err == nil { 252 if fp, err = os.Create(path + "/" + fname); err == nil {
201 defer fp.Close() 253 defer fp.Close()
202 // write block data 254 // write block data
@@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
206 } 258 }
207 } 259 }
208 } 260 }
209 // update cache list 261 // add header to internal list
210 if s.cached != nil { 262 now := util.AbsoluteTimeNow()
211 s.cached[s.wrPos] = query.Key() 263 hdr := &FileHeader{
212 s.wrPos = (s.wrPos + 1) % len(s.cached) 264 key: hex.EncodeToString(query.Key().Bits),
265 size: uint64(fpSize),
266 btype: btype,
267 expires: expire,
268 stored: now,
269 lastUsed: now,
270 usedCount: 1,
213 } 271 }
272 s.files[hdr.key] = hdr
214 return 273 return
215} 274}
216 275
@@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode) (string, string) {
258 return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] 317 return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:]
259} 318}
260 319
320// Prune list of file headers so we drop at least n entries.
321// returns number of removed entries.
322func (s *FileStore) prune(n int) (del int) {
323 // get list of headers; remove expired entries on the fly
324 list := make([]*FileHeader, 0)
325 for key, hdr := range s.files {
326 // remove expired entry
327 if hdr.expires.Expired() {
328 s.dropFile(key)
329 del++
330 }
331 // append to list
332 list = append(list, hdr)
333 }
334 // check if we are already done.
335 if del >= n {
336 return
337 }
338 // sort list by decending rate "(lifetime * size) / usedCount"
339 sort.Slice(list, func(i, j int) bool {
340 ri := (list[i].stored.Elapsed().Val * list[i].size) / list[i].usedCount
341 rj := (list[j].stored.Elapsed().Val * list[j].size) / list[j].usedCount
342 return ri > rj
343 })
344 // remove from start of list until prune limit is reached
345 for _, hdr := range list {
346 s.dropFile(hdr.key)
347 del++
348 if del == n {
349 break
350 }
351 }
352 return
353}
354
355// drop file removes a file from the internal list and the physical storage.
356func (s *FileStore) dropFile(key string) {
357 // remove for internal list
358 delete(s.files, key)
359 // remove from filesystem
360 path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:])
361 if err := os.Remove(path); err != nil {
362 logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error())
363 return
364 }
365}
366
261//------------------------------------------------------------ 367//------------------------------------------------------------
262// Redis: only use for caching purposes on key/value strings 368// Redis: only use for caching purposes on key/value strings
263//------------------------------------------------------------ 369//------------------------------------------------------------
@@ -269,16 +375,28 @@ type RedisStore struct {
269} 375}
270 376
271// NewRedisStore creates a Redis service client instance. 377// NewRedisStore creates a Redis service client instance.
272func NewRedisStore(addr, passwd, db string) (s KVStore, err error) { 378func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) {
273 kvs := new(RedisStore) 379 // get connection parameters
274 if kvs.db, err = strconv.Atoi(db); err != nil { 380 addr, ok := config.GetParam[string](spec, "addr")
275 err = ErrStoreInvalidSpec 381 if !ok {
276 return 382 return nil, ErrStoreInvalidSpec
383 }
384 passwd, ok := config.GetParam[string](spec, "passwd")
385 if !ok {
386 passwd = ""
277 } 387 }
388 db, ok := config.GetParam[int](spec, "db")
389 if !ok {
390 return nil, ErrStoreInvalidSpec
391 }
392
393 // create new Redis store
394 kvs := new(RedisStore)
395 kvs.db = db
278 kvs.client = redis.NewClient(&redis.Options{ 396 kvs.client = redis.NewClient(&redis.Options{
279 Addr: addr, 397 Addr: addr,
280 Password: passwd, 398 Password: passwd,
281 DB: kvs.db, 399 DB: db,
282 }) 400 })
283 if kvs.client == nil { 401 if kvs.client == nil {
284 err = ErrStoreNotAvailable 402 err = ErrStoreNotAvailable
@@ -328,11 +446,17 @@ type SQLStore struct {
328} 446}
329 447
330// NewSQLStore creates a new SQL-based key/value store. 448// NewSQLStore creates a new SQL-based key/value store.
331func NewSQLStore(spec string) (s KVStore, err error) { 449func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) {
450 // get connection parameters
451 connect, ok := config.GetParam[string](spec, "connect")
452 if !ok {
453 return nil, ErrStoreInvalidSpec
454 }
455 // create SQL store
332 kvs := new(SQLStore) 456 kvs := new(SQLStore)
333 457
334 // connect to SQL database 458 // connect to SQL database
335 kvs.db, err = util.DbPool.Connect(spec) 459 kvs.db, err = util.DbPool.Connect(connect)
336 if err != nil { 460 if err != nil {
337 return nil, err 461 return nil, err
338 } 462 }
@@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) {
343 return nil, ErrStoreNotAvailable 467 return nil, ErrStoreNotAvailable
344 } 468 }
345 return kvs, nil 469 return kvs, nil
346
347} 470}
348 471
349// Put a key/value pair into the store 472// Put a key/value pair into the store