From 86437171f36a37ceeba363874818f14fb3a5b85b Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 23 Sep 2025 12:50:23 +0200 Subject: [PATCH] Add timestamp to fs events, allow to provide a glob pattern --- http/api/filesystems.go | 7 +++--- http/handler/filesystem.go | 51 ++++++++++++++++++++++++++++++-------- io/fs/disk.go | 20 +++++++-------- io/fs/event.go | 19 +++++++++++--- io/fs/mem.go | 22 ++++++++-------- 5 files changed, 81 insertions(+), 38 deletions(-) diff --git a/http/api/filesystems.go b/http/api/filesystems.go index 62d7b24b..1b470b84 100644 --- a/http/api/filesystems.go +++ b/http/api/filesystems.go @@ -24,7 +24,8 @@ type FilesystemOperation struct { } type FilesystemEvent struct { - Action string `json:"action"` - Name string `json:"name,omitempty"` - Names []string `json:"names,omitempty"` + Action string `json:"action"` + Name string `json:"name,omitempty"` + Names []string `json:"names,omitempty"` + Timestamp int64 `json:"ts"` } diff --git a/http/handler/filesystem.go b/http/handler/filesystem.go index 60540e7a..3b7496e3 100644 --- a/http/handler/filesystem.go +++ b/http/handler/filesystem.go @@ -258,6 +258,11 @@ func (h *FSHandler) DeleteFiles(c echo.Context) error { } func (h *FSHandler) ListFiles(c echo.Context) error { + accept := c.Request().Header.Get(echo.HeaderAccept) + if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") { + return h.ListFilesEvent(c) + } + pattern := util.DefaultQuery(c, "glob", "") sizeMin := util.DefaultQuery(c, "size_min", "0") sizeMax := util.DefaultQuery(c, "size_max", "0") @@ -266,11 +271,6 @@ func (h *FSHandler) ListFiles(c echo.Context) error { sortby := util.DefaultQuery(c, "sort", "none") order := util.DefaultQuery(c, "order", "asc") - accept := c.Request().Header.Get(echo.HeaderAccept) - if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") { - return h.ListFilesEvent(c) - } - path := "/" if len(pattern) != 0 { @@ -354,6 +354,30 @@ func (h *FSHandler) ListFiles(c echo.Context) error { } func (h *FSHandler) ListFilesEvent(c echo.Context) error { + pattern := util.DefaultQuery(c, "glob", "") + + path := "/" + + if len(pattern) != 0 { + prefix := glob.Prefix(pattern) + index := strings.LastIndex(prefix, "/") + path = prefix[:index+1] + } + + var compiledPattern glob.Glob = nil + + if len(pattern) != 0 { + var err error + compiledPattern, err = glob.Compile(pattern, '/') + if err != nil { + return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err) + } + } + + options := fs.ListOptions{ + Pattern: pattern, + } + keepaliveTicker := time.NewTicker(5 * time.Second) defer keepaliveTicker.Stop() @@ -388,10 +412,11 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error { done := make(chan error, 1) createList := func() api.FilesystemEvent { - files := h.FS.Filesystem.List("/", fs.ListOptions{}) + files := h.FS.Filesystem.List(path, options) event := api.FilesystemEvent{ - Action: "list", - Names: make([]string, 0, len(files)), + Action: "list", + Names: make([]string, 0, len(files)), + Timestamp: time.Now().UnixMilli(), } for _, file := range files { event.Names = append(event.Names, file.Name()) @@ -420,9 +445,15 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error { } res.Flush() case e := <-evts: + if compiledPattern != nil { + if !compiledPattern.Match(e.Name) { + continue + } + } if err := enc.Encode(api.FilesystemEvent{ - Action: e.Action, - Name: e.Name, + Action: e.Action, + Name: e.Name, + Timestamp: e.Timestamp.UnixMilli(), }); err != nil { done <- err } diff --git a/io/fs/disk.go b/io/fs/disk.go index 5964ef51..12f60433 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -369,9 +369,9 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int fs.lastSizeCheck = time.Time{} if replace { - fs.events.Publish(Event{Action: "update", Name: path}) + fs.events.Publish(NewEvent("update", path)) } else { - fs.events.Publish(Event{Action: "create", Name: path}) + fs.events.Publish(NewEvent("create", path)) } return size, !replace, nil @@ -437,7 +437,7 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in fs.lastSizeCheck = time.Time{} - fs.events.Publish(Event{Action: "update", Name: path}) + fs.events.Publish(NewEvent("update", path)) return size, nil } @@ -461,8 +461,8 @@ func (fs *diskFilesystem) rename(src, dst string) error { // First try to rename the file if err := os.Rename(src, dst); err == nil { - fs.events.Publish(Event{Action: "remove", Name: src}) - fs.events.Publish(Event{Action: "create", Name: dst}) + fs.events.Publish(NewEvent("remove", src)) + fs.events.Publish(NewEvent("create", dst)) return nil } @@ -472,14 +472,14 @@ func (fs *diskFilesystem) rename(src, dst string) error { return fmt.Errorf("failed to copy files: %w", err) } - fs.events.Publish(Event{Action: "create", Name: dst}) + fs.events.Publish(NewEvent("create", dst)) if err := os.Remove(src); err != nil { os.Remove(dst) return fmt.Errorf("failed to remove source file: %w", err) } - fs.events.Publish(Event{Action: "remove", Name: src}) + fs.events.Publish(NewEvent("remove", src)) return nil } @@ -520,7 +520,7 @@ func (fs *diskFilesystem) copy(src, dst string) error { fs.lastSizeCheck = time.Time{} - fs.events.Publish(Event{Action: "create", Name: dst}) + fs.events.Publish(NewEvent("create", dst)) return nil } @@ -574,7 +574,7 @@ func (fs *diskFilesystem) Remove(path string) int64 { fs.lastSizeCheck = time.Time{} - fs.events.Publish(Event{Action: "remove", Name: path}) + fs.events.Publish(NewEvent("remove", path)) return size } @@ -642,7 +642,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string if err := os.Remove(path); err == nil { files = append(files, name) size += info.Size() - fs.events.Publish(Event{Action: "remove", Name: path}) + fs.events.Publish(NewEvent("remove", path)) } }) diff --git a/io/fs/event.go b/io/fs/event.go index b8eeb791..0956d4de 100644 --- a/io/fs/event.go +++ b/io/fs/event.go @@ -4,19 +4,30 @@ import ( "context" "fmt" "sync" + "time" "github.com/lithammer/shortuuid/v4" ) type Event struct { - Action string - Name string + Action string + Name string + Timestamp time.Time +} + +func NewEvent(action, name string) Event { + return Event{ + Action: action, + Name: name, + Timestamp: time.Now(), + } } func (e Event) clone() Event { return Event{ - Action: e.Action, - Name: e.Name, + Action: e.Action, + Name: e.Name, + Timestamp: e.Timestamp, } } diff --git a/io/fs/mem.go b/io/fs/mem.go index 4e8acfe8..fa40aea0 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -466,10 +466,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) }) if replace { - fs.events.Publish(Event{Action: "update", Name: newFile.name}) + fs.events.Publish(NewEvent("update", newFile.name)) logger.Debug().Log("Replaced file") } else { - fs.events.Publish(Event{Action: "create", Name: newFile.name}) + fs.events.Publish(NewEvent("create", newFile.name)) logger.Debug().Log("Added file") } @@ -526,7 +526,7 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int "size_bytes": fs.currentSize, }).Log("Appended to file") - fs.events.Publish(Event{Action: "update", Name: file.name}) + fs.events.Publish(NewEvent("update", file.name)) return size, nil } @@ -564,7 +564,7 @@ func (fs *memFilesystem) Purge(size int64) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Purged file") - fs.events.Publish(Event{Action: "remove", Name: f.name}) + fs.events.Publish(NewEvent("remove", f.name)) if size <= 0 { break @@ -609,20 +609,20 @@ func (fs *memFilesystem) Rename(src, dst string) error { dstFile, replace := fs.storage.Store(dst, srcFile) fs.storage.Delete(src) - fs.events.Publish(Event{Action: "remove", Name: src}) + fs.events.Publish(NewEvent("remove", src)) fs.dirs.Remove(src) if !replace { fs.dirs.Add(dst) - fs.events.Publish(Event{Action: "create", Name: dst}) + fs.events.Publish(NewEvent("create", dst)) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { - fs.events.Publish(Event{Action: "update", Name: dst}) + fs.events.Publish(NewEvent("update", dst)) dstFile.Close() @@ -660,14 +660,14 @@ func (fs *memFilesystem) Copy(src, dst string) error { if !replace { fs.dirs.Add(dst) - fs.events.Publish(Event{Action: "create", Name: dst}) + fs.events.Publish(NewEvent("create", dst)) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { - fs.events.Publish(Event{Action: "update", Name: dst}) + fs.events.Publish(NewEvent("update", dst)) replacedFile.Close() fs.currentSize -= replacedFile.size } @@ -754,7 +754,7 @@ func (fs *memFilesystem) remove(path string) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Removed file") - fs.events.Publish(Event{Action: "remove", Name: file.name}) + fs.events.Publish(NewEvent("remove", file.name)) return file.size } @@ -830,7 +830,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, file.Close() - fs.events.Publish(Event{Action: "remove", Name: file.name}) + fs.events.Publish(NewEvent("remove", file.name)) } fs.sizeLock.Lock()