diff --git a/cluster/cluster.go b/cluster/cluster.go index a536b499..7fe27e70 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1317,6 +1317,7 @@ type ClusterNodeResources struct { CPULimit float64 // Defined CPU load limit, 0-100*ncpu Mem uint64 // Currently used memory in bytes MemLimit uint64 // Defined memory limit in bytes + Error error } type ClusterNode struct { @@ -1419,6 +1420,7 @@ func (c *cluster) About() (ClusterAbout, error) { CPULimit: nodeAbout.Resources.CPULimit, Mem: nodeAbout.Resources.Mem, MemLimit: nodeAbout.Resources.MemLimit, + Error: nodeAbout.Resources.Error, }, } diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index b34630a8..262247cb 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -37,12 +37,12 @@ type Node interface { } type NodeReader interface { - Ping() (time.Duration, error) About() NodeAbout Version() NodeVersion Resources() NodeResources - Files() NodeFiles + FileList(storage, pattern string) ([]clientapi.FileInfo, error) + ProxyFileList() NodeFiles GetURL(prefix, path string) (*url.URL, error) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) @@ -67,6 +67,7 @@ type NodeResources struct { CPULimit float64 // Defined CPU load limit, 0-100*ncpu Mem uint64 // Currently used memory in bytes MemLimit uint64 // Defined memory limit in bytes + Error error // Last error } type NodeAbout struct { @@ -112,6 +113,7 @@ type node struct { peerErr error peerLock sync.RWMutex peerWg sync.WaitGroup + peerAbout clientapi.About disconnect context.CancelFunc lastContact time.Time @@ -123,6 +125,7 @@ type node struct { cpuLimit float64 mem uint64 memLimit uint64 + err error } config *config.Config @@ -154,6 +157,7 @@ func NewNode(id, address string, config *config.Config) Node { n.resources.cpuLimit = 100 n.resources.mem = 0 n.resources.memLimit = 0 + n.resources.err = fmt.Errorf("not initialized") ctx, cancel := context.WithCancel(context.Background()) n.disconnect = cancel @@ -340,18 +344,18 @@ func (n *node) Disconnect() { } func (n *node) pingPeer(ctx context.Context, wg *sync.WaitGroup) { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() defer wg.Done() for { select { case <-ticker.C: - // Ping - latency, err := n.Ping() + about, latency, err := n.AboutPeer() n.peerLock.Lock() n.peerErr = err + n.peerAbout = about n.peerLock.Unlock() n.stateLock.Lock() @@ -391,10 +395,6 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { }, }) - n.peerLock.Lock() - n.peerErr = err - n.peerLock.Unlock() - if err != nil { n.stateLock.Lock() n.resources.throttling = true @@ -403,7 +403,7 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { n.resources.cpuLimit = 100 n.resources.mem = 0 n.resources.memLimit = 0 - n.state = stateDisconnected + n.resources.err = err n.stateLock.Unlock() continue @@ -453,7 +453,7 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { n.resources.mem = 0 n.resources.memLimit = 0 } - n.lastContact = time.Now() + n.resources.err = nil n.stateLock.Unlock() case <-ctx.Done(): return @@ -461,19 +461,6 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { } } -func (n *node) Ping() (time.Duration, error) { - n.peerLock.RLock() - defer n.peerLock.RUnlock() - - if n.peer == nil { - return 0, ErrNoPeer - } - - latency, err := n.peer.Ping() - - return latency, err -} - func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -485,41 +472,32 @@ func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse, return n.peer.Metrics(query) } -func (n *node) AboutPeer() (clientapi.About, error) { +func (n *node) AboutPeer() (clientapi.About, time.Duration, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { - return clientapi.About{}, ErrNoPeer + return clientapi.About{}, 0, ErrNoPeer } - return n.peer.About(false) + start := time.Now() + + about, err := n.peer.About(false) + + return about, time.Since(start), err } func (n *node) About() NodeAbout { - about, err := n.AboutPeer() - - n.stateLock.RLock() - defer n.stateLock.RUnlock() - - if err != nil { - return NodeAbout{ - ID: n.id, - Address: n.address, - State: stateDisconnected.String(), - Error: err, - LastContact: n.lastContact, - Resources: NodeResources{ - IsThrottling: true, - NCPU: 1, - }, - } - } - - createdAt, err := time.Parse(time.RFC3339, about.CreatedAt) + n.peerLock.Lock() + createdAt, err := time.Parse(time.RFC3339, n.peerAbout.CreatedAt) if err != nil { createdAt = time.Now() } + name := n.peerAbout.Name + n.peerLock.Unlock() + + n.stateLock.RLock() + defer n.stateLock.RUnlock() state := n.state if time.Since(n.lastContact) > 3*time.Second { @@ -528,7 +506,7 @@ func (n *node) About() NodeAbout { nodeAbout := NodeAbout{ ID: n.id, - Name: about.Name, + Name: name, Address: n.address, State: state.String(), Error: n.peerErr, @@ -543,6 +521,7 @@ func (n *node) About() NodeAbout { CPULimit: n.resources.cpuLimit, Mem: n.resources.mem, MemLimit: n.resources.memLimit, + Error: n.resources.err, }, } @@ -563,29 +542,27 @@ func (n *node) Resources() NodeResources { } func (n *node) Version() NodeVersion { - about, err := n.AboutPeer() - if err != nil { - return NodeVersion{} - } + n.peerLock.Lock() + defer n.peerLock.Unlock() - build, err := time.Parse(time.RFC3339, about.Version.Build) + build, err := time.Parse(time.RFC3339, n.peerAbout.Version.Build) if err != nil { build = time.Time{} } version := NodeVersion{ - Number: about.Version.Number, - Commit: about.Version.Commit, - Branch: about.Version.Branch, + Number: n.peerAbout.Version.Number, + Commit: n.peerAbout.Version.Commit, + Branch: n.peerAbout.Version.Branch, Build: build, - Arch: about.Version.Arch, - Compiler: about.Version.Compiler, + Arch: n.peerAbout.Version.Arch, + Compiler: n.peerAbout.Version.Compiler, } return version } -func (n *node) Files() NodeFiles { +func (n *node) ProxyFileList() NodeFiles { id := n.About().ID files := NodeFiles{ @@ -732,6 +709,17 @@ func (n *node) files() []string { return filesList } +func (n *node) FileList(storage, pattern string) ([]clientapi.FileInfo, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return nil, ErrNoPeer + } + + return n.peer.FilesystemList(storage, pattern, "", "") +} + func cloneURL(src *url.URL) *url.URL { dst := &url.URL{ Scheme: src.Scheme, diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index e9f36f0f..b2459a64 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -37,10 +37,13 @@ type ProxyReader interface { FindNodeFromProcess(id app.ProcessID) (string, error) Resources() map[string]NodeResources + ListProcesses(ProcessListOptions) []clientapi.Process ListProxyProcesses() []Process ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) + ListFiles(storage, patter string) []clientapi.FileInfo + GetURL(prefix, path string) (*url.URL, error) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) GetFileInfo(prefix, path string) (int64, time.Time, error) @@ -343,6 +346,49 @@ func (p *proxy) getNodeForFile(prefix, path string) (Node, error) { return p.GetNode(id) } +func (p *proxy) ListFiles(storage, pattern string) []clientapi.FileInfo { + filesChan := make(chan []clientapi.FileInfo, 64) + filesList := []clientapi.FileInfo{} + + wgList := sync.WaitGroup{} + wgList.Add(1) + + go func() { + defer wgList.Done() + + for list := range filesChan { + filesList = append(filesList, list...) + } + }() + + wg := sync.WaitGroup{} + + p.nodesLock.RLock() + for _, node := range p.nodes { + wg.Add(1) + + go func(node Node, p chan<- []clientapi.FileInfo) { + defer wg.Done() + + files, err := node.FileList(storage, pattern) + if err != nil { + return + } + + p <- files + }(node, filesChan) + } + p.nodesLock.RUnlock() + + wg.Wait() + + close(filesChan) + + wgList.Wait() + + return filesList +} + type Process struct { NodeID string Order string diff --git a/http/api/cluster.go b/http/api/cluster.go index d35df4d5..8cdc878c 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -36,6 +36,7 @@ type ClusterNodeResources struct { CPULimit float64 `json:"cpu_limit"` // percent 0-100*npcu Mem uint64 `json:"memory_used_bytes"` // bytes MemLimit uint64 `json:"memory_limit_bytes"` // bytes + Error string `json:"error"` } type ClusterRaft struct { diff --git a/http/api/filesystems.go b/http/api/filesystems.go index f78749ad..da2395d4 100644 --- a/http/api/filesystems.go +++ b/http/api/filesystems.go @@ -5,6 +5,7 @@ type FileInfo struct { Name string `json:"name" jsonschema:"minLength=1"` Size int64 `json:"size_bytes" jsonschema:"minimum=0" format:"int64"` LastMod int64 `json:"last_modified" jsonschema:"minimum=0" format:"int64"` + CoreID string `json:"core_id,omitempty"` } // FilesystemInfo represents information about a filesystem diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index eeee6740..133ad6a2 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -122,6 +122,10 @@ func (h *ClusterHandler) marshalClusterNode(node cluster.ClusterNode) api.Cluste n.Core.Error = node.Core.Error.Error() } + if node.Resources.Error != nil { + n.Resources.Error = node.Resources.Error.Error() + } + return n } diff --git a/http/handler/api/cluster_fs.go b/http/handler/api/cluster_fs.go new file mode 100644 index 00000000..7a0b9226 --- /dev/null +++ b/http/handler/api/cluster_fs.go @@ -0,0 +1,78 @@ +package api + +import ( + "net/http" + "strconv" + "time" + + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" + "github.com/datarhei/core/v16/io/fs" + + "github.com/labstack/echo/v4" +) + +// ListFiles lists all files on a filesystem +// @Summary List all files on a filesystem +// @Description List all files on a filesystem. The listing can be ordered by name, size, or date of last modification in ascending or descending order. +// @Tags v16.?.? +// @ID cluster-3-list-files +// @Produce json +// @Param storage path string true "Name of the filesystem" +// @Param glob query string false "glob pattern for file names" +// @Param size_min query int64 false "minimal size of files" +// @Param size_max query int64 false "maximal size of files" +// @Param lastmod_start query int64 false "minimal last modification time" +// @Param lastmod_end query int64 false "maximal last modification time" +// @Param sort query string false "none, name, size, lastmod" +// @Param order query string false "asc, desc" +// @Success 200 {array} api.FileInfo +// @Success 500 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/fs/{storage} [get] +func (h *ClusterHandler) ListFiles(c echo.Context) error { + //name := util.PathParam(c, "storage") + pattern := util.DefaultQuery(c, "glob", "") + sizeMin := util.DefaultQuery(c, "size_min", "0") + sizeMax := util.DefaultQuery(c, "size_max", "0") + modifiedStart := util.DefaultQuery(c, "lastmod_start", "") + modifiedEnd := util.DefaultQuery(c, "lastmod_end", "") + //sortby := util.DefaultQuery(c, "sort", "none") + //order := util.DefaultQuery(c, "order", "asc") + + options := fs.ListOptions{ + Pattern: pattern, + } + + if x, err := strconv.ParseInt(sizeMin, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "", "size_min: %s", err.Error()) + } else { + options.SizeMin = x + } + + if x, err := strconv.ParseInt(sizeMax, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "", "size_max: %s", err.Error()) + } else { + options.SizeMax = x + } + + if len(modifiedStart) != 0 { + if x, err := strconv.ParseInt(modifiedStart, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "", "lastmod_start: %s", err.Error()) + } else { + t := time.Unix(x, 0) + options.ModifiedStart = &t + } + } + + if len(modifiedEnd) != 0 { + if x, err := strconv.ParseInt(modifiedEnd, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "", "lastmode_end: %s", err.Error()) + } else { + t := time.Unix(x+1, 0) + options.ModifiedEnd = &t + } + } + + return api.Err(http.StatusNotImplemented, "", "not implemented") +} diff --git a/http/handler/api/cluster_node.go b/http/handler/api/cluster_node.go index 312b835b..c6c78d57 100644 --- a/http/handler/api/cluster_node.go +++ b/http/handler/api/cluster_node.go @@ -118,7 +118,7 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error { Files: make(map[string][]string), } - peerFiles := peer.Files() + peerFiles := peer.ProxyFileList() files.LastUpdate = peerFiles.LastUpdate.Unix() diff --git a/http/server.go b/http/server.go index 1e7e61e1..e770f247 100644 --- a/http/server.go +++ b/http/server.go @@ -722,6 +722,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) + v3.GET("/cluster/fs/:storage", s.v3handler.cluster.ListFiles) + if !s.readOnly { v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership) v3.PUT("/cluster/leave", s.v3handler.cluster.Leave)