diff --git a/app/api/api.go b/app/api/api.go index 7d734fe6..6a7f65cb 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1179,7 +1179,7 @@ func (a *api) start(ctx context.Context) error { var store restreamstore.Store = nil - { + if !cfg.Cluster.Enable { fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{ Root: cfg.DB.Dir, }) diff --git a/restream/restream.go b/restream/restream.go index 97dab80f..8825a12c 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -26,7 +26,6 @@ import ( "github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/rewrite" "github.com/datarhei/core/v16/restream/store" - jsonstore "github.com/datarhei/core/v16/restream/store/json" "github.com/Masterminds/semver/v3" ) @@ -144,22 +143,26 @@ func New(config Config) (Restreamer, error) { replace: config.Replace, rewrite: config.Rewrite, logger: config.Logger, + tasks: map[app.ProcessID]*task{}, + metadata: map[string]interface{}{}, } if r.logger == nil { r.logger = log.New("") } - if r.store == nil { - dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) - s, err := jsonstore.New(jsonstore.Config{ - Filesystem: dummyfs, - }) - if err != nil { - return nil, err + /* + if r.store == nil { + dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) + s, err := jsonstore.New(jsonstore.Config{ + Filesystem: dummyfs, + }) + if err != nil { + return nil, err + } + r.store = s } - r.store = s - } + */ if len(config.Filesystems) == 0 { return nil, fmt.Errorf("at least one filesystem must be provided") @@ -371,7 +374,12 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources } func (r *restream) load() error { + if r.store == nil { + return nil + } + data, err := r.store.Load() + if err != nil { return err } @@ -380,6 +388,7 @@ func (r *restream) load() error { skills := r.ffmpeg.Skills() ffversion := skills.FFmpeg.Version + if v, err := semver.NewVersion(ffversion); err == nil { // Remove the patch level for the constraint ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor()) @@ -418,6 +427,7 @@ func (r *restream) load() error { // Now that all tasks are defined and all placeholders are // replaced, we can resolve references and validate the // inputs and outputs. + for _, t := range tasks { t := t @@ -509,6 +519,10 @@ func (r *restream) load() error { } func (r *restream) save() { + if r.store == nil { + return + } + data := store.NewData() for tid, t := range r.tasks {