2 Commits

Author SHA1 Message Date
Ingo Oppermann
bcc3c9aaa2 Simplify updating cleanup rules 2025-07-18 12:22:42 +02:00
Ingo Oppermann
2d491caa52 Prevent file purging equal patterns when updating process 2025-07-18 11:02:15 +02:00
3 changed files with 216 additions and 57 deletions

View File

@@ -704,6 +704,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
} }
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
patterns := map[string][]rfs.Pattern{}
for _, output := range config.Output { for _, output := range config.Output {
for _, c := range output.Cleanup { for _, c := range output.Cleanup {
name, path, found := strings.Cut(c.Pattern, ":") name, path, found := strings.Cut(c.Pattern, ":")
@@ -717,37 +719,40 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
} }
// Support legacy names // Support legacy names
if name == "diskfs" { switch name {
case "diskfs":
name = "disk" name = "disk"
} else if name == "memfs" { case "memfs":
name = "mem" name = "mem"
} }
for _, fs := range r.fs.list { p := patterns[name]
if fs.Name() != name { p = append(p, rfs.Pattern{
continue Pattern: path,
} MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
})
patterns[name] = p
}
}
pattern := rfs.Pattern{ for name, p := range patterns {
Pattern: path, for _, fs := range r.fs.list {
MaxFiles: c.MaxFiles, if fs.Name() != name {
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, continue
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id.String(), []rfs.Pattern{
pattern,
})
break
} }
fs.UpdateCleanup(id.String(), p)
break
} }
} }
} }
func (r *restream) unsetCleanup(id app.ProcessID) { func (r *restream) unsetCleanup(id app.ProcessID) {
for _, fs := range r.fs.list { for _, fs := range r.fs.list {
fs.UnsetCleanup(id.String()) fs.UpdateCleanup(id.String(), nil)
} }
} }
@@ -1218,16 +1223,16 @@ func (r *restream) updateProcess(task *task, config *app.Config) error {
t.ImportMetadata(task.ExportMetadata()) t.ImportMetadata(task.ExportMetadata())
r.unsetPlayoutPorts(task) r.unsetPlayoutPorts(task)
r.unsetCleanup(task.ID())
r.tasks.LoadAndStore(tid, t) r.tasks.LoadAndStore(tid, t)
// set filesystem cleanup rules // Set the filesystem cleanup rules
r.setCleanup(tid, t.config) r.setCleanup(tid, t.config)
t.Restore() t.Restore()
if !tid.Equal(task.ID()) { if !tid.Equal(task.ID()) {
r.unsetCleanup(task.ID())
r.tasks.LoadAndDelete(task.ID()) r.tasks.LoadAndDelete(task.ID())
} }
@@ -1450,11 +1455,10 @@ func (r *restream) reloadProcess(task *task) error {
t.ImportMetadata(task.ExportMetadata()) t.ImportMetadata(task.ExportMetadata())
r.unsetPlayoutPorts(task) r.unsetPlayoutPorts(task)
r.unsetCleanup(task.ID())
r.tasks.LoadAndStore(tid, t) r.tasks.LoadAndStore(tid, t)
// set filesystem cleanup rules // Set the filesystem cleanup rules
r.setCleanup(tid, t.config) r.setCleanup(tid, t.config)
t.Restore() t.Restore()

View File

@@ -11,6 +11,7 @@ import (
"github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/slices"
) )
type Config struct { type Config struct {
@@ -27,14 +28,19 @@ type Pattern struct {
PurgeOnDelete bool PurgeOnDelete bool
} }
func (p Pattern) Equal(other Pattern) error {
if p.Pattern == other.Pattern {
return nil
}
return fmt.Errorf("not euqal")
}
type Filesystem interface { type Filesystem interface {
fs.Filesystem fs.Filesystem
// SetCleanup // UpdateCleanup
SetCleanup(id string, patterns []Pattern) UpdateCleanup(id string, patterns []Pattern)
// UnsetCleanup
UnsetCleanup(id string)
// Start // Start
Start() Start()
@@ -108,11 +114,7 @@ func (rfs *filesystem) Stop() {
}) })
} }
func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern {
if len(patterns) == 0 {
return
}
for i, p := range patterns { for i, p := range patterns {
g, err := glob.Compile(p.Pattern, '/') g, err := glob.Compile(p.Pattern, '/')
if err != nil { if err != nil {
@@ -121,31 +123,59 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
p.compiledPattern = g p.compiledPattern = g
patterns[i] = p patterns[i] = p
}
return patterns
}
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
newPatterns = rfs.compilePatterns(newPatterns)
rfs.cleanupLock.Lock()
defer rfs.cleanupLock.Unlock()
currentPatterns := rfs.cleanupPatterns[id]
delete(rfs.cleanupPatterns, id)
onlyCurrent, onlyNew := slices.DiffEqualer(currentPatterns, newPatterns)
for _, p := range newPatterns {
found := false
for _, x := range onlyNew {
if p.Equal(x) == nil {
found = true
break
}
}
if !found {
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
"max_files": p.MaxFiles,
"max_file_age": p.MaxFileAge.Seconds(),
}).Log("Keep pattern")
} else {
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
"max_files": p.MaxFiles,
"max_file_age": p.MaxFileAge.Seconds(),
}).Log("Add pattern")
}
}
rfs.cleanupPatterns[id] = newPatterns
for _, p := range onlyCurrent {
rfs.logger.Debug().WithFields(log.Fields{ rfs.logger.Debug().WithFields(log.Fields{
"id": id, "id": id,
"pattern": p.Pattern, "pattern": p.Pattern,
"max_files": p.MaxFiles, "max_files": p.MaxFiles,
"max_file_age": p.MaxFileAge.Seconds(), "max_file_age": p.MaxFileAge.Seconds(),
}).Log("Add pattern") }).Log("Remove pattern")
} }
rfs.cleanupLock.Lock() rfs.purge(onlyCurrent)
defer rfs.cleanupLock.Unlock()
rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...)
}
func (rfs *filesystem) UnsetCleanup(id string) {
rfs.logger.Debug().WithField("id", id).Log("Remove pattern group")
rfs.cleanupLock.Lock()
defer rfs.cleanupLock.Unlock()
patterns := rfs.cleanupPatterns[id]
delete(rfs.cleanupPatterns, id)
rfs.purge(patterns)
} }
func (rfs *filesystem) cleanup() { func (rfs *filesystem) cleanup() {
@@ -219,10 +249,14 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 {
continue continue
} }
_, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{ files, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
Pattern: pattern.Pattern, Pattern: pattern.Pattern,
}) })
for _, file := range files {
rfs.logger.Debug().WithField("path", file).Log("Purged file")
}
nfilesTotal += nfiles nfilesTotal += nfiles
} }

View File

@@ -12,6 +12,57 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestUpdateCleanup(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
clean, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs := clean.(*filesystem)
cleanfs.Start()
patterns := []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
MaxFileAge: 0,
},
}
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
patterns = append(patterns, Pattern{
Pattern: "/*.m3u8",
MaxFiles: 5,
MaxFileAge: 0,
})
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
patterns[0].MaxFiles = 42
cleanfs.UpdateCleanup("foobar", patterns)
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
cleanfs.UpdateCleanup("foobar", patterns[1:])
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:])
cleanfs.UpdateCleanup("foobar", nil)
require.Empty(t, cleanfs.cleanupPatterns["foobar"])
}
func TestMaxFiles(t *testing.T) { func TestMaxFiles(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err) require.NoError(t, err)
@@ -24,7 +75,7 @@ func TestMaxFiles(t *testing.T) {
cleanfs.Start() cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{ cleanfs.UpdateCleanup("foobar", []Pattern{
{ {
Pattern: "/*.ts", Pattern: "/*.ts",
MaxFiles: 3, MaxFiles: 3,
@@ -73,7 +124,7 @@ func TestMaxAge(t *testing.T) {
cleanfs.Start() cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{ cleanfs.UpdateCleanup("foobar", []Pattern{
{ {
Pattern: "/*.ts", Pattern: "/*.ts",
MaxFiles: 0, MaxFiles: 0,
@@ -122,7 +173,7 @@ func TestUnsetCleanup(t *testing.T) {
cleanfs.Start() cleanfs.Start()
cleanfs.SetCleanup("foobar", []Pattern{ cleanfs.UpdateCleanup("foobar", []Pattern{
{ {
Pattern: "/*.ts", Pattern: "/*.ts",
MaxFiles: 3, MaxFiles: 3,
@@ -156,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) {
return true return true
}, 3*time.Second, time.Second) }, 3*time.Second, time.Second)
cleanfs.UnsetCleanup("foobar") cleanfs.UpdateCleanup("foobar", nil)
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
@@ -179,6 +230,76 @@ func TestUnsetCleanup(t *testing.T) {
cleanfs.Stop() cleanfs.Stop()
} }
func TestPurge(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs.Start()
cleanfs.UpdateCleanup("foobar", []Pattern{
{
Pattern: "/*.ts",
MaxFiles: 3,
MaxFileAge: 0,
PurgeOnDelete: true,
},
})
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1)
require.Eventually(t, func() bool {
return cleanfs.Files() == 3
}, 3*time.Second, time.Second)
cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1)
require.Eventually(t, func() bool {
if cleanfs.Files() != 3 {
return false
}
names := []string{}
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
names = append(names, f.Name())
}
require.ElementsMatch(t, []string{"/chunk_1.ts", "/chunk_2.ts", "/chunk_3.ts"}, names)
return true
}, 3*time.Second, time.Second)
cleanfs.UpdateCleanup("foobar", nil)
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
require.Eventually(t, func() bool {
if cleanfs.Files() != 1 {
return false
}
names := []string{}
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
names = append(names, f.Name())
}
require.ElementsMatch(t, []string{"/chunk_4.ts"}, names)
return true
}, 3*time.Second, time.Second)
cleanfs.Stop()
}
func BenchmarkCleanup(b *testing.B) { func BenchmarkCleanup(b *testing.B) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(b, err) require.NoError(b, err)
@@ -229,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) {
}, },
} }
cleanfs.SetCleanup(id, patterns) cleanfs.UpdateCleanup(id, patterns)
ids[i] = id ids[i] = id
} }
@@ -308,7 +429,7 @@ func BenchmarkPurge(b *testing.B) {
}, },
} }
cleanfs.SetCleanup(id, patterns) cleanfs.UpdateCleanup(id, patterns)
ids[i] = id ids[i] = id
} }