Add proxying memfs files

This commit is contained in:
Ingo Oppermann
2022-08-04 16:43:19 +02:00
parent fe889aa4e2
commit c31fd657be
7 changed files with 177 additions and 22 deletions

View File

@@ -3,7 +3,9 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"sync" "sync"
"time"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
) )
@@ -14,6 +16,7 @@ type Cluster interface {
ListNodes() []NodeReader ListNodes() []NodeReader
GetNode(id string) (NodeReader, error) GetNode(id string) (NodeReader, error)
Stop() Stop()
GetFile(path string) (string, error)
} }
type ClusterConfig struct { type ClusterConfig struct {
@@ -21,9 +24,10 @@ type ClusterConfig struct {
} }
type cluster struct { type cluster struct {
nodes map[string]*node nodes map[string]*node
idfiles map[string][]string idfiles map[string][]string
fileid map[string]string idupdate map[string]time.Time
fileid map[string]string
updates chan NodeState updates chan NodeState
@@ -36,11 +40,12 @@ type cluster struct {
func New(config ClusterConfig) (Cluster, error) { func New(config ClusterConfig) (Cluster, error) {
c := &cluster{ c := &cluster{
nodes: map[string]*node{}, nodes: map[string]*node{},
idfiles: map[string][]string{}, idfiles: map[string][]string{},
fileid: map[string]string{}, idupdate: map[string]time.Time{},
updates: make(chan NodeState, 64), fileid: map[string]string{},
logger: config.Logger, updates: make(chan NodeState, 64),
logger: config.Logger,
} }
if c.logger == nil { if c.logger == nil {
@@ -56,7 +61,11 @@ func New(config ClusterConfig) (Cluster, error) {
case <-ctx.Done(): case <-ctx.Done():
return return
case state := <-c.updates: case state := <-c.updates:
c.logger.Info().WithField("node", state.ID).WithField("state", state.State).Log("got news from node") c.logger.Debug().WithFields(log.Fields{
"node": state.ID,
"state": state.State,
"files": len(state.Files),
}).Log("got update")
c.lock.Lock() c.lock.Lock()
@@ -66,6 +75,7 @@ func New(config ClusterConfig) (Cluster, error) {
delete(c.fileid, file) delete(c.fileid, file)
} }
delete(c.idfiles, state.ID) delete(c.idfiles, state.ID)
delete(c.idupdate, state.ID)
if state.State == "connected" { if state.State == "connected" {
// Add files // Add files
@@ -73,6 +83,7 @@ func New(config ClusterConfig) (Cluster, error) {
c.fileid[file] = state.ID c.fileid[file] = state.ID
} }
c.idfiles[state.ID] = files c.idfiles[state.ID] = files
c.idupdate[state.ID] = state.LastUpdate
} }
c.lock.Unlock() c.lock.Unlock()
@@ -153,8 +164,44 @@ func (c *cluster) GetNode(id string) (NodeReader, error) {
node, ok := c.nodes[id] node, ok := c.nodes[id]
if !ok { if !ok {
return nil, fmt.Errorf("no such node") return nil, fmt.Errorf("node not found")
} }
return node, nil return node, nil
} }
func (c *cluster) GetFile(path string) (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
c.logger.Debug().WithField("path", path).Log("opening")
id, ok := c.fileid[path]
if !ok {
c.logger.Debug().WithField("path", path).Log("not found")
return "", fmt.Errorf("file not found")
}
ts, ok := c.idupdate[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("no age information found")
return "", fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
c.logger.Debug().WithField("path", path).Log("file too old")
return "", fmt.Errorf("file not found")
}
node, ok := c.nodes[id]
if !ok {
c.logger.Debug().WithField("path", path).Log("unknown node")
return "", fmt.Errorf("file not found")
}
url := node.Address() + "/" + filepath.Join("memfs", path)
c.logger.Debug().WithField("url", url).Log("file cluster url")
return url, nil
}

View File

@@ -15,9 +15,10 @@ type NodeReader interface {
} }
type NodeState struct { type NodeState struct {
ID string ID string
State string State string
Files []string Files []string
LastUpdate time.Time
} }
type nodeState string type nodeState string
@@ -110,7 +111,8 @@ func (n *node) State() NodeState {
defer n.lock.RUnlock() defer n.lock.RUnlock()
state := NodeState{ state := NodeState{
ID: n.peer.ID(), ID: n.peer.ID(),
LastUpdate: n.lastUpdate,
} }
if n.state == stateDisconnected || time.Since(n.lastUpdate) > 2*time.Second { if n.state == stateDisconnected || time.Since(n.lastUpdate) > 2*time.Second {
@@ -131,6 +133,8 @@ func (n *node) stop() {
func (n *node) files() { func (n *node) files() {
files, err := n.peer.MemFSList("name", "asc") files, err := n.peer.MemFSList("name", "asc")
n.lastUpdate = time.Now()
if err != nil { if err != nil {
n.fileList = nil n.fileList = nil
n.state = stateDisconnected n.state = stateDisconnected
@@ -145,7 +149,5 @@ func (n *node) files() {
n.fileList[i] = file.Name n.fileList[i] = file.Name
} }
n.lastUpdate = time.Now()
return return
} }

View File

@@ -7,8 +7,10 @@ type ClusterNodeConfig struct {
} }
type ClusterNode struct { type ClusterNode struct {
Address string `json:"address"` Address string `json:"address"`
State string `json:"state"` ID string `json:"id"`
LastUpdate int64 `json:"last_update"`
State string `json:"state"`
} }
type ClusterNodeFiles []string type ClusterNodeFiles []string

93
http/fs/cluster.go Normal file
View File

@@ -0,0 +1,93 @@
package fs
import (
"io"
"net/http"
"time"
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/io/fs"
)
type Filesystem interface {
fs.Filesystem
}
type filesystem struct {
fs.Filesystem
cluster cluster.Cluster
}
func NewClusterFS(what string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem {
f := &filesystem{
Filesystem: fs,
cluster: cluster,
}
return f
}
func (fs *filesystem) Open(path string) fs.File {
// Check if the file is locally available
if file := fs.Filesystem.Open(path); file != nil {
return file
}
// Check if the file is available in the cluster
url, err := fs.cluster.GetFile(path)
if err != nil {
return nil
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil
}
client := &http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return nil
}
file := &file{
ReadCloser: resp.Body,
name: path,
}
return file
}
type file struct {
io.ReadCloser
name string
}
func (f *file) Name() string {
return f.name
}
func (f *file) Stat() (fs.FileInfo, error) {
return f, nil
}
func (f *file) Size() int64 {
return 0
}
func (f *file) ModTime() time.Time {
return time.Now()
}
func (f *file) IsLink() (string, bool) {
return "", false
}
func (f *file) IsDir() bool {
return false
}

View File

@@ -89,8 +89,10 @@ func (h *ClusterHandler) GetNode(c echo.Context) error {
state := peer.State() state := peer.State()
node := api.ClusterNode{ node := api.ClusterNode{
Address: peer.Address(), Address: peer.Address(),
State: state.State, ID: state.ID,
LastUpdate: state.LastUpdate.Unix(),
State: state.State,
} }
return c.JSON(http.StatusOK, node) return c.JSON(http.StatusOK, node)

View File

@@ -47,7 +47,10 @@ func (h *MemFSHandler) GetFile(c echo.Context) error {
defer file.Close() defer file.Close()
stat, _ := file.Stat() stat, err := file.Stat()
if err != nil {
return api.Err(http.StatusInternalServerError, "File.Stat() failed", "%s", err)
}
c.Response().Header().Set("Last-Modified", stat.ModTime().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT")) c.Response().Header().Set("Last-Modified", stat.ModTime().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT"))

View File

@@ -36,6 +36,7 @@ import (
"github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/config"
"github.com/datarhei/core/v16/http/cache" "github.com/datarhei/core/v16/http/cache"
"github.com/datarhei/core/v16/http/errorhandler" "github.com/datarhei/core/v16/http/errorhandler"
clusterfs "github.com/datarhei/core/v16/http/fs"
"github.com/datarhei/core/v16/http/graph/resolver" "github.com/datarhei/core/v16/http/graph/resolver"
"github.com/datarhei/core/v16/http/handler" "github.com/datarhei/core/v16/http/handler"
api "github.com/datarhei/core/v16/http/handler/api" api "github.com/datarhei/core/v16/http/handler/api"
@@ -227,8 +228,13 @@ func NewServer(config Config) (Server, error) {
config.MemFS.Filesystem, config.MemFS.Filesystem,
) )
filesystem := config.MemFS.Filesystem
if config.Cluster != nil {
filesystem = clusterfs.NewClusterFS("TODO", filesystem, config.Cluster)
}
s.handler.memfs = handler.NewMemFS( s.handler.memfs = handler.NewMemFS(
config.MemFS.Filesystem, filesystem,
) )
} }