diff --git a/cluster/api.go b/cluster/api.go index 6f801d4d..9b216be2 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -212,6 +212,31 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) + a.router.PUT("/v1/process/:id/metadata/:key", func(c echo.Context) error { + id := util.PathParam(c, "id") + key := util.PathParam(c, "key") + + r := client.SetProcessMetadataRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.SetProcessMetadata(origin, id, key, r.Metadata) + if err != nil { + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to update metadata") + return httpapi.Err(http.StatusInternalServerError, "unable to update metadata", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") + }) + a.router.POST("/v1/iam/user", func(c echo.Context) error { r := client.AddIdentityRequest{} diff --git a/cluster/client/client.go b/cluster/client/client.go index b8a0ef04..4aeeb205 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -32,6 +32,12 @@ type UpdateProcessRequest struct { Config app.Config `json:"config"` } +type SetProcessMetadataRequest struct { + ID string `json:"id"` + Key string `json:"key"` + Metadata interface{} `json:"metadata"` +} + type AddIdentityRequest struct { Identity iamidentity.User `json:"identity"` } @@ -111,6 +117,17 @@ func (c *APIClient) UpdateProcess(origin string, r UpdateProcessRequest) error { return err } +func (c *APIClient) SetProcessMetadata(origin string, r SetProcessMetadataRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPut, "/process/"+r.ID+"/metadata/"+r.Key, "application/json", bytes.NewReader(data), origin) + + return err +} + func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error { data, err := json.Marshal(r) if err != nil { diff --git a/cluster/cluster.go b/cluster/cluster.go index 4977dbe4..fc3200df 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -70,6 +70,7 @@ type Cluster interface { AddProcess(origin string, config *app.Config) error RemoveProcess(origin, id string) error UpdateProcess(origin, id string, config *app.Config) error + SetProcessMetadata(origin, id, key string, data interface{}) error IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) ListIdentities() (time.Time, []iamidentity.User) @@ -756,6 +757,23 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error { return c.applyCommand(cmd) } +func (c *cluster) SetProcessMetadata(origin, id, key string, data interface{}) error { + if !c.IsRaftLeader() { + return c.forwarder.SetProcessMetadata(origin, id, key, data) + } + + cmd := &store.Command{ + Operation: store.OpSetProcessMetadata, + Data: &store.CommandSetProcessMetadata{ + ID: id, + Key: key, + Data: data, + }, + } + + return c.applyCommand(cmd) +} + func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) { policyAdapter, err := clusteriam.NewPolicyAdapter(c.store) if err != nil { diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 7489f8f6..fd56c595 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -24,6 +24,7 @@ type Forwarder interface { AddProcess(origin string, config *app.Config) error UpdateProcess(origin, id string, config *app.Config) error + SetProcessMetadata(origin, id, key string, data interface{}) error RemoveProcess(origin, id string) error AddIdentity(origin string, identity iamidentity.User) error @@ -171,6 +172,24 @@ func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error { return client.UpdateProcess(origin, r) } +func (f *forwarder) SetProcessMetadata(origin, id, key string, data interface{}) error { + if origin == "" { + origin = f.id + } + + r := apiclient.SetProcessMetadataRequest{ + ID: id, + Key: key, + Metadata: data, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.SetProcessMetadata(origin, r) +} + func (f *forwarder) RemoveProcess(origin, id string) error { if origin == "" { origin = f.id diff --git a/cluster/leader.go b/cluster/leader.go index 3f073604..a639c102 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -331,6 +331,7 @@ type processOpMove struct { fromNodeid string toNodeid string config *app.Config + metadata map[string]interface{} } type processOpStart struct { @@ -339,14 +340,16 @@ type processOpStart struct { } type processOpAdd struct { - nodeid string - config *app.Config + nodeid string + config *app.Config + metadata map[string]interface{} } type processOpUpdate struct { nodeid string processid string config *app.Config + metadata map[string]interface{} } type processOpReject struct { @@ -364,7 +367,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { for _, op := range stack { switch v := op.(type) { case processOpAdd: - err := c.proxy.ProcessAdd(v.nodeid, v.config) + err := c.proxy.ProcessAdd(v.nodeid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -385,7 +388,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "nodeid": v.nodeid, }).Log("Adding process") case processOpUpdate: - err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config) + err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -411,7 +414,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "nodeid": v.nodeid, }).Log("Removing process") case processOpMove: - err := c.proxy.ProcessAdd(v.toNodeid, v.config) + err := c.proxy.ProcessAdd(v.toNodeid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -538,6 +541,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin nodeid: p.NodeID, processid: p.Config.ID, config: wantP.Config, + metadata: wantP.Metadata, }) } } @@ -581,7 +585,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin if len(process.Config.Reference) != 0 { for _, count := range haveReferenceAffinityMap[process.Config.Reference] { r := resources[count.nodeid] - cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + cpu := process.Config.LimitCPU mem := process.Config.LimitMemory if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { @@ -594,7 +598,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin // Find the node with the most resources available if len(nodeid) == 0 { for id, r := range resources { - cpu := process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + cpu := process.Config.LimitCPU mem := process.Config.LimitMemory if len(nodeid) == 0 { @@ -613,14 +617,15 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin if len(nodeid) != 0 { opStack = append(opStack, processOpAdd{ - nodeid: nodeid, - config: process.Config, + nodeid: nodeid, + config: process.Config, + metadata: process.Metadata, }) // Adjust the resources r, ok := resources[nodeid] if ok { - r.CPU += process.Config.LimitCPU * r.NCPU // TODO: in the vod branch this changed if system-wide limits are given + r.CPU += process.Config.LimitCPU r.Mem += process.Config.LimitMemory resources[nodeid] = r } @@ -785,6 +790,7 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [ fromNodeid: overloadedNodeid, toNodeid: availableNodeid, config: p.Config, + metadata: p.Metadata, }) // Adjust the process diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 35d0cb64..a12a3f03 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -26,11 +26,11 @@ type Node interface { GetURL(prefix, path string) (*url.URL, error) GetFile(prefix, path string) (io.ReadCloser, error) - ProcessAdd(*app.Config) error + ProcessAdd(config *app.Config, metadata map[string]interface{}) error ProcessStart(id string) error ProcessStop(id string) error ProcessDelete(id string) error - ProcessUpdate(id string, config *app.Config) error + ProcessUpdate(id string, config *app.Config, metadata map[string]interface{}) error NodeReader } @@ -696,6 +696,7 @@ func (n *node) ProcessList() ([]Process, error) { Filter: []string{ "state", "config", + "metadata", }, }) if err != nil { @@ -713,6 +714,7 @@ func (n *node) ProcessList() ([]Process, error) { CPU: p.State.CPU * n.resources.ncpu, Runtime: time.Duration(p.State.Runtime) * time.Second, UpdatedAt: time.Unix(p.UpdatedAt, 0), + Metadata: p.Metadata, } cfg := &app.Config{ @@ -726,7 +728,7 @@ func (n *node) ProcessList() ([]Process, error) { Autostart: p.Config.Autostart, StaleTimeout: p.Config.StaleTimeout, LimitCPU: p.Config.Limits.CPU, - LimitMemory: p.Config.Limits.Memory, + LimitMemory: p.Config.Limits.Memory * 1024 * 1024, LimitWaitFor: p.Config.Limits.WaitFor, } @@ -766,7 +768,7 @@ func (n *node) ProcessList() ([]Process, error) { return processes, nil } -func (n *node) ProcessAdd(config *app.Config) error { +func (n *node) ProcessAdd(config *app.Config, metadata map[string]interface{}) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -774,12 +776,12 @@ func (n *node) ProcessAdd(config *app.Config) error { return fmt.Errorf("not connected") } - cfg := convertConfig(config) + cfg := convertConfig(config, metadata) return n.peer.ProcessAdd(cfg) } -func convertConfig(config *app.Config) clientapi.ProcessConfig { +func convertConfig(config *app.Config, metadata map[string]interface{}) clientapi.ProcessConfig { cfg := clientapi.ProcessConfig{ ID: config.ID, Type: "ffmpeg", @@ -793,9 +795,10 @@ func convertConfig(config *app.Config) clientapi.ProcessConfig { StaleTimeout: config.StaleTimeout, Limits: clientapi.ProcessConfigLimits{ CPU: config.LimitCPU, - Memory: config.LimitMemory, + Memory: config.LimitMemory / 1024 / 1024, WaitFor: config.LimitWaitFor, }, + Metadata: metadata, } for _, d := range config.Input { @@ -862,7 +865,7 @@ func (n *node) ProcessDelete(id string) error { return n.peer.ProcessDelete(id) } -func (n *node) ProcessUpdate(id string, config *app.Config) error { +func (n *node) ProcessUpdate(id string, config *app.Config, metadata map[string]interface{}) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -870,7 +873,7 @@ func (n *node) ProcessUpdate(id string, config *app.Config) error { return fmt.Errorf("not connected") } - cfg := convertConfig(config) + cfg := convertConfig(config, metadata) return n.peer.ProcessUpdate(id, cfg) } diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index b05a9fa8..6bc5b395 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -24,10 +24,10 @@ type Proxy interface { ProxyReader Reader() ProxyReader - ProcessAdd(nodeid string, config *app.Config) error + ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error ProcessDelete(nodeid string, id string) error ProcessStart(nodeid string, id string) error - ProcessUpdate(nodeid string, id string, config *app.Config) error + ProcessUpdate(nodeid string, id string, config *app.Config, metadata map[string]interface{}) error } type ProxyReader interface { @@ -433,6 +433,7 @@ type Process struct { Runtime time.Duration UpdatedAt time.Time Config *app.Config + Metadata map[string]interface{} } func (p *proxy) ListProcesses() []Process { @@ -480,7 +481,7 @@ func (p *proxy) ListProcesses() []Process { return processList } -func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error { +func (p *proxy) ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error { p.lock.RLock() defer p.lock.RUnlock() @@ -489,7 +490,7 @@ func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error { return fmt.Errorf("node not found") } - err := node.ProcessAdd(config) + err := node.ProcessAdd(config, metadata) if err != nil { return err } @@ -536,7 +537,7 @@ func (p *proxy) ProcessStart(nodeid string, id string) error { return nil } -func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) error { +func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config, metadata map[string]interface{}) error { p.lock.RLock() defer p.lock.RUnlock() @@ -545,5 +546,5 @@ func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) erro return fmt.Errorf("node not found") } - return node.ProcessUpdate(id, config) + return node.ProcessUpdate(id, config, metadata) } diff --git a/cluster/store/store.go b/cluster/store/store.go index 351f80b4..603042b5 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -44,6 +44,7 @@ type Process struct { CreatedAt time.Time UpdatedAt time.Time Config *app.Config + Metadata map[string]interface{} } type Users struct { @@ -59,13 +60,14 @@ type Policies struct { type Operation string const ( - OpAddProcess Operation = "addProcess" - OpRemoveProcess Operation = "removeProcess" - OpUpdateProcess Operation = "updateProcess" - OpAddIdentity Operation = "addIdentity" - OpUpdateIdentity Operation = "updateIdentity" - OpRemoveIdentity Operation = "removeIdentity" - OpSetPolicies Operation = "setPolicies" + OpAddProcess Operation = "addProcess" + OpRemoveProcess Operation = "removeProcess" + OpUpdateProcess Operation = "updateProcess" + OpSetProcessMetadata Operation = "setProcessMetadata" + OpAddIdentity Operation = "addIdentity" + OpUpdateIdentity Operation = "updateIdentity" + OpRemoveIdentity Operation = "removeIdentity" + OpSetPolicies Operation = "setPolicies" ) type Command struct { @@ -86,6 +88,12 @@ type CommandRemoveProcess struct { ID string } +type CommandSetProcessMetadata struct { + ID string + Key string + Data interface{} +} + type CommandAddIdentity struct { Identity identity.User } @@ -181,6 +189,12 @@ func (s *store) Apply(entry *raft.Log) interface{} { json.Unmarshal(b, &cmd) err = s.updateProcess(cmd) + case OpSetProcessMetadata: + b, _ := json.Marshal(c.Data) + cmd := CommandSetProcessMetadata{} + json.Unmarshal(b, &cmd) + + err = s.setProcessMetadata(cmd) case OpAddIdentity: b, _ := json.Marshal(c.Data) cmd := CommandAddIdentity{} @@ -238,6 +252,7 @@ func (s *store) addProcess(cmd CommandAddProcess) error { CreatedAt: now, UpdatedAt: now, Config: cmd.Config, + Metadata: map[string]interface{}{}, } return nil @@ -289,6 +304,31 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { return nil } +func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error { + s.lock.Lock() + defer s.lock.Unlock() + + p, ok := s.Process[cmd.ID] + if !ok { + return NewStoreError("the process with the ID '%s' doesn't exists", cmd.ID) + } + + if p.Metadata == nil { + p.Metadata = map[string]interface{}{} + } + + if cmd.Data == nil { + delete(p.Metadata, cmd.Key) + } else { + p.Metadata[cmd.Key] = cmd.Data + } + p.UpdatedAt = time.Now() + + s.Process[cmd.ID] = p + + return nil +} + func (s *store) addIdentity(cmd CommandAddIdentity) error { s.lock.Lock() defer s.lock.Unlock() @@ -386,6 +426,15 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return err } + for id, p := range s.Process { + if p.Metadata != nil { + continue + } + + p.Metadata = map[string]interface{}{} + s.Process[id] = p + } + return nil } @@ -397,8 +446,10 @@ func (s *store) ProcessList() []Process { for _, p := range s.Process { processes = append(processes, Process{ + CreatedAt: p.CreatedAt, UpdatedAt: p.UpdatedAt, Config: p.Config.Clone(), + Metadata: p.Metadata, }) } @@ -415,8 +466,10 @@ func (s *store) GetProcess(id string) (Process, error) { } return Process{ + CreatedAt: process.CreatedAt, UpdatedAt: process.UpdatedAt, Config: process.Config.Clone(), + Metadata: process.Metadata, }, nil } diff --git a/go.mod b/go.mod index d29c169a..a6aad2cd 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.17.2 github.com/casbin/casbin/v2 v2.69.1 - github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a + github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208 github.com/datarhei/gosrt v0.4.1 github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/fujiwara/shapeio v1.0.0 diff --git a/go.sum b/go.sum index 35b02440..9042d041 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,8 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a h1:GFT9alzx9UXytQ+lo3MBuPLqB8HsVE2jNqhu+UpAxaY= -github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a/go.mod h1:2eAeJtBPTyiI+9uhGcCEHZqATBt9J06Bb7Fbxj07lw4= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208 h1:voT+m+r0r112S0BIbQDvW9S4BGBv2JXGW/1L5Cmmvq4= +github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208/go.mod h1:2eAeJtBPTyiI+9uhGcCEHZqATBt9J06Bb7Fbxj07lw4= github.com/datarhei/gosrt v0.4.1 h1:08km3wKy72jOdC+JzBDWN57H7xST4mz5lFeJQHuWmMs= github.com/datarhei/gosrt v0.4.1/go.mod h1:FtsulRiUc67Oi3Ii9JH9aQkpO+ZfgeauRAtIE40mIVA= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 6a756a21..1c5f6ef9 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -345,6 +345,7 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { Reference: p.Config.Reference, CreatedAt: 0, UpdatedAt: p.UpdatedAt.Unix(), + Metadata: p.Metadata, } config := &api.ProcessConfig{} @@ -404,12 +405,16 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") } - config, _ := process.Marshal() + config, metadata := process.Marshal() if err := h.cluster.AddProcess("", config); err != nil { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) } + for key, value := range metadata { + h.cluster.SetProcessMetadata("", config.ID, key, value) + } + return c.JSON(http.StatusOK, process) } @@ -468,7 +473,7 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error { } } - config, _ := process.Marshal() + config, metadata := process.Marshal() if err := h.cluster.UpdateProcess("", id, config); err != nil { if err == restream.ErrUnknownProcess { @@ -478,9 +483,61 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error { return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) } + for key, value := range metadata { + h.cluster.SetProcessMetadata("", id, key, value) + } + return c.JSON(http.StatusOK, process) } +// SetProcessMetadata stores metadata with a process +// @Summary Add JSON metadata with a process under the given key +// @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created. +// @Tags v16.?.? +// @ID cluster-3-set-process-metadata +// @Produce json +// @Param id path string true "Process ID" +// @Param key path string true "Key for data store" +// @Param domain query string false "Domain to act on" +// @Param data body api.Metadata true "Arbitrary JSON data. The null value will remove the key and its contents" +// @Success 200 {object} api.Metadata +// @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process/{id}/metadata/{key} [put] +func (h *ClusterHandler) SetProcessMetadata(c echo.Context) error { + id := util.PathParam(c, "id") + key := util.PathParam(c, "key") + ctxuser := util.DefaultContext(c, "user", "") + domain := util.DefaultQuery(c, "domain", "") + + if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") { + return api.Err(http.StatusForbidden, "Forbidden") + } + + if len(key) == 0 { + return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0") + } + + var data api.Metadata + + if err := util.ShouldBindJSONValidation(c, &data, false); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + /* + tid := restream.TaskID{ + ID: id, + Domain: domain, + } + */ + if err := h.cluster.SetProcessMetadata("", id, key, data); err != nil { + return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) + } + + return c.JSON(http.StatusOK, data) +} + // Delete deletes the process with the given ID from the cluster // @Summary Delete a process by its ID // @Description Delete a process by its ID diff --git a/http/server.go b/http/server.go index 4b717c16..b995709d 100644 --- a/http/server.go +++ b/http/server.go @@ -689,6 +689,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess) v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) + v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata) v3.GET("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM) v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity) diff --git a/vendor/github.com/datarhei/core-client-go/v16/api/config.go b/vendor/github.com/datarhei/core-client-go/v16/api/config.go index 30e81ae1..aa3e186c 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/api/config.go +++ b/vendor/github.com/datarhei/core-client-go/v16/api/config.go @@ -446,6 +446,18 @@ type ConfigV3 struct { Routes map[string]string `json:"routes"` UIPath string `json:"ui_path"` } `json:"router"` + Resources struct { + MaxCPUUsage float64 `json:"max_cpu_usage"` + MaxMemoryUsage float64 `json:"max_memory_usage"` + } `json:"resources"` + Cluster struct { + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Recover bool `json:"recover"` + Debug bool `json:"debug"` + Address string `json:"address"` + Peers []string `json:"peers"` + } `json:"cluster"` } type Config struct { diff --git a/vendor/github.com/datarhei/core-client-go/v16/api/process.go b/vendor/github.com/datarhei/core-client-go/v16/api/process.go index c3b474ff..f7c6a9a1 100644 --- a/vendor/github.com/datarhei/core-client-go/v16/api/process.go +++ b/vendor/github.com/datarhei/core-client-go/v16/api/process.go @@ -2,15 +2,15 @@ package api // Process represents all information on a process type Process struct { - ID string `json:"id" jsonschema:"minLength=1"` - Type string `json:"type" jsonschema:"enum=ffmpeg"` - Reference string `json:"reference"` - CreatedAt int64 `json:"created_at" jsonschema:"minimum=0" format:"int64"` - UpdatedAt int64 `json:"updated_at" jsonschema:"minimum=0" format:"int64"` - Config *ProcessConfig `json:"config,omitempty"` - State *ProcessState `json:"state,omitempty"` - Report *ProcessReport `json:"report,omitempty"` - Metadata Metadata `json:"metadata,omitempty"` + ID string `json:"id" jsonschema:"minLength=1"` + Type string `json:"type" jsonschema:"enum=ffmpeg"` + Reference string `json:"reference"` + CreatedAt int64 `json:"created_at" jsonschema:"minimum=0" format:"int64"` + UpdatedAt int64 `json:"updated_at" jsonschema:"minimum=0" format:"int64"` + Config *ProcessConfig `json:"config,omitempty"` + State *ProcessState `json:"state,omitempty"` + Report *ProcessReport `json:"report,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` } // ProcessConfigIO represents an input or output of an ffmpeg process config @@ -36,30 +36,35 @@ type ProcessConfigLimits struct { // ProcessConfig represents the configuration of an ffmpeg process type ProcessConfig struct { - ID string `json:"id"` - Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="` - Reference string `json:"reference"` - Input []ProcessConfigIO `json:"input" validate:"required"` - Output []ProcessConfigIO `json:"output" validate:"required"` - Options []string `json:"options"` - Reconnect bool `json:"reconnect"` - ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` - Autostart bool `json:"autostart"` - StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` - Limits ProcessConfigLimits `json:"limits"` + ID string `json:"id"` + Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="` + Reference string `json:"reference"` + Input []ProcessConfigIO `json:"input" validate:"required"` + Output []ProcessConfigIO `json:"output" validate:"required"` + Options []string `json:"options"` + Reconnect bool `json:"reconnect"` + ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` + Autostart bool `json:"autostart"` + StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` + Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"` + Scheduler string `json:"scheduler"` + LogPatterns []string `json:"log_patterns"` + Limits ProcessConfigLimits `json:"limits"` + Metadata map[string]interface{} `json:"metadata,omitempty"` } // ProcessState represents the current state of an ffmpeg process type ProcessState struct { - Order string `json:"order" jsonschema:"enum=start,enum=stop"` - State string `json:"exec" jsonschema:"enum=finished,enum=starting,enum=running,enum=finishing,enum=killed,enum=failed"` - Runtime int64 `json:"runtime_seconds" jsonschema:"minimum=0" format:"int64"` - Reconnect int64 `json:"reconnect_seconds" format:"int64"` - LastLog string `json:"last_logline"` - Progress *Progress `json:"progress"` - Memory uint64 `json:"memory_bytes" format:"uint64"` - CPU float64 `json:"cpu_usage" swaggertype:"number" jsonschema:"type=number"` - Command []string `json:"command"` + Order string `json:"order" jsonschema:"enum=start,enum=stop"` + State string `json:"exec" jsonschema:"enum=finished,enum=starting,enum=running,enum=finishing,enum=killed,enum=failed"` + Runtime int64 `json:"runtime_seconds" jsonschema:"minimum=0" format:"int64"` + Reconnect int64 `json:"reconnect_seconds" format:"int64"` + LastLog string `json:"last_logline"` + Progress *Progress `json:"progress"` + Memory uint64 `json:"memory_bytes" format:"uint64"` + CPU float64 `json:"cpu_usage" swaggertype:"number" jsonschema:"type=number"` + Resources ProcessUsage `json:"resources"` + Command []string `json:"command"` } type ProcessUsageCPU struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index c161808c..a15f7b29 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2 # github.com/cpuguy83/go-md2man/v2 v2.0.2 ## explicit; go 1.11 github.com/cpuguy83/go-md2man/v2/md2man -# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230512155342-18a7ac72df3a +# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230602102832-3d80767a2208 ## explicit; go 1.18 github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16/api