2 Commits

Author SHA1 Message Date
Ingo Oppermann
5c7187e373 Add events to disk filesystem 2025-09-22 21:51:37 +02:00
Ingo Oppermann
882764dfe3 Emit list action 2025-09-22 21:44:22 +02:00
3 changed files with 73 additions and 46 deletions

View File

@@ -24,6 +24,7 @@ type FilesystemOperation struct {
}
type FilesystemEvent struct {
Action string `json:"action"`
Name string `json:"name"`
Action string `json:"action"`
Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
}

View File

@@ -354,8 +354,11 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
}
func (h *FSHandler) ListFilesEvent(c echo.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
listTicker := time.NewTicker(30 * time.Second)
defer listTicker.Stop()
req := c.Request()
reqctx := req.Context()
@@ -384,50 +387,46 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
done := make(chan error, 1)
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case e := <-evts:
res.Write([]byte("event: " + e.Action + "\ndata: "))
if err := enc.Encode(e); err != nil {
done <- err
}
res.Write([]byte("\n"))
res.Flush()
}
createList := func() api.FilesystemEvent {
files := h.FS.Filesystem.List("/", fs.ListOptions{})
event := api.FilesystemEvent{
Action: "list",
Names: make([]string, 0, len(files)),
}
for _, file := range files {
event.Names = append(event.Names, file.Name())
}
} else {
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
case e := <-evts:
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
}); err != nil {
done <- err
}
res.Flush()
return event
}
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
case e := <-evts:
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
}); err != nil {
done <- err
}
res.Flush()
}
}
}

View File

@@ -139,6 +139,8 @@ type diskFilesystem struct {
// Logger from the config
logger log.Logger
events *EventWriter
}
// NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem.
@@ -172,6 +174,8 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) {
fs.logger = log.New("")
}
fs.events = NewEventWriter()
return fs, nil
}
@@ -214,6 +218,8 @@ func NewRootedDiskFilesystem(config RootedDiskConfig) (Filesystem, error) {
fs.logger = log.New("")
}
fs.events = NewEventWriter()
return fs, nil
}
@@ -362,6 +368,12 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int
fs.lastSizeCheck = time.Time{}
if replace {
fs.events.Publish(Event{Action: "update", Name: path})
} else {
fs.events.Publish(Event{Action: "create", Name: path})
}
return size, !replace, nil
}
@@ -425,6 +437,8 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "update", Name: path})
return size, nil
}
@@ -447,6 +461,8 @@ func (fs *diskFilesystem) rename(src, dst string) error {
// First try to rename the file
if err := os.Rename(src, dst); err == nil {
fs.events.Publish(Event{Action: "remove", Name: src})
fs.events.Publish(Event{Action: "create", Name: dst})
return nil
}
@@ -456,11 +472,15 @@ func (fs *diskFilesystem) rename(src, dst string) error {
return fmt.Errorf("failed to copy files: %w", err)
}
fs.events.Publish(Event{Action: "create", Name: dst})
if err := os.Remove(src); err != nil {
os.Remove(dst)
return fmt.Errorf("failed to remove source file: %w", err)
}
fs.events.Publish(Event{Action: "remove", Name: src})
return nil
}
@@ -500,6 +520,8 @@ func (fs *diskFilesystem) copy(src, dst string) error {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "create", Name: dst})
return nil
}
@@ -552,6 +574,8 @@ func (fs *diskFilesystem) Remove(path string) int64 {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(Event{Action: "remove", Name: path})
return size
}
@@ -618,6 +642,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
if err := os.Remove(path); err == nil {
files = append(files, name)
size += info.Size()
fs.events.Publish(Event{Action: "remove", Name: path})
}
})
@@ -768,5 +793,7 @@ func (fs *diskFilesystem) cleanPath(path string) string {
}
func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem")
ch, cancel := fs.events.Subscribe()
return ch, cancel, nil
}