mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Compare commits
2 Commits
ea108a011f
...
bcc3c9aaa2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
bcc3c9aaa2 | ||
![]() |
2d491caa52 |
@@ -704,6 +704,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
||||||
|
patterns := map[string][]rfs.Pattern{}
|
||||||
|
|
||||||
for _, output := range config.Output {
|
for _, output := range config.Output {
|
||||||
for _, c := range output.Cleanup {
|
for _, c := range output.Cleanup {
|
||||||
name, path, found := strings.Cut(c.Pattern, ":")
|
name, path, found := strings.Cut(c.Pattern, ":")
|
||||||
@@ -717,37 +719,40 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Support legacy names
|
// Support legacy names
|
||||||
if name == "diskfs" {
|
switch name {
|
||||||
|
case "diskfs":
|
||||||
name = "disk"
|
name = "disk"
|
||||||
} else if name == "memfs" {
|
case "memfs":
|
||||||
name = "mem"
|
name = "mem"
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fs := range r.fs.list {
|
p := patterns[name]
|
||||||
if fs.Name() != name {
|
p = append(p, rfs.Pattern{
|
||||||
continue
|
Pattern: path,
|
||||||
}
|
MaxFiles: c.MaxFiles,
|
||||||
|
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
|
||||||
|
PurgeOnDelete: c.PurgeOnDelete,
|
||||||
|
})
|
||||||
|
patterns[name] = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pattern := rfs.Pattern{
|
for name, p := range patterns {
|
||||||
Pattern: path,
|
for _, fs := range r.fs.list {
|
||||||
MaxFiles: c.MaxFiles,
|
if fs.Name() != name {
|
||||||
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
|
continue
|
||||||
PurgeOnDelete: c.PurgeOnDelete,
|
|
||||||
}
|
|
||||||
|
|
||||||
fs.SetCleanup(id.String(), []rfs.Pattern{
|
|
||||||
pattern,
|
|
||||||
})
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs.UpdateCleanup(id.String(), p)
|
||||||
|
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *restream) unsetCleanup(id app.ProcessID) {
|
func (r *restream) unsetCleanup(id app.ProcessID) {
|
||||||
for _, fs := range r.fs.list {
|
for _, fs := range r.fs.list {
|
||||||
fs.UnsetCleanup(id.String())
|
fs.UpdateCleanup(id.String(), nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1218,16 +1223,16 @@ func (r *restream) updateProcess(task *task, config *app.Config) error {
|
|||||||
t.ImportMetadata(task.ExportMetadata())
|
t.ImportMetadata(task.ExportMetadata())
|
||||||
|
|
||||||
r.unsetPlayoutPorts(task)
|
r.unsetPlayoutPorts(task)
|
||||||
r.unsetCleanup(task.ID())
|
|
||||||
|
|
||||||
r.tasks.LoadAndStore(tid, t)
|
r.tasks.LoadAndStore(tid, t)
|
||||||
|
|
||||||
// set filesystem cleanup rules
|
// Set the filesystem cleanup rules
|
||||||
r.setCleanup(tid, t.config)
|
r.setCleanup(tid, t.config)
|
||||||
|
|
||||||
t.Restore()
|
t.Restore()
|
||||||
|
|
||||||
if !tid.Equal(task.ID()) {
|
if !tid.Equal(task.ID()) {
|
||||||
|
r.unsetCleanup(task.ID())
|
||||||
r.tasks.LoadAndDelete(task.ID())
|
r.tasks.LoadAndDelete(task.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1450,11 +1455,10 @@ func (r *restream) reloadProcess(task *task) error {
|
|||||||
t.ImportMetadata(task.ExportMetadata())
|
t.ImportMetadata(task.ExportMetadata())
|
||||||
|
|
||||||
r.unsetPlayoutPorts(task)
|
r.unsetPlayoutPorts(task)
|
||||||
r.unsetCleanup(task.ID())
|
|
||||||
|
|
||||||
r.tasks.LoadAndStore(tid, t)
|
r.tasks.LoadAndStore(tid, t)
|
||||||
|
|
||||||
// set filesystem cleanup rules
|
// Set the filesystem cleanup rules
|
||||||
r.setCleanup(tid, t.config)
|
r.setCleanup(tid, t.config)
|
||||||
|
|
||||||
t.Restore()
|
t.Restore()
|
||||||
|
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/datarhei/core/v16/glob"
|
"github.com/datarhei/core/v16/glob"
|
||||||
"github.com/datarhei/core/v16/io/fs"
|
"github.com/datarhei/core/v16/io/fs"
|
||||||
"github.com/datarhei/core/v16/log"
|
"github.com/datarhei/core/v16/log"
|
||||||
|
"github.com/datarhei/core/v16/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@@ -27,14 +28,19 @@ type Pattern struct {
|
|||||||
PurgeOnDelete bool
|
PurgeOnDelete bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p Pattern) Equal(other Pattern) error {
|
||||||
|
if p.Pattern == other.Pattern {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("not euqal")
|
||||||
|
}
|
||||||
|
|
||||||
type Filesystem interface {
|
type Filesystem interface {
|
||||||
fs.Filesystem
|
fs.Filesystem
|
||||||
|
|
||||||
// SetCleanup
|
// UpdateCleanup
|
||||||
SetCleanup(id string, patterns []Pattern)
|
UpdateCleanup(id string, patterns []Pattern)
|
||||||
|
|
||||||
// UnsetCleanup
|
|
||||||
UnsetCleanup(id string)
|
|
||||||
|
|
||||||
// Start
|
// Start
|
||||||
Start()
|
Start()
|
||||||
@@ -108,11 +114,7 @@ func (rfs *filesystem) Stop() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern {
|
||||||
if len(patterns) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, p := range patterns {
|
for i, p := range patterns {
|
||||||
g, err := glob.Compile(p.Pattern, '/')
|
g, err := glob.Compile(p.Pattern, '/')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -121,31 +123,59 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
|||||||
|
|
||||||
p.compiledPattern = g
|
p.compiledPattern = g
|
||||||
patterns[i] = p
|
patterns[i] = p
|
||||||
|
}
|
||||||
|
|
||||||
|
return patterns
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
|
||||||
|
newPatterns = rfs.compilePatterns(newPatterns)
|
||||||
|
|
||||||
|
rfs.cleanupLock.Lock()
|
||||||
|
defer rfs.cleanupLock.Unlock()
|
||||||
|
|
||||||
|
currentPatterns := rfs.cleanupPatterns[id]
|
||||||
|
delete(rfs.cleanupPatterns, id)
|
||||||
|
|
||||||
|
onlyCurrent, onlyNew := slices.DiffEqualer(currentPatterns, newPatterns)
|
||||||
|
|
||||||
|
for _, p := range newPatterns {
|
||||||
|
found := false
|
||||||
|
for _, x := range onlyNew {
|
||||||
|
if p.Equal(x) == nil {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
rfs.logger.Debug().WithFields(log.Fields{
|
||||||
|
"id": id,
|
||||||
|
"pattern": p.Pattern,
|
||||||
|
"max_files": p.MaxFiles,
|
||||||
|
"max_file_age": p.MaxFileAge.Seconds(),
|
||||||
|
}).Log("Keep pattern")
|
||||||
|
} else {
|
||||||
|
rfs.logger.Debug().WithFields(log.Fields{
|
||||||
|
"id": id,
|
||||||
|
"pattern": p.Pattern,
|
||||||
|
"max_files": p.MaxFiles,
|
||||||
|
"max_file_age": p.MaxFileAge.Seconds(),
|
||||||
|
}).Log("Add pattern")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rfs.cleanupPatterns[id] = newPatterns
|
||||||
|
|
||||||
|
for _, p := range onlyCurrent {
|
||||||
rfs.logger.Debug().WithFields(log.Fields{
|
rfs.logger.Debug().WithFields(log.Fields{
|
||||||
"id": id,
|
"id": id,
|
||||||
"pattern": p.Pattern,
|
"pattern": p.Pattern,
|
||||||
"max_files": p.MaxFiles,
|
"max_files": p.MaxFiles,
|
||||||
"max_file_age": p.MaxFileAge.Seconds(),
|
"max_file_age": p.MaxFileAge.Seconds(),
|
||||||
}).Log("Add pattern")
|
}).Log("Remove pattern")
|
||||||
}
|
}
|
||||||
|
|
||||||
rfs.cleanupLock.Lock()
|
rfs.purge(onlyCurrent)
|
||||||
defer rfs.cleanupLock.Unlock()
|
|
||||||
|
|
||||||
rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rfs *filesystem) UnsetCleanup(id string) {
|
|
||||||
rfs.logger.Debug().WithField("id", id).Log("Remove pattern group")
|
|
||||||
|
|
||||||
rfs.cleanupLock.Lock()
|
|
||||||
defer rfs.cleanupLock.Unlock()
|
|
||||||
|
|
||||||
patterns := rfs.cleanupPatterns[id]
|
|
||||||
delete(rfs.cleanupPatterns, id)
|
|
||||||
|
|
||||||
rfs.purge(patterns)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rfs *filesystem) cleanup() {
|
func (rfs *filesystem) cleanup() {
|
||||||
@@ -219,10 +249,14 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
|
files, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
|
||||||
Pattern: pattern.Pattern,
|
Pattern: pattern.Pattern,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
for _, file := range files {
|
||||||
|
rfs.logger.Debug().WithField("path", file).Log("Purged file")
|
||||||
|
}
|
||||||
|
|
||||||
nfilesTotal += nfiles
|
nfilesTotal += nfiles
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -12,6 +12,57 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestUpdateCleanup(t *testing.T) {
|
||||||
|
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
clean, err := New(Config{
|
||||||
|
FS: memfs,
|
||||||
|
Interval: time.Second,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cleanfs := clean.(*filesystem)
|
||||||
|
|
||||||
|
cleanfs.Start()
|
||||||
|
|
||||||
|
patterns := []Pattern{
|
||||||
|
{
|
||||||
|
Pattern: "/*.ts",
|
||||||
|
MaxFiles: 3,
|
||||||
|
MaxFileAge: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", patterns)
|
||||||
|
|
||||||
|
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||||
|
|
||||||
|
patterns = append(patterns, Pattern{
|
||||||
|
Pattern: "/*.m3u8",
|
||||||
|
MaxFiles: 5,
|
||||||
|
MaxFileAge: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", patterns)
|
||||||
|
|
||||||
|
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||||
|
|
||||||
|
patterns[0].MaxFiles = 42
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", patterns)
|
||||||
|
|
||||||
|
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", patterns[1:])
|
||||||
|
|
||||||
|
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:])
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", nil)
|
||||||
|
|
||||||
|
require.Empty(t, cleanfs.cleanupPatterns["foobar"])
|
||||||
|
}
|
||||||
|
|
||||||
func TestMaxFiles(t *testing.T) {
|
func TestMaxFiles(t *testing.T) {
|
||||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -24,7 +75,7 @@ func TestMaxFiles(t *testing.T) {
|
|||||||
|
|
||||||
cleanfs.Start()
|
cleanfs.Start()
|
||||||
|
|
||||||
cleanfs.SetCleanup("foobar", []Pattern{
|
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||||
{
|
{
|
||||||
Pattern: "/*.ts",
|
Pattern: "/*.ts",
|
||||||
MaxFiles: 3,
|
MaxFiles: 3,
|
||||||
@@ -73,7 +124,7 @@ func TestMaxAge(t *testing.T) {
|
|||||||
|
|
||||||
cleanfs.Start()
|
cleanfs.Start()
|
||||||
|
|
||||||
cleanfs.SetCleanup("foobar", []Pattern{
|
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||||
{
|
{
|
||||||
Pattern: "/*.ts",
|
Pattern: "/*.ts",
|
||||||
MaxFiles: 0,
|
MaxFiles: 0,
|
||||||
@@ -122,7 +173,7 @@ func TestUnsetCleanup(t *testing.T) {
|
|||||||
|
|
||||||
cleanfs.Start()
|
cleanfs.Start()
|
||||||
|
|
||||||
cleanfs.SetCleanup("foobar", []Pattern{
|
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||||
{
|
{
|
||||||
Pattern: "/*.ts",
|
Pattern: "/*.ts",
|
||||||
MaxFiles: 3,
|
MaxFiles: 3,
|
||||||
@@ -156,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) {
|
|||||||
return true
|
return true
|
||||||
}, 3*time.Second, time.Second)
|
}, 3*time.Second, time.Second)
|
||||||
|
|
||||||
cleanfs.UnsetCleanup("foobar")
|
cleanfs.UpdateCleanup("foobar", nil)
|
||||||
|
|
||||||
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||||
|
|
||||||
@@ -179,6 +230,76 @@ func TestUnsetCleanup(t *testing.T) {
|
|||||||
cleanfs.Stop()
|
cleanfs.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPurge(t *testing.T) {
|
||||||
|
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cleanfs, err := New(Config{
|
||||||
|
FS: memfs,
|
||||||
|
Interval: time.Second,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cleanfs.Start()
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||||
|
{
|
||||||
|
Pattern: "/*.ts",
|
||||||
|
MaxFiles: 3,
|
||||||
|
MaxFileAge: 0,
|
||||||
|
PurgeOnDelete: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||||
|
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||||
|
cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1)
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
return cleanfs.Files() == 3
|
||||||
|
}, 3*time.Second, time.Second)
|
||||||
|
|
||||||
|
cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1)
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
if cleanfs.Files() != 3 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
names := []string{}
|
||||||
|
|
||||||
|
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
|
||||||
|
names = append(names, f.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
require.ElementsMatch(t, []string{"/chunk_1.ts", "/chunk_2.ts", "/chunk_3.ts"}, names)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}, 3*time.Second, time.Second)
|
||||||
|
|
||||||
|
cleanfs.UpdateCleanup("foobar", nil)
|
||||||
|
|
||||||
|
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
if cleanfs.Files() != 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
names := []string{}
|
||||||
|
|
||||||
|
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
|
||||||
|
names = append(names, f.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
require.ElementsMatch(t, []string{"/chunk_4.ts"}, names)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}, 3*time.Second, time.Second)
|
||||||
|
|
||||||
|
cleanfs.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkCleanup(b *testing.B) {
|
func BenchmarkCleanup(b *testing.B) {
|
||||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
@@ -229,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanfs.SetCleanup(id, patterns)
|
cleanfs.UpdateCleanup(id, patterns)
|
||||||
|
|
||||||
ids[i] = id
|
ids[i] = id
|
||||||
}
|
}
|
||||||
@@ -308,7 +429,7 @@ func BenchmarkPurge(b *testing.B) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanfs.SetCleanup(id, patterns)
|
cleanfs.UpdateCleanup(id, patterns)
|
||||||
|
|
||||||
ids[i] = id
|
ids[i] = id
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user