mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Compare commits
2 Commits
bbf40b6e50
...
5c7187e373
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5c7187e373 | ||
![]() |
882764dfe3 |
@@ -25,5 +25,6 @@ type FilesystemOperation struct {
|
||||
|
||||
type FilesystemEvent struct {
|
||||
Action string `json:"action"`
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Names []string `json:"names,omitempty"`
|
||||
}
|
||||
|
@@ -354,8 +354,11 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
|
||||
}
|
||||
|
||||
func (h *FSHandler) ListFilesEvent(c echo.Context) error {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
keepaliveTicker := time.NewTicker(5 * time.Second)
|
||||
defer keepaliveTicker.Stop()
|
||||
|
||||
listTicker := time.NewTicker(30 * time.Second)
|
||||
defer listTicker.Stop()
|
||||
|
||||
req := c.Request()
|
||||
reqctx := req.Context()
|
||||
@@ -384,30 +387,22 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
|
||||
|
||||
done := make(chan error, 1)
|
||||
|
||||
if contentType == "text/event-stream" {
|
||||
res.Write([]byte(":keepalive\n\n"))
|
||||
res.Flush()
|
||||
createList := func() api.FilesystemEvent {
|
||||
files := h.FS.Filesystem.List("/", fs.ListOptions{})
|
||||
event := api.FilesystemEvent{
|
||||
Action: "list",
|
||||
Names: make([]string, 0, len(files)),
|
||||
}
|
||||
for _, file := range files {
|
||||
event.Names = append(event.Names, file.Name())
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-reqctx.Done():
|
||||
done <- nil
|
||||
case <-ticker.C:
|
||||
res.Write([]byte(":keepalive\n\n"))
|
||||
res.Flush()
|
||||
case e := <-evts:
|
||||
res.Write([]byte("event: " + e.Action + "\ndata: "))
|
||||
if err := enc.Encode(e); err != nil {
|
||||
return event
|
||||
}
|
||||
|
||||
if err := enc.Encode(createList()); err != nil {
|
||||
done <- err
|
||||
}
|
||||
res.Write([]byte("\n"))
|
||||
res.Flush()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
|
||||
res.Flush()
|
||||
|
||||
for {
|
||||
@@ -416,9 +411,14 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
|
||||
return err
|
||||
case <-reqctx.Done():
|
||||
done <- nil
|
||||
case <-ticker.C:
|
||||
case <-keepaliveTicker.C:
|
||||
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
|
||||
res.Flush()
|
||||
case <-listTicker.C:
|
||||
if err := enc.Encode(createList()); err != nil {
|
||||
done <- err
|
||||
}
|
||||
res.Flush()
|
||||
case e := <-evts:
|
||||
if err := enc.Encode(api.FilesystemEvent{
|
||||
Action: e.Action,
|
||||
@@ -429,7 +429,6 @@ func (h *FSHandler) ListFilesEvent(c echo.Context) error {
|
||||
res.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// From: github.com/golang/go/net/http/fs.go@7dc9fcb
|
||||
|
@@ -139,6 +139,8 @@ type diskFilesystem struct {
|
||||
|
||||
// Logger from the config
|
||||
logger log.Logger
|
||||
|
||||
events *EventWriter
|
||||
}
|
||||
|
||||
// NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem.
|
||||
@@ -172,6 +174,8 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) {
|
||||
fs.logger = log.New("")
|
||||
}
|
||||
|
||||
fs.events = NewEventWriter()
|
||||
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
@@ -214,6 +218,8 @@ func NewRootedDiskFilesystem(config RootedDiskConfig) (Filesystem, error) {
|
||||
fs.logger = log.New("")
|
||||
}
|
||||
|
||||
fs.events = NewEventWriter()
|
||||
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
@@ -362,6 +368,12 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int
|
||||
|
||||
fs.lastSizeCheck = time.Time{}
|
||||
|
||||
if replace {
|
||||
fs.events.Publish(Event{Action: "update", Name: path})
|
||||
} else {
|
||||
fs.events.Publish(Event{Action: "create", Name: path})
|
||||
}
|
||||
|
||||
return size, !replace, nil
|
||||
}
|
||||
|
||||
@@ -425,6 +437,8 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in
|
||||
|
||||
fs.lastSizeCheck = time.Time{}
|
||||
|
||||
fs.events.Publish(Event{Action: "update", Name: path})
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
@@ -447,6 +461,8 @@ func (fs *diskFilesystem) rename(src, dst string) error {
|
||||
|
||||
// First try to rename the file
|
||||
if err := os.Rename(src, dst); err == nil {
|
||||
fs.events.Publish(Event{Action: "remove", Name: src})
|
||||
fs.events.Publish(Event{Action: "create", Name: dst})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -456,11 +472,15 @@ func (fs *diskFilesystem) rename(src, dst string) error {
|
||||
return fmt.Errorf("failed to copy files: %w", err)
|
||||
}
|
||||
|
||||
fs.events.Publish(Event{Action: "create", Name: dst})
|
||||
|
||||
if err := os.Remove(src); err != nil {
|
||||
os.Remove(dst)
|
||||
return fmt.Errorf("failed to remove source file: %w", err)
|
||||
}
|
||||
|
||||
fs.events.Publish(Event{Action: "remove", Name: src})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -500,6 +520,8 @@ func (fs *diskFilesystem) copy(src, dst string) error {
|
||||
|
||||
fs.lastSizeCheck = time.Time{}
|
||||
|
||||
fs.events.Publish(Event{Action: "create", Name: dst})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -552,6 +574,8 @@ func (fs *diskFilesystem) Remove(path string) int64 {
|
||||
|
||||
fs.lastSizeCheck = time.Time{}
|
||||
|
||||
fs.events.Publish(Event{Action: "remove", Name: path})
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
@@ -618,6 +642,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
|
||||
if err := os.Remove(path); err == nil {
|
||||
files = append(files, name)
|
||||
size += info.Size()
|
||||
fs.events.Publish(Event{Action: "remove", Name: path})
|
||||
}
|
||||
})
|
||||
|
||||
@@ -768,5 +793,7 @@ func (fs *diskFilesystem) cleanPath(path string) string {
|
||||
}
|
||||
|
||||
func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
|
||||
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem")
|
||||
ch, cancel := fs.events.Subscribe()
|
||||
|
||||
return ch, cancel, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user