Allow proxied range requests, /cluster/process mimic /process

This commit is contained in:
Ingo Oppermann
2023-06-21 13:14:27 +02:00
parent a9b92e7f9a
commit b35ac9ccc3
32 changed files with 851 additions and 264 deletions

View File

@@ -698,7 +698,7 @@ const docTemplateClusterAPI = `{
"type": "integer" "type": "integer"
}, },
"logPatterns": { "logPatterns": {
"description": "will we interpreted as regualr expressions", "description": "will we interpreted as regular expressions",
"type": "array", "type": "array",
"items": { "items": {
"type": "string" "type": "string"

View File

@@ -690,7 +690,7 @@
"type": "integer" "type": "integer"
}, },
"logPatterns": { "logPatterns": {
"description": "will we interpreted as regualr expressions", "description": "will we interpreted as regular expressions",
"type": "array", "type": "array",
"items": { "items": {
"type": "string" "type": "string"

View File

@@ -37,7 +37,7 @@ definitions:
description: seconds description: seconds
type: integer type: integer
logPatterns: logPatterns:
description: will we interpreted as regualr expressions description: will we interpreted as regular expressions
items: items:
type: string type: string
type: array type: array

View File

@@ -514,7 +514,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
func (c *cluster) doSynchronize(emergency bool) { func (c *cluster) doSynchronize(emergency bool) {
wish := c.store.GetProcessNodeMap() wish := c.store.GetProcessNodeMap()
want := c.store.ListProcesses() want := c.store.ListProcesses()
have := c.proxy.ListProcesses() have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes() nodes := c.proxy.ListNodes()
nodesMap := map[string]proxy.NodeAbout{} nodesMap := map[string]proxy.NodeAbout{}
@@ -547,7 +547,7 @@ func (c *cluster) doSynchronize(emergency bool) {
} }
func (c *cluster) doRebalance(emergency bool) { func (c *cluster) doRebalance(emergency bool) {
have := c.proxy.ListProcesses() have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes() nodes := c.proxy.ListNodes()
nodesMap := map[string]proxy.NodeAbout{} nodesMap := map[string]proxy.NodeAbout{}

View File

@@ -29,7 +29,8 @@ type Node interface {
StopFiles() StopFiles()
GetURL(prefix, path string) (*url.URL, error) GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string) (io.ReadCloser, error) GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
GetFileInfo(prefix, path string) (int64, time.Time, error)
AddProcess(config *app.Config, metadata map[string]interface{}) error AddProcess(config *app.Config, metadata map[string]interface{}) error
StartProcess(id app.ProcessID) error StartProcess(id app.ProcessID) error
@@ -48,7 +49,8 @@ type NodeReader interface {
Resources() NodeResources Resources() NodeResources
Files() NodeFiles Files() NodeFiles
ProcessList() ([]Process, error) ProcessList(ProcessListOptions) ([]clientapi.Process, error)
ProxyProcessList() ([]Process, error)
} }
type NodeFiles struct { type NodeFiles struct {
@@ -854,7 +856,7 @@ func (n *node) GetURL(prefix, resource string) (*url.URL, error) {
return u, nil return u, nil
} }
func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) { func (n *node) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
@@ -862,12 +864,30 @@ func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("not connected") return nil, fmt.Errorf("not connected")
} }
return n.peer.FilesystemGetFile(prefix, path) return n.peer.FilesystemGetFileOffset(prefix, path, offset)
} }
func (n *node) ProcessList() ([]Process, error) { func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) {
id := n.About().ID n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return 0, time.Time{}, fmt.Errorf("not connected")
}
info, err := n.peer.FilesystemList(prefix, path, "", "")
if err != nil {
return 0, time.Time{}, fmt.Errorf("not found: %w", err)
}
if len(info) != 1 {
return 0, time.Time{}, fmt.Errorf("ambigous result")
}
return info[0].Size, time.Unix(info[0].LastMod, 0), nil
}
func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, error) {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
@@ -875,22 +895,31 @@ func (n *node) ProcessList() ([]Process, error) {
return nil, fmt.Errorf("not connected") return nil, fmt.Errorf("not connected")
} }
list, err := n.peer.ProcessList(client.ProcessListOptions{ return n.peer.ProcessList(client.ProcessListOptions{
Filter: []string{ ID: options.ID,
"state", Filter: options.Filter,
"config", Domain: options.Domain,
"metadata", Reference: options.Reference,
}, IDPattern: options.IDPattern,
RefPattern: options.RefPattern,
OwnerPattern: options.OwnerPattern,
DomainPattern: options.DomainPattern,
}) })
}
func (n *node) ProxyProcessList() ([]Process, error) {
list, err := n.ProcessList(ProcessListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodeid := n.About().ID
processes := []Process{} processes := []Process{}
for _, p := range list { for _, p := range list {
process := Process{ process := Process{
NodeID: id, NodeID: nodeid,
Order: p.State.Order, Order: p.State.Order,
State: p.State.State, State: p.State.State,
Mem: p.State.Memory, Mem: p.State.Memory,

View File

@@ -12,6 +12,8 @@ import (
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
clientapi "github.com/datarhei/core-client-go/v16/api"
) )
type Proxy interface { type Proxy interface {
@@ -35,10 +37,12 @@ type ProxyReader interface {
GetNode(id string) (NodeReader, error) GetNode(id string) (NodeReader, error)
Resources() map[string]NodeResources Resources() map[string]NodeResources
ListProcesses() []Process ListProcesses(ProcessListOptions) []clientapi.Process
ListProxyProcesses() []Process
GetURL(prefix, path string) (*url.URL, error) GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string) (io.ReadCloser, error) GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
GetFileInfo(prefix, path string) (int64, time.Time, error)
} }
func NewNullProxyReader() ProxyReader { func NewNullProxyReader() ProxyReader {
@@ -73,12 +77,20 @@ func (p *proxyReader) Resources() map[string]NodeResources {
return p.proxy.Resources() return p.proxy.Resources()
} }
func (p *proxyReader) ListProcesses() []Process { func (p *proxyReader) ListProcesses(options ProcessListOptions) []clientapi.Process {
if p.proxy == nil { if p.proxy == nil {
return nil return nil
} }
return p.proxy.ListProcesses() return p.proxy.ListProcesses(options)
}
func (p *proxyReader) ListProxyProcesses() []Process {
if p.proxy == nil {
return nil
}
return p.proxy.ListProxyProcesses()
} }
func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) {
@@ -89,12 +101,20 @@ func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) {
return p.proxy.GetURL(prefix, path) return p.proxy.GetURL(prefix, path)
} }
func (p *proxyReader) GetFile(prefix, path string) (io.ReadCloser, error) { func (p *proxyReader) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) {
if p.proxy == nil { if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided") return nil, fmt.Errorf("no proxy provided")
} }
return p.proxy.GetFile(prefix, path) return p.proxy.GetFile(prefix, path, offset)
}
func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error) {
if p.proxy == nil {
return 0, time.Time{}, fmt.Errorf("no proxy provided")
}
return p.proxy.GetFileInfo(prefix, path)
} }
type ProxyConfig struct { type ProxyConfig struct {
@@ -380,39 +400,19 @@ func (p *proxy) GetURL(prefix, path string) (*url.URL, error) {
return url, nil return url, nil
} }
func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) { func (p *proxy) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) {
p.lock.RLock()
defer p.lock.RUnlock()
logger := p.logger.WithFields(log.Fields{ logger := p.logger.WithFields(log.Fields{
"path": path, "path": path,
"prefix": prefix, "prefix": prefix,
}) })
id, ok := p.fileid[prefix+":"+path] node, err := p.getNodeForFile(prefix, path)
if !ok { if err != nil {
logger.Debug().Log("Not found") logger.Debug().WithError(err).Log("File not available")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
} }
ts, ok := p.idupdate[id] data, err := node.GetFile(prefix, path, offset)
if !ok {
logger.Debug().Log("No age information found")
return nil, fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
logger.Debug().Log("File too old")
return nil, fmt.Errorf("file not found")
}
node, ok := p.nodes[id]
if !ok {
logger.Debug().Log("Unknown node")
return nil, fmt.Errorf("file not found")
}
data, err := node.GetFile(prefix, path)
if err != nil { if err != nil {
logger.Debug().Log("Invalid path") logger.Debug().Log("Invalid path")
return nil, fmt.Errorf("file not found") return nil, fmt.Errorf("file not found")
@@ -423,6 +423,55 @@ func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) {
return data, nil return data, nil
} }
func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) {
logger := p.logger.WithFields(log.Fields{
"path": path,
"prefix": prefix,
})
node, err := p.getNodeForFile(prefix, path)
if err != nil {
logger.Debug().WithError(err).Log("File not available")
return 0, time.Time{}, fmt.Errorf("file not found")
}
size, lastModified, err := node.GetFileInfo(prefix, path)
if err != nil {
logger.Debug().Log("Invalid path")
return 0, time.Time{}, fmt.Errorf("file not found")
}
logger.Debug().Log("File cluster path")
return size, lastModified, nil
}
func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {
p.lock.RLock()
defer p.lock.RUnlock()
id, ok := p.fileid[prefix+":"+path]
if !ok {
return nil, fmt.Errorf("file not found")
}
ts, ok := p.idupdate[id]
if !ok {
return nil, fmt.Errorf("no age information found")
}
if time.Since(ts) > 2*time.Second {
return nil, fmt.Errorf("file too old")
}
node, ok := p.nodes[id]
if !ok {
return nil, fmt.Errorf("unknown node")
}
return node, nil
}
type Process struct { type Process struct {
NodeID string NodeID string
Order string Order string
@@ -435,7 +484,18 @@ type Process struct {
Metadata map[string]interface{} Metadata map[string]interface{}
} }
func (p *proxy) ListProcesses() []Process { type ProcessListOptions struct {
ID []string
Filter []string
Domain string
Reference string
IDPattern string
RefPattern string
OwnerPattern string
DomainPattern string
}
func (p *proxy) ListProxyProcesses() []Process {
processChan := make(chan Process, 64) processChan := make(chan Process, 64)
processList := []Process{} processList := []Process{}
@@ -459,7 +519,52 @@ func (p *proxy) ListProcesses() []Process {
go func(node Node, p chan<- Process) { go func(node Node, p chan<- Process) {
defer wg.Done() defer wg.Done()
processes, err := node.ProcessList() processes, err := node.ProxyProcessList()
if err != nil {
return
}
for _, process := range processes {
p <- process
}
}(node, processChan)
}
p.lock.RUnlock()
wg.Wait()
close(processChan)
wgList.Wait()
return processList
}
func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
processChan := make(chan clientapi.Process, 64)
processList := []clientapi.Process{}
wgList := sync.WaitGroup{}
wgList.Add(1)
go func() {
defer wgList.Done()
for process := range processChan {
processList = append(processList, process)
}
}()
wg := sync.WaitGroup{}
p.lock.RLock()
for _, node := range p.nodes {
wg.Add(1)
go func(node Node, p chan<- clientapi.Process) {
defer wg.Done()
processes, err := node.ProcessList(options)
if err != nil { if err != nil {
return return
} }

View File

@@ -838,6 +838,48 @@ const docTemplate = `{
"description": "Domain to act on", "description": "Domain to act on",
"name": "domain", "name": "domain",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.",
"name": "filter",
"in": "query"
},
{
"type": "string",
"description": "Return only these process that have this reference value. If empty, the reference will be ignored.",
"name": "reference",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.",
"name": "id",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "ownerpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "domainpattern",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -846,7 +888,7 @@ const docTemplate = `{
"schema": { "schema": {
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/definitions/api.ClusterProcess" "$ref": "#/definitions/api.Process"
} }
} }
}, },
@@ -928,6 +970,48 @@ const docTemplate = `{
"description": "Domain to act on", "description": "Domain to act on",
"name": "domain", "name": "domain",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.",
"name": "filter",
"in": "query"
},
{
"type": "string",
"description": "Return only these process that have this reference value. If empty, the reference will be ignored.",
"name": "reference",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.",
"name": "id",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "ownerpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "domainpattern",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -936,7 +1020,7 @@ const docTemplate = `{
"schema": { "schema": {
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/definitions/api.ClusterProcess" "$ref": "#/definitions/api.Process"
} }
} }
} }
@@ -994,6 +1078,63 @@ const docTemplate = `{
} }
}, },
"/api/v3/cluster/process/{id}": { "/api/v3/cluster/process/{id}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List a process by its ID. Use the filter parameter to specifiy the level of detail of the output.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List a process by its ID",
"operationId": "cluster-3-get-process",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output",
"name": "filter",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Process"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"put": { "put": {
"security": [ "security": [
{ {
@@ -4012,44 +4153,6 @@ const docTemplate = `{
} }
} }
}, },
"api.ClusterProcess": {
"type": "object",
"properties": {
"cpu": {
"description": "percent 0-100*ncpu",
"type": "number"
},
"domain": {
"type": "string"
},
"id": {
"type": "string"
},
"memory_bytes": {
"description": "bytes",
"type": "integer"
},
"node_id": {
"type": "string"
},
"order": {
"type": "string"
},
"owner": {
"type": "string"
},
"reference": {
"type": "string"
},
"runtime_seconds": {
"description": "seconds",
"type": "integer"
},
"state": {
"type": "string"
}
}
},
"api.ClusterRaft": { "api.ClusterRaft": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -830,6 +830,48 @@
"description": "Domain to act on", "description": "Domain to act on",
"name": "domain", "name": "domain",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.",
"name": "filter",
"in": "query"
},
{
"type": "string",
"description": "Return only these process that have this reference value. If empty, the reference will be ignored.",
"name": "reference",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.",
"name": "id",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "ownerpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "domainpattern",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -838,7 +880,7 @@
"schema": { "schema": {
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/definitions/api.ClusterProcess" "$ref": "#/definitions/api.Process"
} }
} }
}, },
@@ -920,6 +962,48 @@
"description": "Domain to act on", "description": "Domain to act on",
"name": "domain", "name": "domain",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.",
"name": "filter",
"in": "query"
},
{
"type": "string",
"description": "Return only these process that have this reference value. If empty, the reference will be ignored.",
"name": "reference",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.",
"name": "id",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "ownerpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.",
"name": "domainpattern",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -928,7 +1012,7 @@
"schema": { "schema": {
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/definitions/api.ClusterProcess" "$ref": "#/definitions/api.Process"
} }
} }
} }
@@ -986,6 +1070,63 @@
} }
}, },
"/api/v3/cluster/process/{id}": { "/api/v3/cluster/process/{id}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List a process by its ID. Use the filter parameter to specifiy the level of detail of the output.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List a process by its ID",
"operationId": "cluster-3-get-process",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"type": "string",
"description": "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output",
"name": "filter",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Process"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"put": { "put": {
"security": [ "security": [
{ {
@@ -4004,44 +4145,6 @@
} }
} }
}, },
"api.ClusterProcess": {
"type": "object",
"properties": {
"cpu": {
"description": "percent 0-100*ncpu",
"type": "number"
},
"domain": {
"type": "string"
},
"id": {
"type": "string"
},
"memory_bytes": {
"description": "bytes",
"type": "integer"
},
"node_id": {
"type": "string"
},
"order": {
"type": "string"
},
"owner": {
"type": "string"
},
"reference": {
"type": "string"
},
"runtime_seconds": {
"description": "seconds",
"type": "integer"
},
"state": {
"type": "string"
}
}
},
"api.ClusterRaft": { "api.ClusterRaft": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -142,32 +142,6 @@ definitions:
ncpu: ncpu:
type: number type: number
type: object type: object
api.ClusterProcess:
properties:
cpu:
description: percent 0-100*ncpu
type: number
domain:
type: string
id:
type: string
memory_bytes:
description: bytes
type: integer
node_id:
type: string
order:
type: string
owner:
type: string
reference:
type: string
runtime_seconds:
description: seconds
type: integer
state:
type: string
type: object
api.ClusterRaft: api.ClusterRaft:
properties: properties:
server: server:
@@ -2916,6 +2890,42 @@ paths:
in: query in: query
name: domain name: domain
type: string type: string
- description: Comma separated list of fields (config, state, report, metadata)
that will be part of the output. If empty, all fields will be part of the
output.
in: query
name: filter
type: string
- description: Return only these process that have this reference value. If
empty, the reference will be ignored.
in: query
name: reference
type: string
- description: Comma separated list of process ids to list. Overrides the reference.
If empty all IDs will be returned.
in: query
name: id
type: string
- description: Glob pattern for process IDs. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: idpattern
type: string
- description: Glob pattern for process references. If empty all IDs will be
returned. Intersected with results from other pattern matches.
in: query
name: refpattern
type: string
- description: Glob pattern for process owners. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: ownerpattern
type: string
- description: Glob pattern for process domain. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: domainpattern
type: string
produces: produces:
- application/json - application/json
responses: responses:
@@ -2923,7 +2933,7 @@ paths:
description: OK description: OK
schema: schema:
items: items:
$ref: '#/definitions/api.ClusterProcess' $ref: '#/definitions/api.Process'
type: array type: array
"404": "404":
description: Not Found description: Not Found
@@ -2973,6 +2983,42 @@ paths:
in: query in: query
name: domain name: domain
type: string type: string
- description: Comma separated list of fields (config, state, report, metadata)
that will be part of the output. If empty, all fields will be part of the
output.
in: query
name: filter
type: string
- description: Return only these process that have this reference value. If
empty, the reference will be ignored.
in: query
name: reference
type: string
- description: Comma separated list of process ids to list. Overrides the reference.
If empty all IDs will be returned.
in: query
name: id
type: string
- description: Glob pattern for process IDs. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: idpattern
type: string
- description: Glob pattern for process references. If empty all IDs will be
returned. Intersected with results from other pattern matches.
in: query
name: refpattern
type: string
- description: Glob pattern for process owners. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: ownerpattern
type: string
- description: Glob pattern for process domain. If empty all IDs will be returned.
Intersected with results from other pattern matches.
in: query
name: domainpattern
type: string
produces: produces:
- application/json - application/json
responses: responses:
@@ -2980,7 +3026,7 @@ paths:
description: OK description: OK
schema: schema:
items: items:
$ref: '#/definitions/api.ClusterProcess' $ref: '#/definitions/api.Process'
type: array type: array
security: security:
- ApiKeyAuth: [] - ApiKeyAuth: []
@@ -3049,6 +3095,45 @@ paths:
summary: Delete a process by its ID summary: Delete a process by its ID
tags: tags:
- v16.?.? - v16.?.?
get:
description: List a process by its ID. Use the filter parameter to specifiy
the level of detail of the output.
operationId: cluster-3-get-process
parameters:
- description: Process ID
in: path
name: id
required: true
type: string
- description: Domain to act on
in: query
name: domain
type: string
- description: Comma separated list of fields (config, state, report, metadata)
to be part of the output. If empty, all fields will be part of the output
in: query
name: filter
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.Process'
"403":
description: Forbidden
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: List a process by its ID
tags:
- v16.?.?
put: put:
consumes: consumes:
- application/json - application/json

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.18.0 github.com/caddyserver/certmagic v0.18.0
github.com/casbin/casbin/v2 v2.71.1 github.com/casbin/casbin/v2 v2.71.1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c
github.com/datarhei/gosrt v0.5.2 github.com/datarhei/gosrt v0.5.2
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0 github.com/fujiwara/shapeio v1.0.0

10
go.sum
View File

@@ -52,6 +52,16 @@ github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 h1:CILOzUB7CJGHtZHOxMJn+dN6rKzH29TOOOOep0AnFWM= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 h1:CILOzUB7CJGHtZHOxMJn+dN6rKzH29TOOOOep0AnFWM=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620190007-900b37caabbf h1:3/t2rLE/Vh51z7kJrT3WZYS+JKCQvZ1afdpLXLDi28o=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620190007-900b37caabbf/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620192304-1a2e44319306 h1:9CwCYAeo1rEccFDZ0yFkk4uHZ01rLdW6hARrZCU8ywg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620192304-1a2e44319306/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090506-6b04f0277e7a h1:aFlwQNcgyp7k8Vv8ESEvKwmKUI2KiiK53qCFaHYk5wY=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090506-6b04f0277e7a/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6 h1:iS1ji9i8gmYJe+LhCxHnHcvC/ieftBwdteqNg2yay+k=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c h1:WUjP8x7hl1EsTbV5w8WEhK6c8t3uRqLUSn7+FFHDpJU=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg=
github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc= github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc=
github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw= github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=

View File

@@ -42,14 +42,18 @@ func (fs *filesystem) Open(path string) fs.File {
} }
// Check if the file is available in the cluster // Check if the file is available in the cluster
data, err := fs.proxy.GetFile(fs.name, path) size, lastModified, err := fs.proxy.GetFileInfo(fs.name, path)
if err != nil { if err != nil {
return nil return nil
} }
file := &file{ file := &file{
ReadCloser: data, getFile: func(offset int64) (io.ReadCloser, error) {
return fs.proxy.GetFile(fs.name, path, offset)
},
name: path, name: path,
size: size,
lastModiefied: lastModified,
} }
return file return file
@@ -58,11 +62,53 @@ func (fs *filesystem) Open(path string) fs.File {
type file struct { type file struct {
io.ReadCloser io.ReadCloser
getFile func(offset int64) (io.ReadCloser, error)
name string name string
size int64
lastModiefied time.Time
}
func (f *file) Read(p []byte) (int, error) {
if f.ReadCloser == nil {
data, err := f.getFile(0)
if err != nil {
return 0, err
}
f.ReadCloser = data
}
return f.ReadCloser.Read(p)
}
func (f *file) Close() error {
if f.ReadCloser == nil {
return nil
}
return f.ReadCloser.Close()
} }
func (f *file) Seek(offset int64, whence int) (int64, error) { func (f *file) Seek(offset int64, whence int) (int64, error) {
if whence != io.SeekStart {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
}
if f.ReadCloser != nil {
f.ReadCloser.Close()
f.ReadCloser = nil
}
if f.ReadCloser == nil {
data, err := f.getFile(offset)
if err != nil {
return 0, err
}
f.ReadCloser = data
}
return offset, nil
} }
func (f *file) Name() string { func (f *file) Name() string {
@@ -78,11 +124,11 @@ func (f *file) Mode() gofs.FileMode {
} }
func (f *file) Size() int64 { func (f *file) Size() int64 {
return 0 return f.size
} }
func (f *file) ModTime() time.Time { func (f *file) ModTime() time.Time {
return time.Now() return f.lastModiefied
} }
func (f *file) IsLink() (string, bool) { func (f *file) IsLink() (string, bool) {

View File

@@ -9,7 +9,6 @@ import (
"github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/cluster/proxy"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam" "github.com/datarhei/core/v16/iam"
@@ -18,6 +17,7 @@ import (
"github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/restream"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
clientapi "github.com/datarhei/core-client-go/v16/api"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/lithammer/shortuuid/v4" "github.com/lithammer/shortuuid/v4"
) )
@@ -117,46 +117,101 @@ func (h *ClusterHandler) Leave(c echo.Context) error {
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
} }
// ListAllNodeProcesses returns the list of processes running on all nodes of the cluster // ListAllNodesProcesses returns the list of processes running on all nodes of the cluster
// @Summary List of processes in the cluster // @Summary List of processes in the cluster
// @Description List of processes in the cluster // @Description List of processes in the cluster
// @Tags v16.?.? // @Tags v16.?.?
// @ID cluster-3-list-all-node-processes // @ID cluster-3-list-all-node-processes
// @Produce json // @Produce json
// @Param domain query string false "Domain to act on" // @Param domain query string false "Domain to act on"
// @Success 200 {array} api.ClusterProcess // @Param filter query string false "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output."
// @Param reference query string false "Return only these process that have this reference value. If empty, the reference will be ignored."
// @Param id query string false "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned."
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param ownerpattern query string false "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param domainpattern query string false "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Success 200 {array} api.Process
// @Security ApiKeyAuth // @Security ApiKeyAuth
// @Router /api/v3/cluster/process [get] // @Router /api/v3/cluster/process [get]
func (h *ClusterHandler) ListAllNodeProcesses(c echo.Context) error { func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "") ctxuser := util.DefaultContext(c, "user", "")
filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool {
return r == rune(',')
})
reference := util.DefaultQuery(c, "reference", "")
wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool {
return r == rune(',')
})
domain := util.DefaultQuery(c, "domain", "") domain := util.DefaultQuery(c, "domain", "")
idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "")
ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "")
procs := h.proxy.ListProcesses() procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: wantids,
Filter: filter,
Domain: domain,
Reference: reference,
IDPattern: idpattern,
RefPattern: refpattern,
OwnerPattern: ownerpattern,
DomainPattern: domainpattern,
})
processes := []api.ClusterProcess{} processes := []clientapi.Process{}
for _, p := range procs { for _, p := range procs {
if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") {
continue continue
} }
processes = append(processes, api.ClusterProcess{ processes = append(processes, p)
ID: p.Config.ID,
Owner: p.Config.Owner,
Domain: p.Config.Domain,
NodeID: p.NodeID,
Reference: p.Config.Reference,
Order: p.Order,
State: p.State,
CPU: json.ToNumber(p.CPU),
Memory: p.Mem,
Runtime: int64(p.Runtime.Seconds()),
})
} }
return c.JSON(http.StatusOK, processes) return c.JSON(http.StatusOK, processes)
} }
// GetAllNodesProcess returns the process with the given ID whereever it's running on the cluster
// @Summary List a process by its ID
// @Description List a process by its ID. Use the filter parameter to specifiy the level of detail of the output.
// @Tags v16.?.?
// @ID cluster-3-get-process
// @Produce json
// @Param id path string true "Process ID"
// @Param domain query string false "Domain to act on"
// @Param filter query string false "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output"
// @Success 200 {object} api.Process
// @Failure 403 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process/{id} [get]
func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
id := util.PathParam(c, "id")
filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool {
return r == rune(',')
})
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: []string{id},
Filter: filter,
Domain: domain,
})
if len(procs) == 0 {
return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id)
}
return c.JSON(http.StatusOK, procs[0])
}
// GetNodes returns the list of proxy nodes in the cluster // GetNodes returns the list of proxy nodes in the cluster
// @Summary List of proxy nodes in the cluster // @Summary List of proxy nodes in the cluster
// @Description List of proxy nodes in the cluster // @Description List of proxy nodes in the cluster
@@ -301,45 +356,61 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error {
// @Produce json // @Produce json
// @Param id path string true "Node ID" // @Param id path string true "Node ID"
// @Param domain query string false "Domain to act on" // @Param domain query string false "Domain to act on"
// @Success 200 {array} api.ClusterProcess // @Param filter query string false "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output."
// @Param reference query string false "Return only these process that have this reference value. If empty, the reference will be ignored."
// @Param id query string false "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned."
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param ownerpattern query string false "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Param domainpattern query string false "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches."
// @Success 200 {array} api.Process
// @Failure 404 {object} api.Error // @Failure 404 {object} api.Error
// @Failure 500 {object} api.Error // @Failure 500 {object} api.Error
// @Security ApiKeyAuth // @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/process [get] // @Router /api/v3/cluster/node/{id}/process [get]
func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
domain := util.DefaultQuery(c, "domain", "")
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
ctxuser := util.DefaultContext(c, "user", "")
filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool {
return r == rune(',')
})
reference := util.DefaultQuery(c, "reference", "")
wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool {
return r == rune(',')
})
domain := util.DefaultQuery(c, "domain", "")
idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "")
ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "")
peer, err := h.proxy.GetNode(id) peer, err := h.proxy.GetNode(id)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err) return api.Err(http.StatusNotFound, "Node not found", "%s", err)
} }
procs, err := peer.ProcessList() procs, err := peer.ProcessList(proxy.ProcessListOptions{
ID: wantids,
Filter: filter,
Domain: domain,
Reference: reference,
IDPattern: idpattern,
RefPattern: refpattern,
OwnerPattern: ownerpattern,
DomainPattern: domainpattern,
})
if err != nil { if err != nil {
return api.Err(http.StatusInternalServerError, "", "Node not connected: %s", err) return api.Err(http.StatusInternalServerError, "", "Node not available: %s", err)
} }
processes := []api.ClusterProcess{} processes := []clientapi.Process{}
for _, p := range procs { for _, p := range procs {
if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") {
continue continue
} }
processes = append(processes, api.ClusterProcess{ processes = append(processes, p)
ID: p.Config.ID,
Owner: p.Config.Owner,
Domain: p.Config.Domain,
NodeID: p.NodeID,
Reference: p.Config.Reference,
Order: p.Order,
State: p.State,
CPU: json.ToNumber(p.CPU),
Memory: p.Mem,
Runtime: int64(p.Runtime.Seconds()),
})
} }
return c.JSON(http.StatusOK, processes) return c.JSON(http.StatusOK, processes)

View File

@@ -124,7 +124,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
domain := util.DefaultQuery(c, "domain", "") domain := util.DefaultQuery(c, "domain", "")
idpattern := util.DefaultQuery(c, "idpattern", "") idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "") refpattern := util.DefaultQuery(c, "refpattern", "")
ownerpattern := util.DefaultContext(c, "ownerpattern", "") ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "")
preids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern) preids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern)
@@ -195,7 +195,7 @@ func (h *RestreamHandler) Get(c echo.Context) error {
p, err := h.getProcess(tid, filter) p, err := h.getProcess(tid, filter)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", err.Error())
} }
return c.JSON(http.StatusOK, p) return c.JSON(http.StatusOK, p)

View File

@@ -82,7 +82,7 @@ func (h *FSHandler) GetFile(c echo.Context) error {
return c.Blob(http.StatusOK, "application/data", nil) return c.Blob(http.StatusOK, "application/data", nil)
} }
var streamFile io.Reader = file var streamFile io.ReadCloser = file
status := http.StatusOK status := http.StatusOK
ifRange := c.Request().Header.Get("If-Range") ifRange := c.Request().Header.Get("If-Range")
@@ -101,7 +101,7 @@ func (h *FSHandler) GetFile(c echo.Context) error {
if len(byteRange) != 0 { if len(byteRange) != 0 {
ranges, err := parseRange(byteRange, stat.Size()) ranges, err := parseRange(byteRange, stat.Size())
if err != nil { if err != nil {
return api.Err(http.StatusRequestedRangeNotSatisfiable, "") return api.Err(http.StatusRequestedRangeNotSatisfiable, "", "%s", err.Error())
} }
if len(ranges) > 1 { if len(ranges) > 1 {
@@ -111,7 +111,7 @@ func (h *FSHandler) GetFile(c echo.Context) error {
if len(ranges) == 1 { if len(ranges) == 1 {
_, err := file.Seek(ranges[0].start, io.SeekStart) _, err := file.Seek(ranges[0].start, io.SeekStart)
if err != nil { if err != nil {
return api.Err(http.StatusRequestedRangeNotSatisfiable, "") return api.Err(http.StatusRequestedRangeNotSatisfiable, "", "%s", err.Error())
} }
c.Response().Header().Set("Content-Range", ranges[0].contentRange(stat.Size())) c.Response().Header().Set("Content-Range", ranges[0].contentRange(stat.Size()))
@@ -331,7 +331,7 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
} }
type limitReader struct { type limitReader struct {
r io.Reader r io.ReadCloser
size int size int
} }
@@ -348,14 +348,27 @@ func (l *limitReader) Read(p []byte) (int, error) {
i, err := l.r.Read(p) i, err := l.r.Read(p)
if err != nil { if err != nil {
l.r.Close()
return i, err return i, err
} }
l.size -= i l.size -= i
if l.size == 0 {
l.r.Close()
}
return i, nil return i, nil
} }
func (l *limitReader) Close() error {
if l.r != nil {
l.r.Close()
}
return nil
}
// From: github.com/golang/go/net/http/fs.go@7dc9fcb // From: github.com/golang/go/net/http/fs.go@7dc9fcb
// errNoOverlap is returned by serveContent's parseRange if first-byte-pos of // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of

View File

@@ -683,7 +683,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity) v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity)
v3.GET("/cluster/iam/policies", s.v3handler.cluster.ListPolicies) v3.GET("/cluster/iam/policies", s.v3handler.cluster.ListPolicies)
v3.GET("/cluster/process", s.v3handler.cluster.ListAllNodeProcesses) v3.GET("/cluster/process", s.v3handler.cluster.ListAllNodesProcesses)
v3.GET("/cluster/process/:id", s.v3handler.cluster.GetAllNodesProcess)
v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) v3.GET("/cluster/node", s.v3handler.cluster.GetNodes)
v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode)

View File

@@ -78,7 +78,7 @@ type Config struct {
StaleTimeout uint64 // seconds StaleTimeout uint64 // seconds
Timeout uint64 // seconds Timeout uint64 // seconds
Scheduler string // crontab pattern or RFC3339 timestamp Scheduler string // crontab pattern or RFC3339 timestamp
LogPatterns []string // will we interpreted as regualr expressions LogPatterns []string // will we interpreted as regular expressions
LimitCPU float64 // percent LimitCPU float64 // percent
LimitMemory uint64 // bytes LimitMemory uint64 // bytes
LimitWaitFor uint64 // seconds LimitWaitFor uint64 // seconds

View File

@@ -342,7 +342,6 @@ func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, confi
// Only log session that have been active // Only log session that have been active
logger.Info().Log("Closed") logger.Info().Log("Closed")
logger.Debug().Log("Closed")
c.lock.history.Lock() c.lock.history.Lock()
@@ -557,9 +556,10 @@ func (c *collector) Activate(id string) bool {
return false return false
} }
c.lock.session.RLock() c.lock.session.Lock()
defer c.lock.session.Unlock()
sess, ok := c.sessions[id] sess, ok := c.sessions[id]
c.lock.session.RUnlock()
if !ok { if !ok {
return false return false
@@ -571,7 +571,6 @@ func (c *collector) Activate(id string) bool {
c.totalSessions++ c.totalSessions++
sess.logger.Info().Log("Active") sess.logger.Info().Log("Active")
sess.logger.Debug().Log("Active")
return true return true
} }

View File

@@ -64,6 +64,7 @@ type RestClient interface {
FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{name} FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{name}
FilesystemHasFile(name, path string) bool // HEAD /v3/fs/{name}/{path} FilesystemHasFile(name, path string) bool // HEAD /v3/fs/{name}/{path}
FilesystemGetFile(name, path string) (io.ReadCloser, error) // GET /v3/fs/{name}/{path} FilesystemGetFile(name, path string) (io.ReadCloser, error) // GET /v3/fs/{name}/{path}
FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) // GET /v3/fs/{name}/{path}
FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path} FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path}
FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path} FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path}
@@ -483,7 +484,7 @@ func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) {
return resp.StatusCode, resp.Body, nil return resp.StatusCode, resp.Body, nil
} }
func (r *restclient) stream(method, path string, query *url.Values, contentType string, data io.Reader) (io.ReadCloser, error) { func (r *restclient) stream(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) (io.ReadCloser, error) {
if err := r.checkVersion(method, r.prefix+path); err != nil { if err := r.checkVersion(method, r.prefix+path); err != nil {
return nil, err return nil, err
} }
@@ -498,6 +499,10 @@ func (r *restclient) stream(method, path string, query *url.Values, contentType
return nil, err return nil, err
} }
if header != nil {
req.Header = header.Clone()
}
if method == "POST" || method == "PUT" { if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", contentType) req.Header.Add("Content-Type", contentType)
} }
@@ -553,8 +558,8 @@ func (r *restclient) stream(method, path string, query *url.Values, contentType
return body, nil return body, nil
} }
func (r *restclient) call(method, path string, query *url.Values, contentType string, data io.Reader) ([]byte, error) { func (r *restclient) call(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) ([]byte, error) {
body, err := r.stream(method, path, query, contentType, data) body, err := r.stream(method, path, query, header, contentType, data)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -16,7 +16,7 @@ type configVersion struct {
func (r *restclient) Config() (int64, api.Config, error) { func (r *restclient) Config() (int64, api.Config, error) {
version := configVersion{} version := configVersion{}
data, err := r.call("GET", "/v3/config", nil, "", nil) data, err := r.call("GET", "/v3/config", nil, nil, "", nil)
if err != nil { if err != nil {
return 0, api.Config{}, err return 0, api.Config{}, err
} }
@@ -69,7 +69,7 @@ func (r *restclient) ConfigSet(config interface{}) error {
e := json.NewEncoder(&buf) e := json.NewEncoder(&buf)
e.Encode(config) e.Encode(config)
_, err := r.call("PUT", "/v3/config", nil, "application/json", &buf) _, err := r.call("PUT", "/v3/config", nil, nil, "application/json", &buf)
if e, ok := err.(api.Error); ok { if e, ok := err.(api.Error); ok {
if e.Code == 409 { if e.Code == 409 {
@@ -85,7 +85,7 @@ func (r *restclient) ConfigSet(config interface{}) error {
} }
func (r *restclient) ConfigReload() error { func (r *restclient) ConfigReload() error {
_, err := r.call("GET", "/v3/config/reload", nil, "", nil) _, err := r.call("GET", "/v3/config/reload", nil, nil, "", nil)
return err return err
} }

View File

@@ -3,8 +3,10 @@ package coreclient
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"net/http"
"net/url" "net/url"
"path/filepath" "path/filepath"
"strconv"
"github.com/datarhei/core-client-go/v16/api" "github.com/datarhei/core-client-go/v16/api"
) )
@@ -28,7 +30,7 @@ func (r *restclient) FilesystemList(name, pattern, sort, order string) ([]api.Fi
query.Set("sort", sort) query.Set("sort", sort)
query.Set("order", order) query.Set("order", order)
data, err := r.call("GET", "/v3/fs/"+url.PathEscape(name), query, "", nil) data, err := r.call("GET", "/v3/fs/"+url.PathEscape(name), query, nil, "", nil)
if err != nil { if err != nil {
return files, err return files, err
} }
@@ -43,17 +45,28 @@ func (r *restclient) FilesystemHasFile(name, path string) bool {
path = "/" + path path = "/" + path
} }
_, err := r.call("HEAD", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) _, err := r.call("HEAD", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "", nil)
return err == nil return err == nil
} }
func (r *restclient) FilesystemGetFile(name, path string) (io.ReadCloser, error) { func (r *restclient) FilesystemGetFile(name, path string) (io.ReadCloser, error) {
return r.FilesystemGetFileOffset(name, path, 0)
}
func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) {
if !filepath.IsAbs(path) { if !filepath.IsAbs(path) {
path = "/" + path path = "/" + path
} }
return r.stream("GET", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) var header http.Header = nil
if offset > 0 {
header = make(http.Header)
header.Set("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
}
return r.stream("GET", "/v3/fs/"+url.PathEscape(name)+path, nil, header, "", nil)
} }
func (r *restclient) FilesystemDeleteFile(name, path string) error { func (r *restclient) FilesystemDeleteFile(name, path string) error {
@@ -61,7 +74,7 @@ func (r *restclient) FilesystemDeleteFile(name, path string) error {
path = "/" + path path = "/" + path
} }
_, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) _, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "", nil)
return err return err
} }
@@ -71,7 +84,7 @@ func (r *restclient) FilesystemAddFile(name, path string, data io.Reader) error
path = "/" + path path = "/" + path
} }
_, err := r.call("PUT", "/v3/fs/"+url.PathEscape(name)+path, nil, "application/data", data) _, err := r.call("PUT", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "application/data", data)
return err return err
} }

View File

@@ -14,7 +14,7 @@ func (r *restclient) Graph(query api.GraphQuery) (api.GraphResponse, error) {
e := json.NewEncoder(&buf) e := json.NewEncoder(&buf)
e.Encode(query) e.Encode(query)
data, err := r.call("PUT", "/v3/graph", nil, "application/json", &buf) data, err := r.call("PUT", "/v3/graph", nil, nil, "application/json", &buf)
if err != nil { if err != nil {
return resp, err return resp, err
} }

View File

@@ -13,7 +13,7 @@ func (r *restclient) Log() ([]api.LogEvent, error) {
query := &url.Values{} query := &url.Values{}
query.Set("format", "raw") query.Set("format", "raw")
data, err := r.call("GET", "/v3/log", query, "", nil) data, err := r.call("GET", "/v3/log", query, nil, "", nil)
if err != nil { if err != nil {
return log, err return log, err
} }

View File

@@ -16,7 +16,7 @@ func (r *restclient) Metadata(key string) (api.Metadata, error) {
path += "/" + url.PathEscape(key) path += "/" + url.PathEscape(key)
} }
data, err := r.call("GET", path, nil, "", nil) data, err := r.call("GET", path, nil, nil, "", nil)
if err != nil { if err != nil {
return m, err return m, err
} }
@@ -37,7 +37,7 @@ func (r *restclient) MetadataSet(key string, metadata api.Metadata) error {
path += "/" + url.PathEscape(key) path += "/" + url.PathEscape(key)
} }
_, err := r.call("PUT", path, nil, "application/json", &buf) _, err := r.call("PUT", path, nil, nil, "application/json", &buf)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -10,7 +10,7 @@ import (
func (r *restclient) MetricsList() ([]api.MetricsDescription, error) { func (r *restclient) MetricsList() ([]api.MetricsDescription, error) {
descriptions := []api.MetricsDescription{} descriptions := []api.MetricsDescription{}
data, err := r.call("GET", "/v3/metrics", nil, "application/json", nil) data, err := r.call("GET", "/v3/metrics", nil, nil, "application/json", nil)
if err != nil { if err != nil {
return descriptions, err return descriptions, err
} }
@@ -27,7 +27,7 @@ func (r *restclient) Metrics(query api.MetricsQuery) (api.MetricsResponse, error
e := json.NewEncoder(&buf) e := json.NewEncoder(&buf)
e.Encode(query) e.Encode(query)
data, err := r.call("POST", "/v3/metrics", nil, "application/json", &buf) data, err := r.call("POST", "/v3/metrics", nil, nil, "application/json", &buf)
if err != nil { if err != nil {
return m, err return m, err
} }

View File

@@ -45,6 +45,8 @@ type ProcessListOptions struct {
Reference string Reference string
IDPattern string IDPattern string
RefPattern string RefPattern string
OwnerPattern string
DomainPattern string
} }
func (p *ProcessListOptions) Query() *url.Values { func (p *ProcessListOptions) Query() *url.Values {
@@ -55,6 +57,8 @@ func (p *ProcessListOptions) Query() *url.Values {
values.Set("reference", p.Reference) values.Set("reference", p.Reference)
values.Set("idpattern", p.IDPattern) values.Set("idpattern", p.IDPattern)
values.Set("refpattern", p.RefPattern) values.Set("refpattern", p.RefPattern)
values.Set("ownerpattern", p.OwnerPattern)
values.Set("domainpattern", p.DomainPattern)
return values return values
} }
@@ -62,7 +66,7 @@ func (p *ProcessListOptions) Query() *url.Values {
func (r *restclient) ProcessList(opts ProcessListOptions) ([]api.Process, error) { func (r *restclient) ProcessList(opts ProcessListOptions) ([]api.Process, error) {
var processes []api.Process var processes []api.Process
data, err := r.call("GET", "/v3/process", opts.Query(), "", nil) data, err := r.call("GET", "/v3/process", opts.Query(), nil, "", nil)
if err != nil { if err != nil {
return processes, err return processes, err
} }
@@ -79,7 +83,7 @@ func (r *restclient) Process(id ProcessID, filter []string) (api.Process, error)
values.Set("filter", strings.Join(filter, ",")) values.Set("filter", strings.Join(filter, ","))
values.Set("domain", id.Domain) values.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID), values, "", nil) data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID), values, nil, "", nil)
if err != nil { if err != nil {
return info, err return info, err
} }
@@ -95,7 +99,7 @@ func (r *restclient) ProcessAdd(p api.ProcessConfig) error {
e := json.NewEncoder(&buf) e := json.NewEncoder(&buf)
e.Encode(p) e.Encode(p)
_, err := r.call("POST", "/v3/process", nil, "application/json", &buf) _, err := r.call("POST", "/v3/process", nil, nil, "application/json", &buf)
if err != nil { if err != nil {
return err return err
} }
@@ -112,7 +116,7 @@ func (r *restclient) ProcessUpdate(id ProcessID, p api.ProcessConfig) error {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, "application/json", &buf) _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", &buf)
if err != nil { if err != nil {
return err return err
} }
@@ -124,7 +128,7 @@ func (r *restclient) ProcessDelete(id ProcessID) error {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, "", nil) r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, nil, "", nil)
return nil return nil
} }
@@ -140,7 +144,7 @@ func (r *restclient) ProcessCommand(id ProcessID, command string) error {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, "application/json", &buf) _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", &buf)
if err != nil { if err != nil {
return err return err
} }
@@ -154,7 +158,7 @@ func (r *restclient) ProcessProbe(id ProcessID) (api.Probe, error) {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/probe", query, "", nil) data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/probe", query, nil, "", nil)
if err != nil { if err != nil {
return p, err return p, err
} }
@@ -170,7 +174,7 @@ func (r *restclient) ProcessConfig(id ProcessID) (api.ProcessConfig, error) {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/config", query, "", nil) data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/config", query, nil, "", nil)
if err != nil { if err != nil {
return p, err return p, err
} }
@@ -186,7 +190,7 @@ func (r *restclient) ProcessReport(id ProcessID) (api.ProcessReport, error) {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, "", nil) data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "", nil)
if err != nil { if err != nil {
return p, err return p, err
} }
@@ -202,7 +206,7 @@ func (r *restclient) ProcessState(id ProcessID) (api.ProcessState, error) {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/state", query, "", nil) data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/state", query, nil, "", nil)
if err != nil { if err != nil {
return p, err return p, err
} }
@@ -223,7 +227,7 @@ func (r *restclient) ProcessMetadata(id ProcessID, key string) (api.Metadata, er
path += "/" + url.PathEscape(key) path += "/" + url.PathEscape(key)
} }
data, err := r.call("GET", path, query, "", nil) data, err := r.call("GET", path, query, nil, "", nil)
if err != nil { if err != nil {
return m, err return m, err
} }
@@ -242,7 +246,7 @@ func (r *restclient) ProcessMetadataSet(id ProcessID, key string, metadata api.M
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, "application/json", &buf) _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", &buf)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -9,7 +9,7 @@ import (
func (r *restclient) RTMPChannels() ([]api.RTMPChannel, error) { func (r *restclient) RTMPChannels() ([]api.RTMPChannel, error) {
var m []api.RTMPChannel var m []api.RTMPChannel
data, err := r.call("GET", "/v3/rtmp", nil, "", nil) data, err := r.call("GET", "/v3/rtmp", nil, nil, "", nil)
if err != nil { if err != nil {
return m, err return m, err
} }

View File

@@ -14,7 +14,7 @@ func (r *restclient) Sessions(collectors []string) (api.SessionsSummary, error)
query := &url.Values{} query := &url.Values{}
query.Set("collectors", strings.Join(collectors, ",")) query.Set("collectors", strings.Join(collectors, ","))
data, err := r.call("GET", "/v3/sessions", query, "", nil) data, err := r.call("GET", "/v3/sessions", query, nil, "", nil)
if err != nil { if err != nil {
return sessions, err return sessions, err
} }
@@ -30,7 +30,7 @@ func (r *restclient) SessionsActive(collectors []string) (api.SessionsActive, er
query := &url.Values{} query := &url.Values{}
query.Set("collectors", strings.Join(collectors, ",")) query.Set("collectors", strings.Join(collectors, ","))
data, err := r.call("GET", "/v3/sessions/active", query, "", nil) data, err := r.call("GET", "/v3/sessions/active", query, nil, "", nil)
if err != nil { if err != nil {
return sessions, err return sessions, err
} }

View File

@@ -9,7 +9,7 @@ import (
func (r *restclient) Skills() (api.Skills, error) { func (r *restclient) Skills() (api.Skills, error) {
var skills api.Skills var skills api.Skills
data, err := r.call("GET", "/v3/skills", nil, "", nil) data, err := r.call("GET", "/v3/skills", nil, nil, "", nil)
if err != nil { if err != nil {
return skills, err return skills, err
} }
@@ -20,7 +20,7 @@ func (r *restclient) Skills() (api.Skills, error) {
} }
func (r *restclient) SkillsReload() error { func (r *restclient) SkillsReload() error {
_, err := r.call("GET", "/v3/skills/reload", nil, "", nil) _, err := r.call("GET", "/v3/skills/reload", nil, nil, "", nil)
return err return err
} }

View File

@@ -9,7 +9,7 @@ import (
func (r *restclient) SRTChannels() ([]api.SRTChannel, error) { func (r *restclient) SRTChannels() ([]api.SRTChannel, error) {
var m []api.SRTChannel var m []api.SRTChannel
data, err := r.call("GET", "/v3/srt", nil, "", nil) data, err := r.call("GET", "/v3/srt", nil, nil, "", nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -13,7 +13,7 @@ func (r *restclient) WidgetProcess(id ProcessID) (api.WidgetProcess, error) {
query := &url.Values{} query := &url.Values{}
query.Set("domain", id.Domain) query.Set("domain", id.Domain)
data, err := r.call("GET", "/v3/widget/process"+url.PathEscape(id.ID), query, "", nil) data, err := r.call("GET", "/v3/widget/process"+url.PathEscape(id.ID), query, nil, "", nil)
if err != nil { if err != nil {
return w, err return w, err
} }

2
vendor/modules.txt vendored
View File

@@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2 # github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11 ## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 # github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c
## explicit; go 1.18 ## explicit; go 1.18
github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api github.com/datarhei/core-client-go/v16/api