From 4261e07def81e7c3eb183b9d5c4059a2e9c53759 Mon Sep 17 00:00:00 2001 From: Bernd Fix Date: Sat, 11 Jun 2022 20:21:38 +0200 Subject: File/block caching strategy improved. --- src/gnunet/config/config.go | 33 +++- src/gnunet/config/gnunet-config.json | 25 ++- src/gnunet/service/dht/dhtstore_test.go | 19 ++- src/gnunet/service/dht/module.go | 4 - src/gnunet/service/store.go | 293 +++++++++++++++++++++++--------- 5 files changed, 266 insertions(+), 108 deletions(-) (limited to 'src') diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go index a92bdd6..b4d2840 100644 --- a/src/gnunet/config/config.go +++ b/src/gnunet/config/config.go @@ -93,15 +93,34 @@ type GNSConfig struct { MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution } +//---------------------------------------------------------------------- +// Generic parameter configuration (handle any key/value settings) +//---------------------------------------------------------------------- + +// ParameterConfig handle arbitrary values for a key strings. This necessary +// e.g. in the 'Storage' configuration, as custom storage implementations +// require different sets of parameters. +type ParameterConfig map[string]any + +// Get a parameter value with given type 'V' +func GetParam[V any](params ParameterConfig, key string) (i V, ok bool) { + var v any + if v, ok = params[key]; ok { + if i, ok = v.(V); ok { + return + } + } + return +} + //---------------------------------------------------------------------- // DHT configuration //---------------------------------------------------------------------- // DHTConfig contains parameters for the distributed hash table (DHT) type DHTConfig struct { - Service *ServiceConfig `json:"service"` // socket for DHT service - Storage string `json:"storage"` // filesystem storage location - Cache string `json:"cache"` // key/value cache + Service *ServiceConfig `json:"service"` // socket for DHT service + Storage ParameterConfig `json:"storage"` // filesystem storage location } //---------------------------------------------------------------------- @@ -110,8 +129,8 @@ type DHTConfig struct { // NamecacheConfig contains parameters for the local name cache type NamecacheConfig struct { - Service *ServiceConfig `json:"service"` // socket for Namecache service - Storage string `json:"storage"` // key/value cache + Service *ServiceConfig `json:"service"` // socket for Namecache service + Storage ParameterConfig `json:"storage"` // key/value cache } //---------------------------------------------------------------------- @@ -120,8 +139,8 @@ type NamecacheConfig struct { // RevocationConfig contains parameters for the key revocation service type RevocationConfig struct { - Service *ServiceConfig `json:"service"` // socket for Revocation service - Storage string `json:"storage"` // persistance mechanism for revocation data + Service *ServiceConfig `json:"service"` // socket for Revocation service + Storage ParameterConfig `json:"storage"` // persistance mechanism for revocation data } //---------------------------------------------------------------------- diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json index 7927cda..167bfa0 100644 --- a/src/gnunet/config/gnunet-config.json +++ b/src/gnunet/config/gnunet-config.json @@ -11,7 +11,7 @@ { "id": "test", "network": "ip+udp", - "address": "upnp:192.168.178.1", + "address": "upnp:192.168.134.1", "port": 6666, "ttl": 86400 } @@ -28,8 +28,12 @@ "perm": "0770" } }, - "storage": "dht_file_store+/var/lib/gnunet/dht/store", - "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" + "storage": { + "mode": "file", + "cache": false, + "path": "/var/lib/gnunet/dht/store", + "maxGB": 10 + } }, "gns": { "service": { @@ -48,7 +52,13 @@ "perm": "0770" } }, - "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" + "storage": { + "mode": "file", + "cache": true, + "path": "/var/lib/gnunet/namecache", + "num": 1000, + "expire": 43200 + } }, "revocation": { "service": { @@ -57,7 +67,12 @@ "perm": "0770" } }, - "storage": "redis:localhost:6397::15" + "storage": { + "mode": "redis", + "addr": "localhost:6397", + "passwd": "", + "id": 15 + } }, "rpc": { "endpoint": "tcp:127.0.0.1:80" diff --git a/src/gnunet/service/dht/dhtstore_test.go b/src/gnunet/service/dht/dhtstore_test.go index 3cb8080..d9fc1d0 100644 --- a/src/gnunet/service/dht/dhtstore_test.go +++ b/src/gnunet/service/dht/dhtstore_test.go @@ -20,9 +20,10 @@ package dht import ( "encoding/hex" + "gnunet/config" "gnunet/crypto" "gnunet/service" - blocks "gnunet/service/dht/blocks" + "gnunet/service/dht/blocks" "math/rand" "testing" ) @@ -37,8 +38,15 @@ const ( // each block from storage and checks for matching hash. func TestDHTFilesStore(t *testing.T) { + // test configuration + cfg := make(config.ParameterConfig) + cfg["mode"] = "file" + cfg["cache"] = false + cfg["path"] = "/var/lib/gnunet/dht/store" + cfg["maxGB"] = 10 + // create file store - fs, err := service.NewFileCache("/var/lib/gnunet/dht/cache", "100") + fs, err := service.NewFileStore(cfg) if err != nil { t.Fatal(err) } @@ -48,33 +56,30 @@ func TestDHTFilesStore(t *testing.T) { // First round: save blocks for i := 0; i < fsNumBlocks; i++ { // generate random block - size := 20 // 1024 + rand.Intn(62000) + size := 1024 + rand.Intn(62000) buf := make([]byte, size) rand.Read(buf) val := blocks.NewGenericBlock(buf) // generate associated key k := crypto.Hash(buf).Bits key := blocks.NewGenericQuery(k) - t.Logf("> %d: %s -- %s", i, hex.EncodeToString(k), hex.EncodeToString(buf)) // store block if err := fs.Put(key, val); err != nil { t.Fatal(err) } - // remember key keys = append(keys, key) } // Second round: retrieve blocks and check - for i, key := range keys { + for _, key := range keys { // get block val, err := fs.Get(key) if err != nil { t.Fatal(err) } buf := val.Data() - t.Logf("< %d: %s -- %s", i, hex.EncodeToString(key.Key().Bits), hex.EncodeToString(buf)) // re-create key k := crypto.Hash(buf) diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index 34bf020..612d0ec 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go @@ -57,10 +57,6 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module, err error) { if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil { return } - // create cache handler - if cache, err = service.NewDHTStore(config.Cfg.DHT.Cache); err != nil { - return - } // create routing table rt := NewRoutingTable(NewPeerAddress(c.PeerID())) diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go index c599c54..5de5415 100644 --- a/src/gnunet/service/store.go +++ b/src/gnunet/service/store.go @@ -22,17 +22,21 @@ import ( "context" "database/sql" "encoding/binary" + "encoding/gob" "encoding/hex" "errors" "fmt" + "gnunet/config" "gnunet/crypto" "gnunet/service/dht/blocks" "gnunet/util" + "io" "io/ioutil" "os" - "strconv" - "strings" + "sort" + "sync" + "github.com/bfix/gospel/logger" redis "github.com/go-redis/redis/v8" ) @@ -46,22 +50,20 @@ var ( //------------------------------------------------------------ // Generic storage interface. Can be used for persistent or // transient (caching) storage of key/value data. -// One set of methods (Get/Put) work on DHT queries and blocks, -// the other set (GetS, PutS) work on key/value strings. -// Each custom implementation can decide which sets to support. //------------------------------------------------------------ // Store is a key/value storage where the type of the key is either // a SHA512 hash value or a string and the value is either a DHT -// block or a string. +// block or a string. It is possiblle to mix any key/value types, +// but not used in this implementation. type Store[K, V any] interface { - // Put block into storage under given key + // Put value into storage under given key Put(key K, val V) error - // Get block with given key from storage + // Get value with given key from storage Get(key K) (V, error) - // List all store queries + // List all store keys List() ([]K, error) } @@ -78,22 +80,18 @@ type KVStore Store[string, string] //------------------------------------------------------------ // NewDHTStore creates a new storage handler with given spec // for use with DHT queries and blocks -func NewDHTStore(spec string) (DHTStore, error) { - specs := strings.Split(spec, ":") - if len(specs) < 2 { +func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) { + // get the mode parameter + mode, ok := config.GetParam[string](spec, "mode") + if !ok { return nil, ErrStoreInvalidSpec } - switch specs[0] { + switch mode { //------------------------------------------------------------------ // File-base storage //------------------------------------------------------------------ - case "file_store": - return NewFileStore(specs[1]) - case "file_cache": - if len(specs) < 3 { - return nil, ErrStoreInvalidSpec - } - return NewFileCache(specs[1], specs[2]) + case "file": + return NewFileStore(spec) } return nil, ErrStoreUnknown } @@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) { //------------------------------------------------------------ // NewKVStore creates a new storage handler with given spec // for use with key/value string pairs. -func NewKVStore(spec string) (KVStore, error) { - specs := strings.SplitN(spec, ":", 2) - if len(specs) < 2 { +func NewKVStore(spec config.ParameterConfig) (KVStore, error) { + // get the mode parameter + mode, ok := config.GetParam[string](spec, "mode") + if !ok { return nil, ErrStoreInvalidSpec } - switch specs[0] { + switch mode { //-------------------------------------------------------------- // Redis service //-------------------------------------------------------------- case "redis": - if len(specs) < 4 { - return nil, ErrStoreInvalidSpec - } - return NewRedisStore(specs[1], specs[2], specs[3]) + return NewRedisStore(spec) //-------------------------------------------------------------- // SQL database service //-------------------------------------------------------------- case "sql": - if len(specs) < 4 { - return nil, ErrStoreInvalidSpec - } - return NewSQLStore(specs[1]) + return NewSQLStore(spec) } return nil, errors.New("unknown storage mechanism") } @@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) { // Filesystem-based storage //------------------------------------------------------------ +// FileHeader is the layout of a file managed by the storage handler. +// On start-up the file store recreates the list of file entries from +// traversing the actual filesystem. This is done in the background. +type FileHeader struct { + key string // storage key + size uint64 // size of file + btype uint16 // block type + stored util.AbsoluteTime // time added to store + expires util.AbsoluteTime // expiration time + lastUsed util.AbsoluteTime // time last used + usedCount uint64 // usage count +} + // FileStore implements a filesystem-based storage mechanism for // DHT queries and blocks. type FileStore struct { - path string // storage path - cached []*crypto.HashCode // list of cached entries (key) - wrPos int // write position in cyclic list + path string // storage path + cache bool // storage works as cache + args config.ParameterConfig // arguments / settings + + totalSize uint64 // total storage size (logical, not physical) + files map[string]*FileHeader // list of file headers + wrPos int // write position in cyclic list + mtx sync.Mutex // serialize operations (prune) } // NewFileStore instantiates a new file storage. -func NewFileStore(path string) (DHTStore, error) { - // create file store - return &FileStore{ - path: path, - }, nil -} - -// NewFileCache instantiates a new file-based cache. -func NewFileCache(path, param string) (DHTStore, error) { +func NewFileStore(spec config.ParameterConfig) (DHTStore, error) { + // get path parameter + path, ok := config.GetParam[string](spec, "path") + if !ok { + return nil, ErrStoreInvalidSpec + } + isCache, ok := config.GetParam[bool](spec, "cache") + if !ok { + isCache = false + } // remove old cache content - os.RemoveAll(path) + if isCache { + os.RemoveAll(path) + } + // create file store handler + fs := &FileStore{ + path: path, + args: spec, + cache: isCache, + files: make(map[string]*FileHeader), + } + // load file header list + if !isCache { + if fp, err := os.Open(path + "/files.db"); err == nil { + dec := gob.NewDecoder(fp) + for { + hdr := new(FileHeader) + if dec.Decode(hdr) != nil { + if err != io.EOF { + return nil, err + } + break + } + fs.files[hdr.key] = hdr + fs.totalSize += hdr.size + } + fp.Close() + } + } + return fs, nil +} - // get number of cache entries - num, err := strconv.ParseUint(param, 10, 32) - if err != nil { - return nil, err +// Close file storage. write metadata to file +func (s *FileStore) Close() (err error) { + if !s.cache { + if fp, err := os.Create(s.path + "/files.db"); err == nil { + defer fp.Close() + enc := gob.NewEncoder(fp) + for _, hdr := range s.files { + if err = enc.Encode(hdr); err != nil { + break + } + } + } } - // create file store - return &FileStore{ - path: path, - cached: make([]*crypto.HashCode, num), - wrPos: 0, - }, nil + return } // Put block into storage under given key func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { + // check for free space + if s.cache { + // caching is limited by explicit number of files + num, ok := config.GetParam[int](s.args, "num") + if !ok { + num = 100 + } + if len(s.files) >= num { + // make space for at least one new entry + s.prune(1) + } + } else { + // normal storage is limited by quota (default: 10GB) + max, ok := config.GetParam[int](s.args, "maxGB") + if !ok { + max = 10 + } + if int(s.totalSize>>30) > max { + // drop a significant number of blocks + s.prune(20) + } + } // get query parameters for entry - var ( - btype uint16 // block type - expire util.AbsoluteTime // block expiration - ) + var btype uint16 // block type query.Get("blkType", &btype) + var expire util.AbsoluteTime // block expiration query.Get("expire", &expire) - // are we in caching mode? - if s.cached != nil { - // release previous block if defined - if key := s.cached[s.wrPos]; key != nil { - // get path and filename from key - path, fname := s.expandPath(key) - if err = os.Remove(path + "/" + fname); err != nil { - return - } - // free slot - s.cached[s.wrPos] = nil - } - } // get path and filename from key path, fname := s.expandPath(query.Key()) // make sure the path exists @@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { } // write to file for storage var fp *os.File + var fpSize int if fp, err = os.Create(path + "/" + fname); err == nil { defer fp.Close() // write block data @@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { } } } - // update cache list - if s.cached != nil { - s.cached[s.wrPos] = query.Key() - s.wrPos = (s.wrPos + 1) % len(s.cached) + // add header to internal list + now := util.AbsoluteTimeNow() + hdr := &FileHeader{ + key: hex.EncodeToString(query.Key().Bits), + size: uint64(fpSize), + btype: btype, + expires: expire, + stored: now, + lastUsed: now, + usedCount: 1, } + s.files[hdr.key] = hdr return } @@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode) (string, string) { return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] } +// Prune list of file headers so we drop at least n entries. +// returns number of removed entries. +func (s *FileStore) prune(n int) (del int) { + // get list of headers; remove expired entries on the fly + list := make([]*FileHeader, 0) + for key, hdr := range s.files { + // remove expired entry + if hdr.expires.Expired() { + s.dropFile(key) + del++ + } + // append to list + list = append(list, hdr) + } + // check if we are already done. + if del >= n { + return + } + // sort list by decending rate "(lifetime * size) / usedCount" + sort.Slice(list, func(i, j int) bool { + ri := (list[i].stored.Elapsed().Val * list[i].size) / list[i].usedCount + rj := (list[j].stored.Elapsed().Val * list[j].size) / list[j].usedCount + return ri > rj + }) + // remove from start of list until prune limit is reached + for _, hdr := range list { + s.dropFile(hdr.key) + del++ + if del == n { + break + } + } + return +} + +// drop file removes a file from the internal list and the physical storage. +func (s *FileStore) dropFile(key string) { + // remove for internal list + delete(s.files, key) + // remove from filesystem + path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:]) + if err := os.Remove(path); err != nil { + logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) + return + } +} + //------------------------------------------------------------ // Redis: only use for caching purposes on key/value strings //------------------------------------------------------------ @@ -269,16 +375,28 @@ type RedisStore struct { } // NewRedisStore creates a Redis service client instance. -func NewRedisStore(addr, passwd, db string) (s KVStore, err error) { - kvs := new(RedisStore) - if kvs.db, err = strconv.Atoi(db); err != nil { - err = ErrStoreInvalidSpec - return +func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) { + // get connection parameters + addr, ok := config.GetParam[string](spec, "addr") + if !ok { + return nil, ErrStoreInvalidSpec + } + passwd, ok := config.GetParam[string](spec, "passwd") + if !ok { + passwd = "" } + db, ok := config.GetParam[int](spec, "db") + if !ok { + return nil, ErrStoreInvalidSpec + } + + // create new Redis store + kvs := new(RedisStore) + kvs.db = db kvs.client = redis.NewClient(&redis.Options{ Addr: addr, Password: passwd, - DB: kvs.db, + DB: db, }) if kvs.client == nil { err = ErrStoreNotAvailable @@ -328,11 +446,17 @@ type SQLStore struct { } // NewSQLStore creates a new SQL-based key/value store. -func NewSQLStore(spec string) (s KVStore, err error) { +func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) { + // get connection parameters + connect, ok := config.GetParam[string](spec, "connect") + if !ok { + return nil, ErrStoreInvalidSpec + } + // create SQL store kvs := new(SQLStore) // connect to SQL database - kvs.db, err = util.DbPool.Connect(spec) + kvs.db, err = util.DbPool.Connect(connect) if err != nil { return nil, err } @@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) { return nil, ErrStoreNotAvailable } return kvs, nil - } // Put a key/value pair into the store -- cgit v1.2.3