Add filesystem event handler

This commit is contained in:
Ingo Oppermann
2025-09-22 17:10:05 +02:00
parent 062204eca9
commit bbf40b6e50
8 changed files with 108 additions and 10 deletions

View File

@@ -22,3 +22,8 @@ type FilesystemOperation struct {
Target string `json:"target"` Target string `json:"target"`
RateLimit uint64 `json:"bandwidth_limit_kbit"` // kbit/s RateLimit uint64 `json:"bandwidth_limit_kbit"` // kbit/s
} }
type FilesystemEvent struct {
Action string `json:"action"`
Name string `json:"name"`
}

View File

@@ -35,21 +35,21 @@ func NewClusterFS(name string, fs fs.Filesystem, proxy *node.Manager) Filesystem
return f return f
} }
func (fs *filesystem) Open(path string) fs.File { func (cfs *filesystem) Open(path string) fs.File {
// Check if the file is locally available // Check if the file is locally available
//if file := fs.Filesystem.Open(path); file != nil { //if file := fs.Filesystem.Open(path); file != nil {
// return file // return file
//} //}
// Check if the file is available in the cluster // Check if the file is available in the cluster
size, lastModified, err := fs.proxy.FilesystemGetFileInfo(fs.name, path) size, lastModified, err := cfs.proxy.FilesystemGetFileInfo(cfs.name, path)
if err != nil { if err != nil {
return nil return nil
} }
file := &file{ file := &file{
getFile: func(offset int64) (io.ReadCloser, error) { getFile: func(offset int64) (io.ReadCloser, error) {
return fs.proxy.FilesystemGetFile(fs.name, path, offset) return cfs.proxy.FilesystemGetFile(cfs.name, path, offset)
}, },
name: path, name: path,
size: size, size: size,

View File

@@ -1,6 +1,7 @@
package handler package handler
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@@ -265,6 +266,11 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
sortby := util.DefaultQuery(c, "sort", "none") sortby := util.DefaultQuery(c, "sort", "none")
order := util.DefaultQuery(c, "order", "asc") order := util.DefaultQuery(c, "order", "asc")
accept := c.Request().Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") {
return h.ListFilesEvent(c)
}
path := "/" path := "/"
if len(pattern) != 0 { if len(pattern) != 0 {
@@ -300,7 +306,7 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
if len(modifiedEnd) != 0 { if len(modifiedEnd) != 0 {
if x, err := strconv.ParseInt(modifiedEnd, 10, 64); err != nil { if x, err := strconv.ParseInt(modifiedEnd, 10, 64); err != nil {
return api.Err(http.StatusBadRequest, "", "lastmode_end: %s", err.Error()) return api.Err(http.StatusBadRequest, "", "lastmod_end: %s", err.Error())
} else { } else {
t := time.Unix(x+1, 0) t := time.Unix(x+1, 0)
options.ModifiedEnd = &t options.ModifiedEnd = &t
@@ -347,6 +353,85 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
return c.JSON(http.StatusOK, fileinfos) return c.JSON(http.StatusOK, fileinfos)
} }
func (h *FSHandler) ListFilesEvent(c echo.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "text/event-stream"
accept := req.Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") {
contentType = "application/x-json-stream"
}
evts, cancel, err := h.FS.Filesystem.Events()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this filesystem")
}
defer cancel()
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case e := <-evts:
res.Write([]byte("event: " + e.Action + "\ndata: "))
if err := enc.Encode(e); err != nil {
done <- err
}
res.Write([]byte("\n"))
res.Flush()
}
}
} else {
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
case e := <-evts:
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
}); err != nil {
done <- err
}
res.Flush()
}
}
}
}
// From: github.com/golang/go/net/http/fs.go@7dc9fcb // From: github.com/golang/go/net/http/fs.go@7dc9fcb
// errNoOverlap is returned by serveContent's parseRange if first-byte-pos of // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of

View File

@@ -766,3 +766,7 @@ func (fs *diskFilesystem) cleanPath(path string) string {
return filepath.Join(fs.root, filepath.Clean(path)) return filepath.Join(fs.root, filepath.Clean(path))
} }
func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem")
}

View File

@@ -22,10 +22,6 @@ func (e Event) clone() Event {
type EventsCancelFunc func() type EventsCancelFunc func()
type EventFilesystem interface {
Events() (<-chan Event, EventsCancelFunc)
}
type EventWriter struct { type EventWriter struct {
publisher chan Event publisher chan Event
publisherClosed bool publisherClosed bool

View File

@@ -84,6 +84,8 @@ type ReadFilesystem interface {
// on success, the result is an absolute path. On non-disk filesystems. Only the mere existence // on success, the result is an absolute path. On non-disk filesystems. Only the mere existence
// of that file is verfied. In case the file is not found, the error ErrNotExist will be returned. // of that file is verfied. In case the file is not found, the error ErrNotExist will be returned.
LookPath(file string) (string, error) LookPath(file string) (string, error)
Events() (<-chan Event, EventsCancelFunc, error)
} }
type WriteFilesystem interface { type WriteFilesystem interface {

View File

@@ -948,6 +948,8 @@ func (fs *memFilesystem) cleanPath(path string) string {
return filepath.Join("/", filepath.Clean(path)) return filepath.Join("/", filepath.Clean(path))
} }
func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc) { func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
return fs.events.Subscribe() ch, cancel := fs.events.Subscribe()
return ch, cancel, nil
} }

View File

@@ -686,6 +686,10 @@ func (fs *s3Filesystem) cleanPath(path string) string {
return filepath.Join("/", filepath.Clean(path))[1:] return filepath.Join("/", filepath.Clean(path))[1:]
} }
func (fs *s3Filesystem) Events() (<-chan Event, EventsCancelFunc, error) {
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem")
}
type s3FileInfo struct { type s3FileInfo struct {
name string name string
size int64 size int64