diff --git a/app/import/main_test.go b/app/import/main_test.go index 4f132eb0..305110f9 100644 --- a/app/import/main_test.go +++ b/app/import/main_test.go @@ -15,6 +15,7 @@ func TestImport(t *testing.T) { require.NoError(t, err) memfs.WriteFileReader("/mime.types", strings.NewReader("foobar")) + memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("foobar")) configstore, err := store.NewJSON(memfs, "/config.json", nil) require.NoError(t, err) diff --git a/config/config.go b/config/config.go index 1a0e92fa..b6eab656 100644 --- a/config/config.go +++ b/config/config.go @@ -228,7 +228,7 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.SRT.Log.Topics, []string{}, ","), "srt.log.topics", "CORE_SRT_LOG_TOPICS", nil, "List of topics to log", false, false) // FFmpeg - d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg"), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) + d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) d.vars.Register(value.NewInt64(&d.FFmpeg.MaxProcesses, 0), "ffmpeg.max_processes", "CORE_FFMPEG_MAXPROCESSES", nil, "Max. allowed simultaneously running ffmpeg instances, 0 for unlimited", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Allow, []string{}, " "), "ffmpeg.access.input.allow", "CORE_FFMPEG_ACCESS_INPUT_ALLOW", nil, "List of allowed expression to match against the input addresses", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Block, []string{}, " "), "ffmpeg.access.input.block", "CORE_FFMPEG_ACCESS_INPUT_BLOCK", nil, "List of blocked expression to match against the input addresses", false, false) diff --git a/config/config_test.go b/config/config_test.go index c3420436..132857fe 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -57,12 +57,17 @@ func TestConfigCopy(t *testing.T) { } func TestValidateDefault(t *testing.T) { - fs, _ := fs.NewMemFilesystem(fs.MemConfig{}) + fs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + size, fresh, err := fs.WriteFileReader("./mime.types", strings.NewReader("xxxxx")) require.Equal(t, int64(5), size) require.Equal(t, true, fresh) require.NoError(t, err) + _, _, err = fs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx")) + require.NoError(t, err) + cfg := New(fs) cfg.Validate(true) diff --git a/config/v1/config.go b/config/v1/config.go index 31dabce3..022edfe9 100644 --- a/config/v1/config.go +++ b/config/v1/config.go @@ -195,7 +195,7 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.SRT.Log.Topics, []string{}, ","), "srt.log.topics", "CORE_SRT_LOG_TOPICS", nil, "List of topics to log", false, false) // FFmpeg - d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg"), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) + d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) d.vars.Register(value.NewInt64(&d.FFmpeg.MaxProcesses, 0), "ffmpeg.max_processes", "CORE_FFMPEG_MAXPROCESSES", nil, "Max. allowed simultaneously running ffmpeg instances, 0 for unlimited", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Allow, []string{}, " "), "ffmpeg.access.input.allow", "CORE_FFMPEG_ACCESS_INPUT_ALLOW", nil, "List of allowed expression to match against the input addresses", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Block, []string{}, " "), "ffmpeg.access.input.block", "CORE_FFMPEG_ACCESS_INPUT_BLOCK", nil, "List of blocked expression to match against the input addresses", false, false) diff --git a/config/v2/config.go b/config/v2/config.go index 1bbe9ed0..e1bfb0cb 100644 --- a/config/v2/config.go +++ b/config/v2/config.go @@ -196,7 +196,7 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.SRT.Log.Topics, []string{}, ","), "srt.log.topics", "CORE_SRT_LOG_TOPICS", nil, "List of topics to log", false, false) // FFmpeg - d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg"), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) + d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) d.vars.Register(value.NewInt64(&d.FFmpeg.MaxProcesses, 0), "ffmpeg.max_processes", "CORE_FFMPEG_MAXPROCESSES", nil, "Max. allowed simultaneously running ffmpeg instances, 0 for unlimited", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Allow, []string{}, " "), "ffmpeg.access.input.allow", "CORE_FFMPEG_ACCESS_INPUT_ALLOW", nil, "List of allowed expression to match against the input addresses", false, false) d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Input.Block, []string{}, " "), "ffmpeg.access.input.block", "CORE_FFMPEG_ACCESS_INPUT_BLOCK", nil, "List of blocked expression to match against the input addresses", false, false) diff --git a/config/value/os.go b/config/value/os.go index 0080bfb8..6f57c1b3 100644 --- a/config/value/os.go +++ b/config/value/os.go @@ -2,7 +2,6 @@ package value import ( "fmt" - "os/exec" "path/filepath" "strings" @@ -115,27 +114,35 @@ func (u *Dir) IsEmpty() bool { // executable -type Exec string +type Exec struct { + p *string + fs fs.Filesystem +} + +func NewExec(p *string, val string, fs fs.Filesystem) *Exec { + v := &Exec{ + p: p, + fs: fs, + } -func NewExec(p *string, val string) *Exec { *p = val - return (*Exec)(p) + return v } func (u *Exec) Set(val string) error { - *u = Exec(val) + *u.p = val return nil } func (u *Exec) String() string { - return string(*u) + return *u.p } func (u *Exec) Validate() error { - val := string(*u) + val := *u.p - _, err := exec.LookPath(val) + _, err := u.fs.LookPath(val) if err != nil { return fmt.Errorf("%s not found or is not executable", val) } @@ -144,7 +151,7 @@ func (u *Exec) Validate() error { } func (u *Exec) IsEmpty() bool { - return len(string(*u)) == 0 + return len(*u.p) == 0 } // regular file diff --git a/config/value/os_test.go b/config/value/os_test.go index dd01317a..1706ba94 100644 --- a/config/value/os_test.go +++ b/config/value/os_test.go @@ -3,9 +3,124 @@ package value import ( "testing" + "github.com/datarhei/core/v16/io/fs" "github.com/stretchr/testify/require" ) +func TestMustDirValue(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + _, err = memfs.Stat("/foobar") + require.Error(t, err) + + var x string + + val := NewMustDir(&x, "./foobar", memfs) + + require.Equal(t, "./foobar", val.String()) + require.NoError(t, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + info, err := memfs.Stat("/foobar") + require.NoError(t, err) + require.True(t, info.IsDir()) + + x = "/bar/foo" + + require.Equal(t, "/bar/foo", val.String()) + + _, err = memfs.Stat("/bar/foo") + require.Error(t, err) + + require.NoError(t, val.Validate()) + + info, err = memfs.Stat("/bar/foo") + require.NoError(t, err) + require.True(t, info.IsDir()) + + memfs.WriteFile("/foo/bar", []byte("hello")) + + val.Set("/foo/bar") + + require.Error(t, val.Validate()) +} + +func TestDirValue(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + var x string + + val := NewDir(&x, "/foobar", memfs) + + require.Equal(t, "/foobar", val.String()) + require.Error(t, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + err = memfs.MkdirAll("/foobar", 0755) + require.NoError(t, err) + + require.NoError(t, val.Validate()) + + _, _, err = memfs.WriteFile("/foo/bar", []byte("hello")) + require.NoError(t, err) + + val.Set("/foo/bar") + + require.Error(t, val.Validate()) +} + +func TestFileValue(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + var x string + + val := NewFile(&x, "/foobar", memfs) + + require.Equal(t, "/foobar", val.String()) + require.Error(t, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + _, _, err = memfs.WriteFile("/foobar", []byte("hello")) + require.NoError(t, err) + + require.NoError(t, val.Validate()) + + err = memfs.MkdirAll("/foo/bar", 0755) + require.NoError(t, err) + + val.Set("/foo/bar") + + require.Error(t, val.Validate()) +} + +func TestExecValue(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + var x string + + val := NewExec(&x, "/foobar", memfs) + + require.Equal(t, "/foobar", val.String()) + require.Error(t, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + _, _, err = memfs.WriteFile("/foobar", []byte("hello")) + require.NoError(t, err) + + require.NoError(t, val.Validate()) + + err = memfs.MkdirAll("/foo/bar", 0755) + require.NoError(t, err) + + val.Set("/foo/bar") + + require.Error(t, val.Validate()) +} + func TestAbsolutePathValue(t *testing.T) { var x string diff --git a/http/handler/api/config_test.go b/http/handler/api/config_test.go index 9815018d..0410eaf2 100644 --- a/http/handler/api/config_test.go +++ b/http/handler/api/config_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "net/http" + "strings" "testing" "github.com/datarhei/core/v16/config" @@ -21,6 +22,12 @@ func getDummyConfigRouter(t *testing.T) (*echo.Echo, store.Store) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) + _, _, err = memfs.WriteFileReader("./mime.types", strings.NewReader("xxxxx")) + require.NoError(t, err) + + _, _, err = memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx")) + require.NoError(t, err) + config, err := store.NewJSON(memfs, "/config.json", nil) require.NoError(t, err) @@ -43,10 +50,13 @@ func TestConfigGet(t *testing.T) { func TestConfigSetConflict(t *testing.T) { router, _ := getDummyConfigRouter(t) + cfg := config.New(nil) + cfg.Storage.MimeTypes = "/path/to/mime.types" + var data bytes.Buffer encoder := json.NewEncoder(&data) - encoder.Encode(config.New(nil)) + encoder.Encode(cfg) mock.Request(t, http.StatusConflict, router, "PUT", "/", &data) } @@ -63,10 +73,8 @@ func TestConfigSet(t *testing.T) { // Setting a new v3 config cfg := config.New(nil) - cfg.FFmpeg.Binary = "true" cfg.DB.Dir = "." cfg.Storage.Disk.Dir = "." - cfg.Storage.MimeTypes = "" cfg.Storage.Disk.Cache.Types.Allow = []string{".aaa"} cfg.Storage.Disk.Cache.Types.Block = []string{".zzz"} cfg.Host.Name = []string{"foobar.com"} @@ -84,10 +92,8 @@ func TestConfigSet(t *testing.T) { // Setting a complete v1 config cfgv1 := v1.New(nil) - cfgv1.FFmpeg.Binary = "true" cfgv1.DB.Dir = "." cfgv1.Storage.Disk.Dir = "." - cfgv1.Storage.MimeTypes = "" cfgv1.Storage.Disk.Cache.Types = []string{".bbb"} cfgv1.Host.Name = []string{"foobar.com"} diff --git a/io/fs/disk.go b/io/fs/disk.go index 0e60a960..88352c72 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -553,6 +553,51 @@ func (fs *diskFilesystem) List(path, pattern string) []FileInfo { return files } +func (fs *diskFilesystem) LookPath(file string) (string, error) { + if strings.Contains(file, "/") { + file = fs.cleanPath(file) + err := fs.findExecutable(file) + if err == nil { + return file, nil + } + return "", os.ErrNotExist + } + path := os.Getenv("PATH") + for _, dir := range filepath.SplitList(path) { + if dir == "" { + // Unix shell semantics: path element "" means "." + dir = "." + } + path := filepath.Join(dir, file) + path = fs.cleanPath(path) + if err := fs.findExecutable(path); err == nil { + if !filepath.IsAbs(path) { + return path, os.ErrNotExist + } + return path, nil + } + } + return "", os.ErrNotExist +} + +func (fs *diskFilesystem) findExecutable(file string) error { + d, err := fs.Stat(file) + if err != nil { + return err + } + + m := d.Mode() + if m.IsDir() { + return fmt.Errorf("is a directory") + } + + if m&0111 != 0 { + return nil + } + + return os.ErrPermission +} + func (fs *diskFilesystem) walk(path string, walkfn func(path string, info os.FileInfo)) { filepath.Walk(path, func(path string, info os.FileInfo, err error) error { if err != nil { diff --git a/io/fs/fs.go b/io/fs/fs.go index d8b8553b..9f3b8661 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -66,6 +66,12 @@ type ReadFilesystem interface { // List lists all files that are currently on the filesystem. List(path, pattern string) []FileInfo + + // LookPath searches for an executable named file in the directories named by the PATH environment + // variable. If file contains a slash, it is tried directly and the PATH is not consulted. Otherwise, + // on success, the result is an absolute path. On non-disk filesystems. Only the mere existence + // of that file is verfied. + LookPath(file string) (string, error) } type WriteFilesystem interface { diff --git a/io/fs/mem.go b/io/fs/mem.go index 3939c43a..a75eb932 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -697,6 +697,39 @@ func (fs *memFilesystem) List(path, pattern string) []FileInfo { return files } +func (fs *memFilesystem) LookPath(file string) (string, error) { + if strings.Contains(file, "/") { + file = fs.cleanPath(file) + info, err := fs.Stat(file) + if err == nil { + if !info.Mode().IsRegular() { + return file, os.ErrNotExist + } + return file, nil + } + return "", os.ErrNotExist + } + path := os.Getenv("PATH") + for _, dir := range filepath.SplitList(path) { + if dir == "" { + // Unix shell semantics: path element "" means "." + dir = "." + } + path := filepath.Join(dir, file) + path = fs.cleanPath(path) + if info, err := fs.Stat(path); err == nil { + if !filepath.IsAbs(path) { + return path, os.ErrNotExist + } + if !info.Mode().IsRegular() { + return path, os.ErrNotExist + } + return path, nil + } + } + return "", os.ErrNotExist +} + func (fs *memFilesystem) cleanPath(path string) string { if !filepath.IsAbs(path) { path = filepath.Join("/", path) diff --git a/io/fs/s3.go b/io/fs/s3.go index c16cc15f..22c66d05 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -511,6 +511,39 @@ func (fs *s3Filesystem) List(path, pattern string) []FileInfo { return files } +func (fs *s3Filesystem) LookPath(file string) (string, error) { + if strings.Contains(file, "/") { + file = fs.cleanPath(file) + info, err := fs.Stat(file) + if err == nil { + if !info.Mode().IsRegular() { + return file, os.ErrNotExist + } + return file, nil + } + return "", os.ErrNotExist + } + path := os.Getenv("PATH") + for _, dir := range filepath.SplitList(path) { + if dir == "" { + // Unix shell semantics: path element "" means "." + dir = "." + } + path := filepath.Join(dir, file) + path = fs.cleanPath(path) + if info, err := fs.Stat(path); err == nil { + if !filepath.IsAbs(path) { + return path, os.ErrNotExist + } + if !info.Mode().IsRegular() { + return path, os.ErrNotExist + } + return path, nil + } + } + return "", os.ErrNotExist +} + func (fs *s3Filesystem) isDir(path string) bool { if !strings.HasSuffix(path, "/") { path = path + "/" diff --git a/process/process.go b/process/process.go index 3c927e2b..4bfcb4b4 100644 --- a/process/process.go +++ b/process/process.go @@ -192,6 +192,7 @@ type process struct { onStart func() onExit func() onStateChange func(from, to string) + lock sync.Mutex } limits Limiter } @@ -588,6 +589,7 @@ func (p *process) stop(wait bool) error { if wait { wg.Add(1) + p.callbacks.lock.Lock() if p.callbacks.onExit == nil { p.callbacks.onExit = func() { wg.Done() @@ -601,6 +603,7 @@ func (p *process) stop(wait bool) error { p.callbacks.onExit = cb } } + p.callbacks.lock.Unlock() } var err error @@ -829,10 +832,12 @@ func (p *process) waiter() { // Reset the parser stats p.parser.ResetStats() - // Call the onStop callback + // Call the onExit callback + p.callbacks.lock.Lock() if p.callbacks.onExit != nil { go p.callbacks.onExit() } + p.callbacks.lock.Unlock() p.order.lock.Lock() defer p.order.lock.Unlock()