diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 56800329..4546fd92 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -698,7 +698,7 @@ const docTemplateClusterAPI = `{ "type": "integer" }, "logPatterns": { - "description": "will we interpreted as regualr expressions", + "description": "will we interpreted as regular expressions", "type": "array", "items": { "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index b7d6a1f3..dcc23d05 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -690,7 +690,7 @@ "type": "integer" }, "logPatterns": { - "description": "will we interpreted as regualr expressions", + "description": "will we interpreted as regular expressions", "type": "array", "items": { "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index d39617a3..e34e8698 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -37,7 +37,7 @@ definitions: description: seconds type: integer logPatterns: - description: will we interpreted as regualr expressions + description: will we interpreted as regular expressions items: type: string type: array diff --git a/cluster/leader.go b/cluster/leader.go index a868875c..f252420a 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -514,7 +514,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { func (c *cluster) doSynchronize(emergency bool) { wish := c.store.GetProcessNodeMap() want := c.store.ListProcesses() - have := c.proxy.ListProcesses() + have := c.proxy.ListProxyProcesses() nodes := c.proxy.ListNodes() nodesMap := map[string]proxy.NodeAbout{} @@ -547,7 +547,7 @@ func (c *cluster) doSynchronize(emergency bool) { } func (c *cluster) doRebalance(emergency bool) { - have := c.proxy.ListProcesses() + have := c.proxy.ListProxyProcesses() nodes := c.proxy.ListNodes() nodesMap := map[string]proxy.NodeAbout{} diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 9c9691f8..70b6b523 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -29,7 +29,8 @@ type Node interface { StopFiles() GetURL(prefix, path string) (*url.URL, error) - GetFile(prefix, path string) (io.ReadCloser, error) + GetFile(prefix, path string, offset int64) (io.ReadCloser, error) + GetFileInfo(prefix, path string) (int64, time.Time, error) AddProcess(config *app.Config, metadata map[string]interface{}) error StartProcess(id app.ProcessID) error @@ -48,7 +49,8 @@ type NodeReader interface { Resources() NodeResources Files() NodeFiles - ProcessList() ([]Process, error) + ProcessList(ProcessListOptions) ([]clientapi.Process, error) + ProxyProcessList() ([]Process, error) } type NodeFiles struct { @@ -854,7 +856,7 @@ func (n *node) GetURL(prefix, resource string) (*url.URL, error) { return u, nil } -func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) { +func (n *node) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -862,12 +864,30 @@ func (n *node) GetFile(prefix, path string) (io.ReadCloser, error) { return nil, fmt.Errorf("not connected") } - return n.peer.FilesystemGetFile(prefix, path) + return n.peer.FilesystemGetFileOffset(prefix, path, offset) } -func (n *node) ProcessList() ([]Process, error) { - id := n.About().ID +func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + if n.peer == nil { + return 0, time.Time{}, fmt.Errorf("not connected") + } + + info, err := n.peer.FilesystemList(prefix, path, "", "") + if err != nil { + return 0, time.Time{}, fmt.Errorf("not found: %w", err) + } + + if len(info) != 1 { + return 0, time.Time{}, fmt.Errorf("ambigous result") + } + + return info[0].Size, time.Unix(info[0].LastMod, 0), nil +} + +func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -875,22 +895,31 @@ func (n *node) ProcessList() ([]Process, error) { return nil, fmt.Errorf("not connected") } - list, err := n.peer.ProcessList(client.ProcessListOptions{ - Filter: []string{ - "state", - "config", - "metadata", - }, + return n.peer.ProcessList(client.ProcessListOptions{ + ID: options.ID, + Filter: options.Filter, + Domain: options.Domain, + Reference: options.Reference, + IDPattern: options.IDPattern, + RefPattern: options.RefPattern, + OwnerPattern: options.OwnerPattern, + DomainPattern: options.DomainPattern, }) +} + +func (n *node) ProxyProcessList() ([]Process, error) { + list, err := n.ProcessList(ProcessListOptions{}) if err != nil { return nil, err } + nodeid := n.About().ID + processes := []Process{} for _, p := range list { process := Process{ - NodeID: id, + NodeID: nodeid, Order: p.State.Order, State: p.State.State, Mem: p.State.Memory, diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index da90ccb7..6a26e51d 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -12,6 +12,8 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" + + clientapi "github.com/datarhei/core-client-go/v16/api" ) type Proxy interface { @@ -35,10 +37,12 @@ type ProxyReader interface { GetNode(id string) (NodeReader, error) Resources() map[string]NodeResources - ListProcesses() []Process + ListProcesses(ProcessListOptions) []clientapi.Process + ListProxyProcesses() []Process GetURL(prefix, path string) (*url.URL, error) - GetFile(prefix, path string) (io.ReadCloser, error) + GetFile(prefix, path string, offset int64) (io.ReadCloser, error) + GetFileInfo(prefix, path string) (int64, time.Time, error) } func NewNullProxyReader() ProxyReader { @@ -73,12 +77,20 @@ func (p *proxyReader) Resources() map[string]NodeResources { return p.proxy.Resources() } -func (p *proxyReader) ListProcesses() []Process { +func (p *proxyReader) ListProcesses(options ProcessListOptions) []clientapi.Process { if p.proxy == nil { return nil } - return p.proxy.ListProcesses() + return p.proxy.ListProcesses(options) +} + +func (p *proxyReader) ListProxyProcesses() []Process { + if p.proxy == nil { + return nil + } + + return p.proxy.ListProxyProcesses() } func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { @@ -89,12 +101,20 @@ func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { return p.proxy.GetURL(prefix, path) } -func (p *proxyReader) GetFile(prefix, path string) (io.ReadCloser, error) { +func (p *proxyReader) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) { if p.proxy == nil { return nil, fmt.Errorf("no proxy provided") } - return p.proxy.GetFile(prefix, path) + return p.proxy.GetFile(prefix, path, offset) +} + +func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error) { + if p.proxy == nil { + return 0, time.Time{}, fmt.Errorf("no proxy provided") + } + + return p.proxy.GetFileInfo(prefix, path) } type ProxyConfig struct { @@ -380,39 +400,19 @@ func (p *proxy) GetURL(prefix, path string) (*url.URL, error) { return url, nil } -func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) { - p.lock.RLock() - defer p.lock.RUnlock() - +func (p *proxy) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) { logger := p.logger.WithFields(log.Fields{ "path": path, "prefix": prefix, }) - id, ok := p.fileid[prefix+":"+path] - if !ok { - logger.Debug().Log("Not found") + node, err := p.getNodeForFile(prefix, path) + if err != nil { + logger.Debug().WithError(err).Log("File not available") return nil, fmt.Errorf("file not found") } - ts, ok := p.idupdate[id] - if !ok { - logger.Debug().Log("No age information found") - return nil, fmt.Errorf("file not found") - } - - if time.Since(ts) > 2*time.Second { - logger.Debug().Log("File too old") - return nil, fmt.Errorf("file not found") - } - - node, ok := p.nodes[id] - if !ok { - logger.Debug().Log("Unknown node") - return nil, fmt.Errorf("file not found") - } - - data, err := node.GetFile(prefix, path) + data, err := node.GetFile(prefix, path, offset) if err != nil { logger.Debug().Log("Invalid path") return nil, fmt.Errorf("file not found") @@ -423,6 +423,55 @@ func (p *proxy) GetFile(prefix, path string) (io.ReadCloser, error) { return data, nil } +func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) { + logger := p.logger.WithFields(log.Fields{ + "path": path, + "prefix": prefix, + }) + + node, err := p.getNodeForFile(prefix, path) + if err != nil { + logger.Debug().WithError(err).Log("File not available") + return 0, time.Time{}, fmt.Errorf("file not found") + } + + size, lastModified, err := node.GetFileInfo(prefix, path) + if err != nil { + logger.Debug().Log("Invalid path") + return 0, time.Time{}, fmt.Errorf("file not found") + } + + logger.Debug().Log("File cluster path") + + return size, lastModified, nil +} + +func (p *proxy) getNodeForFile(prefix, path string) (Node, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + id, ok := p.fileid[prefix+":"+path] + if !ok { + return nil, fmt.Errorf("file not found") + } + + ts, ok := p.idupdate[id] + if !ok { + return nil, fmt.Errorf("no age information found") + } + + if time.Since(ts) > 2*time.Second { + return nil, fmt.Errorf("file too old") + } + + node, ok := p.nodes[id] + if !ok { + return nil, fmt.Errorf("unknown node") + } + + return node, nil +} + type Process struct { NodeID string Order string @@ -435,7 +484,18 @@ type Process struct { Metadata map[string]interface{} } -func (p *proxy) ListProcesses() []Process { +type ProcessListOptions struct { + ID []string + Filter []string + Domain string + Reference string + IDPattern string + RefPattern string + OwnerPattern string + DomainPattern string +} + +func (p *proxy) ListProxyProcesses() []Process { processChan := make(chan Process, 64) processList := []Process{} @@ -459,7 +519,52 @@ func (p *proxy) ListProcesses() []Process { go func(node Node, p chan<- Process) { defer wg.Done() - processes, err := node.ProcessList() + processes, err := node.ProxyProcessList() + if err != nil { + return + } + + for _, process := range processes { + p <- process + } + }(node, processChan) + } + p.lock.RUnlock() + + wg.Wait() + + close(processChan) + + wgList.Wait() + + return processList +} + +func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { + processChan := make(chan clientapi.Process, 64) + processList := []clientapi.Process{} + + wgList := sync.WaitGroup{} + wgList.Add(1) + + go func() { + defer wgList.Done() + + for process := range processChan { + processList = append(processList, process) + } + }() + + wg := sync.WaitGroup{} + + p.lock.RLock() + for _, node := range p.nodes { + wg.Add(1) + + go func(node Node, p chan<- clientapi.Process) { + defer wg.Done() + + processes, err := node.ProcessList(options) if err != nil { return } diff --git a/docs/docs.go b/docs/docs.go index 312e81c3..29ec0a62 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -838,6 +838,48 @@ const docTemplate = `{ "description": "Domain to act on", "name": "domain", "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.", + "name": "filter", + "in": "query" + }, + { + "type": "string", + "description": "Return only these process that have this reference value. If empty, the reference will be ignored.", + "name": "reference", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.", + "name": "id", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "ownerpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "domainpattern", + "in": "query" } ], "responses": { @@ -846,7 +888,7 @@ const docTemplate = `{ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } }, @@ -928,6 +970,48 @@ const docTemplate = `{ "description": "Domain to act on", "name": "domain", "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.", + "name": "filter", + "in": "query" + }, + { + "type": "string", + "description": "Return only these process that have this reference value. If empty, the reference will be ignored.", + "name": "reference", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.", + "name": "id", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "ownerpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "domainpattern", + "in": "query" } ], "responses": { @@ -936,7 +1020,7 @@ const docTemplate = `{ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } } @@ -994,6 +1078,63 @@ const docTemplate = `{ } }, "/api/v3/cluster/process/{id}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List a process by its ID. Use the filter parameter to specifiy the level of detail of the output.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List a process by its ID", + "operationId": "cluster-3-get-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output", + "name": "filter", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Process" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, "put": { "security": [ { @@ -4012,44 +4153,6 @@ const docTemplate = `{ } } }, - "api.ClusterProcess": { - "type": "object", - "properties": { - "cpu": { - "description": "percent 0-100*ncpu", - "type": "number" - }, - "domain": { - "type": "string" - }, - "id": { - "type": "string" - }, - "memory_bytes": { - "description": "bytes", - "type": "integer" - }, - "node_id": { - "type": "string" - }, - "order": { - "type": "string" - }, - "owner": { - "type": "string" - }, - "reference": { - "type": "string" - }, - "runtime_seconds": { - "description": "seconds", - "type": "integer" - }, - "state": { - "type": "string" - } - } - }, "api.ClusterRaft": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index e1fc98ff..bb29847a 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -830,6 +830,48 @@ "description": "Domain to act on", "name": "domain", "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.", + "name": "filter", + "in": "query" + }, + { + "type": "string", + "description": "Return only these process that have this reference value. If empty, the reference will be ignored.", + "name": "reference", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.", + "name": "id", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "ownerpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "domainpattern", + "in": "query" } ], "responses": { @@ -838,7 +880,7 @@ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } }, @@ -920,6 +962,48 @@ "description": "Domain to act on", "name": "domain", "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output.", + "name": "filter", + "in": "query" + }, + { + "type": "string", + "description": "Return only these process that have this reference value. If empty, the reference will be ignored.", + "name": "reference", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned.", + "name": "id", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "ownerpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches.", + "name": "domainpattern", + "in": "query" } ], "responses": { @@ -928,7 +1012,7 @@ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } } @@ -986,6 +1070,63 @@ } }, "/api/v3/cluster/process/{id}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List a process by its ID. Use the filter parameter to specifiy the level of detail of the output.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List a process by its ID", + "operationId": "cluster-3-get-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output", + "name": "filter", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Process" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, "put": { "security": [ { @@ -4004,44 +4145,6 @@ } } }, - "api.ClusterProcess": { - "type": "object", - "properties": { - "cpu": { - "description": "percent 0-100*ncpu", - "type": "number" - }, - "domain": { - "type": "string" - }, - "id": { - "type": "string" - }, - "memory_bytes": { - "description": "bytes", - "type": "integer" - }, - "node_id": { - "type": "string" - }, - "order": { - "type": "string" - }, - "owner": { - "type": "string" - }, - "reference": { - "type": "string" - }, - "runtime_seconds": { - "description": "seconds", - "type": "integer" - }, - "state": { - "type": "string" - } - } - }, "api.ClusterRaft": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index cc3fc269..6e127625 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -142,32 +142,6 @@ definitions: ncpu: type: number type: object - api.ClusterProcess: - properties: - cpu: - description: percent 0-100*ncpu - type: number - domain: - type: string - id: - type: string - memory_bytes: - description: bytes - type: integer - node_id: - type: string - order: - type: string - owner: - type: string - reference: - type: string - runtime_seconds: - description: seconds - type: integer - state: - type: string - type: object api.ClusterRaft: properties: server: @@ -2916,6 +2890,42 @@ paths: in: query name: domain type: string + - description: Comma separated list of fields (config, state, report, metadata) + that will be part of the output. If empty, all fields will be part of the + output. + in: query + name: filter + type: string + - description: Return only these process that have this reference value. If + empty, the reference will be ignored. + in: query + name: reference + type: string + - description: Comma separated list of process ids to list. Overrides the reference. + If empty all IDs will be returned. + in: query + name: id + type: string + - description: Glob pattern for process IDs. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: idpattern + type: string + - description: Glob pattern for process references. If empty all IDs will be + returned. Intersected with results from other pattern matches. + in: query + name: refpattern + type: string + - description: Glob pattern for process owners. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: ownerpattern + type: string + - description: Glob pattern for process domain. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: domainpattern + type: string produces: - application/json responses: @@ -2923,7 +2933,7 @@ paths: description: OK schema: items: - $ref: '#/definitions/api.ClusterProcess' + $ref: '#/definitions/api.Process' type: array "404": description: Not Found @@ -2973,6 +2983,42 @@ paths: in: query name: domain type: string + - description: Comma separated list of fields (config, state, report, metadata) + that will be part of the output. If empty, all fields will be part of the + output. + in: query + name: filter + type: string + - description: Return only these process that have this reference value. If + empty, the reference will be ignored. + in: query + name: reference + type: string + - description: Comma separated list of process ids to list. Overrides the reference. + If empty all IDs will be returned. + in: query + name: id + type: string + - description: Glob pattern for process IDs. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: idpattern + type: string + - description: Glob pattern for process references. If empty all IDs will be + returned. Intersected with results from other pattern matches. + in: query + name: refpattern + type: string + - description: Glob pattern for process owners. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: ownerpattern + type: string + - description: Glob pattern for process domain. If empty all IDs will be returned. + Intersected with results from other pattern matches. + in: query + name: domainpattern + type: string produces: - application/json responses: @@ -2980,7 +3026,7 @@ paths: description: OK schema: items: - $ref: '#/definitions/api.ClusterProcess' + $ref: '#/definitions/api.Process' type: array security: - ApiKeyAuth: [] @@ -3049,6 +3095,45 @@ paths: summary: Delete a process by its ID tags: - v16.?.? + get: + description: List a process by its ID. Use the filter parameter to specifiy + the level of detail of the output. + operationId: cluster-3-get-process + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Domain to act on + in: query + name: domain + type: string + - description: Comma separated list of fields (config, state, report, metadata) + to be part of the output. If empty, all fields will be part of the output + in: query + name: filter + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.Process' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: List a process by its ID + tags: + - v16.?.? put: consumes: - application/json diff --git a/go.mod b/go.mod index a7d284a1..91e26926 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.18.0 github.com/casbin/casbin/v2 v2.71.1 - github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 + github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c github.com/datarhei/gosrt v0.5.2 github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/fujiwara/shapeio v1.0.0 diff --git a/go.sum b/go.sum index 6829c35a..66dcec52 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,16 @@ github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e h1 github.com/datarhei/core-client-go/v16 v16.11.1-0.20230614141756-a25a5fc3c60e/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 h1:CILOzUB7CJGHtZHOxMJn+dN6rKzH29TOOOOep0AnFWM= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620190007-900b37caabbf h1:3/t2rLE/Vh51z7kJrT3WZYS+JKCQvZ1afdpLXLDi28o= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620190007-900b37caabbf/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620192304-1a2e44319306 h1:9CwCYAeo1rEccFDZ0yFkk4uHZ01rLdW6hARrZCU8ywg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620192304-1a2e44319306/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090506-6b04f0277e7a h1:aFlwQNcgyp7k8Vv8ESEvKwmKUI2KiiK53qCFaHYk5wY= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090506-6b04f0277e7a/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6 h1:iS1ji9i8gmYJe+LhCxHnHcvC/ieftBwdteqNg2yay+k= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621090732-c6ae9699cea6/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c h1:WUjP8x7hl1EsTbV5w8WEhK6c8t3uRqLUSn7+FFHDpJU= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c/go.mod h1:6L0zr/NUwvaPsCTK/IL17m8JUEtgLp3BDtlsBREwacg= github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc= github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 8af5e3ce..00428c74 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -42,14 +42,18 @@ func (fs *filesystem) Open(path string) fs.File { } // Check if the file is available in the cluster - data, err := fs.proxy.GetFile(fs.name, path) + size, lastModified, err := fs.proxy.GetFileInfo(fs.name, path) if err != nil { return nil } file := &file{ - ReadCloser: data, - name: path, + getFile: func(offset int64) (io.ReadCloser, error) { + return fs.proxy.GetFile(fs.name, path, offset) + }, + name: path, + size: size, + lastModiefied: lastModified, } return file @@ -58,11 +62,53 @@ func (fs *filesystem) Open(path string) fs.File { type file struct { io.ReadCloser - name string + getFile func(offset int64) (io.ReadCloser, error) + name string + size int64 + lastModiefied time.Time +} + +func (f *file) Read(p []byte) (int, error) { + if f.ReadCloser == nil { + data, err := f.getFile(0) + if err != nil { + return 0, err + } + + f.ReadCloser = data + } + + return f.ReadCloser.Read(p) +} + +func (f *file) Close() error { + if f.ReadCloser == nil { + return nil + } + + return f.ReadCloser.Close() } func (f *file) Seek(offset int64, whence int) (int64, error) { - return 0, fmt.Errorf("not implemented") + if whence != io.SeekStart { + return 0, fmt.Errorf("not implemented") + } + + if f.ReadCloser != nil { + f.ReadCloser.Close() + f.ReadCloser = nil + } + + if f.ReadCloser == nil { + data, err := f.getFile(offset) + if err != nil { + return 0, err + } + + f.ReadCloser = data + } + + return offset, nil } func (f *file) Name() string { @@ -78,11 +124,11 @@ func (f *file) Mode() gofs.FileMode { } func (f *file) Size() int64 { - return 0 + return f.size } func (f *file) ModTime() time.Time { - return time.Now() + return f.lastModiefied } func (f *file) IsLink() (string, bool) { diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index b21a9e32..e632e229 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -9,7 +9,6 @@ import ( "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster/proxy" - "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/iam" @@ -18,6 +17,7 @@ import ( "github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/restream/app" + clientapi "github.com/datarhei/core-client-go/v16/api" "github.com/labstack/echo/v4" "github.com/lithammer/shortuuid/v4" ) @@ -117,46 +117,101 @@ func (h *ClusterHandler) Leave(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } -// ListAllNodeProcesses returns the list of processes running on all nodes of the cluster +// ListAllNodesProcesses returns the list of processes running on all nodes of the cluster // @Summary List of processes in the cluster // @Description List of processes in the cluster // @Tags v16.?.? // @ID cluster-3-list-all-node-processes // @Produce json // @Param domain query string false "Domain to act on" -// @Success 200 {array} api.ClusterProcess +// @Param filter query string false "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output." +// @Param reference query string false "Return only these process that have this reference value. If empty, the reference will be ignored." +// @Param id query string false "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned." +// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param ownerpattern query string false "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param domainpattern query string false "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Success 200 {array} api.Process // @Security ApiKeyAuth // @Router /api/v3/cluster/process [get] -func (h *ClusterHandler) ListAllNodeProcesses(c echo.Context) error { +func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") + filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool { + return r == rune(',') + }) + reference := util.DefaultQuery(c, "reference", "") + wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool { + return r == rune(',') + }) domain := util.DefaultQuery(c, "domain", "") + idpattern := util.DefaultQuery(c, "idpattern", "") + refpattern := util.DefaultQuery(c, "refpattern", "") + ownerpattern := util.DefaultQuery(c, "ownerpattern", "") + domainpattern := util.DefaultQuery(c, "domainpattern", "") - procs := h.proxy.ListProcesses() + procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ + ID: wantids, + Filter: filter, + Domain: domain, + Reference: reference, + IDPattern: idpattern, + RefPattern: refpattern, + OwnerPattern: ownerpattern, + DomainPattern: domainpattern, + }) - processes := []api.ClusterProcess{} + processes := []clientapi.Process{} for _, p := range procs { - if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { + if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") { continue } - processes = append(processes, api.ClusterProcess{ - ID: p.Config.ID, - Owner: p.Config.Owner, - Domain: p.Config.Domain, - NodeID: p.NodeID, - Reference: p.Config.Reference, - Order: p.Order, - State: p.State, - CPU: json.ToNumber(p.CPU), - Memory: p.Mem, - Runtime: int64(p.Runtime.Seconds()), - }) + processes = append(processes, p) } return c.JSON(http.StatusOK, processes) } +// GetAllNodesProcess returns the process with the given ID whereever it's running on the cluster +// @Summary List a process by its ID +// @Description List a process by its ID. Use the filter parameter to specifiy the level of detail of the output. +// @Tags v16.?.? +// @ID cluster-3-get-process +// @Produce json +// @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" +// @Param filter query string false "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output" +// @Success 200 {object} api.Process +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process/{id} [get] +func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error { + ctxuser := util.DefaultContext(c, "user", "") + id := util.PathParam(c, "id") + filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool { + return r == rune(',') + }) + domain := util.DefaultQuery(c, "domain", "") + + if !h.iam.Enforce(ctxuser, domain, "process:"+id, "read") { + return api.Err(http.StatusForbidden, "Forbidden") + } + + procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ + ID: []string{id}, + Filter: filter, + Domain: domain, + }) + + if len(procs) == 0 { + return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) + } + + return c.JSON(http.StatusOK, procs[0]) +} + // GetNodes returns the list of proxy nodes in the cluster // @Summary List of proxy nodes in the cluster // @Description List of proxy nodes in the cluster @@ -301,45 +356,61 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error { // @Produce json // @Param id path string true "Node ID" // @Param domain query string false "Domain to act on" -// @Success 200 {array} api.ClusterProcess +// @Param filter query string false "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output." +// @Param reference query string false "Return only these process that have this reference value. If empty, the reference will be ignored." +// @Param id query string false "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned." +// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param ownerpattern query string false "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Param domainpattern query string false "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches." +// @Success 200 {array} api.Process // @Failure 404 {object} api.Error // @Failure 500 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/cluster/node/{id}/process [get] func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { - ctxuser := util.DefaultContext(c, "user", "") - domain := util.DefaultQuery(c, "domain", "") id := util.PathParam(c, "id") + ctxuser := util.DefaultContext(c, "user", "") + filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool { + return r == rune(',') + }) + reference := util.DefaultQuery(c, "reference", "") + wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool { + return r == rune(',') + }) + domain := util.DefaultQuery(c, "domain", "") + idpattern := util.DefaultQuery(c, "idpattern", "") + refpattern := util.DefaultQuery(c, "refpattern", "") + ownerpattern := util.DefaultQuery(c, "ownerpattern", "") + domainpattern := util.DefaultQuery(c, "domainpattern", "") peer, err := h.proxy.GetNode(id) if err != nil { return api.Err(http.StatusNotFound, "Node not found", "%s", err) } - procs, err := peer.ProcessList() + procs, err := peer.ProcessList(proxy.ProcessListOptions{ + ID: wantids, + Filter: filter, + Domain: domain, + Reference: reference, + IDPattern: idpattern, + RefPattern: refpattern, + OwnerPattern: ownerpattern, + DomainPattern: domainpattern, + }) if err != nil { - return api.Err(http.StatusInternalServerError, "", "Node not connected: %s", err) + return api.Err(http.StatusInternalServerError, "", "Node not available: %s", err) } - processes := []api.ClusterProcess{} + processes := []clientapi.Process{} for _, p := range procs { if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { continue } - processes = append(processes, api.ClusterProcess{ - ID: p.Config.ID, - Owner: p.Config.Owner, - Domain: p.Config.Domain, - NodeID: p.NodeID, - Reference: p.Config.Reference, - Order: p.Order, - State: p.State, - CPU: json.ToNumber(p.CPU), - Memory: p.Mem, - Runtime: int64(p.Runtime.Seconds()), - }) + processes = append(processes, p) } return c.JSON(http.StatusOK, processes) diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 132c7575..6ca3457b 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -124,7 +124,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error { domain := util.DefaultQuery(c, "domain", "") idpattern := util.DefaultQuery(c, "idpattern", "") refpattern := util.DefaultQuery(c, "refpattern", "") - ownerpattern := util.DefaultContext(c, "ownerpattern", "") + ownerpattern := util.DefaultQuery(c, "ownerpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "") preids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern) @@ -195,7 +195,7 @@ func (h *RestreamHandler) Get(c echo.Context) error { p, err := h.getProcess(tid, filter) if err != nil { - return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) + return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", err.Error()) } return c.JSON(http.StatusOK, p) diff --git a/http/handler/filesystem.go b/http/handler/filesystem.go index d55f1bac..ef7552e7 100644 --- a/http/handler/filesystem.go +++ b/http/handler/filesystem.go @@ -82,7 +82,7 @@ func (h *FSHandler) GetFile(c echo.Context) error { return c.Blob(http.StatusOK, "application/data", nil) } - var streamFile io.Reader = file + var streamFile io.ReadCloser = file status := http.StatusOK ifRange := c.Request().Header.Get("If-Range") @@ -101,7 +101,7 @@ func (h *FSHandler) GetFile(c echo.Context) error { if len(byteRange) != 0 { ranges, err := parseRange(byteRange, stat.Size()) if err != nil { - return api.Err(http.StatusRequestedRangeNotSatisfiable, "") + return api.Err(http.StatusRequestedRangeNotSatisfiable, "", "%s", err.Error()) } if len(ranges) > 1 { @@ -111,7 +111,7 @@ func (h *FSHandler) GetFile(c echo.Context) error { if len(ranges) == 1 { _, err := file.Seek(ranges[0].start, io.SeekStart) if err != nil { - return api.Err(http.StatusRequestedRangeNotSatisfiable, "") + return api.Err(http.StatusRequestedRangeNotSatisfiable, "", "%s", err.Error()) } c.Response().Header().Set("Content-Range", ranges[0].contentRange(stat.Size())) @@ -331,7 +331,7 @@ func (h *FSHandler) ListFiles(c echo.Context) error { } type limitReader struct { - r io.Reader + r io.ReadCloser size int } @@ -348,14 +348,27 @@ func (l *limitReader) Read(p []byte) (int, error) { i, err := l.r.Read(p) if err != nil { + l.r.Close() return i, err } l.size -= i + if l.size == 0 { + l.r.Close() + } + return i, nil } +func (l *limitReader) Close() error { + if l.r != nil { + l.r.Close() + } + + return nil +} + // From: github.com/golang/go/net/http/fs.go@7dc9fcb // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of diff --git a/http/server.go b/http/server.go index 68c44c45..b540c508 100644 --- a/http/server.go +++ b/http/server.go @@ -683,7 +683,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity) v3.GET("/cluster/iam/policies", s.v3handler.cluster.ListPolicies) - v3.GET("/cluster/process", s.v3handler.cluster.ListAllNodeProcesses) + v3.GET("/cluster/process", s.v3handler.cluster.ListAllNodesProcesses) + v3.GET("/cluster/process/:id", s.v3handler.cluster.GetAllNodesProcess) v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) diff --git a/restream/app/process.go b/restream/app/process.go index 09fea2a5..3f435481 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -78,7 +78,7 @@ type Config struct { StaleTimeout uint64 // seconds Timeout uint64 // seconds Scheduler string // crontab pattern or RFC3339 timestamp - LogPatterns []string // will we interpreted as regualr expressions + LogPatterns []string // will we interpreted as regular expressions LimitCPU float64 // percent LimitMemory uint64 // bytes LimitWaitFor uint64 // seconds diff --git a/session/collector.go b/session/collector.go index 9c1739eb..03baf662 100644 --- a/session/collector.go +++ b/session/collector.go @@ -342,7 +342,6 @@ func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, confi // Only log session that have been active logger.Info().Log("Closed") - logger.Debug().Log("Closed") c.lock.history.Lock() @@ -557,9 +556,10 @@ func (c *collector) Activate(id string) bool { return false } - c.lock.session.RLock() + c.lock.session.Lock() + defer c.lock.session.Unlock() + sess, ok := c.sessions[id] - c.lock.session.RUnlock() if !ok { return false @@ -571,7 +571,6 @@ func (c *collector) Activate(id string) bool { c.totalSessions++ sess.logger.Info().Log("Active") - sess.logger.Debug().Log("Active") return true } diff --git a/vendor/github.com/datarhei/core-client-go/v16/client.go b/vendor/github.com/datarhei/core-client-go/v16/client.go index f7f49811..275bc277 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/client.go +++ b/vendor/github.com/datarhei/core-client-go/v16/client.go @@ -61,11 +61,12 @@ type RestClient interface { MemFSDeleteFile(path string) error // DELETE /v3/fs/mem/{path} MemFSAddFile(path string, data io.Reader) error // PUT /v3/fs/mem/{path} - FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{name} - FilesystemHasFile(name, path string) bool // HEAD /v3/fs/{name}/{path} - FilesystemGetFile(name, path string) (io.ReadCloser, error) // GET /v3/fs/{name}/{path} - FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path} - FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path} + FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{name} + FilesystemHasFile(name, path string) bool // HEAD /v3/fs/{name}/{path} + FilesystemGetFile(name, path string) (io.ReadCloser, error) // GET /v3/fs/{name}/{path} + FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) // GET /v3/fs/{name}/{path} + FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path} + FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path} Log() ([]api.LogEvent, error) // GET /v3/log @@ -483,7 +484,7 @@ func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) { return resp.StatusCode, resp.Body, nil } -func (r *restclient) stream(method, path string, query *url.Values, contentType string, data io.Reader) (io.ReadCloser, error) { +func (r *restclient) stream(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) (io.ReadCloser, error) { if err := r.checkVersion(method, r.prefix+path); err != nil { return nil, err } @@ -498,6 +499,10 @@ func (r *restclient) stream(method, path string, query *url.Values, contentType return nil, err } + if header != nil { + req.Header = header.Clone() + } + if method == "POST" || method == "PUT" { req.Header.Add("Content-Type", contentType) } @@ -553,8 +558,8 @@ func (r *restclient) stream(method, path string, query *url.Values, contentType return body, nil } -func (r *restclient) call(method, path string, query *url.Values, contentType string, data io.Reader) ([]byte, error) { - body, err := r.stream(method, path, query, contentType, data) +func (r *restclient) call(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) ([]byte, error) { + body, err := r.stream(method, path, query, header, contentType, data) if err != nil { return nil, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/config.go b/vendor/github.com/datarhei/core-client-go/v16/config.go index f1d1fd76..0e7d9283 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/config.go +++ b/vendor/github.com/datarhei/core-client-go/v16/config.go @@ -16,7 +16,7 @@ type configVersion struct { func (r *restclient) Config() (int64, api.Config, error) { version := configVersion{} - data, err := r.call("GET", "/v3/config", nil, "", nil) + data, err := r.call("GET", "/v3/config", nil, nil, "", nil) if err != nil { return 0, api.Config{}, err } @@ -69,7 +69,7 @@ func (r *restclient) ConfigSet(config interface{}) error { e := json.NewEncoder(&buf) e.Encode(config) - _, err := r.call("PUT", "/v3/config", nil, "application/json", &buf) + _, err := r.call("PUT", "/v3/config", nil, nil, "application/json", &buf) if e, ok := err.(api.Error); ok { if e.Code == 409 { @@ -85,7 +85,7 @@ func (r *restclient) ConfigSet(config interface{}) error { } func (r *restclient) ConfigReload() error { - _, err := r.call("GET", "/v3/config/reload", nil, "", nil) + _, err := r.call("GET", "/v3/config/reload", nil, nil, "", nil) return err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/fs.go b/vendor/github.com/datarhei/core-client-go/v16/fs.go index e4b7bd03..50336486 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/fs.go +++ b/vendor/github.com/datarhei/core-client-go/v16/fs.go @@ -3,8 +3,10 @@ package coreclient import ( "encoding/json" "io" + "net/http" "net/url" "path/filepath" + "strconv" "github.com/datarhei/core-client-go/v16/api" ) @@ -28,7 +30,7 @@ func (r *restclient) FilesystemList(name, pattern, sort, order string) ([]api.Fi query.Set("sort", sort) query.Set("order", order) - data, err := r.call("GET", "/v3/fs/"+url.PathEscape(name), query, "", nil) + data, err := r.call("GET", "/v3/fs/"+url.PathEscape(name), query, nil, "", nil) if err != nil { return files, err } @@ -43,17 +45,28 @@ func (r *restclient) FilesystemHasFile(name, path string) bool { path = "/" + path } - _, err := r.call("HEAD", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) + _, err := r.call("HEAD", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "", nil) return err == nil } func (r *restclient) FilesystemGetFile(name, path string) (io.ReadCloser, error) { + return r.FilesystemGetFileOffset(name, path, 0) +} + +func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) { if !filepath.IsAbs(path) { path = "/" + path } - return r.stream("GET", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) + var header http.Header = nil + + if offset > 0 { + header = make(http.Header) + header.Set("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") + } + + return r.stream("GET", "/v3/fs/"+url.PathEscape(name)+path, nil, header, "", nil) } func (r *restclient) FilesystemDeleteFile(name, path string) error { @@ -61,7 +74,7 @@ func (r *restclient) FilesystemDeleteFile(name, path string) error { path = "/" + path } - _, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(name)+path, nil, "", nil) + _, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "", nil) return err } @@ -71,7 +84,7 @@ func (r *restclient) FilesystemAddFile(name, path string, data io.Reader) error path = "/" + path } - _, err := r.call("PUT", "/v3/fs/"+url.PathEscape(name)+path, nil, "application/data", data) + _, err := r.call("PUT", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "application/data", data) return err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/graph.go b/vendor/github.com/datarhei/core-client-go/v16/graph.go index 6e7d53c9..02f5c2bc 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/graph.go +++ b/vendor/github.com/datarhei/core-client-go/v16/graph.go @@ -14,7 +14,7 @@ func (r *restclient) Graph(query api.GraphQuery) (api.GraphResponse, error) { e := json.NewEncoder(&buf) e.Encode(query) - data, err := r.call("PUT", "/v3/graph", nil, "application/json", &buf) + data, err := r.call("PUT", "/v3/graph", nil, nil, "application/json", &buf) if err != nil { return resp, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/log.go b/vendor/github.com/datarhei/core-client-go/v16/log.go index a46f74a3..0b454267 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/log.go +++ b/vendor/github.com/datarhei/core-client-go/v16/log.go @@ -13,7 +13,7 @@ func (r *restclient) Log() ([]api.LogEvent, error) { query := &url.Values{} query.Set("format", "raw") - data, err := r.call("GET", "/v3/log", query, "", nil) + data, err := r.call("GET", "/v3/log", query, nil, "", nil) if err != nil { return log, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/metadata.go b/vendor/github.com/datarhei/core-client-go/v16/metadata.go index 7eccc6c8..69f3ca63 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/metadata.go +++ b/vendor/github.com/datarhei/core-client-go/v16/metadata.go @@ -16,7 +16,7 @@ func (r *restclient) Metadata(key string) (api.Metadata, error) { path += "/" + url.PathEscape(key) } - data, err := r.call("GET", path, nil, "", nil) + data, err := r.call("GET", path, nil, nil, "", nil) if err != nil { return m, err } @@ -37,7 +37,7 @@ func (r *restclient) MetadataSet(key string, metadata api.Metadata) error { path += "/" + url.PathEscape(key) } - _, err := r.call("PUT", path, nil, "application/json", &buf) + _, err := r.call("PUT", path, nil, nil, "application/json", &buf) if err != nil { return err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/metrics.go b/vendor/github.com/datarhei/core-client-go/v16/metrics.go index 36239300..75a13343 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/metrics.go +++ b/vendor/github.com/datarhei/core-client-go/v16/metrics.go @@ -10,7 +10,7 @@ import ( func (r *restclient) MetricsList() ([]api.MetricsDescription, error) { descriptions := []api.MetricsDescription{} - data, err := r.call("GET", "/v3/metrics", nil, "application/json", nil) + data, err := r.call("GET", "/v3/metrics", nil, nil, "application/json", nil) if err != nil { return descriptions, err } @@ -27,7 +27,7 @@ func (r *restclient) Metrics(query api.MetricsQuery) (api.MetricsResponse, error e := json.NewEncoder(&buf) e.Encode(query) - data, err := r.call("POST", "/v3/metrics", nil, "application/json", &buf) + data, err := r.call("POST", "/v3/metrics", nil, nil, "application/json", &buf) if err != nil { return m, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/process.go b/vendor/github.com/datarhei/core-client-go/v16/process.go index d52a7b45..6576a82c 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/process.go +++ b/vendor/github.com/datarhei/core-client-go/v16/process.go @@ -39,12 +39,14 @@ func (p ProcessID) String() string { } type ProcessListOptions struct { - ID []string - Filter []string - Domain string - Reference string - IDPattern string - RefPattern string + ID []string + Filter []string + Domain string + Reference string + IDPattern string + RefPattern string + OwnerPattern string + DomainPattern string } func (p *ProcessListOptions) Query() *url.Values { @@ -55,6 +57,8 @@ func (p *ProcessListOptions) Query() *url.Values { values.Set("reference", p.Reference) values.Set("idpattern", p.IDPattern) values.Set("refpattern", p.RefPattern) + values.Set("ownerpattern", p.OwnerPattern) + values.Set("domainpattern", p.DomainPattern) return values } @@ -62,7 +66,7 @@ func (p *ProcessListOptions) Query() *url.Values { func (r *restclient) ProcessList(opts ProcessListOptions) ([]api.Process, error) { var processes []api.Process - data, err := r.call("GET", "/v3/process", opts.Query(), "", nil) + data, err := r.call("GET", "/v3/process", opts.Query(), nil, "", nil) if err != nil { return processes, err } @@ -79,7 +83,7 @@ func (r *restclient) Process(id ProcessID, filter []string) (api.Process, error) values.Set("filter", strings.Join(filter, ",")) values.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID), values, "", nil) + data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID), values, nil, "", nil) if err != nil { return info, err } @@ -95,7 +99,7 @@ func (r *restclient) ProcessAdd(p api.ProcessConfig) error { e := json.NewEncoder(&buf) e.Encode(p) - _, err := r.call("POST", "/v3/process", nil, "application/json", &buf) + _, err := r.call("POST", "/v3/process", nil, nil, "application/json", &buf) if err != nil { return err } @@ -112,7 +116,7 @@ func (r *restclient) ProcessUpdate(id ProcessID, p api.ProcessConfig) error { query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", &buf) if err != nil { return err } @@ -124,7 +128,7 @@ func (r *restclient) ProcessDelete(id ProcessID) error { query := &url.Values{} query.Set("domain", id.Domain) - r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, "", nil) + r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, nil, "", nil) return nil } @@ -140,7 +144,7 @@ func (r *restclient) ProcessCommand(id ProcessID, command string) error { query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", &buf) if err != nil { return err } @@ -154,7 +158,7 @@ func (r *restclient) ProcessProbe(id ProcessID) (api.Probe, error) { query := &url.Values{} query.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/probe", query, "", nil) + data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/probe", query, nil, "", nil) if err != nil { return p, err } @@ -170,7 +174,7 @@ func (r *restclient) ProcessConfig(id ProcessID) (api.ProcessConfig, error) { query := &url.Values{} query.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/config", query, "", nil) + data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/config", query, nil, "", nil) if err != nil { return p, err } @@ -186,7 +190,7 @@ func (r *restclient) ProcessReport(id ProcessID) (api.ProcessReport, error) { query := &url.Values{} query.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, "", nil) + data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "", nil) if err != nil { return p, err } @@ -202,7 +206,7 @@ func (r *restclient) ProcessState(id ProcessID) (api.ProcessState, error) { query := &url.Values{} query.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/state", query, "", nil) + data, err := r.call("GET", "/v3/process/"+url.PathEscape(id.ID)+"/state", query, nil, "", nil) if err != nil { return p, err } @@ -223,7 +227,7 @@ func (r *restclient) ProcessMetadata(id ProcessID, key string) (api.Metadata, er path += "/" + url.PathEscape(key) } - data, err := r.call("GET", path, query, "", nil) + data, err := r.call("GET", path, query, nil, "", nil) if err != nil { return m, err } @@ -242,7 +246,7 @@ func (r *restclient) ProcessMetadataSet(id ProcessID, key string, metadata api.M query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", &buf) if err != nil { return err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/rtmp.go b/vendor/github.com/datarhei/core-client-go/v16/rtmp.go index 052f0fb0..c7e03d92 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/rtmp.go +++ b/vendor/github.com/datarhei/core-client-go/v16/rtmp.go @@ -9,7 +9,7 @@ import ( func (r *restclient) RTMPChannels() ([]api.RTMPChannel, error) { var m []api.RTMPChannel - data, err := r.call("GET", "/v3/rtmp", nil, "", nil) + data, err := r.call("GET", "/v3/rtmp", nil, nil, "", nil) if err != nil { return m, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/session.go b/vendor/github.com/datarhei/core-client-go/v16/session.go index f1e161dc..4f0c99f0 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/session.go +++ b/vendor/github.com/datarhei/core-client-go/v16/session.go @@ -14,7 +14,7 @@ func (r *restclient) Sessions(collectors []string) (api.SessionsSummary, error) query := &url.Values{} query.Set("collectors", strings.Join(collectors, ",")) - data, err := r.call("GET", "/v3/sessions", query, "", nil) + data, err := r.call("GET", "/v3/sessions", query, nil, "", nil) if err != nil { return sessions, err } @@ -30,7 +30,7 @@ func (r *restclient) SessionsActive(collectors []string) (api.SessionsActive, er query := &url.Values{} query.Set("collectors", strings.Join(collectors, ",")) - data, err := r.call("GET", "/v3/sessions/active", query, "", nil) + data, err := r.call("GET", "/v3/sessions/active", query, nil, "", nil) if err != nil { return sessions, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/skills.go b/vendor/github.com/datarhei/core-client-go/v16/skills.go index 1141bbe1..e7ef5c35 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/skills.go +++ b/vendor/github.com/datarhei/core-client-go/v16/skills.go @@ -9,7 +9,7 @@ import ( func (r *restclient) Skills() (api.Skills, error) { var skills api.Skills - data, err := r.call("GET", "/v3/skills", nil, "", nil) + data, err := r.call("GET", "/v3/skills", nil, nil, "", nil) if err != nil { return skills, err } @@ -20,7 +20,7 @@ func (r *restclient) Skills() (api.Skills, error) { } func (r *restclient) SkillsReload() error { - _, err := r.call("GET", "/v3/skills/reload", nil, "", nil) + _, err := r.call("GET", "/v3/skills/reload", nil, nil, "", nil) return err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/srt.go b/vendor/github.com/datarhei/core-client-go/v16/srt.go index 426df39d..7ee623ea 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/srt.go +++ b/vendor/github.com/datarhei/core-client-go/v16/srt.go @@ -9,7 +9,7 @@ import ( func (r *restclient) SRTChannels() ([]api.SRTChannel, error) { var m []api.SRTChannel - data, err := r.call("GET", "/v3/srt", nil, "", nil) + data, err := r.call("GET", "/v3/srt", nil, nil, "", nil) if err != nil { return nil, err } diff --git a/vendor/github.com/datarhei/core-client-go/v16/widget.go b/vendor/github.com/datarhei/core-client-go/v16/widget.go index 349f8d7b..07030569 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/widget.go +++ b/vendor/github.com/datarhei/core-client-go/v16/widget.go @@ -13,7 +13,7 @@ func (r *restclient) WidgetProcess(id ProcessID) (api.WidgetProcess, error) { query := &url.Values{} query.Set("domain", id.Domain) - data, err := r.call("GET", "/v3/widget/process"+url.PathEscape(id.ID), query, "", nil) + data, err := r.call("GET", "/v3/widget/process"+url.PathEscape(id.ID), query, nil, "", nil) if err != nil { return w, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 69e3212a..52d1406b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.2 ## explicit; go 1.11 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230620131644-140b3a61d4c8 +# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230621092648-5cc658d8a73c ## explicit; go 1.18 github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16/api