Allow to set process metadata in the cluster

This commit is contained in:
Ingo Oppermann
2023-06-02 14:17:35 +02:00
parent e9ba394c2e
commit e532531eeb
15 changed files with 284 additions and 67 deletions

View File

@@ -212,6 +212,31 @@ func NewAPI(config APIConfig) (API, error) {
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
}) })
a.router.PUT("/v1/process/:id/metadata/:key", func(c echo.Context) error {
id := util.PathParam(c, "id")
key := util.PathParam(c, "key")
r := client.SetProcessMetadataRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil {
return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit")
}
err := a.cluster.SetProcessMetadata(origin, id, key, r.Metadata)
if err != nil {
a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to update metadata")
return httpapi.Err(http.StatusInternalServerError, "unable to update metadata", "%s", err)
}
return c.JSON(http.StatusOK, "OK")
})
a.router.POST("/v1/iam/user", func(c echo.Context) error { a.router.POST("/v1/iam/user", func(c echo.Context) error {
r := client.AddIdentityRequest{} r := client.AddIdentityRequest{}

View File

@@ -32,6 +32,12 @@ type UpdateProcessRequest struct {
Config app.Config `json:"config"` Config app.Config `json:"config"`
} }
type SetProcessMetadataRequest struct {
ID string `json:"id"`
Key string `json:"key"`
Metadata interface{} `json:"metadata"`
}
type AddIdentityRequest struct { type AddIdentityRequest struct {
Identity iamidentity.User `json:"identity"` Identity iamidentity.User `json:"identity"`
} }
@@ -111,6 +117,17 @@ func (c *APIClient) UpdateProcess(origin string, r UpdateProcessRequest) error {
return err return err
} }
func (c *APIClient) SetProcessMetadata(origin string, r SetProcessMetadataRequest) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
_, err = c.call(http.MethodPut, "/process/"+r.ID+"/metadata/"+r.Key, "application/json", bytes.NewReader(data), origin)
return err
}
func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error { func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error {
data, err := json.Marshal(r) data, err := json.Marshal(r)
if err != nil { if err != nil {

View File

@@ -70,6 +70,7 @@ type Cluster interface {
AddProcess(origin string, config *app.Config) error AddProcess(origin string, config *app.Config) error
RemoveProcess(origin, id string) error RemoveProcess(origin, id string) error
UpdateProcess(origin, id string, config *app.Config) error UpdateProcess(origin, id string, config *app.Config) error
SetProcessMetadata(origin, id, key string, data interface{}) error
IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error)
ListIdentities() (time.Time, []iamidentity.User) ListIdentities() (time.Time, []iamidentity.User)
@@ -756,6 +757,23 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error {
return c.applyCommand(cmd) return c.applyCommand(cmd)
} }
func (c *cluster) SetProcessMetadata(origin, id, key string, data interface{}) error {
if !c.IsRaftLeader() {
return c.forwarder.SetProcessMetadata(origin, id, key, data)
}
cmd := &store.Command{
Operation: store.OpSetProcessMetadata,
Data: &store.CommandSetProcessMetadata{
ID: id,
Key: key,
Data: data,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) { func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) {
policyAdapter, err := clusteriam.NewPolicyAdapter(c.store) policyAdapter, err := clusteriam.NewPolicyAdapter(c.store)
if err != nil { if err != nil {

View File

@@ -24,6 +24,7 @@ type Forwarder interface {
AddProcess(origin string, config *app.Config) error AddProcess(origin string, config *app.Config) error
UpdateProcess(origin, id string, config *app.Config) error UpdateProcess(origin, id string, config *app.Config) error
SetProcessMetadata(origin, id, key string, data interface{}) error
RemoveProcess(origin, id string) error RemoveProcess(origin, id string) error
AddIdentity(origin string, identity iamidentity.User) error AddIdentity(origin string, identity iamidentity.User) error
@@ -171,6 +172,24 @@ func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error {
return client.UpdateProcess(origin, r) return client.UpdateProcess(origin, r)
} }
func (f *forwarder) SetProcessMetadata(origin, id, key string, data interface{}) error {
if origin == "" {
origin = f.id
}
r := apiclient.SetProcessMetadataRequest{
ID: id,
Key: key,
Metadata: data,
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.SetProcessMetadata(origin, r)
}
func (f *forwarder) RemoveProcess(origin, id string) error { func (f *forwarder) RemoveProcess(origin, id string) error {
if origin == "" { if origin == "" {
origin = f.id origin = f.id

View File

@@ -331,6 +331,7 @@ type processOpMove struct {
fromNodeid string fromNodeid string
toNodeid string toNodeid string
config *app.Config config *app.Config
metadata map[string]interface{}
} }
type processOpStart struct { type processOpStart struct {
@@ -341,12 +342,14 @@ type processOpStart struct {
type processOpAdd struct { type processOpAdd struct {
nodeid string nodeid string
config *app.Config config *app.Config
metadata map[string]interface{}
} }
type processOpUpdate struct { type processOpUpdate struct {
nodeid string nodeid string
processid string processid string
config *app.Config config *app.Config
metadata map[string]interface{}
} }
type processOpReject struct { type processOpReject struct {
@@ -364,7 +367,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
for _, op := range stack { for _, op := range stack {
switch v := op.(type) { switch v := op.(type) {
case processOpAdd: case processOpAdd:
err := c.proxy.ProcessAdd(v.nodeid, v.config) err := c.proxy.ProcessAdd(v.nodeid, v.config, v.metadata)
if err != nil { if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ID,
@@ -385,7 +388,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Adding process") }).Log("Adding process")
case processOpUpdate: case processOpUpdate:
err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config) err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config, v.metadata)
if err != nil { if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ID,
@@ -411,7 +414,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
"nodeid": v.nodeid, "nodeid": v.nodeid,
}).Log("Removing process") }).Log("Removing process")
case processOpMove: case processOpMove:
err := c.proxy.ProcessAdd(v.toNodeid, v.config) err := c.proxy.ProcessAdd(v.toNodeid, v.config, v.metadata)
if err != nil { if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{ c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID, "processid": v.config.ID,
@@ -538,6 +541,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
nodeid: p.NodeID, nodeid: p.NodeID,
processid: p.Config.ID, processid: p.Config.ID,
config: wantP.Config, config: wantP.Config,
metadata: wantP.Metadata,
}) })
} }
} }
@@ -581,7 +585,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
if len(process.Config.Reference) != 0 { if len(process.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[process.Config.Reference] { for _, count := range haveReferenceAffinityMap[process.Config.Reference] {
r := resources[count.nodeid] r := resources[count.nodeid]
cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given cpu := process.Config.LimitCPU
mem := process.Config.LimitMemory mem := process.Config.LimitMemory
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
@@ -594,7 +598,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
// Find the node with the most resources available // Find the node with the most resources available
if len(nodeid) == 0 { if len(nodeid) == 0 {
for id, r := range resources { for id, r := range resources {
cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given cpu := process.Config.LimitCPU
mem := process.Config.LimitMemory mem := process.Config.LimitMemory
if len(nodeid) == 0 { if len(nodeid) == 0 {
@@ -615,12 +619,13 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
opStack = append(opStack, processOpAdd{ opStack = append(opStack, processOpAdd{
nodeid: nodeid, nodeid: nodeid,
config: process.Config, config: process.Config,
metadata: process.Metadata,
}) })
// Adjust the resources // Adjust the resources
r, ok := resources[nodeid] r, ok := resources[nodeid]
if ok { if ok {
r.CPU += process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given r.CPU += process.Config.LimitCPU
r.Mem += process.Config.LimitMemory r.Mem += process.Config.LimitMemory
resources[nodeid] = r resources[nodeid] = r
} }
@@ -785,6 +790,7 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [
fromNodeid: overloadedNodeid, fromNodeid: overloadedNodeid,
toNodeid: availableNodeid, toNodeid: availableNodeid,
config: p.Config, config: p.Config,
metadata: p.Metadata,
}) })
// Adjust the process // Adjust the process

View File

@@ -26,11 +26,11 @@ type Node interface {
GetURL(prefix, path string) (*url.URL, error) GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string) (io.ReadCloser, error) GetFile(prefix, path string) (io.ReadCloser, error)
ProcessAdd(*app.Config) error ProcessAdd(config *app.Config, metadata map[string]interface{}) error
ProcessStart(id string) error ProcessStart(id string) error
ProcessStop(id string) error ProcessStop(id string) error
ProcessDelete(id string) error ProcessDelete(id string) error
ProcessUpdate(id string, config *app.Config) error ProcessUpdate(id string, config *app.Config, metadata map[string]interface{}) error
NodeReader NodeReader
} }
@@ -696,6 +696,7 @@ func (n *node) ProcessList() ([]Process, error) {
Filter: []string{ Filter: []string{
"state", "state",
"config", "config",
"metadata",
}, },
}) })
if err != nil { if err != nil {
@@ -713,6 +714,7 @@ func (n *node) ProcessList() ([]Process, error) {
CPU: p.State.CPU * n.resources.ncpu, CPU: p.State.CPU * n.resources.ncpu,
Runtime: time.Duration(p.State.Runtime) * time.Second, Runtime: time.Duration(p.State.Runtime) * time.Second,
UpdatedAt: time.Unix(p.UpdatedAt, 0), UpdatedAt: time.Unix(p.UpdatedAt, 0),
Metadata: p.Metadata,
} }
cfg := &app.Config{ cfg := &app.Config{
@@ -726,7 +728,7 @@ func (n *node) ProcessList() ([]Process, error) {
Autostart: p.Config.Autostart, Autostart: p.Config.Autostart,
StaleTimeout: p.Config.StaleTimeout, StaleTimeout: p.Config.StaleTimeout,
LimitCPU: p.Config.Limits.CPU, LimitCPU: p.Config.Limits.CPU,
LimitMemory: p.Config.Limits.Memory, LimitMemory: p.Config.Limits.Memory * 1024 * 1024,
LimitWaitFor: p.Config.Limits.WaitFor, LimitWaitFor: p.Config.Limits.WaitFor,
} }
@@ -766,7 +768,7 @@ func (n *node) ProcessList() ([]Process, error) {
return processes, nil return processes, nil
} }
func (n *node) ProcessAdd(config *app.Config) error { func (n *node) ProcessAdd(config *app.Config, metadata map[string]interface{}) error {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
@@ -774,12 +776,12 @@ func (n *node) ProcessAdd(config *app.Config) error {
return fmt.Errorf("not connected") return fmt.Errorf("not connected")
} }
cfg := convertConfig(config) cfg := convertConfig(config, metadata)
return n.peer.ProcessAdd(cfg) return n.peer.ProcessAdd(cfg)
} }
func convertConfig(config *app.Config) clientapi.ProcessConfig { func convertConfig(config *app.Config, metadata map[string]interface{}) clientapi.ProcessConfig {
cfg := clientapi.ProcessConfig{ cfg := clientapi.ProcessConfig{
ID: config.ID, ID: config.ID,
Type: "ffmpeg", Type: "ffmpeg",
@@ -793,9 +795,10 @@ func convertConfig(config *app.Config) clientapi.ProcessConfig {
StaleTimeout: config.StaleTimeout, StaleTimeout: config.StaleTimeout,
Limits: clientapi.ProcessConfigLimits{ Limits: clientapi.ProcessConfigLimits{
CPU: config.LimitCPU, CPU: config.LimitCPU,
Memory: config.LimitMemory, Memory: config.LimitMemory / 1024 / 1024,
WaitFor: config.LimitWaitFor, WaitFor: config.LimitWaitFor,
}, },
Metadata: metadata,
} }
for _, d := range config.Input { for _, d := range config.Input {
@@ -862,7 +865,7 @@ func (n *node) ProcessDelete(id string) error {
return n.peer.ProcessDelete(id) return n.peer.ProcessDelete(id)
} }
func (n *node) ProcessUpdate(id string, config *app.Config) error { func (n *node) ProcessUpdate(id string, config *app.Config, metadata map[string]interface{}) error {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()
@@ -870,7 +873,7 @@ func (n *node) ProcessUpdate(id string, config *app.Config) error {
return fmt.Errorf("not connected") return fmt.Errorf("not connected")
} }
cfg := convertConfig(config) cfg := convertConfig(config, metadata)
return n.peer.ProcessUpdate(id, cfg) return n.peer.ProcessUpdate(id, cfg)
} }

View File

@@ -24,10 +24,10 @@ type Proxy interface {
ProxyReader ProxyReader
Reader() ProxyReader Reader() ProxyReader
ProcessAdd(nodeid string, config *app.Config) error ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error
ProcessDelete(nodeid string, id string) error ProcessDelete(nodeid string, id string) error
ProcessStart(nodeid string, id string) error ProcessStart(nodeid string, id string) error
ProcessUpdate(nodeid string, id string, config *app.Config) error ProcessUpdate(nodeid string, id string, config *app.Config, metadata map[string]interface{}) error
} }
type ProxyReader interface { type ProxyReader interface {
@@ -433,6 +433,7 @@ type Process struct {
Runtime time.Duration Runtime time.Duration
UpdatedAt time.Time UpdatedAt time.Time
Config *app.Config Config *app.Config
Metadata map[string]interface{}
} }
func (p *proxy) ListProcesses() []Process { func (p *proxy) ListProcesses() []Process {
@@ -480,7 +481,7 @@ func (p *proxy) ListProcesses() []Process {
return processList return processList
} }
func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error { func (p *proxy) ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@@ -489,7 +490,7 @@ func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error {
return fmt.Errorf("node not found") return fmt.Errorf("node not found")
} }
err := node.ProcessAdd(config) err := node.ProcessAdd(config, metadata)
if err != nil { if err != nil {
return err return err
} }
@@ -536,7 +537,7 @@ func (p *proxy) ProcessStart(nodeid string, id string) error {
return nil return nil
} }
func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) error { func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config, metadata map[string]interface{}) error {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@@ -545,5 +546,5 @@ func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) erro
return fmt.Errorf("node not found") return fmt.Errorf("node not found")
} }
return node.ProcessUpdate(id, config) return node.ProcessUpdate(id, config, metadata)
} }

View File

@@ -44,6 +44,7 @@ type Process struct {
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
Config *app.Config Config *app.Config
Metadata map[string]interface{}
} }
type Users struct { type Users struct {
@@ -62,6 +63,7 @@ const (
OpAddProcess Operation = "addProcess" OpAddProcess Operation = "addProcess"
OpRemoveProcess Operation = "removeProcess" OpRemoveProcess Operation = "removeProcess"
OpUpdateProcess Operation = "updateProcess" OpUpdateProcess Operation = "updateProcess"
OpSetProcessMetadata Operation = "setProcessMetadata"
OpAddIdentity Operation = "addIdentity" OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity" OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity" OpRemoveIdentity Operation = "removeIdentity"
@@ -86,6 +88,12 @@ type CommandRemoveProcess struct {
ID string ID string
} }
type CommandSetProcessMetadata struct {
ID string
Key string
Data interface{}
}
type CommandAddIdentity struct { type CommandAddIdentity struct {
Identity identity.User Identity identity.User
} }
@@ -181,6 +189,12 @@ func (s *store) Apply(entry *raft.Log) interface{} {
json.Unmarshal(b, &cmd) json.Unmarshal(b, &cmd)
err = s.updateProcess(cmd) err = s.updateProcess(cmd)
case OpSetProcessMetadata:
b, _ := json.Marshal(c.Data)
cmd := CommandSetProcessMetadata{}
json.Unmarshal(b, &cmd)
err = s.setProcessMetadata(cmd)
case OpAddIdentity: case OpAddIdentity:
b, _ := json.Marshal(c.Data) b, _ := json.Marshal(c.Data)
cmd := CommandAddIdentity{} cmd := CommandAddIdentity{}
@@ -238,6 +252,7 @@ func (s *store) addProcess(cmd CommandAddProcess) error {
CreatedAt: now, CreatedAt: now,
UpdatedAt: now, UpdatedAt: now,
Config: cmd.Config, Config: cmd.Config,
Metadata: map[string]interface{}{},
} }
return nil return nil
@@ -289,6 +304,31 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
return nil return nil
} }
func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error {
s.lock.Lock()
defer s.lock.Unlock()
p, ok := s.Process[cmd.ID]
if !ok {
return NewStoreError("the process with the ID '%s' doesn't exists", cmd.ID)
}
if p.Metadata == nil {
p.Metadata = map[string]interface{}{}
}
if cmd.Data == nil {
delete(p.Metadata, cmd.Key)
} else {
p.Metadata[cmd.Key] = cmd.Data
}
p.UpdatedAt = time.Now()
s.Process[cmd.ID] = p
return nil
}
func (s *store) addIdentity(cmd CommandAddIdentity) error { func (s *store) addIdentity(cmd CommandAddIdentity) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@@ -386,6 +426,15 @@ func (s *store) Restore(snapshot io.ReadCloser) error {
return err return err
} }
for id, p := range s.Process {
if p.Metadata != nil {
continue
}
p.Metadata = map[string]interface{}{}
s.Process[id] = p
}
return nil return nil
} }
@@ -397,8 +446,10 @@ func (s *store) ProcessList() []Process {
for _, p := range s.Process { for _, p := range s.Process {
processes = append(processes, Process{ processes = append(processes, Process{
CreatedAt: p.CreatedAt,
UpdatedAt: p.UpdatedAt, UpdatedAt: p.UpdatedAt,
Config: p.Config.Clone(), Config: p.Config.Clone(),
Metadata: p.Metadata,
}) })
} }
@@ -415,8 +466,10 @@ func (s *store) GetProcess(id string) (Process, error) {
} }
return Process{ return Process{
CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt, UpdatedAt: process.UpdatedAt,
Config: process.Config.Clone(), Config: process.Config.Clone(),
Metadata: process.Metadata,
}, nil }, nil
} }

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.17.2 github.com/caddyserver/certmagic v0.17.2
github.com/casbin/casbin/v2 v2.69.1 github.com/casbin/casbin/v2 v2.69.1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208
github.com/datarhei/gosrt v0.4.1 github.com/datarhei/gosrt v0.4.1
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0 github.com/fujiwara/shapeio v1.0.0

4
go.sum
View File

@@ -47,8 +47,8 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a h1:GFT9alzx9UXytQ+lo3MBuPLqB8HsVE2jNqhu+UpAxaY= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208 h1:voT+m+r0r112S0BIbQDvW9S4BGBv2JXGW/1L5Cmmvq4=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a/go.mod h1:2eAeJtBPTyiI+9uhGcCEHZqATBt9J06Bb7Fbxj07lw4= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208/go.mod h1:2eAeJtBPTyiI+9uhGcCEHZqATBt9J06Bb7Fbxj07lw4=
github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs= github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs=
github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA= github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=

View File

@@ -345,6 +345,7 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error {
Reference: p.Config.Reference, Reference: p.Config.Reference,
CreatedAt: 0, CreatedAt: 0,
UpdatedAt: p.UpdatedAt.Unix(), UpdatedAt: p.UpdatedAt.Unix(),
Metadata: p.Metadata,
} }
config := &api.ProcessConfig{} config := &api.ProcessConfig{}
@@ -404,12 +405,16 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error {
return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined")
} }
config, _ := process.Marshal() config, metadata := process.Marshal()
if err := h.cluster.AddProcess("", config); err != nil { if err := h.cluster.AddProcess("", config); err != nil {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
} }
for key, value := range metadata {
h.cluster.SetProcessMetadata("", config.ID, key, value)
}
return c.JSON(http.StatusOK, process) return c.JSON(http.StatusOK, process)
} }
@@ -468,7 +473,7 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error {
} }
} }
config, _ := process.Marshal() config, metadata := process.Marshal()
if err := h.cluster.UpdateProcess("", id, config); err != nil { if err := h.cluster.UpdateProcess("", id, config); err != nil {
if err == restream.ErrUnknownProcess { if err == restream.ErrUnknownProcess {
@@ -478,9 +483,61 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err)
} }
for key, value := range metadata {
h.cluster.SetProcessMetadata("", id, key, value)
}
return c.JSON(http.StatusOK, process) return c.JSON(http.StatusOK, process)
} }
// SetProcessMetadata stores metadata with a process
// @Summary Add JSON metadata with a process under the given key
// @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.
// @Tags v16.?.?
// @ID cluster-3-set-process-metadata
// @Produce json
// @Param id path string true "Process ID"
// @Param key path string true "Key for data store"
// @Param domain query string false "Domain to act on"
// @Param data body api.Metadata true "Arbitrary JSON data. The null value will remove the key and its contents"
// @Success 200 {object} api.Metadata
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process/{id}/metadata/{key} [put]
func (h *ClusterHandler) SetProcessMetadata(c echo.Context) error {
id := util.PathParam(c, "id")
key := util.PathParam(c, "key")
ctxuser := util.DefaultContext(c, "user", "")
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
if len(key) == 0 {
return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0")
}
var data api.Metadata
if err := util.ShouldBindJSONValidation(c, &data, false); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
/*
tid := restream.TaskID{
ID: id,
Domain: domain,
}
*/
if err := h.cluster.SetProcessMetadata("", id, key, data); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
return c.JSON(http.StatusOK, data)
}
// Delete deletes the process with the given ID from the cluster // Delete deletes the process with the given ID from the cluster
// @Summary Delete a process by its ID // @Summary Delete a process by its ID
// @Description Delete a process by its ID // @Description Delete a process by its ID

View File

@@ -689,6 +689,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)
v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess) v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess)
v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess)
v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata)
v3.GET("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM) v3.GET("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM)
v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity) v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity)

View File

@@ -446,6 +446,18 @@ type ConfigV3 struct {
Routes map[string]string `json:"routes"` Routes map[string]string `json:"routes"`
UIPath string `json:"ui_path"` UIPath string `json:"ui_path"`
} `json:"router"` } `json:"router"`
Resources struct {
MaxCPUUsage float64 `json:"max_cpu_usage"`
MaxMemoryUsage float64 `json:"max_memory_usage"`
} `json:"resources"`
Cluster struct {
Enable bool `json:"enable"`
Bootstrap bool `json:"bootstrap"`
Recover bool `json:"recover"`
Debug bool `json:"debug"`
Address string `json:"address"`
Peers []string `json:"peers"`
} `json:"cluster"`
} }
type Config struct { type Config struct {

View File

@@ -10,7 +10,7 @@ type Process struct {
Config *ProcessConfig `json:"config,omitempty"` Config *ProcessConfig `json:"config,omitempty"`
State *ProcessState `json:"state,omitempty"` State *ProcessState `json:"state,omitempty"`
Report *ProcessReport `json:"report,omitempty"` Report *ProcessReport `json:"report,omitempty"`
Metadata Metadata `json:"metadata,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"`
} }
// ProcessConfigIO represents an input or output of an ffmpeg process config // ProcessConfigIO represents an input or output of an ffmpeg process config
@@ -46,7 +46,11 @@ type ProcessConfig struct {
ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"`
Autostart bool `json:"autostart"` Autostart bool `json:"autostart"`
StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"`
Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"`
Scheduler string `json:"scheduler"`
LogPatterns []string `json:"log_patterns"`
Limits ProcessConfigLimits `json:"limits"` Limits ProcessConfigLimits `json:"limits"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
} }
// ProcessState represents the current state of an ffmpeg process // ProcessState represents the current state of an ffmpeg process
@@ -59,6 +63,7 @@ type ProcessState struct {
Progress *Progress `json:"progress"` Progress *Progress `json:"progress"`
Memory uint64 `json:"memory_bytes" format:"uint64"` Memory uint64 `json:"memory_bytes" format:"uint64"`
CPU float64 `json:"cpu_usage" swaggertype:"number" jsonschema:"type=number"` CPU float64 `json:"cpu_usage" swaggertype:"number" jsonschema:"type=number"`
Resources ProcessUsage `json:"resources"`
Command []string `json:"command"` Command []string `json:"command"`
} }

2
vendor/modules.txt vendored
View File

@@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2 # github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11 ## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a # github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208
## explicit; go 1.18 ## explicit; go 1.18
github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api github.com/datarhei/core-client-go/v16/api