Add timestamp to fs events, allow to provide a glob pattern

This commit is contained in:
Ingo Oppermann
2025-09-23 12:50:23 +02:00
parent 5c7187e373
commit 86437171f3
5 changed files with 81 additions and 38 deletions

View File

@@ -24,7 +24,8 @@ type FilesystemOperation struct {
}
type FilesystemEvent struct {
Action string `json:"action"`
Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
Action string `json:"action"`
Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
Timestamp int64 `json:"ts"`
}

View File

@@ -258,6 +258,11 @@ func (h *FSHandler) DeleteFiles(c echo.Context) error {
}
func (h *FSHandler) ListFiles(c echo.Context) error {
accept := c.Request().Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") {
return h.ListFilesEvent(c)
}
pattern := util.DefaultQuery(c, "glob", "")
sizeMin := util.DefaultQuery(c, "size_min", "0")
sizeMax := util.DefaultQuery(c, "size_max", "0")
@@ -266,11 +271,6 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
sortby := util.DefaultQuery(c, "sort", "none")
order := util.DefaultQuery(c, "order", "asc")
accept := c.Request().Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") {
return h.ListFilesEvent(c)
}
path := "/"
if len(pattern) != 0 {
@@ -354,6 +354,30 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
}
func (h *FSHandler) ListFilesEvent(c echo.Context) error {
pattern := util.DefaultQuery(c, "glob", "")
path := "/"
if len(pattern) != 0 {
prefix := glob.Prefix(pattern)
index := strings.LastIndex(prefix, "/")
path = prefix[:index+1]
}
var compiledPattern glob.Glob = nil
if len(pattern) != 0 {
var err error
compiledPattern, err = glob.Compile(pattern, '/')
if err != nil {
return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err)
}
}
options := fs.ListOptions{
Pattern: pattern,
}
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
@@ -388,10 +412,11 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
done := make(chan error, 1)
createList := func() api.FilesystemEvent {
files := h.FS.Filesystem.List("/", fs.ListOptions{})
files := h.FS.Filesystem.List(path, options)
event := api.FilesystemEvent{
Action: "list",
Names: make([]string, 0, len(files)),
Action: "list",
Names: make([]string, 0, len(files)),
Timestamp: time.Now().UnixMilli(),
}
for _, file := range files {
event.Names = append(event.Names, file.Name())
@@ -420,9 +445,15 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
}
res.Flush()
case e := <-evts:
if compiledPattern != nil {
if !compiledPattern.Match(e.Name) {
continue
}
}
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp.UnixMilli(),
}); err != nil {
done <- err
}

View File

@@ -369,9 +369,9 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int
fs.lastSizeCheck = time.Time{}
if replace {
fs.events.Publish(Event{Action: "update", Name: path})
fs.events.Publish(NewEvent("update", path))
} else {
fs.events.Publish(Event{Action: "create", Name: path})
fs.events.Publish(NewEvent("create", path))
}
return size, !replace, nil
@@ -437,7 +437,7 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "update", Name: path})
fs.events.Publish(NewEvent("update", path))
return size, nil
}
@@ -461,8 +461,8 @@ func (fs *diskFilesystem) rename(src, dst string) error {
// First try to rename the file
if err := os.Rename(src, dst); err == nil {
fs.events.Publish(Event{Action: "remove", Name: src})
fs.events.Publish(Event{Action: "create", Name: dst})
fs.events.Publish(NewEvent("remove", src))
fs.events.Publish(NewEvent("create", dst))
return nil
}
@@ -472,14 +472,14 @@ func (fs *diskFilesystem) rename(src, dst string) error {
return fmt.Errorf("failed to copy files: %w", err)
}
fs.events.Publish(Event{Action: "create", Name: dst})
fs.events.Publish(NewEvent("create", dst))
if err := os.Remove(src); err != nil {
os.Remove(dst)
return fmt.Errorf("failed to remove source file: %w", err)
}
fs.events.Publish(Event{Action: "remove", Name: src})
fs.events.Publish(NewEvent("remove", src))
return nil
}
@@ -520,7 +520,7 @@ func (fs *diskFilesystem) copy(src, dst string) error {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "create", Name: dst})
fs.events.Publish(NewEvent("create", dst))
return nil
}
@@ -574,7 +574,7 @@ func (fs *diskFilesystem) Remove(path string) int64 {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "remove", Name: path})
fs.events.Publish(NewEvent("remove", path))
return size
}
@@ -642,7 +642,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
if err := os.Remove(path); err == nil {
files = append(files, name)
size += info.Size()
fs.events.Publish(Event{Action: "remove", Name: path})
fs.events.Publish(NewEvent("remove", path))
}
})

View File

@@ -4,19 +4,30 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/lithammer/shortuuid/v4"
)
type Event struct {
Action string
Name string
Action string
Name string
Timestamp time.Time
}
func NewEvent(action, name string) Event {
return Event{
Action: action,
Name: name,
Timestamp: time.Now(),
}
}
func (e Event) clone() Event {
return Event{
Action: e.Action,
Name: e.Name,
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp,
}
}

View File

@@ -466,10 +466,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
})
if replace {
fs.events.Publish(Event{Action: "update", Name: newFile.name})
fs.events.Publish(NewEvent("update", newFile.name))
logger.Debug().Log("Replaced file")
} else {
fs.events.Publish(Event{Action: "create", Name: newFile.name})
fs.events.Publish(NewEvent("create", newFile.name))
logger.Debug().Log("Added file")
}
@@ -526,7 +526,7 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
"size_bytes": fs.currentSize,
}).Log("Appended to file")
fs.events.Publish(Event{Action: "update", Name: file.name})
fs.events.Publish(NewEvent("update", file.name))
return size, nil
}
@@ -564,7 +564,7 @@ func (fs *memFilesystem) Purge(size int64) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Purged file")
fs.events.Publish(Event{Action: "remove", Name: f.name})
fs.events.Publish(NewEvent("remove", f.name))
if size <= 0 {
break
@@ -609,20 +609,20 @@ func (fs *memFilesystem) Rename(src, dst string) error {
dstFile, replace := fs.storage.Store(dst, srcFile)
fs.storage.Delete(src)
fs.events.Publish(Event{Action: "remove", Name: src})
fs.events.Publish(NewEvent("remove", src))
fs.dirs.Remove(src)
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(Event{Action: "create", Name: dst})
fs.events.Publish(NewEvent("create", dst))
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(Event{Action: "update", Name: dst})
fs.events.Publish(NewEvent("update", dst))
dstFile.Close()
@@ -660,14 +660,14 @@ func (fs *memFilesystem) Copy(src, dst string) error {
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(Event{Action: "create", Name: dst})
fs.events.Publish(NewEvent("create", dst))
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(Event{Action: "update", Name: dst})
fs.events.Publish(NewEvent("update", dst))
replacedFile.Close()
fs.currentSize -= replacedFile.size
}
@@ -754,7 +754,7 @@ func (fs *memFilesystem) remove(path string) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Removed file")
fs.events.Publish(Event{Action: "remove", Name: file.name})
fs.events.Publish(NewEvent("remove", file.name))
return file.size
}
@@ -830,7 +830,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
file.Close()
fs.events.Publish(Event{Action: "remove", Name: file.name})
fs.events.Publish(NewEvent("remove", file.name))
}
fs.sizeLock.Lock()