diff --git a/restream/core.go b/restream/core.go index cb9ad4c6..2a2dd0e9 100644 --- a/restream/core.go +++ b/restream/core.go @@ -704,6 +704,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro } func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { + patterns := map[string][]rfs.Pattern{} + for _, output := range config.Output { for _, c := range output.Cleanup { name, path, found := strings.Cut(c.Pattern, ":") @@ -717,37 +719,40 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { } // Support legacy names - if name == "diskfs" { + switch name { + case "diskfs": name = "disk" - } else if name == "memfs" { + case "memfs": name = "mem" } - for _, fs := range r.fs.list { - if fs.Name() != name { - continue - } + p := patterns[name] + p = append(p, rfs.Pattern{ + Pattern: path, + MaxFiles: c.MaxFiles, + MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, + PurgeOnDelete: c.PurgeOnDelete, + }) + patterns[name] = p + } + } - pattern := rfs.Pattern{ - Pattern: path, - MaxFiles: c.MaxFiles, - MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, - PurgeOnDelete: c.PurgeOnDelete, - } - - fs.SetCleanup(id.String(), []rfs.Pattern{ - pattern, - }) - - break + for name, p := range patterns { + for _, fs := range r.fs.list { + if fs.Name() != name { + continue } + + fs.UpdateCleanup(id.String(), p) + + break } } } func (r *restream) unsetCleanup(id app.ProcessID) { for _, fs := range r.fs.list { - fs.UnsetCleanup(id.String()) + fs.UpdateCleanup(id.String(), nil) } } @@ -1218,16 +1223,16 @@ func (r *restream) updateProcess(task *task, config *app.Config) error { t.ImportMetadata(task.ExportMetadata()) r.unsetPlayoutPorts(task) - r.unsetCleanup(task.ID()) r.tasks.LoadAndStore(tid, t) - // set filesystem cleanup rules + // Set the filesystem cleanup rules r.setCleanup(tid, t.config) t.Restore() if !tid.Equal(task.ID()) { + r.unsetCleanup(task.ID()) r.tasks.LoadAndDelete(task.ID()) } @@ -1450,11 +1455,10 @@ func (r *restream) reloadProcess(task *task) error { t.ImportMetadata(task.ExportMetadata()) r.unsetPlayoutPorts(task) - r.unsetCleanup(task.ID()) r.tasks.LoadAndStore(tid, t) - // set filesystem cleanup rules + // Set the filesystem cleanup rules r.setCleanup(tid, t.config) t.Restore() diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 348051d8..54894dd8 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -11,6 +11,7 @@ import ( "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/slices" ) type Config struct { @@ -27,14 +28,19 @@ type Pattern struct { PurgeOnDelete bool } +func (p Pattern) Equal(other Pattern) error { + if p.Pattern == other.Pattern { + return nil + } + + return fmt.Errorf("not euqal") +} + type Filesystem interface { fs.Filesystem - // SetCleanup - SetCleanup(id string, patterns []Pattern) - - // UnsetCleanup - UnsetCleanup(id string) + // UpdateCleanup + UpdateCleanup(id string, patterns []Pattern) // Start Start() @@ -108,11 +114,7 @@ func (rfs *filesystem) Stop() { }) } -func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { - if len(patterns) == 0 { - return - } - +func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern { for i, p := range patterns { g, err := glob.Compile(p.Pattern, '/') if err != nil { @@ -121,7 +123,46 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { p.compiledPattern = g patterns[i] = p + } + return patterns +} + +func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) { + rfs.logger.Debug().WithField("id", id).Log("Update pattern group") + + newPatterns = rfs.compilePatterns(newPatterns) + + rfs.cleanupLock.Lock() + defer rfs.cleanupLock.Unlock() + + currentPatterns := rfs.cleanupPatterns[id] + delete(rfs.cleanupPatterns, id) + + onlyCurrent, onlyNew := slices.DiffEqualer(currentPatterns, newPatterns) + + patterns := []Pattern{} + + for _, p := range currentPatterns { + found := false + for _, x := range onlyCurrent { + if p.Equal(x) == nil { + found = true + break + } + } + if !found { + patterns = append(patterns, p) + rfs.logger.Debug().WithFields(log.Fields{ + "id": id, + "pattern": p.Pattern, + "max_files": p.MaxFiles, + "max_file_age": p.MaxFileAge.Seconds(), + }).Log("Keep pattern") + } + } + + for _, p := range onlyNew { rfs.logger.Debug().WithFields(log.Fields{ "id": id, "pattern": p.Pattern, @@ -130,22 +171,20 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { }).Log("Add pattern") } - rfs.cleanupLock.Lock() - defer rfs.cleanupLock.Unlock() + patterns = append(patterns, onlyNew...) - rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...) -} + rfs.cleanupPatterns[id] = patterns -func (rfs *filesystem) UnsetCleanup(id string) { - rfs.logger.Debug().WithField("id", id).Log("Remove pattern group") + for _, p := range onlyCurrent { + rfs.logger.Debug().WithFields(log.Fields{ + "id": id, + "pattern": p.Pattern, + "max_files": p.MaxFiles, + "max_file_age": p.MaxFileAge.Seconds(), + }).Log("Remove pattern") + } - rfs.cleanupLock.Lock() - defer rfs.cleanupLock.Unlock() - - patterns := rfs.cleanupPatterns[id] - delete(rfs.cleanupPatterns, id) - - rfs.purge(patterns) + rfs.purge(onlyCurrent) } func (rfs *filesystem) cleanup() { diff --git a/restream/fs/fs_test.go b/restream/fs/fs_test.go index 46d3a964..6d936680 100644 --- a/restream/fs/fs_test.go +++ b/restream/fs/fs_test.go @@ -12,6 +12,51 @@ import ( "github.com/stretchr/testify/require" ) +func TestUpdateCleanup(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + clean, err := New(Config{ + FS: memfs, + Interval: time.Second, + }) + require.NoError(t, err) + + cleanfs := clean.(*filesystem) + + cleanfs.Start() + + patterns := []Pattern{ + { + Pattern: "/*.ts", + MaxFiles: 3, + MaxFileAge: 0, + }, + } + + cleanfs.UpdateCleanup("foobar", patterns) + + require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) + + patterns = append(patterns, Pattern{ + Pattern: "/*.m3u8", + MaxFiles: 5, + MaxFileAge: 0, + }) + + cleanfs.UpdateCleanup("foobar", patterns) + + require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) + + cleanfs.UpdateCleanup("foobar", patterns[1:]) + + require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:]) + + cleanfs.UpdateCleanup("foobar", nil) + + require.Empty(t, cleanfs.cleanupPatterns["foobar"]) +} + func TestMaxFiles(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) @@ -24,7 +69,7 @@ func TestMaxFiles(t *testing.T) { cleanfs.Start() - cleanfs.SetCleanup("foobar", []Pattern{ + cleanfs.UpdateCleanup("foobar", []Pattern{ { Pattern: "/*.ts", MaxFiles: 3, @@ -73,7 +118,7 @@ func TestMaxAge(t *testing.T) { cleanfs.Start() - cleanfs.SetCleanup("foobar", []Pattern{ + cleanfs.UpdateCleanup("foobar", []Pattern{ { Pattern: "/*.ts", MaxFiles: 0, @@ -122,7 +167,7 @@ func TestUnsetCleanup(t *testing.T) { cleanfs.Start() - cleanfs.SetCleanup("foobar", []Pattern{ + cleanfs.UpdateCleanup("foobar", []Pattern{ { Pattern: "/*.ts", MaxFiles: 3, @@ -156,7 +201,7 @@ func TestUnsetCleanup(t *testing.T) { return true }, 3*time.Second, time.Second) - cleanfs.UnsetCleanup("foobar") + cleanfs.UpdateCleanup("foobar", nil) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) @@ -179,6 +224,76 @@ func TestUnsetCleanup(t *testing.T) { cleanfs.Stop() } +func TestPurge(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, + }) + require.NoError(t, err) + + cleanfs.Start() + + cleanfs.UpdateCleanup("foobar", []Pattern{ + { + Pattern: "/*.ts", + MaxFiles: 3, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + }) + + cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) + cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) + cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1) + + require.Eventually(t, func() bool { + return cleanfs.Files() == 3 + }, 3*time.Second, time.Second) + + cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1) + + require.Eventually(t, func() bool { + if cleanfs.Files() != 3 { + return false + } + + names := []string{} + + for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) { + names = append(names, f.Name()) + } + + require.ElementsMatch(t, []string{"/chunk_1.ts", "/chunk_2.ts", "/chunk_3.ts"}, names) + + return true + }, 3*time.Second, time.Second) + + cleanfs.UpdateCleanup("foobar", nil) + + cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) + + require.Eventually(t, func() bool { + if cleanfs.Files() != 1 { + return false + } + + names := []string{} + + for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) { + names = append(names, f.Name()) + } + + require.ElementsMatch(t, []string{"/chunk_4.ts"}, names) + + return true + }, 3*time.Second, time.Second) + + cleanfs.Stop() +} + func BenchmarkCleanup(b *testing.B) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(b, err) @@ -229,7 +344,7 @@ func BenchmarkCleanup(b *testing.B) { }, } - cleanfs.SetCleanup(id, patterns) + cleanfs.UpdateCleanup(id, patterns) ids[i] = id } @@ -308,7 +423,7 @@ func BenchmarkPurge(b *testing.B) { }, } - cleanfs.SetCleanup(id, patterns) + cleanfs.UpdateCleanup(id, patterns) ids[i] = id }