diff --git a/cluster/cluster.go b/cluster/cluster.go index cad7d0af..910546fd 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,7 +3,9 @@ package cluster import ( "context" "fmt" + "path/filepath" "sync" + "time" "github.com/datarhei/core/v16/log" ) @@ -14,6 +16,7 @@ type Cluster interface { ListNodes() []NodeReader GetNode(id string) (NodeReader, error) Stop() + GetFile(path string) (string, error) } type ClusterConfig struct { @@ -21,9 +24,10 @@ type ClusterConfig struct { } type cluster struct { - nodes map[string]*node - idfiles map[string][]string - fileid map[string]string + nodes map[string]*node + idfiles map[string][]string + idupdate map[string]time.Time + fileid map[string]string updates chan NodeState @@ -36,11 +40,12 @@ type cluster struct { func New(config ClusterConfig) (Cluster, error) { c := &cluster{ - nodes: map[string]*node{}, - idfiles: map[string][]string{}, - fileid: map[string]string{}, - updates: make(chan NodeState, 64), - logger: config.Logger, + nodes: map[string]*node{}, + idfiles: map[string][]string{}, + idupdate: map[string]time.Time{}, + fileid: map[string]string{}, + updates: make(chan NodeState, 64), + logger: config.Logger, } if c.logger == nil { @@ -56,7 +61,11 @@ func New(config ClusterConfig) (Cluster, error) { case <-ctx.Done(): return case state := <-c.updates: - c.logger.Info().WithField("node", state.ID).WithField("state", state.State).Log("got news from node") + c.logger.Debug().WithFields(log.Fields{ + "node": state.ID, + "state": state.State, + "files": len(state.Files), + }).Log("got update") c.lock.Lock() @@ -66,6 +75,7 @@ func New(config ClusterConfig) (Cluster, error) { delete(c.fileid, file) } delete(c.idfiles, state.ID) + delete(c.idupdate, state.ID) if state.State == "connected" { // Add files @@ -73,6 +83,7 @@ func New(config ClusterConfig) (Cluster, error) { c.fileid[file] = state.ID } c.idfiles[state.ID] = files + c.idupdate[state.ID] = state.LastUpdate } c.lock.Unlock() @@ -153,8 +164,44 @@ func (c *cluster) GetNode(id string) (NodeReader, error) { node, ok := c.nodes[id] if !ok { - return nil, fmt.Errorf("no such node") + return nil, fmt.Errorf("node not found") } return node, nil } + +func (c *cluster) GetFile(path string) (string, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + c.logger.Debug().WithField("path", path).Log("opening") + + id, ok := c.fileid[path] + if !ok { + c.logger.Debug().WithField("path", path).Log("not found") + return "", fmt.Errorf("file not found") + } + + ts, ok := c.idupdate[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("no age information found") + return "", fmt.Errorf("file not found") + } + + if time.Since(ts) > 2*time.Second { + c.logger.Debug().WithField("path", path).Log("file too old") + return "", fmt.Errorf("file not found") + } + + node, ok := c.nodes[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("unknown node") + return "", fmt.Errorf("file not found") + } + + url := node.Address() + "/" + filepath.Join("memfs", path) + + c.logger.Debug().WithField("url", url).Log("file cluster url") + + return url, nil +} diff --git a/cluster/node.go b/cluster/node.go index 30c1143d..e1cf8a0a 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -15,9 +15,10 @@ type NodeReader interface { } type NodeState struct { - ID string - State string - Files []string + ID string + State string + Files []string + LastUpdate time.Time } type nodeState string @@ -110,7 +111,8 @@ func (n *node) State() NodeState { defer n.lock.RUnlock() state := NodeState{ - ID: n.peer.ID(), + ID: n.peer.ID(), + LastUpdate: n.lastUpdate, } if n.state == stateDisconnected || time.Since(n.lastUpdate) > 2*time.Second { @@ -131,6 +133,8 @@ func (n *node) stop() { func (n *node) files() { files, err := n.peer.MemFSList("name", "asc") + n.lastUpdate = time.Now() + if err != nil { n.fileList = nil n.state = stateDisconnected @@ -145,7 +149,5 @@ func (n *node) files() { n.fileList[i] = file.Name } - n.lastUpdate = time.Now() - return } diff --git a/http/api/cluster.go b/http/api/cluster.go index 4430d60f..71cba657 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -7,8 +7,10 @@ type ClusterNodeConfig struct { } type ClusterNode struct { - Address string `json:"address"` - State string `json:"state"` + Address string `json:"address"` + ID string `json:"id"` + LastUpdate int64 `json:"last_update"` + State string `json:"state"` } type ClusterNodeFiles []string diff --git a/http/fs/cluster.go b/http/fs/cluster.go new file mode 100644 index 00000000..10d1f1dc --- /dev/null +++ b/http/fs/cluster.go @@ -0,0 +1,93 @@ +package fs + +import ( + "io" + "net/http" + "time" + + "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/io/fs" +) + +type Filesystem interface { + fs.Filesystem +} + +type filesystem struct { + fs.Filesystem + + cluster cluster.Cluster +} + +func NewClusterFS(what string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem { + f := &filesystem{ + Filesystem: fs, + cluster: cluster, + } + + return f +} + +func (fs *filesystem) Open(path string) fs.File { + // Check if the file is locally available + if file := fs.Filesystem.Open(path); file != nil { + return file + } + + // Check if the file is available in the cluster + url, err := fs.cluster.GetFile(path) + if err != nil { + return nil + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil + } + + client := &http.Client{ + Timeout: 15 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return nil + } + + file := &file{ + ReadCloser: resp.Body, + name: path, + } + + return file +} + +type file struct { + io.ReadCloser + + name string +} + +func (f *file) Name() string { + return f.name +} + +func (f *file) Stat() (fs.FileInfo, error) { + return f, nil +} + +func (f *file) Size() int64 { + return 0 +} + +func (f *file) ModTime() time.Time { + return time.Now() +} + +func (f *file) IsLink() (string, bool) { + return "", false +} + +func (f *file) IsDir() bool { + return false +} diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 2651c30e..8f17941e 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -89,8 +89,10 @@ func (h *ClusterHandler) GetNode(c echo.Context) error { state := peer.State() node := api.ClusterNode{ - Address: peer.Address(), - State: state.State, + Address: peer.Address(), + ID: state.ID, + LastUpdate: state.LastUpdate.Unix(), + State: state.State, } return c.JSON(http.StatusOK, node) diff --git a/http/handler/memfs.go b/http/handler/memfs.go index 1369a6dc..81632506 100644 --- a/http/handler/memfs.go +++ b/http/handler/memfs.go @@ -47,7 +47,10 @@ func (h *MemFSHandler) GetFile(c echo.Context) error { defer file.Close() - stat, _ := file.Stat() + stat, err := file.Stat() + if err != nil { + return api.Err(http.StatusInternalServerError, "File.Stat() failed", "%s", err) + } c.Response().Header().Set("Last-Modified", stat.ModTime().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT")) diff --git a/http/server.go b/http/server.go index 3d0eb8f1..07a8264a 100644 --- a/http/server.go +++ b/http/server.go @@ -36,6 +36,7 @@ import ( "github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/http/cache" "github.com/datarhei/core/v16/http/errorhandler" + clusterfs "github.com/datarhei/core/v16/http/fs" "github.com/datarhei/core/v16/http/graph/resolver" "github.com/datarhei/core/v16/http/handler" api "github.com/datarhei/core/v16/http/handler/api" @@ -227,8 +228,13 @@ func NewServer(config Config) (Server, error) { config.MemFS.Filesystem, ) + filesystem := config.MemFS.Filesystem + if config.Cluster != nil { + filesystem = clusterfs.NewClusterFS("TODO", filesystem, config.Cluster) + } + s.handler.memfs = handler.NewMemFS( - config.MemFS.Filesystem, + filesystem, ) }