2 Commits

Author SHA1 Message Date
Ingo Oppermann
5c7187e373 Add events to disk filesystem 2025-09-22 21:51:37 +02:00
Ingo Oppermann
882764dfe3 Emit list action 2025-09-22 21:44:22 +02:00
3 changed files with 73 additions and 46 deletions

View File

@@ -24,6 +24,7 @@ type FilesystemOperation struct {
} }
type FilesystemEvent struct { type FilesystemEvent struct {
Action string `json:"action"` Action string `json:"action"`
Name string `json:"name"` Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
} }

View File

@@ -354,8 +354,11 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
} }
func (h *FSHandler) ListFilesEvent(c echo.Context) error { func (h *FSHandler) ListFilesEvent(c echo.Context) error {
ticker := time.NewTicker(5 * time.Second) keepaliveTicker := time.NewTicker(5 * time.Second)
defer ticker.Stop() defer keepaliveTicker.Stop()
listTicker := time.NewTicker(30 * time.Second)
defer listTicker.Stop()
req := c.Request() req := c.Request()
reqctx := req.Context() reqctx := req.Context()
@@ -384,50 +387,46 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
done := make(chan error, 1) done := make(chan error, 1)
if contentType == "text/event-stream" { createList := func() api.FilesystemEvent {
res.Write([]byte(":keepalive\n\n")) files := h.FS.Filesystem.List("/", fs.ListOptions{})
res.Flush() event := api.FilesystemEvent{
Action: "list",
for { Names: make([]string, 0, len(files)),
select { }
case err := <-done: for _, file := range files {
return err event.Names = append(event.Names, file.Name())
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case e := <-evts:
res.Write([]byte("event: " + e.Action + "\ndata: "))
if err := enc.Encode(e); err != nil {
done <- err
}
res.Write([]byte("\n"))
res.Flush()
}
} }
} else {
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
for { return event
select { }
case err := <-done:
return err if err := enc.Encode(createList()); err != nil {
case <-reqctx.Done(): done <- err
done <- nil }
case <-ticker.C: res.Flush()
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush() for {
case e := <-evts: select {
if err := enc.Encode(api.FilesystemEvent{ case err := <-done:
Action: e.Action, return err
Name: e.Name, case <-reqctx.Done():
}); err != nil { done <- nil
done <- err case <-keepaliveTicker.C:
} res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush() res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
done <- err
} }
res.Flush()
case e := <-evts:
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
}); err != nil {
done <- err
}
res.Flush()
} }
} }
} }

View File

@@ -139,6 +139,8 @@ type diskFilesystem struct {
// Logger from the config // Logger from the config
logger log.Logger logger log.Logger
events *EventWriter
} }
// NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem. // NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem.
@@ -172,6 +174,8 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) {
fs.logger = log.New("") fs.logger = log.New("")
} }
fs.events = NewEventWriter()
return fs, nil return fs, nil
} }
@@ -214,6 +218,8 @@ func NewRootedDiskFilesystem(config RootedDiskConfig) (Filesystem, error) {
fs.logger = log.New("") fs.logger = log.New("")
} }
fs.events = NewEventWriter()
return fs, nil return fs, nil
} }
@@ -362,6 +368,12 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int
fs.lastSizeCheck = time.Time{} fs.lastSizeCheck = time.Time{}
if replace {
fs.events.Publish(Event{Action: "update", Name: path})
} else {
fs.events.Publish(Event{Action: "create", Name: path})
}
return size, !replace, nil return size, !replace, nil
} }
@@ -425,6 +437,8 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in
fs.lastSizeCheck = time.Time{} fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "update", Name: path})
return size, nil return size, nil
} }
@@ -447,6 +461,8 @@ func (fs *diskFilesystem) rename(src, dst string) error {
// First try to rename the file // First try to rename the file
if err := os.Rename(src, dst); err == nil { if err := os.Rename(src, dst); err == nil {
fs.events.Publish(Event{Action: "remove", Name: src})
fs.events.Publish(Event{Action: "create", Name: dst})
return nil return nil
} }
@@ -456,11 +472,15 @@ func (fs *diskFilesystem) rename(src, dst string) error {
return fmt.Errorf("failed to copy files: %w", err) return fmt.Errorf("failed to copy files: %w", err)
} }
fs.events.Publish(Event{Action: "create", Name: dst})
if err := os.Remove(src); err != nil { if err := os.Remove(src); err != nil {
os.Remove(dst) os.Remove(dst)
return fmt.Errorf("failed to remove source file: %w", err) return fmt.Errorf("failed to remove source file: %w", err)
} }
fs.events.Publish(Event{Action: "remove", Name: src})
return nil return nil
} }
@@ -500,6 +520,8 @@ func (fs *diskFilesystem) copy(src, dst string) error {
fs.lastSizeCheck = time.Time{} fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "create", Name: dst})
return nil return nil
} }
@@ -552,6 +574,8 @@ func (fs *diskFilesystem) Remove(path string) int64 {
fs.lastSizeCheck = time.Time{} fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "remove", Name: path})
return size return size
} }
@@ -618,6 +642,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
if err := os.Remove(path); err == nil { if err := os.Remove(path); err == nil {
files = append(files, name) files = append(files, name)
size += info.Size() size += info.Size()
fs.events.Publish(Event{Action: "remove", Name: path})
} }
}) })
@@ -768,5 +793,7 @@ func (fs *diskFilesystem) cleanPath(path string) string {
} }
func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) { func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem") ch, cancel := fs.events.Subscribe()
return ch, cancel, nil
} }