2 Commits

Author SHA1 Message Date
Ingo Oppermann
bcc3c9aaa2 Simplify updating cleanup rules 2025-07-18 12:22:42 +02:00
Ingo Oppermann
2d491caa52 Prevent file purging equal patterns when updating process 2025-07-18 11:02:15 +02:00
3 changed files with 216 additions and 57 deletions

View File

@@ -704,6 +704,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
}
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
patterns := map[string][]rfs.Pattern{}
for _, output := range config.Output {
for _, c := range output.Cleanup {
name, path, found := strings.Cut(c.Pattern, ":")
@@ -717,37 +719,40 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
}
// Support legacy names
if name == "diskfs" {
switch name {
case "diskfs":
name = "disk"
} else if name == "memfs" {
case "memfs":
name = "mem"
}
p := patterns[name]
p = append(p, rfs.Pattern{
Pattern: path,
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
})
patterns[name] = p
}
}
for name, p := range patterns {
for _, fs := range r.fs.list {
if fs.Name() != name {
continue
}
pattern := rfs.Pattern{
Pattern: path,
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id.String(), []rfs.Pattern{
pattern,
})
fs.UpdateCleanup(id.String(), p)
break
}
}
}
}
func (r *restream) unsetCleanup(id app.ProcessID) {
for _, fs := range r.fs.list {
fs.UnsetCleanup(id.String())
fs.UpdateCleanup(id.String(), nil)
}
}
@@ -1218,16 +1223,16 @@ func (r *restream) updateProcess(task *task, config *app.Config) error {
t.ImportMetadata(task.ExportMetadata())
r.unsetPlayoutPorts(task)
r.unsetCleanup(task.ID())
r.tasks.LoadAndStore(tid, t)
// set filesystem cleanup rules
// Set the filesystem cleanup rules
r.setCleanup(tid, t.config)
t.Restore()
if !tid.Equal(task.ID()) {
r.unsetCleanup(task.ID())
r.tasks.LoadAndDelete(task.ID())
}
@@ -1450,11 +1455,10 @@ func (r *restream) reloadProcess(task *task) error {
t.ImportMetadata(task.ExportMetadata())
r.unsetPlayoutPorts(task)
r.unsetCleanup(task.ID())
r.tasks.LoadAndStore(tid, t)
// set filesystem cleanup rules
// Set the filesystem cleanup rules
r.setCleanup(tid, t.config)
t.Restore()

View File

@@ -11,6 +11,7 @@ import (
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/slices"
)
type Config struct {
@@ -27,14 +28,19 @@ type Pattern struct {
PurgeOnDelete bool
}
func (p Pattern) Equal(other Pattern) error {
if p.Pattern == other.Pattern {
return nil
}
return fmt.Errorf("not euqal")
}
type Filesystem interface {
fs.Filesystem
// SetCleanup
SetCleanup(id string, patterns []Pattern)
// UnsetCleanup
UnsetCleanup(id string)
// UpdateCleanup
UpdateCleanup(id string, patterns []Pattern)
// Start
Start()
@@ -108,11 +114,7 @@ func (rfs *filesystem) Stop() {
})
}
func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
if len(patterns) == 0 {
return
}
func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern {
for i, p := range patterns {
g, err := glob.Compile(p.Pattern, '/')
if err != nil {
@@ -121,7 +123,38 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
p.compiledPattern = g
patterns[i] = p
}
return patterns
}
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
newPatterns = rfs.compilePatterns(newPatterns)
rfs.cleanupLock.Lock()
defer rfs.cleanupLock.Unlock()
currentPatterns := rfs.cleanupPatterns[id]
delete(rfs.cleanupPatterns, id)
onlyCurrent, onlyNew := slices.DiffEqualer(currentPatterns, newPatterns)
for _, p := range newPatterns {
found := false
for _, x := range onlyNew {
if p.Equal(x) == nil {
found = true
break
}
}
if !found {
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
"max_files": p.MaxFiles,
"max_file_age": p.MaxFileAge.Seconds(),
}).Log("Keep pattern")
} else {
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
@@ -129,23 +162,20 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
"max_file_age": p.MaxFileAge.Seconds(),
}).Log("Add pattern")
}
}
rfs.cleanupLock.Lock()
defer rfs.cleanupLock.Unlock()
rfs.cleanupPatterns[id] = newPatterns
rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...)
}
for _, p := range onlyCurrent {
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
"max_files": p.MaxFiles,
"max_file_age": p.MaxFileAge.Seconds(),
}).Log("Remove pattern")
}
func (rfs *filesystem) UnsetCleanup(id string) {
rfs.logger.Debug().WithField("id", id).Log("Remove pattern group")
rfs.cleanupLock.Lock()
defer rfs.cleanupLock.Unlock()
patterns := rfs.cleanupPatterns[id]
delete(rfs.cleanupPatterns, id)
rfs.purge(patterns)
rfs.purge(onlyCurrent)
}
func (rfs *filesystem) cleanup() {
@@ -219,10 +249,14 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 {
continue
}
_, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
files, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
Pattern: pattern.Pattern,
})
for _, file := range files {
rfs.logger.Debug().WithField("path", file).Log("Purged file")
}
nfilesTotal += nfiles
}

View File

@@ -12,6 +12,57 @@ import (
"github.com/stretchr/testify/require"
)
func TestUpdateCleanup(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
clean, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs := clean.(*filesystem)
cleanfs.Start()
patterns := []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
MaxFileAge: 0,
},
}
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
patterns = append(patterns, Pattern{
Pattern: "/*.m3u8",
MaxFiles: 5,
MaxFileAge: 0,
})
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
patterns[0].MaxFiles = 42
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
cleanfs.UpdateCleanup("foobar", patterns[1:])
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:])
cleanfs.UpdateCleanup("foobar", nil)
require.Empty(t, cleanfs.cleanupPatterns["foobar"])
}
func TestMaxFiles(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
@@ -24,7 +75,7 @@ func TestMaxFiles(t *testing.T) {
cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{
cleanfs.UpdateCleanup("foobar", []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
@@ -73,7 +124,7 @@ func TestMaxAge(t *testing.T) {
cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{
cleanfs.UpdateCleanup("foobar", []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 0,
@@ -122,7 +173,7 @@ func TestUnsetCleanup(t *testing.T) {
cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{
cleanfs.UpdateCleanup("foobar", []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
@@ -156,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) {
return true
}, 3*time.Second, time.Second)
cleanfs.UnsetCleanup("foobar")
cleanfs.UpdateCleanup("foobar", nil)
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
@@ -179,6 +230,76 @@ func TestUnsetCleanup(t *testing.T) {
cleanfs.Stop()
}
func TestPurge(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs.Start()
cleanfs.UpdateCleanup("foobar", []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
MaxFileAge: 0,
PurgeOnDelete: true,
},
})
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1)
require.Eventually(t, func() bool {
return cleanfs.Files() == 3
}, 3*time.Second, time.Second)
cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1)
require.Eventually(t, func() bool {
if cleanfs.Files() != 3 {
return false
}
names := []string{}
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
names = append(names, f.Name())
}
require.ElementsMatch(t, []string{"/chunk_1.ts", "/chunk_2.ts", "/chunk_3.ts"}, names)
return true
}, 3*time.Second, time.Second)
cleanfs.UpdateCleanup("foobar", nil)
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
require.Eventually(t, func() bool {
if cleanfs.Files() != 1 {
return false
}
names := []string{}
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
names = append(names, f.Name())
}
require.ElementsMatch(t, []string{"/chunk_4.ts"}, names)
return true
}, 3*time.Second, time.Second)
cleanfs.Stop()
}
func BenchmarkCleanup(b *testing.B) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(b, err)
@@ -229,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) {
},
}
cleanfs.SetCleanup(id, patterns)
cleanfs.UpdateCleanup(id, patterns)
ids[i] = id
}
@@ -308,7 +429,7 @@ func BenchmarkPurge(b *testing.B) {
},
}
cleanfs.SetCleanup(id, patterns)
cleanfs.UpdateCleanup(id, patterns)
ids[i] = id
}