From 7d2b7b48362b38b931ba24f9b36b01a6f1b83b10 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 12 May 2023 12:59:01 +0200 Subject: [PATCH] WIP: allow update processes in cluster --- client/client.go | 1 - cluster/leader.go | 78 +++++++++---- cluster/leader_test.go | 116 +++++++++++-------- cluster/proxy/node.go | 141 +++++++++++++++-------- cluster/proxy/proxy.go | 131 +++++++++------------ cluster/store/store.go | 41 ++++--- docs/docs.go | 224 +++++++++++++++++++++++++++++++++--- docs/swagger.json | 224 +++++++++++++++++++++++++++++++++--- docs/swagger.yaml | 153 +++++++++++++++++++++--- encoding/json/json.go | 12 ++ http/api/cluster.go | 43 ++++--- http/api/json.go | 2 +- http/api/probe.go | 6 +- http/api/process.go | 2 +- http/api/progress.go | 10 +- http/api/session.go | 12 +- http/handler/api/cluster.go | 97 +++++++++++----- http/server.go | 8 +- 18 files changed, 990 insertions(+), 311 deletions(-) diff --git a/client/client.go b/client/client.go index 21c7d666..7cf047f8 100644 --- a/client/client.go +++ b/client/client.go @@ -141,7 +141,6 @@ func New(config Config) (RestClient, error) { u.Fragment = "" r.address = u.String() - fmt.Printf("address: %s\n", r.address) if r.client == nil { r.client = &http.Client{ diff --git a/cluster/leader.go b/cluster/leader.go index 4d746540..b5cf8067 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -9,6 +9,7 @@ import ( "time" "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" ) @@ -342,6 +343,12 @@ type processOpAdd struct { config *app.Config } +type processOpUpdate struct { + nodeid string + processid string + config *app.Config +} + type processOpReject struct { processid string err error @@ -377,6 +384,19 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.config.ID, "nodeid": v.nodeid, }).Log("Adding process") + case processOpUpdate: + err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Updating process") + break + } + c.logger.Info().WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Updating process") case processOpDelete: err := c.proxy.ProcessDelete(v.nodeid, v.processid) if err != nil { @@ -451,7 +471,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { func (c *cluster) doSynchronize() { want := c.store.ProcessList() - have := c.proxy.ProcessList() + have := c.proxy.ListProcesses() resources := c.proxy.Resources() c.logger.Debug().WithFields(log.Fields{ @@ -466,7 +486,7 @@ func (c *cluster) doSynchronize() { } func (c *cluster) doRebalance() { - have := c.proxy.ProcessList() + have := c.proxy.ListProcesses() resources := c.proxy.Resources() c.logger.Debug().WithFields(log.Fields{ @@ -482,7 +502,7 @@ func (c *cluster) doRebalance() { // normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place. // // Deprecated: all values are absolute or already normed to 0-100*ncpu percent -func normalizeProcessesAndResources(processes []proxy.ProcessConfig, resources map[string]proxy.NodeResources) { +func normalizeProcessesAndResources(processes []proxy.Process, resources map[string]proxy.NodeResources) { maxNCPU := .0 for _, r := range resources { @@ -520,12 +540,12 @@ func normalizeProcessesAndResources(processes []proxy.ProcessConfig, resources m // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. -func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[string]proxy.NodeResources) []interface{} { +func synchronize(want []store.Process, have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} { // A map from the process ID to the process config of the processes // we want to be running on the nodes. - wantMap := map[string]*app.Config{} - for _, config := range want { - wantMap[config.ID] = &config + wantMap := map[string]store.Process{} + for _, process := range want { + wantMap[process.Config.ID] = process } opStack := []interface{}{} @@ -533,10 +553,10 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st // Now we iterate through the processes we actually have running on the nodes // and remove them from the wantMap. We also make sure that they are running. // If a process is not on the wantMap, it will be deleted from the nodes. - haveAfterRemove := []proxy.ProcessConfig{} + haveAfterRemove := []proxy.Process{} for _, p := range have { - if _, ok := wantMap[p.Config.ID]; !ok { + if wantP, ok := wantMap[p.Config.ID]; !ok { opStack = append(opStack, processOpDelete{ nodeid: p.NodeID, processid: p.Config.ID, @@ -551,6 +571,14 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st } continue + } else { + if wantP.UpdatedAt.After(p.UpdatedAt) { + opStack = append(opStack, processOpUpdate{ + nodeid: p.NodeID, + processid: p.Config.ID, + config: wantP.Config, + }) + } } delete(wantMap, p.Config.ID) @@ -571,11 +599,11 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st haveReferenceAffinityMap := createReferenceAffinityMap(have) // Now all remaining processes in the wantMap must be added to one of the nodes - for _, config := range wantMap { + for _, process := range wantMap { // If a process doesn't have any limits defined, reject that process - if config.LimitCPU <= 0 || config.LimitMemory <= 0 { + if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 { opStack = append(opStack, processOpReject{ - processid: config.ID, + processid: process.Config.ID, err: errNoLimitsDefined, }) @@ -589,11 +617,11 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st // Try to add the process to a node where other processes with the same // reference currently reside. - if len(config.Reference) != 0 { - for _, count := range haveReferenceAffinityMap[config.Reference] { + if len(process.Config.Reference) != 0 { + for _, count := range haveReferenceAffinityMap[process.Config.Reference] { r := resources[count.nodeid] - cpu := config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given - mem := config.LimitMemory + cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + mem := process.Config.LimitMemory if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { nodeid = count.nodeid @@ -605,8 +633,8 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st // Find the node with the most resources available if len(nodeid) == 0 { for id, r := range resources { - cpu := config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given - mem := config.LimitMemory + cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + mem := process.Config.LimitMemory if len(nodeid) == 0 { if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { @@ -625,19 +653,19 @@ func synchronize(want []app.Config, have []proxy.ProcessConfig, resources map[st if len(nodeid) != 0 { opStack = append(opStack, processOpAdd{ nodeid: nodeid, - config: config, + config: process.Config, }) // Adjust the resources r, ok := resources[nodeid] if ok { - r.CPU += config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given - r.Mem += config.LimitMemory + r.CPU += process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + r.Mem += process.Config.LimitMemory resources[nodeid] = r } } else { opStack = append(opStack, processOpReject{ - processid: config.ID, + processid: process.Config.ID, err: errNotEnoughResources, }) } @@ -651,7 +679,7 @@ type referenceAffinityNodeCount struct { count uint64 } -func createReferenceAffinityMap(processes []proxy.ProcessConfig) map[string][]referenceAffinityNodeCount { +func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenceAffinityNodeCount { referenceAffinityMap := map[string][]referenceAffinityNodeCount{} for _, p := range processes { if len(p.Config.Reference) == 0 { @@ -696,9 +724,9 @@ func createReferenceAffinityMap(processes []proxy.ProcessConfig) map[string][]re // rebalance returns a list of operations that will move running processes away from nodes // that are overloaded. -func rebalance(have []proxy.ProcessConfig, resources map[string]proxy.NodeResources) []interface{} { +func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} { // Group the processes by node - processNodeMap := map[string][]proxy.ProcessConfig{} + processNodeMap := map[string][]proxy.Process{} for _, p := range have { processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p) diff --git a/cluster/leader_test.go b/cluster/leader_test.go index d3516a77..64a62643 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -2,23 +2,28 @@ package cluster import ( "testing" + "time" "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/restream/app" "github.com/stretchr/testify/require" ) func TestSynchronizeAdd(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", - LimitCPU: 10, - LimitMemory: 50, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 50, + }, }, } - have := []proxy.ProcessConfig{} + have := []proxy.Process{} resources := map[string]proxy.NodeResources{ "node1": { @@ -69,22 +74,28 @@ func TestSynchronizeAdd(t *testing.T) { } func TestSynchronizeAddReferenceAffinity(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", - Reference: "barfoo", - LimitCPU: 10, - LimitMemory: 20, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 20, + }, }, { - ID: "foobar2", - Reference: "barfoo", - LimitCPU: 10, - LimitMemory: 30, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 30, + }, }, } - have := []proxy.ProcessConfig{ + have := []proxy.Process{ { NodeID: "node2", Order: "start", @@ -132,15 +143,18 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { } func TestSynchronizeAddLimit(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", - LimitCPU: 10, - LimitMemory: 5, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + }, }, } - have := []proxy.ProcessConfig{} + have := []proxy.Process{} resources := map[string]proxy.NodeResources{ "node1": { @@ -191,15 +205,18 @@ func TestSynchronizeAddLimit(t *testing.T) { } func TestSynchronizeAddNoResourcesCPU(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", - LimitCPU: 30, - LimitMemory: 5, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 30, + LimitMemory: 5, + }, }, } - have := []proxy.ProcessConfig{} + have := []proxy.Process{} resources := map[string]proxy.NodeResources{ "node1": { @@ -229,15 +246,18 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { } func TestSynchronizeAddNoResourcesMemory(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", - LimitCPU: 1, - LimitMemory: 50, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 1, + LimitMemory: 50, + }, }, } - have := []proxy.ProcessConfig{} + have := []proxy.Process{} resources := map[string]proxy.NodeResources{ "node1": { @@ -267,13 +287,16 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { } func TestSynchronizeAddNoLimits(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar", + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + }, }, } - have := []proxy.ProcessConfig{} + have := []proxy.Process{} resources := map[string]proxy.NodeResources{ "node1": { @@ -303,9 +326,9 @@ func TestSynchronizeAddNoLimits(t *testing.T) { } func TestSynchronizeRemove(t *testing.T) { - want := []app.Config{} + want := []store.Process{} - have := []proxy.ProcessConfig{ + have := []proxy.Process{ { NodeID: "node2", Order: "start", @@ -364,15 +387,18 @@ func TestSynchronizeRemove(t *testing.T) { } func TestSynchronizeAddRemove(t *testing.T) { - want := []app.Config{ + want := []store.Process{ { - ID: "foobar1", - LimitCPU: 10, - LimitMemory: 5, + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar1", + LimitCPU: 10, + LimitMemory: 5, + }, }, } - have := []proxy.ProcessConfig{ + have := []proxy.Process{ { NodeID: "node2", Order: "start", @@ -439,7 +465,7 @@ func TestSynchronizeAddRemove(t *testing.T) { } func TestRebalanceNothingToDo(t *testing.T) { - processes := []proxy.ProcessConfig{ + processes := []proxy.Process{ { NodeID: "node1", Order: "start", @@ -487,7 +513,7 @@ func TestRebalanceNothingToDo(t *testing.T) { } func TestRebalanceOverload(t *testing.T) { - processes := []proxy.ProcessConfig{ + processes := []proxy.Process{ { NodeID: "node1", Order: "start", @@ -573,7 +599,7 @@ func TestRebalanceOverload(t *testing.T) { } func TestRebalanceSkip(t *testing.T) { - processes := []proxy.ProcessConfig{ + processes := []proxy.Process{ { NodeID: "node1", Order: "start", @@ -667,7 +693,7 @@ func TestRebalanceSkip(t *testing.T) { } func TestRebalanceReferenceAffinity(t *testing.T) { - processes := []proxy.ProcessConfig{ + processes := []proxy.Process{ { NodeID: "node1", Order: "start", @@ -794,7 +820,7 @@ func TestRebalanceReferenceAffinity(t *testing.T) { } func TestCreateReferenceAffinityNodeMap(t *testing.T) { - processes := []proxy.ProcessConfig{ + processes := []proxy.Process{ { NodeID: "node1", Order: "start", diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 6fea2539..1a594fcc 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -27,7 +27,7 @@ type Node interface { GetURL(path string) (string, error) GetFile(path string) (io.ReadCloser, error) - ProcessList() ([]ProcessConfig, error) + ProcessList() ([]Process, error) ProcessAdd(*app.Config) error ProcessStart(id string) error ProcessStop(id string) error @@ -37,11 +37,10 @@ type Node interface { } type NodeReader interface { - ID() string - Address() string IPs() []string Files() NodeFiles - State() NodeState + About() NodeAbout + Version() NodeVersion } type NodeFiles struct { @@ -58,14 +57,27 @@ type NodeResources struct { MemLimit uint64 // Defined memory limit in bytes } -type NodeState struct { +type NodeAbout struct { ID string + Name string + Address string State string + CreatedAt time.Time + Uptime time.Duration LastContact time.Time Latency time.Duration Resources NodeResources } +type NodeVersion struct { + Number string + Commit string + Branch string + Build time.Time + Arch string + Compiler string +} + type nodeState string func (n nodeState) String() string { @@ -364,50 +376,34 @@ func (n *node) StopFiles() { n.cancelFiles() } -func (n *node) Address() string { - return n.address -} - -func (n *node) IPs() []string { - return n.ips -} - -func (n *node) ID() string { +func (n *node) About() NodeAbout { n.peerLock.RLock() - defer n.peerLock.RUnlock() if n.peer == nil { - return "" + n.peerLock.RUnlock() + return NodeAbout{} } - return n.peer.ID() -} + about := n.peer.About() + + n.peerLock.RUnlock() + + createdAt, err := time.Parse(time.RFC3339, about.CreatedAt) + if err != nil { + createdAt = time.Now() + } -func (n *node) Files() NodeFiles { n.stateLock.RLock() defer n.stateLock.RUnlock() - state := NodeFiles{ - ID: n.ID(), - LastUpdate: n.lastUpdate, - } - - if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second { - state.Files = make([]string, len(n.filesList)) - copy(state.Files, n.filesList) - } - - return state -} - -func (n *node) State() NodeState { - n.stateLock.RLock() - defer n.stateLock.RUnlock() - - state := NodeState{ - ID: n.ID(), - LastContact: n.lastContact, + state := NodeAbout{ + ID: about.ID, + Name: about.Name, + Address: n.address, State: n.state.String(), + CreatedAt: createdAt, + Uptime: time.Since(createdAt), + LastContact: n.lastContact, Latency: time.Duration(n.latency * float64(time.Second)), Resources: NodeResources{ NCPU: n.resources.ncpu, @@ -421,6 +417,54 @@ func (n *node) State() NodeState { return state } +func (n *node) Version() NodeVersion { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return NodeVersion{} + } + + about := n.peer.About() + + build, err := time.Parse(time.RFC3339, about.Version.Build) + if err != nil { + build = time.Time{} + } + + version := NodeVersion{ + Number: about.Version.Number, + Commit: about.Version.Commit, + Branch: about.Version.Branch, + Build: build, + Arch: about.Version.Arch, + Compiler: about.Version.Compiler, + } + + return version +} + +func (n *node) IPs() []string { + return n.ips +} + +func (n *node) Files() NodeFiles { + n.stateLock.RLock() + defer n.stateLock.RUnlock() + + state := NodeFiles{ + ID: n.About().ID, + LastUpdate: n.lastUpdate, + } + + if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second { + state.Files = make([]string, len(n.filesList)) + copy(state.Files, n.filesList) + } + + return state +} + func (n *node) files() { filesChan := make(chan string, 1024) filesList := []string{} @@ -603,7 +647,7 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { return nil, fmt.Errorf("unknown prefix") } -func (n *node) ProcessList() ([]ProcessConfig, error) { +func (n *node) ProcessList() ([]Process, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -619,16 +663,17 @@ func (n *node) ProcessList() ([]ProcessConfig, error) { return nil, err } - processes := []ProcessConfig{} + processes := []Process{} for _, p := range list { - process := ProcessConfig{ - NodeID: n.ID(), - Order: p.State.Order, - State: p.State.State, - Mem: p.State.Memory, - Runtime: time.Duration(p.State.Runtime) * time.Second, - Config: p.Config.Marshal(), + process := Process{ + NodeID: n.About().ID, + Order: p.State.Order, + State: p.State.State, + Mem: p.State.Memory, + Runtime: time.Duration(p.State.Runtime) * time.Second, + UpdatedAt: time.Unix(p.UpdatedAt, 0), + Config: p.Config.Marshal(), } if x, err := p.State.CPU.Float64(); err == nil { diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 52a41ea2..866bbd07 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -23,67 +23,19 @@ type Proxy interface { ProxyReader Reader() ProxyReader - ProxyProcessor - Processor() ProxyProcessor -} - -type ProxyProcessor interface { - Resources() map[string]NodeResources - - ProcessList() []ProcessConfig ProcessAdd(nodeid string, config *app.Config) error ProcessDelete(nodeid string, id string) error ProcessStart(nodeid string, id string) error -} - -type proxyProcessor struct { - proxy *proxy -} - -func (p *proxyProcessor) Resources() map[string]NodeResources { - if p.proxy == nil { - return nil - } - - return p.proxy.Resources() -} - -func (p *proxyProcessor) ProcessList() []ProcessConfig { - if p.proxy == nil { - return nil - } - - return p.proxy.ProcessList() -} - -func (p *proxyProcessor) ProcessAdd(nodeid string, config *app.Config) error { - if p.proxy == nil { - return fmt.Errorf("no proxy provided") - } - - return p.proxy.ProcessAdd(nodeid, config) -} - -func (p *proxyProcessor) ProcessDelete(nodeid string, id string) error { - if p.proxy == nil { - return fmt.Errorf("no proxy provided") - } - - return p.proxy.ProcessDelete(nodeid, id) -} - -func (p *proxyProcessor) ProcessStart(nodeid string, id string) error { - if p.proxy == nil { - return fmt.Errorf("no proxy provided") - } - - return p.proxy.ProcessStart(nodeid, id) + ProcessUpdate(nodeid string, id string, config *app.Config) error } type ProxyReader interface { ListNodes() []NodeReader GetNode(id string) (NodeReader, error) + Resources() map[string]NodeResources + ListProcesses() []Process + GetURL(path string) (string, error) GetFile(path string) (io.ReadCloser, error) } @@ -112,6 +64,22 @@ func (p *proxyReader) GetNode(id string) (NodeReader, error) { return p.proxy.GetNode(id) } +func (p *proxyReader) Resources() map[string]NodeResources { + if p.proxy == nil { + return nil + } + + return p.proxy.Resources() +} + +func (p *proxyReader) ListProcesses() []Process { + if p.proxy == nil { + return nil + } + + return p.proxy.ListProcesses() +} + func (p *proxyReader) GetURL(path string) (string, error) { if p.proxy == nil { return "", fmt.Errorf("no proxy provided") @@ -264,12 +232,6 @@ func (p *proxy) Reader() ProxyReader { } } -func (p *proxy) Processor() ProxyProcessor { - return &proxyProcessor{ - proxy: p, - } -} - func (p *proxy) Resources() map[string]NodeResources { resources := map[string]NodeResources{} @@ -277,15 +239,18 @@ func (p *proxy) Resources() map[string]NodeResources { defer p.lock.RUnlock() for _, node := range p.nodes { - resources[node.ID()] = node.State().Resources + about := node.About() + resources[about.ID] = about.Resources } return resources } func (p *proxy) AddNode(id string, node Node) (string, error) { - if id != node.ID() { - return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID()) + about := node.About() + + if id != about.ID { + return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID) } p.lock.Lock() @@ -315,7 +280,8 @@ func (p *proxy) AddNode(id string, node Node) (string, error) { node.StartFiles(p.updates) p.logger.Info().WithFields(log.Fields{ - "address": node.Address(), + "address": about.Address, + "name": about.Name, "id": id, }).Log("Added node") @@ -449,19 +415,20 @@ func (p *proxy) GetFile(path string) (io.ReadCloser, error) { return data, nil } -type ProcessConfig struct { - NodeID string - Order string - State string - CPU float64 // Current CPU load of this process, 0-100*ncpu - Mem uint64 // Currently consumed memory of this process in bytes - Runtime time.Duration - Config *app.Config +type Process struct { + NodeID string + Order string + State string + CPU float64 // Current CPU load of this process, 0-100*ncpu + Mem uint64 // Currently consumed memory of this process in bytes + Runtime time.Duration + UpdatedAt time.Time + Config *app.Config } -func (p *proxy) ProcessList() []ProcessConfig { - processChan := make(chan ProcessConfig, 64) - processList := []ProcessConfig{} +func (p *proxy) ListProcesses() []Process { + processChan := make(chan Process, 64) + processList := []Process{} wgList := sync.WaitGroup{} wgList.Add(1) @@ -469,8 +436,8 @@ func (p *proxy) ProcessList() []ProcessConfig { go func() { defer wgList.Done() - for file := range processChan { - processList = append(processList, file) + for process := range processChan { + processList = append(processList, process) } }() @@ -480,7 +447,7 @@ func (p *proxy) ProcessList() []ProcessConfig { for _, node := range p.nodes { wg.Add(1) - go func(node Node, p chan<- ProcessConfig) { + go func(node Node, p chan<- Process) { defer wg.Done() processes, err := node.ProcessList() @@ -559,3 +526,15 @@ func (p *proxy) ProcessStart(nodeid string, id string) error { return nil } + +func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) error { + p.lock.RLock() + defer p.lock.RUnlock() + + _, ok := p.nodes[nodeid] + if !ok { + return fmt.Errorf("node not found") + } + + return fmt.Errorf("not implemented") +} diff --git a/cluster/store/store.go b/cluster/store/store.go index 9d32eaf3..1abfb0aa 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" @@ -15,8 +16,13 @@ import ( type Store interface { raft.FSM - ProcessList() []app.Config - GetProcess(id string) (app.Config, error) + ProcessList() []Process + GetProcess(id string) (Process, error) +} + +type Process struct { + UpdatedAt time.Time + Config *app.Config } type operation string @@ -42,7 +48,7 @@ type CommandRemoveProcess struct { // Implement a FSM type store struct { lock sync.RWMutex - Process map[string]app.Config + Process map[string]Process logger log.Logger } @@ -53,7 +59,7 @@ type Config struct { func NewStore(config Config) (Store, error) { s := &store{ - Process: map[string]app.Config{}, + Process: map[string]Process{}, logger: config.Logger, } @@ -89,7 +95,10 @@ func (s *store) Apply(entry *raft.Log) interface{} { json.Unmarshal(b, &cmd) s.lock.Lock() - s.Process[cmd.ID] = cmd.Config + s.Process[cmd.ID] = Process{ + UpdatedAt: time.Now(), + Config: &cmd.Config, + } s.lock.Unlock() case OpRemoveProcess: b, _ := json.Marshal(c.Data) @@ -139,29 +148,35 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return nil } -func (s *store) ProcessList() []app.Config { +func (s *store) ProcessList() []Process { s.lock.RLock() defer s.lock.RUnlock() - processes := []app.Config{} + processes := []Process{} - for _, cfg := range s.Process { - processes = append(processes, *cfg.Clone()) + for _, p := range s.Process { + processes = append(processes, Process{ + UpdatedAt: p.UpdatedAt, + Config: p.Config.Clone(), + }) } return processes } -func (s *store) GetProcess(id string) (app.Config, error) { +func (s *store) GetProcess(id string) (Process, error) { s.lock.RLock() defer s.lock.RUnlock() - cfg, ok := s.Process[id] + process, ok := s.Process[id] if !ok { - return app.Config{}, fmt.Errorf("not found") + return Process{}, fmt.Errorf("not found") } - return *cfg.Clone(), nil + return Process{ + UpdatedAt: process.UpdatedAt, + Config: process.Config.Clone(), + }, nil } type fsmSnapshot struct { diff --git a/docs/docs.go b/docs/docs.go index cb56b85f..6b005990 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -219,6 +219,9 @@ const docTemplate = `{ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List of nodes in the cluster", "operationId": "cluster-3-get-cluster", "responses": { @@ -237,7 +240,7 @@ const docTemplate = `{ } } }, - "/api/v3/cluster/proxy": { + "/api/v3/cluster/node": { "get": { "security": [ { @@ -248,8 +251,11 @@ const docTemplate = `{ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List of proxy nodes in the cluster", - "operationId": "cluster-3-get-proxy-nodes", + "operationId": "cluster-3-get-nodes", "responses": { "200": { "description": "OK", @@ -269,7 +275,7 @@ const docTemplate = `{ } } }, - "/api/v3/cluster/proxy/node/{id}": { + "/api/v3/cluster/node/{id}": { "get": { "security": [ { @@ -280,8 +286,11 @@ const docTemplate = `{ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List a proxy node by its ID", - "operationId": "cluster-3-get-proxy-node", + "operationId": "cluster-3-get-node", "parameters": [ { "type": "string", @@ -307,7 +316,7 @@ const docTemplate = `{ } } }, - "/api/v3/cluster/proxy/node/{id}/files": { + "/api/v3/cluster/node/{id}/files": { "get": { "security": [ { @@ -318,8 +327,11 @@ const docTemplate = `{ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List the files of a proxy node by its ID", - "operationId": "cluster-3-get-proxy-node-files", + "operationId": "cluster-3-get-node-files", "parameters": [ { "type": "string", @@ -345,6 +357,120 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of processes in the cluster", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List of processes in the cluster", + "operationId": "cluster-3-list-processes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterProcess" + } + } + } + } + }, + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Add a new FFmpeg process", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Add a new process", + "operationId": "cluster-3-add-process", + "parameters": [ + { + "description": "Process config", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, + "/api/v3/cluster/process/{id}": { + "delete": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Delete a process by its ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Delete a process by its ID", + "operationId": "cluster-3-delete-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ @@ -2251,7 +2377,7 @@ const docTemplate = `{ "id": { "type": "string" }, - "nodes": { + "server": { "type": "array", "items": { "$ref": "#/definitions/api.ClusterServer" @@ -2268,29 +2394,99 @@ const docTemplate = `{ "address": { "type": "string" }, + "created_at": { + "type": "string" + }, "id": { "type": "string" }, - "last_ping": { - "type": "integer" - }, - "last_update": { + "last_contact": { + "description": "unix timestamp", "type": "integer" }, "latency_ms": { "description": "milliseconds", "type": "number" }, + "name": { + "type": "string" + }, + "resources": { + "$ref": "#/definitions/api.ClusterNodeResources" + }, "state": { "type": "string" + }, + "uptime_seconds": { + "type": "integer" } } }, "api.ClusterNodeFiles": { "type": "object", - "additionalProperties": { - "type": "array", - "items": { + "properties": { + "files": { + "type": "object", + "additionalProperties": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "last_update": { + "description": "unix timestamp", + "type": "integer" + } + } + }, + "api.ClusterNodeResources": { + "type": "object", + "properties": { + "cpu_limit": { + "description": "percent 0-100*npcu", + "type": "number" + }, + "cpu_used": { + "description": "percent 0-100*npcu", + "type": "number" + }, + "memory_limit_bytes": { + "type": "integer" + }, + "memory_used_bytes": { + "type": "integer" + }, + "ncpu": { + "type": "number" + } + } + }, + "api.ClusterProcess": { + "type": "object", + "properties": { + "cpu": { + "type": "number" + }, + "id": { + "type": "string" + }, + "memory_bytes": { + "type": "integer" + }, + "node_id": { + "type": "string" + }, + "order": { + "type": "string" + }, + "reference": { + "type": "string" + }, + "runtime_seconds": { + "type": "integer" + }, + "state": { "type": "string" } } diff --git a/docs/swagger.json b/docs/swagger.json index 15bcee14..5409f3b1 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -212,6 +212,9 @@ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List of nodes in the cluster", "operationId": "cluster-3-get-cluster", "responses": { @@ -230,7 +233,7 @@ } } }, - "/api/v3/cluster/proxy": { + "/api/v3/cluster/node": { "get": { "security": [ { @@ -241,8 +244,11 @@ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List of proxy nodes in the cluster", - "operationId": "cluster-3-get-proxy-nodes", + "operationId": "cluster-3-get-nodes", "responses": { "200": { "description": "OK", @@ -262,7 +268,7 @@ } } }, - "/api/v3/cluster/proxy/node/{id}": { + "/api/v3/cluster/node/{id}": { "get": { "security": [ { @@ -273,8 +279,11 @@ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List a proxy node by its ID", - "operationId": "cluster-3-get-proxy-node", + "operationId": "cluster-3-get-node", "parameters": [ { "type": "string", @@ -300,7 +309,7 @@ } } }, - "/api/v3/cluster/proxy/node/{id}/files": { + "/api/v3/cluster/node/{id}/files": { "get": { "security": [ { @@ -311,8 +320,11 @@ "produces": [ "application/json" ], + "tags": [ + "v16.?.?" + ], "summary": "List the files of a proxy node by its ID", - "operationId": "cluster-3-get-proxy-node-files", + "operationId": "cluster-3-get-node-files", "parameters": [ { "type": "string", @@ -338,6 +350,120 @@ } } }, + "/api/v3/cluster/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of processes in the cluster", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List of processes in the cluster", + "operationId": "cluster-3-list-processes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterProcess" + } + } + } + } + }, + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Add a new FFmpeg process", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Add a new process", + "operationId": "cluster-3-add-process", + "parameters": [ + { + "description": "Process config", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, + "/api/v3/cluster/process/{id}": { + "delete": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Delete a process by its ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Delete a process by its ID", + "operationId": "cluster-3-delete-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ @@ -2244,7 +2370,7 @@ "id": { "type": "string" }, - "nodes": { + "server": { "type": "array", "items": { "$ref": "#/definitions/api.ClusterServer" @@ -2261,29 +2387,99 @@ "address": { "type": "string" }, + "created_at": { + "type": "string" + }, "id": { "type": "string" }, - "last_ping": { - "type": "integer" - }, - "last_update": { + "last_contact": { + "description": "unix timestamp", "type": "integer" }, "latency_ms": { "description": "milliseconds", "type": "number" }, + "name": { + "type": "string" + }, + "resources": { + "$ref": "#/definitions/api.ClusterNodeResources" + }, "state": { "type": "string" + }, + "uptime_seconds": { + "type": "integer" } } }, "api.ClusterNodeFiles": { "type": "object", - "additionalProperties": { - "type": "array", - "items": { + "properties": { + "files": { + "type": "object", + "additionalProperties": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "last_update": { + "description": "unix timestamp", + "type": "integer" + } + } + }, + "api.ClusterNodeResources": { + "type": "object", + "properties": { + "cpu_limit": { + "description": "percent 0-100*npcu", + "type": "number" + }, + "cpu_used": { + "description": "percent 0-100*npcu", + "type": "number" + }, + "memory_limit_bytes": { + "type": "integer" + }, + "memory_used_bytes": { + "type": "integer" + }, + "ncpu": { + "type": "number" + } + } + }, + "api.ClusterProcess": { + "type": "object", + "properties": { + "cpu": { + "type": "number" + }, + "id": { + "type": "string" + }, + "memory_bytes": { + "type": "integer" + }, + "node_id": { + "type": "string" + }, + "order": { + "type": "string" + }, + "reference": { + "type": "string" + }, + "runtime_seconds": { + "type": "integer" + }, + "state": { "type": "string" } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 93b37b8b..24c12f3f 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -75,7 +75,7 @@ definitions: type: string id: type: string - nodes: + server: items: $ref: '#/definitions/api.ClusterServer' type: array @@ -86,23 +86,70 @@ definitions: properties: address: type: string + created_at: + type: string id: type: string - last_ping: - type: integer - last_update: + last_contact: + description: unix timestamp type: integer latency_ms: description: milliseconds type: number + name: + type: string + resources: + $ref: '#/definitions/api.ClusterNodeResources' state: type: string + uptime_seconds: + type: integer type: object api.ClusterNodeFiles: - additionalProperties: - items: + properties: + files: + additionalProperties: + items: + type: string + type: array + type: object + last_update: + description: unix timestamp + type: integer + type: object + api.ClusterNodeResources: + properties: + cpu_limit: + description: percent 0-100*npcu + type: number + cpu_used: + description: percent 0-100*npcu + type: number + memory_limit_bytes: + type: integer + memory_used_bytes: + type: integer + ncpu: + type: number + type: object + api.ClusterProcess: + properties: + cpu: + type: number + id: + type: string + memory_bytes: + type: integer + node_id: + type: string + order: + type: string + reference: + type: string + runtime_seconds: + type: integer + state: type: string - type: array type: object api.ClusterServer: properties: @@ -2179,10 +2226,12 @@ paths: security: - ApiKeyAuth: [] summary: List of nodes in the cluster - /api/v3/cluster/proxy: + tags: + - v16.?.? + /api/v3/cluster/node: get: description: List of proxy nodes in the cluster - operationId: cluster-3-get-proxy-nodes + operationId: cluster-3-get-nodes produces: - application/json responses: @@ -2199,10 +2248,12 @@ paths: security: - ApiKeyAuth: [] summary: List of proxy nodes in the cluster - /api/v3/cluster/proxy/node/{id}: + tags: + - v16.?.? + /api/v3/cluster/node/{id}: get: description: List a proxy node by its ID - operationId: cluster-3-get-proxy-node + operationId: cluster-3-get-node parameters: - description: Node ID in: path @@ -2223,10 +2274,12 @@ paths: security: - ApiKeyAuth: [] summary: List a proxy node by its ID - /api/v3/cluster/proxy/node/{id}/files: + tags: + - v16.?.? + /api/v3/cluster/node/{id}/files: get: description: List the files of a proxy node by its ID - operationId: cluster-3-get-proxy-node-files + operationId: cluster-3-get-node-files parameters: - description: Node ID in: path @@ -2247,6 +2300,80 @@ paths: security: - ApiKeyAuth: [] summary: List the files of a proxy node by its ID + tags: + - v16.?.? + /api/v3/cluster/process: + get: + description: List of processes in the cluster + operationId: cluster-3-list-processes + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/api.ClusterProcess' + type: array + security: + - ApiKeyAuth: [] + summary: List of processes in the cluster + tags: + - v16.?.? + post: + consumes: + - application/json + description: Add a new FFmpeg process + operationId: cluster-3-add-process + parameters: + - description: Process config + in: body + name: config + required: true + schema: + $ref: '#/definitions/api.ProcessConfig' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ProcessConfig' + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Add a new process + tags: + - v16.?.? + /api/v3/cluster/process/{id}: + delete: + description: Delete a process by its ID + operationId: cluster-3-delete-process + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Delete a process by its ID + tags: + - v16.?.? /api/v3/config: get: description: Retrieve the currently active Restreamer configuration diff --git a/encoding/json/json.go b/encoding/json/json.go index dd8d19bf..e0984bfd 100644 --- a/encoding/json/json.go +++ b/encoding/json/json.go @@ -58,3 +58,15 @@ func lineAndCharacter(input []byte, offset int) (line int, character int, err er return line, character, nil } + +func ToNumber(f float64) json.Number { + var s string + + if f == float64(int64(f)) { + s = fmt.Sprintf("%.0f", f) // 0 decimal if integer + } else { + s = fmt.Sprintf("%.3f", f) // max. 3 decimal if float + } + + return json.Number(s) +} diff --git a/http/api/cluster.go b/http/api/cluster.go index 807394b5..b14948c8 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -1,19 +1,25 @@ package api -type ClusterNodeConfig struct { - Address string `json:"address"` - Username string `json:"username"` - Password string `json:"password"` -} +import "encoding/json" type ClusterNode struct { - Address string `json:"address"` - ID string `json:"id"` - LastContact int64 `json:"last_contact"` // unix timestamp - Latency float64 `json:"latency_ms"` // milliseconds - State string `json:"state"` - CPU float64 `json:"cpu_used"` // percent 0-100*npcu - Mem uint64 `json:"mem_used" format:"uint64"` // bytes + ID string `json:"id"` + Name string `json:"name"` + Address string `json:"address"` + CreatedAt string `json:"created_at"` + Uptime int64 `json:"uptime_seconds"` + LastContact int64 `json:"last_contact"` // unix timestamp + Latency float64 `json:"latency_ms"` // milliseconds + State string `json:"state"` + Resources ClusterNodeResources `json:"resources"` +} + +type ClusterNodeResources struct { + NCPU float64 `json:"ncpu"` + CPU float64 `json:"cpu_used"` // percent 0-100*npcu + CPULimit float64 `json:"cpu_limit"` // percent 0-100*npcu + Mem uint64 `json:"memory_used_bytes"` + MemLimit uint64 `json:"memory_limit_bytes"` } type ClusterNodeFiles struct { @@ -28,6 +34,17 @@ type ClusterServer struct { Leader bool `json:"leader"` } +type ClusterProcess struct { + ProcessID string `json:"id"` + NodeID string `json:"node_id"` + Reference string `json:"reference"` + Order string `json:"order"` + State string `json:"state"` + CPU json.Number `json:"cpu" swaggertype:"number" jsonschema:"type=number"` + Memory uint64 `json:"memory_bytes"` + Runtime int64 `json:"runtime_seconds"` +} + type ClusterStats struct { State string `json:"state"` LastContact float64 `json:"last_contact_ms"` @@ -39,6 +56,6 @@ type ClusterAbout struct { Address string `json:"address"` ClusterAPIAddress string `json:"cluster_api_address"` CoreAPIAddress string `json:"core_api_address"` - Nodes []ClusterServer `json:"nodes"` + Server []ClusterServer `json:"server"` Stats ClusterStats `json:"stats"` } diff --git a/http/api/json.go b/http/api/json.go index 2b2dcd60..365f3ce0 100644 --- a/http/api/json.go +++ b/http/api/json.go @@ -5,7 +5,7 @@ import ( "fmt" ) -func toNumber(f float64) json.Number { +func ToNumber(f float64) json.Number { var s string if f == float64(int64(f)) { diff --git a/http/api/probe.go b/http/api/probe.go index dda8b260..75a975e3 100644 --- a/http/api/probe.go +++ b/http/api/probe.go @@ -45,10 +45,10 @@ func (i *ProbeIO) Unmarshal(io *app.ProbeIO) { i.Type = io.Type i.Codec = io.Codec i.Coder = io.Coder - i.Bitrate = toNumber(io.Bitrate) - i.Duration = toNumber(io.Duration) + i.Bitrate = ToNumber(io.Bitrate) + i.Duration = ToNumber(io.Duration) - i.FPS = toNumber(io.FPS) + i.FPS = ToNumber(io.FPS) i.Pixfmt = io.Pixfmt i.Width = io.Width i.Height = io.Height diff --git a/http/api/process.go b/http/api/process.go index d52d4393..98472459 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -258,7 +258,7 @@ func (s *ProcessState) Unmarshal(state *app.State) { s.LastLog = state.LastLog s.Progress = &Progress{} s.Memory = state.Memory - s.CPU = toNumber(state.CPU) + s.CPU = ToNumber(state.CPU) s.Command = state.Command s.Progress.Unmarshal(&state.Progress) diff --git a/http/api/progress.go b/http/api/progress.go index 051eea5b..63ad801e 100644 --- a/http/api/progress.go +++ b/http/api/progress.go @@ -118,12 +118,12 @@ func (progress *Progress) Unmarshal(p *app.Progress) { progress.Output = make([]ProgressIO, len(p.Output)) progress.Frame = p.Frame progress.Packet = p.Packet - progress.FPS = toNumber(p.FPS) - progress.Quantizer = toNumber(p.Quantizer) + progress.FPS = ToNumber(p.FPS) + progress.Quantizer = ToNumber(p.Quantizer) progress.Size = p.Size / 1024 - progress.Time = toNumber(p.Time) - progress.Bitrate = toNumber(p.Bitrate / 1024) - progress.Speed = toNumber(p.Speed) + progress.Time = ToNumber(p.Time) + progress.Bitrate = ToNumber(p.Bitrate / 1024) + progress.Speed = ToNumber(p.Speed) progress.Drop = p.Drop progress.Dup = p.Dup diff --git a/http/api/session.go b/http/api/session.go index c616121f..d76275c4 100644 --- a/http/api/session.go +++ b/http/api/session.go @@ -43,8 +43,8 @@ func (s *Session) Unmarshal(sess session.Session) { s.Extra = sess.Extra s.RxBytes = sess.RxBytes s.TxBytes = sess.TxBytes - s.RxBitrate = toNumber(sess.RxBitrate / 1024) - s.TxBitrate = toNumber(sess.TxBitrate / 1024) + s.RxBitrate = ToNumber(sess.RxBitrate / 1024) + s.TxBitrate = ToNumber(sess.TxBitrate / 1024) } // SessionSummaryActive represents the currently active sessions @@ -81,12 +81,12 @@ type SessionsActive map[string][]Session // Unmarshal creates a new SessionSummary from a session.Summary func (summary *SessionSummary) Unmarshal(sum session.Summary) { summary.Active.MaxSessions = sum.MaxSessions - summary.Active.MaxRxBitrate = toNumber(sum.MaxRxBitrate / 1024 / 1024) - summary.Active.MaxTxBitrate = toNumber(sum.MaxTxBitrate / 1024 / 1024) + summary.Active.MaxRxBitrate = ToNumber(sum.MaxRxBitrate / 1024 / 1024) + summary.Active.MaxTxBitrate = ToNumber(sum.MaxTxBitrate / 1024 / 1024) summary.Active.Sessions = sum.CurrentSessions - summary.Active.RxBitrate = toNumber(sum.CurrentRxBitrate / 1024 / 1024) - summary.Active.TxBitrate = toNumber(sum.CurrentTxBitrate / 1024 / 1024) + summary.Active.RxBitrate = ToNumber(sum.CurrentRxBitrate / 1024 / 1024) + summary.Active.TxBitrate = ToNumber(sum.CurrentTxBitrate / 1024 / 1024) summary.Active.SessionList = make([]Session, len(sum.Active)) diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 0794c08b..598ba7cc 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -4,11 +4,14 @@ import ( "net/http" "sort" "strings" + "time" "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" + "github.com/labstack/echo/v4" "github.com/lithammer/shortuuid/v4" ) @@ -27,31 +30,33 @@ func NewCluster(cluster cluster.Cluster) *ClusterHandler { } } -// GetProxyNodes returns the list of proxy nodes in the cluster +// GetNodes returns the list of proxy nodes in the cluster // @Summary List of proxy nodes in the cluster // @Description List of proxy nodes in the cluster // @Tags v16.?.? -// @ID cluster-3-get-proxy-nodes +// @ID cluster-3-get-nodes // @Produce json // @Success 200 {array} api.ClusterNode // @Failure 404 {object} api.Error // @Security ApiKeyAuth -// @Router /api/v3/cluster/proxy [get] -func (h *ClusterHandler) GetProxyNodes(c echo.Context) error { +// @Router /api/v3/cluster/node [get] +func (h *ClusterHandler) GetNodes(c echo.Context) error { nodes := h.proxy.ListNodes() list := []api.ClusterNode{} for _, node := range nodes { - state := node.State() + about := node.About() n := api.ClusterNode{ - Address: node.Address(), - ID: state.ID, - LastContact: state.LastContact.Unix(), - Latency: state.Latency.Seconds() * 1000, - State: state.State, - CPU: state.Resources.CPU, - Mem: state.Resources.Mem, + ID: about.ID, + Name: about.Name, + Address: about.Address, + CreatedAt: about.CreatedAt.Format(time.RFC3339), + Uptime: int64(about.Uptime.Seconds()), + LastContact: about.LastContact.Unix(), + Latency: about.Latency.Seconds() * 1000, + State: about.State, + Resources: api.ClusterNodeResources(about.Resources), } list = append(list, n) @@ -60,18 +65,18 @@ func (h *ClusterHandler) GetProxyNodes(c echo.Context) error { return c.JSON(http.StatusOK, list) } -// GetProxyNode returns the proxy node with the given ID +// GetNode returns the proxy node with the given ID // @Summary List a proxy node by its ID // @Description List a proxy node by its ID // @Tags v16.?.? -// @ID cluster-3-get-proxy-node +// @ID cluster-3-get-node // @Produce json // @Param id path string true "Node ID" // @Success 200 {object} api.ClusterNode // @Failure 404 {object} api.Error // @Security ApiKeyAuth -// @Router /api/v3/cluster/proxy/node/{id} [get] -func (h *ClusterHandler) GetProxyNode(c echo.Context) error { +// @Router /api/v3/cluster/node/{id} [get] +func (h *ClusterHandler) GetNode(c echo.Context) error { id := util.PathParam(c, "id") peer, err := h.proxy.GetNode(id) @@ -79,33 +84,35 @@ func (h *ClusterHandler) GetProxyNode(c echo.Context) error { return api.Err(http.StatusNotFound, "Node not found", "%s", err) } - state := peer.State() + about := peer.About() node := api.ClusterNode{ - Address: peer.Address(), - ID: state.ID, - LastContact: state.LastContact.Unix(), - Latency: state.Latency.Seconds() * 1000, - State: state.State, - CPU: state.Resources.CPU, - Mem: state.Resources.Mem, + ID: about.ID, + Name: about.Name, + Address: about.Address, + CreatedAt: about.CreatedAt.Format(time.RFC3339), + Uptime: int64(about.Uptime.Seconds()), + LastContact: about.LastContact.Unix(), + Latency: about.Latency.Seconds() * 1000, + State: about.State, + Resources: api.ClusterNodeResources(about.Resources), } return c.JSON(http.StatusOK, node) } -// GetProxyNodeFiles returns the files from the proxy node with the given ID +// GetNodeFiles returns the files from the proxy node with the given ID // @Summary List the files of a proxy node by its ID // @Description List the files of a proxy node by its ID // @Tags v16.?.? -// @ID cluster-3-get-proxy-node-files +// @ID cluster-3-get-node-files // @Produce json // @Param id path string true "Node ID" // @Success 200 {object} api.ClusterNodeFiles // @Failure 404 {object} api.Error // @Security ApiKeyAuth -// @Router /api/v3/cluster/proxy/node/{id}/files [get] -func (h *ClusterHandler) GetProxyNodeFiles(c echo.Context) error { +// @Router /api/v3/cluster/node/{id}/files [get] +func (h *ClusterHandler) GetNodeFiles(c echo.Context) error { id := util.PathParam(c, "id") peer, err := h.proxy.GetNode(id) @@ -153,7 +160,7 @@ func (h *ClusterHandler) About(c echo.Context) error { Address: state.Address, ClusterAPIAddress: state.ClusterAPIAddress, CoreAPIAddress: state.CoreAPIAddress, - Nodes: []api.ClusterServer{}, + Server: []api.ClusterServer{}, Stats: api.ClusterStats{ State: state.Stats.State, LastContact: state.Stats.LastContact.Seconds() * 1000, @@ -162,7 +169,7 @@ func (h *ClusterHandler) About(c echo.Context) error { } for _, n := range state.Nodes { - about.Nodes = append(about.Nodes, api.ClusterServer{ + about.Server = append(about.Server, api.ClusterServer{ ID: n.ID, Address: n.Address, Voter: n.Voter, @@ -173,6 +180,36 @@ func (h *ClusterHandler) About(c echo.Context) error { return c.JSON(http.StatusOK, about) } +// ListProcesses returns the list of processes in the cluster +// @Summary List of processes in the cluster +// @Description List of processes in the cluster +// @Tags v16.?.? +// @ID cluster-3-list-processes +// @Produce json +// @Success 200 {array} api.ClusterProcess +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process [get] +func (h *ClusterHandler) ListProcesses(c echo.Context) error { + procs := h.proxy.ListProcesses() + + processes := []api.ClusterProcess{} + + for _, p := range procs { + processes = append(processes, api.ClusterProcess{ + ProcessID: p.Config.ID, + NodeID: p.NodeID, + Reference: p.Config.Reference, + Order: p.Order, + State: p.State, + CPU: json.ToNumber(p.CPU), + Memory: p.Mem, + Runtime: int64(p.Runtime.Seconds()), + }) + } + + return c.JSON(http.StatusOK, processes) +} + // Add adds a new process to the cluster // @Summary Add a new process // @Description Add a new FFmpeg process diff --git a/http/server.go b/http/server.go index 9b7e5583..99b8d988 100644 --- a/http/server.go +++ b/http/server.go @@ -661,9 +661,11 @@ func (s *server) setRoutesV3(v3 *echo.Group) { if s.v3handler.cluster != nil { v3.GET("/cluster", s.v3handler.cluster.About) - v3.GET("/cluster/proxy", s.v3handler.cluster.GetProxyNodes) - v3.GET("/cluster/proxy/node/:id", s.v3handler.cluster.GetProxyNode) - v3.GET("/cluster/proxy/node/:id/files", s.v3handler.cluster.GetProxyNodeFiles) + v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) + v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) + v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeFiles) + + v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses) if !s.readOnly { v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)