mirror of
https://github.com/datarhei/core.git
synced 2025-12-24 13:07:56 +08:00
137 lines
2.6 KiB
Go
137 lines
2.6 KiB
Go
package cluster
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/datarhei/core/v16/cluster/store"
|
|
"github.com/datarhei/core/v16/restream/app"
|
|
)
|
|
|
|
func (c *cluster) ListProcesses() []store.Process {
|
|
return c.store.ListProcesses()
|
|
}
|
|
|
|
func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) {
|
|
return c.store.GetProcess(id)
|
|
}
|
|
|
|
func (c *cluster) AddProcess(origin string, config *app.Config) error {
|
|
if ok, _ := c.IsDegraded(); ok {
|
|
return ErrDegraded
|
|
}
|
|
|
|
if !c.IsRaftLeader() {
|
|
return c.forwarder.AddProcess(origin, config)
|
|
}
|
|
|
|
cmd := &store.Command{
|
|
Operation: store.OpAddProcess,
|
|
Data: &store.CommandAddProcess{
|
|
Config: config,
|
|
},
|
|
}
|
|
|
|
return c.applyCommand(cmd)
|
|
}
|
|
|
|
func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error {
|
|
if ok, _ := c.IsDegraded(); ok {
|
|
return ErrDegraded
|
|
}
|
|
|
|
if !c.IsRaftLeader() {
|
|
return c.forwarder.RemoveProcess(origin, id)
|
|
}
|
|
|
|
cmd := &store.Command{
|
|
Operation: store.OpRemoveProcess,
|
|
Data: &store.CommandRemoveProcess{
|
|
ID: id,
|
|
},
|
|
}
|
|
|
|
return c.applyCommand(cmd)
|
|
}
|
|
|
|
func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error {
|
|
if ok, _ := c.IsDegraded(); ok {
|
|
return ErrDegraded
|
|
}
|
|
|
|
if !c.IsRaftLeader() {
|
|
return c.forwarder.UpdateProcess(origin, id, config)
|
|
}
|
|
|
|
cmd := &store.Command{
|
|
Operation: store.OpUpdateProcess,
|
|
Data: &store.CommandUpdateProcess{
|
|
ID: id,
|
|
Config: config,
|
|
},
|
|
}
|
|
|
|
return c.applyCommand(cmd)
|
|
}
|
|
|
|
func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error {
|
|
if ok, _ := c.IsDegraded(); ok {
|
|
return ErrDegraded
|
|
}
|
|
|
|
if !c.IsRaftLeader() {
|
|
return c.forwarder.SetProcessCommand(origin, id, command)
|
|
}
|
|
|
|
if command == "start" || command == "stop" {
|
|
cmd := &store.Command{
|
|
Operation: store.OpSetProcessOrder,
|
|
Data: &store.CommandSetProcessOrder{
|
|
ID: id,
|
|
Order: command,
|
|
},
|
|
}
|
|
|
|
return c.applyCommand(cmd)
|
|
}
|
|
|
|
procs := c.proxy.ListProxyProcesses()
|
|
nodeid := ""
|
|
|
|
for _, p := range procs {
|
|
if p.Config.ProcessID() != id {
|
|
continue
|
|
}
|
|
|
|
nodeid = p.NodeID
|
|
|
|
break
|
|
}
|
|
|
|
if len(nodeid) == 0 {
|
|
return fmt.Errorf("the process '%s' is not registered with any node", id.String())
|
|
}
|
|
|
|
return c.proxy.CommandProcess(nodeid, id, command)
|
|
}
|
|
|
|
func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
|
|
if ok, _ := c.IsDegraded(); ok {
|
|
return ErrDegraded
|
|
}
|
|
|
|
if !c.IsRaftLeader() {
|
|
return c.forwarder.SetProcessMetadata(origin, id, key, data)
|
|
}
|
|
|
|
cmd := &store.Command{
|
|
Operation: store.OpSetProcessMetadata,
|
|
Data: &store.CommandSetProcessMetadata{
|
|
ID: id,
|
|
Key: key,
|
|
Data: data,
|
|
},
|
|
}
|
|
|
|
return c.applyCommand(cmd)
|
|
}
|