Use abstract filesystem for stores

This commit is contained in:
Ingo Oppermann
2023-02-01 16:09:20 +01:00
parent 49b16f44a8
commit 2a3288ffd0
316 changed files with 13512 additions and 17601 deletions

View File

@@ -34,7 +34,7 @@ import (
"github.com/datarhei/core/v16/restream"
restreamapp "github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/store"
restreamstore "github.com/datarhei/core/v16/restream/store"
"github.com/datarhei/core/v16/rtmp"
"github.com/datarhei/core/v16/service"
"github.com/datarhei/core/v16/session"
@@ -153,7 +153,8 @@ func (a *api) Reload() error {
logger := log.New("Core").WithOutput(log.NewConsoleWriter(a.log.writer, log.Lwarn, true))
store, err := configstore.NewJSON(a.config.path, func() {
rootfs, _ := fs.NewDiskFilesystem(fs.DiskConfig{})
store, err := configstore.NewJSON(rootfs, a.config.path, func() {
a.errorChan <- ErrConfigReload
})
if err != nil {
@@ -295,7 +296,13 @@ func (a *api) start() error {
}
if cfg.Sessions.Persist {
sessionConfig.PersistDir = filepath.Join(cfg.DB.Dir, "sessions")
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: filepath.Join(cfg.DB.Dir, "sessions"),
})
if err != nil {
return fmt.Errorf("unable to create filesystem for persisting sessions: %w", err)
}
sessionConfig.PersistFS = fs
}
sessions, err := session.New(sessionConfig)
@@ -374,11 +381,9 @@ func (a *api) start() error {
a.sessions = sessions
}
diskfs, err := fs.NewDiskFilesystem(fs.DiskConfig{
Name: "disk",
Dir: cfg.Storage.Disk.Dir,
Size: cfg.Storage.Disk.Size * 1024 * 1024,
Logger: a.log.logger.core.WithComponent("FS"),
diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: cfg.Storage.Disk.Dir,
Logger: a.log.logger.core.WithComponent("DiskFS"),
})
if err != nil {
return fmt.Errorf("disk filesystem: %w", err)
@@ -403,21 +408,27 @@ func (a *api) start() error {
}
if a.memfs == nil {
memfs := fs.NewMemFilesystem(fs.MemConfig{
Name: "mem",
Base: baseMemFS.String(),
Size: cfg.Storage.Memory.Size * 1024 * 1024,
Purge: cfg.Storage.Memory.Purge,
Logger: a.log.logger.core.WithComponent("FS"),
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{
Logger: a.log.logger.core.WithComponent("MemFS"),
})
a.memfs = memfs
memfs.SetMetadata("base", baseMemFS.String())
sizedfs, _ := fs.NewSizedFilesystem(memfs, cfg.Storage.Memory.Size*1024*1024, cfg.Storage.Memory.Purge)
a.memfs = sizedfs
} else {
a.memfs.Rebase(baseMemFS.String())
a.memfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024)
a.memfs.SetMetadata("base", baseMemFS.String())
if sizedfs, ok := a.memfs.(fs.SizedFilesystem); ok {
sizedfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024)
}
}
for _, s3 := range cfg.Storage.S3 {
if _, ok := a.s3fs[s3.Name]; ok {
return fmt.Errorf("the name '%s' for a s3 filesystem is already in use", s3.Name)
}
baseS3FS := url.URL{
Scheme: "http",
Path: s3.Mountpoint,
@@ -436,7 +447,6 @@ func (a *api) start() error {
s3fs, err := fs.NewS3Filesystem(fs.S3Config{
Name: s3.Name,
Base: baseS3FS.String(),
Endpoint: s3.Endpoint,
AccessKeyID: s3.AccessKeyID,
SecretAccessKey: s3.SecretAccessKey,
@@ -449,9 +459,7 @@ func (a *api) start() error {
return fmt.Errorf("s3 filesystem (%s): %w", s3.Name, err)
}
if _, ok := a.s3fs[s3.Name]; ok {
return fmt.Errorf("the name '%s' for a filesystem is already in use", s3.Name)
}
s3fs.SetMetadata("base", baseS3FS.String())
a.s3fs[s3.Name] = s3fs
}
@@ -495,23 +503,23 @@ func (a *api) start() error {
{
a.replacer.RegisterTemplateFunc("diskfs", func(config *restreamapp.Config, section string) string {
return a.diskfs.Base()
return a.diskfs.Metadata("base")
}, nil)
a.replacer.RegisterTemplateFunc("fs:disk", func(config *restreamapp.Config, section string) string {
return a.diskfs.Base()
return a.diskfs.Metadata("base")
}, nil)
a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string {
return a.memfs.Base()
return a.memfs.Metadata("base")
}, nil)
a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string {
return a.memfs.Base()
return a.memfs.Metadata("base")
}, nil)
for name, s3 := range a.s3fs {
a.replacer.RegisterTemplate("fs:"+name, s3.Base(), nil)
a.replacer.RegisterTemplate("fs:"+name, s3.Metadata("base"), nil)
}
a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string {
@@ -567,11 +575,24 @@ func (a *api) start() error {
filesystems = append(filesystems, fs)
}
store := store.NewJSONStore(store.JSONConfig{
Filepath: cfg.DB.Dir + "/db.json",
FFVersion: a.ffmpeg.Skills().FFmpeg.Version,
Logger: a.log.logger.core.WithComponent("ProcessStore"),
})
var store restreamstore.Store = nil
{
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: cfg.DB.Dir,
})
if err != nil {
return err
}
store, err = restreamstore.NewJSON(restreamstore.JSONConfig{
Filesystem: fs,
Filepath: "/db.json",
Logger: a.log.logger.core.WithComponent("ProcessStore"),
})
if err != nil {
return err
}
}
restream, err := restream.New(restream.Config{
ID: cfg.ID,
@@ -645,8 +666,8 @@ func (a *api) start() error {
metrics.Register(monitor.NewCPUCollector())
metrics.Register(monitor.NewMemCollector())
metrics.Register(monitor.NewNetCollector())
metrics.Register(monitor.NewDiskCollector(a.diskfs.Base()))
metrics.Register(monitor.NewFilesystemCollector("diskfs", diskfs))
metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base")))
metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs))
metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs))
for name, fs := range a.s3fs {
metrics.Register(monitor.NewFilesystemCollector(name, fs))
@@ -1395,7 +1416,7 @@ func (a *api) Destroy() {
// Free the MemFS
if a.memfs != nil {
a.memfs.DeleteAll()
a.memfs.RemoveAll()
a.memfs = nil
}
}

View File

@@ -9,6 +9,7 @@ import (
cfgvars "github.com/datarhei/core/v16/config/vars"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/io/file"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/store"
@@ -24,7 +25,9 @@ func main() {
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
diskfs, _ := fs.NewDiskFilesystem(fs.DiskConfig{})
configstore, err := cfgstore.NewJSON(diskfs, configfile, nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
@@ -117,9 +120,12 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
logger.Info().WithField("backup", backupFilepath).Log("Backup created")
// Load the existing DB
datastore := store.NewJSONStore(store.JSONConfig{
datastore, err := store.NewJSON(store.JSONConfig{
Filepath: cfg.DB.Dir + "/db.json",
})
if err != nil {
return err
}
data, err := datastore.Load()
if err != nil {

View File

@@ -17,6 +17,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/store"
@@ -495,14 +496,14 @@ type importConfigAudio struct {
sampling string
}
func importV1(path string, cfg importConfig) (store.StoreData, error) {
func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.StoreData, error) {
if len(cfg.id) == 0 {
cfg.id = uuid.New().String()
}
r := store.NewStoreData()
jsondata, err := os.ReadFile(path)
jsondata, err := fs.ReadFile(path)
if err != nil {
return r, fmt.Errorf("failed to read data from %s: %w", path, err)
}
@@ -1417,9 +1418,19 @@ func probeInput(binary string, config app.Config) app.Probe {
return app.Probe{}
}
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
store, err := store.NewJSON(store.JSONConfig{
Filesystem: dummyfs,
Filepath: "/",
Logger: nil,
})
if err != nil {
return app.Probe{}
}
rs, err := restream.New(restream.Config{
FFmpeg: ffmpeg,
Store: store.NewDummyStore(store.DummyConfig{}),
Store: store,
})
if err != nil {
return app.Probe{}

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream/store"
"github.com/stretchr/testify/require"
@@ -36,8 +37,13 @@ import (
var id string = "4186b095-7f0a-4e94-8c3d-f17459ab252f"
func testV1Import(t *testing.T, v1Fixture, v4Fixture string, config importConfig) {
diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: ".",
})
require.NoError(t, err)
// Import v1 database
v4, err := importV1(v1Fixture, config)
v4, err := importV1(diskfs, v1Fixture, config)
require.Equal(t, nil, err)
// Reset variants
@@ -50,7 +56,7 @@ func testV1Import(t *testing.T, v1Fixture, v4Fixture string, config importConfig
require.Equal(t, nil, err)
// Read the wanted result
wantdatav4, err := os.ReadFile(v4Fixture)
wantdatav4, err := diskfs.ReadFile(v4Fixture)
require.Equal(t, nil, err)
var wantv4 store.StoreData

View File

@@ -6,6 +6,7 @@ import (
cfgstore "github.com/datarhei/core/v16/config/store"
cfgvars "github.com/datarhei/core/v16/config/vars"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/store"
@@ -17,18 +18,24 @@ func main() {
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
diskfs, err := fs.NewDiskFilesystem(fs.DiskConfig{})
if err != nil {
logger.Error().WithError(err).Log("Access disk filesystem failed")
os.Exit(1)
}
configstore, err := cfgstore.NewJSON(diskfs, configfile, nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
}
if err := doImport(logger, configstore); err != nil {
if err := doImport(logger, diskfs, configstore); err != nil {
os.Exit(1)
}
}
func doImport(logger log.Logger, configstore cfgstore.Store) error {
func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) error {
if logger == nil {
logger = log.New("")
}
@@ -67,23 +74,27 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
logger = logger.WithField("database", v1filename)
if _, err := os.Stat(v1filename); err != nil {
if _, err := fs.Stat(v1filename); err != nil {
if os.IsNotExist(err) {
logger.Info().Log("Database doesn't exist and nothing will be imported")
return nil
}
logger.Error().WithError(err).Log("Checking for v1 database")
return fmt.Errorf("checking for v1 database: %w", err)
}
logger.Info().Log("Found database")
// Load an existing DB
datastore := store.NewJSONStore(store.JSONConfig{
Filepath: cfg.DB.Dir + "/db.json",
datastore, err := store.NewJSON(store.JSONConfig{
Filesystem: fs,
Filepath: cfg.DB.Dir + "/db.json",
})
if err != nil {
logger.Error().WithError(err).Log("Creating datastore for new database failed")
return fmt.Errorf("creating datastore for new database failed: %w", err)
}
data, err := datastore.Load()
if err != nil {
@@ -105,7 +116,7 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
importConfig.binary = cfg.FFmpeg.Binary
// Rewrite the old database to the new database
r, err := importV1(v1filename, importConfig)
r, err := importV1(fs, v1filename, importConfig)
if err != nil {
logger.Error().WithError(err).Log("Importing database failed")
return fmt.Errorf("importing database failed: %w", err)

View File

@@ -1,20 +1,29 @@
package main
import (
"strings"
"testing"
"github.com/datarhei/core/v16/config/store"
"github.com/datarhei/core/v16/io/fs"
"github.com/stretchr/testify/require"
)
func TestImport(t *testing.T) {
configstore := store.NewDummy()
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
memfs.WriteFileReader("/mime.types", strings.NewReader("foobar"))
configstore, err := store.NewJSON(memfs, "/config.json", nil)
require.NoError(t, err)
cfg := configstore.Get()
err := configstore.Set(cfg)
err = configstore.Set(cfg)
require.NoError(t, err)
err = doImport(nil, configstore)
err = doImport(nil, memfs, configstore)
require.NoError(t, err)
}