Limit s3 filesystem to put, get, and delete, no more listing. reject s3 cleanup rules with wildcards

This commit is contained in:
Ingo Oppermann
2025-07-23 13:51:41 +02:00
parent 36470072f4
commit 46810bf64d
7 changed files with 387 additions and 238 deletions

View File

@@ -48,6 +48,11 @@ func Prefix(pattern string) string {
return strings.Clone(pattern[:index])
}
func IsPattern(pattern string) bool {
index := strings.IndexAny(pattern, "*[{")
return index != -1
}
// Match returns whether the name matches the glob pattern, also considering
// one or several optionnal separator. An error is only returned if the pattern
// is invalid.

View File

@@ -22,3 +22,16 @@ func TestPatterns(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)
}
func TestPrefix(t *testing.T) {
prefix := Prefix("/a/b/c/d")
require.Equal(t, "/a/b/c/d", prefix)
prefix = Prefix("/a/b/*/d")
require.Equal(t, "/a/b/", prefix)
}
func TestIsPattern(t *testing.T) {
require.False(t, IsPattern("/a/b/c/d"))
require.True(t, IsPattern("/a/b/*/d"))
}

View File

@@ -171,13 +171,26 @@ func testWriteFile(t *testing.T, fs Filesystem) {
cur, max := fs.Size()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
require.Equal(t, int64(-1), max)
data, err := fs.ReadFile("/foobar")
require.NoError(t, err)
require.Equal(t, []byte("xxxxx"), data)
} else {
require.Equal(t, int64(5), cur)
require.Equal(t, int64(-1), max)
}
cur = fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
} else {
require.Equal(t, int64(1), cur)
}
}
func testWriteFileSafe(t *testing.T, fs Filesystem) {
size, created, err := fs.WriteFileSafe("/foobar", []byte("xxxxx"))
@@ -186,6 +199,10 @@ func testWriteFileSafe(t *testing.T, fs Filesystem) {
require.Equal(t, int64(5), size)
require.Equal(t, true, created)
if _, ok := fs.(*s3Filesystem); ok {
return
}
cur, max := fs.Size()
require.Equal(t, int64(5), cur)
@@ -207,6 +224,17 @@ func testWriteFileReader(t *testing.T, fs Filesystem) {
cur, max := fs.Size()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
require.Equal(t, int64(-1), max)
data, err := fs.ReadFile("/foobar")
require.NoError(t, err)
require.Equal(t, []byte("xxxxx"), data)
return
}
require.Equal(t, int64(5), cur)
require.Equal(t, int64(-1), max)
@@ -266,6 +294,11 @@ func testFiles(t *testing.T, fs Filesystem) {
fs.WriteFileReader("/foobar.txt", strings.NewReader("bar"), -1)
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), fs.Files())
return
}
require.Equal(t, int64(1), fs.Files())
fs.MkdirAll("/path/to/foo", 0755)
@@ -288,12 +321,21 @@ func testReplace(t *testing.T, fs Filesystem) {
cur, max := fs.Size()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
require.Equal(t, int64(-1), max)
} else {
require.Equal(t, int64(5), cur)
require.Equal(t, int64(-1), max)
}
cur = fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
} else {
require.Equal(t, int64(1), cur)
}
data = strings.NewReader("yyy")
@@ -303,6 +345,13 @@ func testReplace(t *testing.T, fs Filesystem) {
require.Equal(t, int64(3), size)
require.Equal(t, false, created)
if _, ok := fs.(*s3Filesystem); ok {
data, err := fs.ReadFile("/foobar")
require.NoError(t, err)
require.Equal(t, []byte("yyy"), data)
return
}
cur, max = fs.Size()
require.Equal(t, int64(3), cur)
@@ -323,6 +372,12 @@ func testList(t *testing.T, fs Filesystem) {
cur, max := fs.Size()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
require.Equal(t, int64(-1), max)
return
}
require.Equal(t, int64(17), cur)
require.Equal(t, int64(-1), max)
@@ -357,7 +412,11 @@ func testListGlob(t *testing.T, fs Filesystem) {
cur := fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
} else {
require.Equal(t, int64(4), cur)
}
getNames := func(files []FileInfo) []string {
names := []string{}
@@ -367,6 +426,13 @@ func testListGlob(t *testing.T, fs Filesystem) {
return names
}
if _, ok := fs.(*s3Filesystem); ok {
files := getNames(fs.List("/", ListOptions{Pattern: "/foo*"}))
require.Equal(t, 0, len(files))
require.ElementsMatch(t, []string{}, files)
return
}
files := getNames(fs.List("/", ListOptions{Pattern: "/foo*"}))
require.Equal(t, 2, len(files))
require.ElementsMatch(t, []string{"/foobar1", "/foobar4"}, files)
@@ -389,6 +455,10 @@ func testListGlob(t *testing.T, fs Filesystem) {
}
func testListSize(t *testing.T, fs Filesystem) {
if _, ok := fs.(*s3Filesystem); ok {
return
}
fs.WriteFileReader("/a", strings.NewReader("a"), -1)
fs.WriteFileReader("/aa", strings.NewReader("aa"), -1)
fs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1)
@@ -434,6 +504,11 @@ func testListModified(t *testing.T, fs Filesystem) {
cur := fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
return
}
require.Equal(t, int64(4), cur)
getNames := func(files []FileInfo) []string {
@@ -483,16 +558,29 @@ func testRemoveAll(t *testing.T, fs Filesystem) {
cur := fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
} else {
require.Equal(t, int64(4), cur)
}
_, size := fs.RemoveList("/", ListOptions{
Pattern: "",
})
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), size)
} else {
require.Equal(t, int64(12), size)
}
cur = fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
} else {
require.Equal(t, int64(0), cur)
}
}
func testRemoveList(t *testing.T, fs Filesystem) {
@@ -503,6 +591,11 @@ func testRemoveList(t *testing.T, fs Filesystem) {
cur := fs.Files()
if _, ok := fs.(*s3Filesystem); ok {
require.Equal(t, int64(0), cur)
return
}
require.Equal(t, int64(4), cur)
_, size := fs.RemoveList("/", ListOptions{
@@ -555,6 +648,10 @@ func testStatDir(t *testing.T, fs Filesystem) {
require.NotNil(t, info)
require.Equal(t, true, info.IsDir())
if _, ok := fs.(*s3Filesystem); ok {
return
}
fs.WriteFileReader("/these/are/some/directories/foobar", strings.NewReader("gduwotoxqb"), -1)
info, err = fs.Stat("/foobar")
@@ -593,6 +690,10 @@ func testStatDir(t *testing.T, fs Filesystem) {
}
func testMkdirAll(t *testing.T, fs Filesystem) {
if _, ok := fs.(*s3Filesystem); ok {
return
}
info, err := fs.Stat("/foo/bar/dir")
require.Error(t, err)
require.Nil(t, info)

View File

@@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"github.com/minio/minio-go/v7"
@@ -139,47 +138,11 @@ func (fs *s3Filesystem) SetMetadata(key, data string) {
}
func (fs *s3Filesystem) Size() (int64, int64) {
size := int64(0)
files := fs.List("/", ListOptions{})
for _, file := range files {
size += file.Size()
}
return size, -1
return 0, -1
}
func (fs *s3Filesystem) Files() int64 {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{
WithVersions: false,
WithMetadata: false,
Prefix: "",
Recursive: true,
MaxKeys: 0,
StartAfter: "",
UseV1: false,
})
nfiles := int64(0)
for object := range ch {
if object.Err != nil {
fs.logger.WithError(object.Err).Log("Listing object failed")
}
if strings.HasSuffix("/"+object.Key, "/"+fakeDirEntry) {
// Skip fake entries (see MkdirAll)
continue
}
nfiles++
}
return nfiles
return 0
}
func (fs *s3Filesystem) Symlink(oldname, newname string) error {
@@ -403,8 +366,7 @@ func (fs *s3Filesystem) Copy(src, dst string) error {
src = fs.cleanPath(src)
dst = fs.cleanPath(dst)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()
_, err := fs.client.CopyObject(ctx, minio.CopyDestOptions{
Bucket: fs.bucket,
@@ -444,8 +406,7 @@ func (fs *s3Filesystem) MkdirAll(path string, perm os.FileMode) error {
func (fs *s3Filesystem) Remove(path string) int64 {
path = fs.cleanPath(path)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()
stat, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{})
if err != nil {
@@ -467,9 +428,13 @@ func (fs *s3Filesystem) Remove(path string) int64 {
}
func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string, int64) {
fs.logger.Warn().Log("Removing files with pattern is not supported")
return nil, 0
/*
path = fs.cleanPath(path)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var totalSize int64 = 0
@@ -495,6 +460,7 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string,
objectsCh := make(chan minio.ObjectInfo)
// Send object names that are needed to be removed to objectsCh
go func() {
defer close(objectsCh)
@@ -557,15 +523,21 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string,
for err := range fs.client.RemoveObjects(context.Background(), fs.bucket, objectsCh, minio.RemoveObjectsOptions{
GovernanceBypass: true,
}) {
fs.logger.WithError(err.Err).WithField("key", err.ObjectName).Log("Deleting object failed")
}
fs.logger.Debug().Log("Deleted all files")
return files, totalSize
*/
}
func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
fs.logger.Warn().Log("Listing files is not supported")
return nil
/*
path = fs.cleanPath(path)
var compiledPattern glob.Glob
@@ -587,7 +559,7 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
files := []FileInfo{}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{
@@ -652,6 +624,7 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
}
return files
*/
}
func (fs *s3Filesystem) LookPath(file string) (string, error) {
@@ -689,38 +662,18 @@ func (fs *s3Filesystem) LookPath(file string) (string, error) {
func (fs *s3Filesystem) isDir(path string) bool {
if !strings.HasSuffix(path, "/") {
path = path + "/"
path = path + "/" + fakeDirEntry
}
if path == "/" {
if path == "/"+fakeDirEntry {
return true
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{
WithVersions: false,
WithMetadata: false,
Prefix: path,
Recursive: true,
MaxKeys: 1,
StartAfter: "",
UseV1: false,
})
files := uint64(0)
for object := range ch {
if object.Err != nil {
fs.logger.WithError(object.Err).Log("Listing object failed")
continue
}
files++
}
return files > 0
_, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{})
return err == nil
}
func (fs *s3Filesystem) cleanPath(path string) string {

View File

@@ -187,7 +187,8 @@ func (r *restream) Start() {
t.Restore()
// The filesystem cleanup rules can be set
r.setCleanup(id, t.config)
patterns, _ := r.compileCleanup(t.ResolvedConfig())
r.setCleanup(id, patterns)
return true
})
@@ -537,7 +538,12 @@ func (r *restream) AddProcess(config *app.Config) error {
}
// set filesystem cleanup rules
r.setCleanup(tid, t.config)
patterns, err := r.compileCleanup(t.ResolvedConfig())
if err != nil {
t.Destroy()
return err
}
r.setCleanup(tid, patterns)
err = t.Restore()
if err != nil {
@@ -703,14 +709,21 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
}
}
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
func (r *restream) compileCleanup(config *app.Config) (map[string][]rfs.Pattern, error) {
patterns := map[string][]rfs.Pattern{}
var err error = nil
logger := r.logger.WithFields(log.Fields{
"id": config.ID,
"domain": config.Domain,
})
for _, output := range config.Output {
for _, c := range output.Cleanup {
name, path, found := strings.Cut(c.Pattern, ":")
if !found {
r.logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix")
logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix")
err = fmt.Errorf("invalid pattern, no prefix: %s", c.Pattern)
continue
}
@@ -726,6 +739,38 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
name = "mem"
}
fstype := ""
for _, fs := range r.fs.list {
if fs.Name() != name {
continue
}
fstype = fs.Type()
break
}
if len(fstype) == 0 {
logger.Warn().WithField("pattern", c.Pattern).Log("no filesystem with the name '%s' found", name)
err = fmt.Errorf("no filesystem with the name '%s' found: %s", name, c.Pattern)
continue
}
if fstype == "s3" {
if glob.IsPattern(path) {
logger.Warn().WithField("pattern", c.Pattern).Log("wildcards are not allowed for s3 filesystems")
err = fmt.Errorf("wildcards are not allowed for s3 filesystems: %s", c.Pattern)
continue
}
if c.MaxFiles != 0 || c.MaxFileAge != 0 {
logger.Warn().WithField("pattern", c.Pattern).Log("cleanup filter rule are not allowed for s3 filesystems")
err = fmt.Errorf("cleanup filter rules are not allowed for s3 filesystems: %s", c.Pattern)
continue
}
}
p := patterns[name]
p = append(p, rfs.Pattern{
Pattern: path,
@@ -737,6 +782,10 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
}
}
return patterns, err
}
func (r *restream) setCleanup(id app.ProcessID, patterns map[string][]rfs.Pattern) {
for name, p := range patterns {
for _, fs := range r.fs.list {
if fs.Name() != name {
@@ -1191,6 +1240,11 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err
return err
}
cleanupPatterns, err := r.compileCleanup(t.ResolvedConfig())
if err != nil {
return err
}
tid := t.ID()
if !tid.Equal(task.ID()) {
@@ -1227,7 +1281,7 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err
r.tasks.LoadAndStore(tid, t)
// Set the filesystem cleanup rules
r.setCleanup(tid, t.config)
r.setCleanup(tid, cleanupPatterns)
t.Restore()
@@ -1426,6 +1480,11 @@ func (r *restream) ReloadProcess(id app.ProcessID) error {
}
func (r *restream) reloadProcess(task *task) error {
cleanupPatterns, err := r.compileCleanup(task.ResolvedConfig())
if err != nil {
return err
}
t, err := r.createTask(task.Config())
if err != nil {
return err
@@ -1457,7 +1516,7 @@ func (r *restream) reloadProcess(task *task) error {
r.tasks.LoadAndStore(tid, t)
// Set the filesystem cleanup rules
r.setCleanup(tid, t.config)
r.setCleanup(tid, cleanupPatterns)
t.Restore()

View File

@@ -1503,7 +1503,7 @@ func TestProcessReplacer(t *testing.T) {
},
Cleanup: []app.ConfigIOCleanup{
{
Pattern: "pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}",
Pattern: "foo:pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}",
MaxFiles: 0,
MaxFileAge: 0,
PurgeOnDelete: false,
@@ -1574,7 +1574,7 @@ func TestProcessReplacer(t *testing.T) {
},
Cleanup: []app.ConfigIOCleanup{
{
Pattern: "pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}",
Pattern: "foo:pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}",
MaxFiles: 0,
MaxFileAge: 0,
PurgeOnDelete: false,

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
@@ -181,6 +182,10 @@ func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern, purge boo
}
func (rfs *filesystem) cleanup() {
if rfs.Filesystem.Type() == "s3" {
return
}
rfs.cleanupLock.RLock()
nPatterns := len(rfs.cleanupPatterns)
rfs.cleanupLock.RUnlock()
@@ -251,7 +256,19 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 {
continue
}
files, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
if len(pattern.Pattern) == 0 {
continue
}
if rfs.Filesystem.Type() == "s3" {
rfs.Filesystem.Remove(pattern.Pattern)
nfilesTotal++
} else {
prefix := glob.Prefix(pattern.Pattern)
index := strings.LastIndex(prefix, "/")
path := prefix[:index+1]
files, nfiles := rfs.Filesystem.RemoveList(path, fs.ListOptions{
Pattern: pattern.Pattern,
})
@@ -261,6 +278,7 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 {
nfilesTotal += nfiles
}
}
return nfilesTotal
}