mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Prevent file purging equal patterns when updating process
This commit is contained in:
@@ -704,6 +704,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
|
||||
}
|
||||
|
||||
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
||||
patterns := map[string][]rfs.Pattern{}
|
||||
|
||||
for _, output := range config.Output {
|
||||
for _, c := range output.Cleanup {
|
||||
name, path, found := strings.Cut(c.Pattern, ":")
|
||||
@@ -717,37 +719,40 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
||||
}
|
||||
|
||||
// Support legacy names
|
||||
if name == "diskfs" {
|
||||
switch name {
|
||||
case "diskfs":
|
||||
name = "disk"
|
||||
} else if name == "memfs" {
|
||||
case "memfs":
|
||||
name = "mem"
|
||||
}
|
||||
|
||||
for _, fs := range r.fs.list {
|
||||
if fs.Name() != name {
|
||||
continue
|
||||
}
|
||||
p := patterns[name]
|
||||
p = append(p, rfs.Pattern{
|
||||
Pattern: path,
|
||||
MaxFiles: c.MaxFiles,
|
||||
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
|
||||
PurgeOnDelete: c.PurgeOnDelete,
|
||||
})
|
||||
patterns[name] = p
|
||||
}
|
||||
}
|
||||
|
||||
pattern := rfs.Pattern{
|
||||
Pattern: path,
|
||||
MaxFiles: c.MaxFiles,
|
||||
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
|
||||
PurgeOnDelete: c.PurgeOnDelete,
|
||||
}
|
||||
|
||||
fs.SetCleanup(id.String(), []rfs.Pattern{
|
||||
pattern,
|
||||
})
|
||||
|
||||
break
|
||||
for name, p := range patterns {
|
||||
for _, fs := range r.fs.list {
|
||||
if fs.Name() != name {
|
||||
continue
|
||||
}
|
||||
|
||||
fs.UpdateCleanup(id.String(), p)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *restream) unsetCleanup(id app.ProcessID) {
|
||||
for _, fs := range r.fs.list {
|
||||
fs.UnsetCleanup(id.String())
|
||||
fs.UpdateCleanup(id.String(), nil)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1218,16 +1223,16 @@ func (r *restream) updateProcess(task *task, config *app.Config) error {
|
||||
t.ImportMetadata(task.ExportMetadata())
|
||||
|
||||
r.unsetPlayoutPorts(task)
|
||||
r.unsetCleanup(task.ID())
|
||||
|
||||
r.tasks.LoadAndStore(tid, t)
|
||||
|
||||
// set filesystem cleanup rules
|
||||
// Set the filesystem cleanup rules
|
||||
r.setCleanup(tid, t.config)
|
||||
|
||||
t.Restore()
|
||||
|
||||
if !tid.Equal(task.ID()) {
|
||||
r.unsetCleanup(task.ID())
|
||||
r.tasks.LoadAndDelete(task.ID())
|
||||
}
|
||||
|
||||
@@ -1450,11 +1455,10 @@ func (r *restream) reloadProcess(task *task) error {
|
||||
t.ImportMetadata(task.ExportMetadata())
|
||||
|
||||
r.unsetPlayoutPorts(task)
|
||||
r.unsetCleanup(task.ID())
|
||||
|
||||
r.tasks.LoadAndStore(tid, t)
|
||||
|
||||
// set filesystem cleanup rules
|
||||
// Set the filesystem cleanup rules
|
||||
r.setCleanup(tid, t.config)
|
||||
|
||||
t.Restore()
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/datarhei/core/v16/glob"
|
||||
"github.com/datarhei/core/v16/io/fs"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/slices"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
@@ -27,14 +28,19 @@ type Pattern struct {
|
||||
PurgeOnDelete bool
|
||||
}
|
||||
|
||||
func (p Pattern) Equal(other Pattern) error {
|
||||
if p.Pattern == other.Pattern {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("not euqal")
|
||||
}
|
||||
|
||||
type Filesystem interface {
|
||||
fs.Filesystem
|
||||
|
||||
// SetCleanup
|
||||
SetCleanup(id string, patterns []Pattern)
|
||||
|
||||
// UnsetCleanup
|
||||
UnsetCleanup(id string)
|
||||
// UpdateCleanup
|
||||
UpdateCleanup(id string, patterns []Pattern)
|
||||
|
||||
// Start
|
||||
Start()
|
||||
@@ -108,11 +114,7 @@ func (rfs *filesystem) Stop() {
|
||||
})
|
||||
}
|
||||
|
||||
func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
if len(patterns) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern {
|
||||
for i, p := range patterns {
|
||||
g, err := glob.Compile(p.Pattern, '/')
|
||||
if err != nil {
|
||||
@@ -121,7 +123,46 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
|
||||
p.compiledPattern = g
|
||||
patterns[i] = p
|
||||
}
|
||||
|
||||
return patterns
|
||||
}
|
||||
|
||||
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
|
||||
rfs.logger.Debug().WithField("id", id).Log("Update pattern group")
|
||||
|
||||
newPatterns = rfs.compilePatterns(newPatterns)
|
||||
|
||||
rfs.cleanupLock.Lock()
|
||||
defer rfs.cleanupLock.Unlock()
|
||||
|
||||
currentPatterns := rfs.cleanupPatterns[id]
|
||||
delete(rfs.cleanupPatterns, id)
|
||||
|
||||
onlyCurrent, onlyNew := slices.DiffEqualer(currentPatterns, newPatterns)
|
||||
|
||||
patterns := []Pattern{}
|
||||
|
||||
for _, p := range currentPatterns {
|
||||
found := false
|
||||
for _, x := range onlyCurrent {
|
||||
if p.Equal(x) == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
patterns = append(patterns, p)
|
||||
rfs.logger.Debug().WithFields(log.Fields{
|
||||
"id": id,
|
||||
"pattern": p.Pattern,
|
||||
"max_files": p.MaxFiles,
|
||||
"max_file_age": p.MaxFileAge.Seconds(),
|
||||
}).Log("Keep pattern")
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range onlyNew {
|
||||
rfs.logger.Debug().WithFields(log.Fields{
|
||||
"id": id,
|
||||
"pattern": p.Pattern,
|
||||
@@ -130,22 +171,20 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
}).Log("Add pattern")
|
||||
}
|
||||
|
||||
rfs.cleanupLock.Lock()
|
||||
defer rfs.cleanupLock.Unlock()
|
||||
patterns = append(patterns, onlyNew...)
|
||||
|
||||
rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...)
|
||||
}
|
||||
rfs.cleanupPatterns[id] = patterns
|
||||
|
||||
func (rfs *filesystem) UnsetCleanup(id string) {
|
||||
rfs.logger.Debug().WithField("id", id).Log("Remove pattern group")
|
||||
for _, p := range onlyCurrent {
|
||||
rfs.logger.Debug().WithFields(log.Fields{
|
||||
"id": id,
|
||||
"pattern": p.Pattern,
|
||||
"max_files": p.MaxFiles,
|
||||
"max_file_age": p.MaxFileAge.Seconds(),
|
||||
}).Log("Remove pattern")
|
||||
}
|
||||
|
||||
rfs.cleanupLock.Lock()
|
||||
defer rfs.cleanupLock.Unlock()
|
||||
|
||||
patterns := rfs.cleanupPatterns[id]
|
||||
delete(rfs.cleanupPatterns, id)
|
||||
|
||||
rfs.purge(patterns)
|
||||
rfs.purge(onlyCurrent)
|
||||
}
|
||||
|
||||
func (rfs *filesystem) cleanup() {
|
||||
|
@@ -12,6 +12,51 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpdateCleanup(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
clean, err := New(Config{
|
||||
FS: memfs,
|
||||
Interval: time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
cleanfs := clean.(*filesystem)
|
||||
|
||||
cleanfs.Start()
|
||||
|
||||
patterns := []Pattern{
|
||||
{
|
||||
Pattern: "/*.ts",
|
||||
MaxFiles: 3,
|
||||
MaxFileAge: 0,
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||
|
||||
patterns = append(patterns, Pattern{
|
||||
Pattern: "/*.m3u8",
|
||||
MaxFiles: 5,
|
||||
MaxFileAge: 0,
|
||||
})
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns[1:])
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:])
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
|
||||
require.Empty(t, cleanfs.cleanupPatterns["foobar"])
|
||||
}
|
||||
|
||||
func TestMaxFiles(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
@@ -24,7 +69,7 @@ func TestMaxFiles(t *testing.T) {
|
||||
|
||||
cleanfs.Start()
|
||||
|
||||
cleanfs.SetCleanup("foobar", []Pattern{
|
||||
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||
{
|
||||
Pattern: "/*.ts",
|
||||
MaxFiles: 3,
|
||||
@@ -73,7 +118,7 @@ func TestMaxAge(t *testing.T) {
|
||||
|
||||
cleanfs.Start()
|
||||
|
||||
cleanfs.SetCleanup("foobar", []Pattern{
|
||||
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||
{
|
||||
Pattern: "/*.ts",
|
||||
MaxFiles: 0,
|
||||
@@ -122,7 +167,7 @@ func TestUnsetCleanup(t *testing.T) {
|
||||
|
||||
cleanfs.Start()
|
||||
|
||||
cleanfs.SetCleanup("foobar", []Pattern{
|
||||
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||
{
|
||||
Pattern: "/*.ts",
|
||||
MaxFiles: 3,
|
||||
@@ -156,7 +201,7 @@ func TestUnsetCleanup(t *testing.T) {
|
||||
return true
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.UnsetCleanup("foobar")
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||
|
||||
@@ -179,6 +224,76 @@ func TestUnsetCleanup(t *testing.T) {
|
||||
cleanfs.Stop()
|
||||
}
|
||||
|
||||
func TestPurge(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
cleanfs, err := New(Config{
|
||||
FS: memfs,
|
||||
Interval: time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
cleanfs.Start()
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", []Pattern{
|
||||
{
|
||||
Pattern: "/*.ts",
|
||||
MaxFiles: 3,
|
||||
MaxFileAge: 0,
|
||||
PurgeOnDelete: true,
|
||||
},
|
||||
})
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return cleanfs.Files() == 3
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
if cleanfs.Files() != 3 {
|
||||
return false
|
||||
}
|
||||
|
||||
names := []string{}
|
||||
|
||||
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
|
||||
names = append(names, f.Name())
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, []string{"/chunk_1.ts", "/chunk_2.ts", "/chunk_3.ts"}, names)
|
||||
|
||||
return true
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
if cleanfs.Files() != 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
names := []string{}
|
||||
|
||||
for _, f := range cleanfs.List("/", fs.ListOptions{Pattern: "/*.ts"}) {
|
||||
names = append(names, f.Name())
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, []string{"/chunk_4.ts"}, names)
|
||||
|
||||
return true
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.Stop()
|
||||
}
|
||||
|
||||
func BenchmarkCleanup(b *testing.B) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(b, err)
|
||||
@@ -229,7 +344,7 @@ func BenchmarkCleanup(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.SetCleanup(id, patterns)
|
||||
cleanfs.UpdateCleanup(id, patterns)
|
||||
|
||||
ids[i] = id
|
||||
}
|
||||
@@ -308,7 +423,7 @@ func BenchmarkPurge(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.SetCleanup(id, patterns)
|
||||
cleanfs.UpdateCleanup(id, patterns)
|
||||
|
||||
ids[i] = id
|
||||
}
|
||||
|
Reference in New Issue
Block a user