Disable locally persisting DB in cluster mode

This commit is contained in:
Ingo Oppermann
2024-07-16 14:01:31 +02:00
parent 3d78122053
commit 96f7d8030c
2 changed files with 25 additions and 11 deletions

View File

@@ -1179,7 +1179,7 @@ func (a *api) start(ctx context.Context) error {
var store restreamstore.Store = nil var store restreamstore.Store = nil
{ if !cfg.Cluster.Enable {
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{ fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: cfg.DB.Dir, Root: cfg.DB.Dir,
}) })

View File

@@ -26,7 +26,6 @@ import (
"github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/rewrite" "github.com/datarhei/core/v16/restream/rewrite"
"github.com/datarhei/core/v16/restream/store" "github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
) )
@@ -144,12 +143,15 @@ func New(config Config) (Restreamer, error) {
replace: config.Replace, replace: config.Replace,
rewrite: config.Rewrite, rewrite: config.Rewrite,
logger: config.Logger, logger: config.Logger,
tasks: map[app.ProcessID]*task{},
metadata: map[string]interface{}{},
} }
if r.logger == nil { if r.logger == nil {
r.logger = log.New("") r.logger = log.New("")
} }
/*
if r.store == nil { if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := jsonstore.New(jsonstore.Config{ s, err := jsonstore.New(jsonstore.Config{
@@ -160,6 +162,7 @@ func New(config Config) (Restreamer, error) {
} }
r.store = s r.store = s
} }
*/
if len(config.Filesystems) == 0 { if len(config.Filesystems) == 0 {
return nil, fmt.Errorf("at least one filesystem must be provided") return nil, fmt.Errorf("at least one filesystem must be provided")
@@ -371,7 +374,12 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
} }
func (r *restream) load() error { func (r *restream) load() error {
if r.store == nil {
return nil
}
data, err := r.store.Load() data, err := r.store.Load()
if err != nil { if err != nil {
return err return err
} }
@@ -380,6 +388,7 @@ func (r *restream) load() error {
skills := r.ffmpeg.Skills() skills := r.ffmpeg.Skills()
ffversion := skills.FFmpeg.Version ffversion := skills.FFmpeg.Version
if v, err := semver.NewVersion(ffversion); err == nil { if v, err := semver.NewVersion(ffversion); err == nil {
// Remove the patch level for the constraint // Remove the patch level for the constraint
ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor()) ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor())
@@ -418,6 +427,7 @@ func (r *restream) load() error {
// Now that all tasks are defined and all placeholders are // Now that all tasks are defined and all placeholders are
// replaced, we can resolve references and validate the // replaced, we can resolve references and validate the
// inputs and outputs. // inputs and outputs.
for _, t := range tasks { for _, t := range tasks {
t := t t := t
@@ -509,6 +519,10 @@ func (r *restream) load() error {
} }
func (r *restream) save() { func (r *restream) save() {
if r.store == nil {
return
}
data := store.NewData() data := store.NewData()
for tid, t := range r.tasks { for tid, t := range r.tasks {