diff --git a/http/api/filesystems.go b/http/api/filesystems.go index da2395d4..2d8be806 100644 --- a/http/api/filesystems.go +++ b/http/api/filesystems.go @@ -22,3 +22,8 @@ type FilesystemOperation struct { Target string `json:"target"` RateLimit uint64 `json:"bandwidth_limit_kbit"` // kbit/s } + +type FilesystemEvent struct { + Action string `json:"action"` + Name string `json:"name"` +} diff --git a/http/fs/cluster.go b/http/fs/cluster.go index fba458d7..83531132 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -35,21 +35,21 @@ func NewClusterFS(name string, fs fs.Filesystem, proxy *node.Manager) Filesystem return f } -func (fs *filesystem) Open(path string) fs.File { +func (cfs *filesystem) Open(path string) fs.File { // Check if the file is locally available //if file := fs.Filesystem.Open(path); file != nil { // return file //} // Check if the file is available in the cluster - size, lastModified, err := fs.proxy.FilesystemGetFileInfo(fs.name, path) + size, lastModified, err := cfs.proxy.FilesystemGetFileInfo(cfs.name, path) if err != nil { return nil } file := &file{ getFile: func(offset int64) (io.ReadCloser, error) { - return fs.proxy.FilesystemGetFile(fs.name, path, offset) + return cfs.proxy.FilesystemGetFile(cfs.name, path, offset) }, name: path, size: size, diff --git a/http/handler/filesystem.go b/http/handler/filesystem.go index 1bffccf5..feb509d9 100644 --- a/http/handler/filesystem.go +++ b/http/handler/filesystem.go @@ -1,6 +1,7 @@ package handler import ( + "encoding/json" "errors" "fmt" "io" @@ -265,6 +266,11 @@ func (h *FSHandler) ListFiles(c echo.Context) error { sortby := util.DefaultQuery(c, "sort", "none") order := util.DefaultQuery(c, "order", "asc") + accept := c.Request().Header.Get(echo.HeaderAccept) + if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") { + return h.ListFilesEvent(c) + } + path := "/" if len(pattern) != 0 { @@ -300,7 +306,7 @@ func (h *FSHandler) ListFiles(c echo.Context) error { if len(modifiedEnd) != 0 { if x, err := strconv.ParseInt(modifiedEnd, 10, 64); err != nil { - return api.Err(http.StatusBadRequest, "", "lastmode_end: %s", err.Error()) + return api.Err(http.StatusBadRequest, "", "lastmod_end: %s", err.Error()) } else { t := time.Unix(x+1, 0) options.ModifiedEnd = &t @@ -347,6 +353,85 @@ func (h *FSHandler) ListFiles(c echo.Context) error { return c.JSON(http.StatusOK, fileinfos) } +func (h *FSHandler) ListFilesEvent(c echo.Context) error { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + req := c.Request() + reqctx := req.Context() + + contentType := "text/event-stream" + accept := req.Header.Get(echo.HeaderAccept) + if strings.Contains(accept, "application/x-json-stream") { + contentType = "application/x-json-stream" + } + + evts, cancel, err := h.FS.Filesystem.Events() + if err != nil { + return api.Err(http.StatusNotImplemented, "", "events are not implemented for this filesystem") + } + defer cancel() + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.Header().Set(echo.HeaderConnection, "close") + res.WriteHeader(http.StatusOK) + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + done := make(chan error, 1) + + if contentType == "text/event-stream" { + res.Write([]byte(":keepalive\n\n")) + res.Flush() + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-ticker.C: + res.Write([]byte(":keepalive\n\n")) + res.Flush() + case e := <-evts: + res.Write([]byte("event: " + e.Action + "\ndata: ")) + if err := enc.Encode(e); err != nil { + done <- err + } + res.Write([]byte("\n")) + res.Flush() + } + } + } else { + res.Write([]byte("{\"action\": \"keepalive\"}\n")) + res.Flush() + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-ticker.C: + res.Write([]byte("{\"action\": \"keepalive\"}\n")) + res.Flush() + case e := <-evts: + if err := enc.Encode(api.FilesystemEvent{ + Action: e.Action, + Name: e.Name, + }); err != nil { + done <- err + } + res.Flush() + } + } + } +} + // From: github.com/golang/go/net/http/fs.go@7dc9fcb // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of diff --git a/io/fs/disk.go b/io/fs/disk.go index b9f76b77..e205e9d3 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -766,3 +766,7 @@ func (fs *diskFilesystem) cleanPath(path string) string { return filepath.Join(fs.root, filepath.Clean(path)) } + +func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) { + return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem") +} diff --git a/io/fs/event.go b/io/fs/event.go index cadf08f5..b8eeb791 100644 --- a/io/fs/event.go +++ b/io/fs/event.go @@ -22,10 +22,6 @@ func (e Event) clone() Event { type EventsCancelFunc func() -type EventFilesystem interface { - Events() (<-chan Event, EventsCancelFunc) -} - type EventWriter struct { publisher chan Event publisherClosed bool diff --git a/io/fs/fs.go b/io/fs/fs.go index 973be194..ed10a4bd 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -84,6 +84,8 @@ type ReadFilesystem interface { // on success, the result is an absolute path. On non-disk filesystems. Only the mere existence // of that file is verfied. In case the file is not found, the error ErrNotExist will be returned. LookPath(file string) (string, error) + + Events() (<-chan Event, EventsCancelFunc, error) } type WriteFilesystem interface { diff --git a/io/fs/mem.go b/io/fs/mem.go index 0a028c6c..4e8acfe8 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -948,6 +948,8 @@ func (fs *memFilesystem) cleanPath(path string) string { return filepath.Join("/", filepath.Clean(path)) } -func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc) { - return fs.events.Subscribe() +func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc, error) { + ch, cancel := fs.events.Subscribe() + + return ch, cancel, nil } diff --git a/io/fs/s3.go b/io/fs/s3.go index e70b1079..3585deab 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -686,6 +686,10 @@ func (fs *s3Filesystem) cleanPath(path string) string { return filepath.Join("/", filepath.Clean(path))[1:] } +func (fs *s3Filesystem) Events() (<-chan Event, EventsCancelFunc, error) { + return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem") +} + type s3FileInfo struct { name string size int64