WIP enforcing policies on process level

This commit is contained in:
Ingo Oppermann
2023-02-14 15:23:50 +01:00
parent a186307746
commit a9459bda7c
7 changed files with 434 additions and 219 deletions

View File

@@ -61,7 +61,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
} }
p, _ := h.getProcess(config.ID, "config") p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config")
return c.JSON(http.StatusOK, p.Config) return c.JSON(http.StatusOK, p.Config)
} }
@@ -88,14 +88,16 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
}) })
idpattern := util.DefaultQuery(c, "idpattern", "") idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "") refpattern := util.DefaultQuery(c, "refpattern", "")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
ids := h.restream.GetProcessIDs(idpattern, refpattern) ids := h.restream.GetProcessIDs(idpattern, refpattern, user, group)
processes := []api.Process{} processes := []api.Process{}
if len(wantids) == 0 || len(reference) != 0 { if len(wantids) == 0 || len(reference) != 0 {
for _, id := range ids { for _, id := range ids {
if p, err := h.getProcess(id, filter); err == nil { if p, err := h.getProcess(id, user, group, filter); err == nil {
if len(reference) != 0 && p.Reference != reference { if len(reference) != 0 && p.Reference != reference {
continue continue
} }
@@ -106,7 +108,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
for _, id := range ids { for _, id := range ids {
for _, wantid := range wantids { for _, wantid := range wantids {
if wantid == id { if wantid == id {
if p, err := h.getProcess(id, filter); err == nil { if p, err := h.getProcess(id, user, group, filter); err == nil {
processes = append(processes, p) processes = append(processes, p)
} }
} }
@@ -132,8 +134,10 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
func (h *RestreamHandler) Get(c echo.Context) error { func (h *RestreamHandler) Get(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
filter := util.DefaultQuery(c, "filter", "") filter := util.DefaultQuery(c, "filter", "")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
p, err := h.getProcess(id, filter) p, err := h.getProcess(id, user, group, filter)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -154,12 +158,14 @@ func (h *RestreamHandler) Get(c echo.Context) error {
// @Router /api/v3/process/{id} [delete] // @Router /api/v3/process/{id} [delete]
func (h *RestreamHandler) Delete(c echo.Context) error { func (h *RestreamHandler) Delete(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
if err := h.restream.StopProcess(id); err != nil { if err := h.restream.StopProcess(id, user, group); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
if err := h.restream.DeleteProcess(id); err != nil { if err := h.restream.DeleteProcess(id, user, group); err != nil {
return api.Err(http.StatusInternalServerError, "Process can't be deleted", "%s", err) return api.Err(http.StatusInternalServerError, "Process can't be deleted", "%s", err)
} }
@@ -182,6 +188,8 @@ func (h *RestreamHandler) Delete(c echo.Context) error {
// @Router /api/v3/process/{id} [put] // @Router /api/v3/process/{id} [put]
func (h *RestreamHandler) Update(c echo.Context) error { func (h *RestreamHandler) Update(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
process := api.ProcessConfig{ process := api.ProcessConfig{
ID: id, ID: id,
@@ -189,7 +197,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
Autostart: true, Autostart: true,
} }
current, err := h.restream.GetProcess(id) current, err := h.restream.GetProcess(id, user, group)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Process not found", "%s", id) return api.Err(http.StatusNotFound, "Process not found", "%s", id)
} }
@@ -203,7 +211,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
config := process.Marshal() config := process.Marshal()
if err := h.restream.UpdateProcess(id, config); err != nil { if err := h.restream.UpdateProcess(id, user, group, config); err != nil {
if err == restream.ErrUnknownProcess { if err == restream.ErrUnknownProcess {
return api.Err(http.StatusNotFound, "Process not found", "%s", id) return api.Err(http.StatusNotFound, "Process not found", "%s", id)
} }
@@ -211,7 +219,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err)
} }
p, _ := h.getProcess(config.ID, "config") p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config")
return c.JSON(http.StatusOK, p.Config) return c.JSON(http.StatusOK, p.Config)
} }
@@ -232,6 +240,8 @@ func (h *RestreamHandler) Update(c echo.Context) error {
// @Router /api/v3/process/{id}/command [put] // @Router /api/v3/process/{id}/command [put]
func (h *RestreamHandler) Command(c echo.Context) error { func (h *RestreamHandler) Command(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
var command api.Command var command api.Command
@@ -241,13 +251,13 @@ func (h *RestreamHandler) Command(c echo.Context) error {
var err error var err error
if command.Command == "start" { if command.Command == "start" {
err = h.restream.StartProcess(id) err = h.restream.StartProcess(id, user, group)
} else if command.Command == "stop" { } else if command.Command == "stop" {
err = h.restream.StopProcess(id) err = h.restream.StopProcess(id, user, group)
} else if command.Command == "restart" { } else if command.Command == "restart" {
err = h.restream.RestartProcess(id) err = h.restream.RestartProcess(id, user, group)
} else if command.Command == "reload" { } else if command.Command == "reload" {
err = h.restream.ReloadProcess(id) err = h.restream.ReloadProcess(id, user, group)
} else { } else {
return api.Err(http.StatusBadRequest, "Unknown command provided", "Known commands are: start, stop, reload, restart") return api.Err(http.StatusBadRequest, "Unknown command provided", "Known commands are: start, stop, reload, restart")
} }
@@ -273,8 +283,10 @@ func (h *RestreamHandler) Command(c echo.Context) error {
// @Router /api/v3/process/{id}/config [get] // @Router /api/v3/process/{id}/config [get]
func (h *RestreamHandler) GetConfig(c echo.Context) error { func (h *RestreamHandler) GetConfig(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
p, err := h.restream.GetProcess(id) p, err := h.restream.GetProcess(id, user, group)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -299,8 +311,10 @@ func (h *RestreamHandler) GetConfig(c echo.Context) error {
// @Router /api/v3/process/{id}/state [get] // @Router /api/v3/process/{id}/state [get]
func (h *RestreamHandler) GetState(c echo.Context) error { func (h *RestreamHandler) GetState(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
s, err := h.restream.GetProcessState(id) s, err := h.restream.GetProcessState(id, user, group)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -325,8 +339,10 @@ func (h *RestreamHandler) GetState(c echo.Context) error {
// @Router /api/v3/process/{id}/report [get] // @Router /api/v3/process/{id}/report [get]
func (h *RestreamHandler) GetReport(c echo.Context) error { func (h *RestreamHandler) GetReport(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
l, err := h.restream.GetProcessLog(id) l, err := h.restream.GetProcessLog(id, user, group)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -349,8 +365,10 @@ func (h *RestreamHandler) GetReport(c echo.Context) error {
// @Router /api/v3/process/{id}/probe [get] // @Router /api/v3/process/{id}/probe [get]
func (h *RestreamHandler) Probe(c echo.Context) error { func (h *RestreamHandler) Probe(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
probe := h.restream.Probe(id) probe := h.restream.Probe(id, user, group)
apiprobe := api.Probe{} apiprobe := api.Probe{}
apiprobe.Unmarshal(&probe) apiprobe.Unmarshal(&probe)
@@ -411,8 +429,10 @@ func (h *RestreamHandler) ReloadSkills(c echo.Context) error {
func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error { func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
key := util.PathParam(c, "key") key := util.PathParam(c, "key")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
data, err := h.restream.GetProcessMetadata(id, key) data, err := h.restream.GetProcessMetadata(id, user, group, key)
if err != nil { if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -437,6 +457,8 @@ func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error {
func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error { func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error {
id := util.PathParam(c, "id") id := util.PathParam(c, "id")
key := util.PathParam(c, "key") key := util.PathParam(c, "key")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
if len(key) == 0 { if len(key) == 0 {
return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0") return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0")
@@ -448,7 +470,7 @@ func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
} }
if err := h.restream.SetProcessMetadata(id, key, data); err != nil { if err := h.restream.SetProcessMetadata(id, key, user, group, data); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
} }
@@ -510,7 +532,7 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error {
return c.JSON(http.StatusOK, data) return c.JSON(http.StatusOK, data)
} }
func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, error) { func (h *RestreamHandler) getProcess(id, user, group, filterString string) (api.Process, error) {
filter := strings.FieldsFunc(filterString, func(r rune) bool { filter := strings.FieldsFunc(filterString, func(r rune) bool {
return r == rune(',') return r == rune(',')
}) })
@@ -534,7 +556,7 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro
} }
} }
process, err := h.restream.GetProcess(id) process, err := h.restream.GetProcess(id, user, group)
if err != nil { if err != nil {
return api.Process{}, err return api.Process{}, err
} }
@@ -552,21 +574,21 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro
} }
if wants["state"] { if wants["state"] {
if state, err := h.restream.GetProcessState(id); err == nil { if state, err := h.restream.GetProcessState(id, user, group); err == nil {
info.State = &api.ProcessState{} info.State = &api.ProcessState{}
info.State.Unmarshal(state) info.State.Unmarshal(state)
} }
} }
if wants["report"] { if wants["report"] {
if log, err := h.restream.GetProcessLog(id); err == nil { if log, err := h.restream.GetProcessLog(id, user, group); err == nil {
info.Report = &api.ProcessReport{} info.Report = &api.ProcessReport{}
info.Report.Unmarshal(log) info.Report.Unmarshal(log)
} }
} }
if wants["metadata"] { if wants["metadata"] {
if data, err := h.restream.GetProcessMetadata(id, ""); err == nil { if data, err := h.restream.GetProcessMetadata(id, "", user, group); err == nil {
info.Metadata = api.NewMetadata(data) info.Metadata = api.NewMetadata(data)
} }
} }

View File

@@ -76,3 +76,12 @@ func DefaultQuery(c echo.Context, name, defValue string) string {
return param return param
} }
func DefaultContext(c echo.Context, name, defValue string) string {
value, ok := c.Get(name).(string)
if !ok {
return defValue
}
return value
}

View File

@@ -370,6 +370,10 @@ func NewIdentityManager(config IdentityConfig) (IdentityManager, error) {
im.logger = log.New("") im.logger = log.New("")
} }
if im.fs == nil {
return nil, fmt.Errorf("no filesystem provided")
}
err := im.load(im.filePath) err := im.load(im.filePath)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -552,10 +556,6 @@ func (im *identityManager) Rename(oldname, newname string) error {
} }
func (im *identityManager) load(filePath string) error { func (im *identityManager) load(filePath string) error {
if im.fs == nil {
return fmt.Errorf("no filesystem provided")
}
if _, err := im.fs.Stat(filePath); os.IsNotExist(err) { if _, err := im.fs.Stat(filePath); os.IsNotExist(err) {
return nil return nil
} }
@@ -587,10 +587,6 @@ func (im *identityManager) Save() error {
} }
func (im *identityManager) save(filePath string) error { func (im *identityManager) save(filePath string) error {
if im.fs == nil {
return fmt.Errorf("no filesystem provided")
}
if filePath == "" { if filePath == "" {
return fmt.Errorf("invalid file path, file path cannot be empty") return fmt.Errorf("invalid file path, file path cannot be empty")
} }

View File

@@ -57,15 +57,15 @@ func (c *restreamCollector) Collect() metric.Metrics {
"starting": 0, "starting": 0,
} }
ids := c.r.GetProcessIDs("", "") ids := c.r.GetProcessIDs("", "", "$superuser", "$none")
for _, id := range ids { for _, id := range ids {
state, _ := c.r.GetProcessState(id) state, _ := c.r.GetProcessState(id, "$superuser", "$none")
if state == nil { if state == nil {
continue continue
} }
proc, _ := c.r.GetProcess(id) proc, _ := c.r.GetProcess(id, "$superuser", "$none")
if proc == nil { if proc == nil {
continue continue
} }

View File

@@ -36,6 +36,8 @@ func (io ConfigIO) Clone() ConfigIO {
type Config struct { type Config struct {
ID string `json:"id"` ID string `json:"id"`
Reference string `json:"reference"` Reference string `json:"reference"`
Owner string `json:"owner"`
Group string `json:"group"`
FFVersion string `json:"ffversion"` FFVersion string `json:"ffversion"`
Input []ConfigIO `json:"input"` Input []ConfigIO `json:"input"`
Output []ConfigIO `json:"output"` Output []ConfigIO `json:"output"`
@@ -103,6 +105,8 @@ func (config *Config) CreateCommand() []string {
type Process struct { type Process struct {
ID string `json:"id"` ID string `json:"id"`
Owner string `json:"owner"`
Group string `json:"group"`
Reference string `json:"reference"` Reference string `json:"reference"`
Config *Config `json:"config"` Config *Config `json:"config"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`

View File

@@ -15,6 +15,7 @@ import (
"github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net"
@@ -30,31 +31,33 @@ import (
// The Restreamer interface // The Restreamer interface
type Restreamer interface { type Restreamer interface {
ID() string // ID of this instance ID() string // ID of this instance
Name() string // Arbitrary name of this instance Name() string // Arbitrary name of this instance
CreatedAt() time.Time // Time of when this instance has been created CreatedAt() time.Time // Time of when this instance has been created
Start() // Start all processes that have a "start" order Start() // Start all processes that have a "start" order
Stop() // Stop all running process but keep their "start" order Stop() // Stop all running process but keep their "start" order
AddProcess(config *app.Config) error // Add a new process
GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference Skills() skills.Skills // Get the ffmpeg skills
DeleteProcess(id string) error // Delete a process ReloadSkills() error // Reload the ffmpeg skills
UpdateProcess(id string, config *app.Config) error // Update a process SetMetadata(key string, data interface{}) error // Set general metadata
StartProcess(id string) error // Start a process GetMetadata(key string) (interface{}, error) // Get previously set general metadata
StopProcess(id string) error // Stop a process
RestartProcess(id string) error // Restart a process AddProcess(config *app.Config) error // Add a new process
ReloadProcess(id string) error // Reload a process GetProcessIDs(idpattern, refpattern, user, group string) []string // Get a list of process IDs based on patterns for ID and reference
GetProcess(id string) (*app.Process, error) // Get a process DeleteProcess(id, user, group string) error // Delete a process
GetProcessState(id string) (*app.State, error) // Get the state of a process UpdateProcess(id, user, group string, config *app.Config) error // Update a process
GetProcessLog(id string) (*app.Log, error) // Get the logs of a process StartProcess(id, user, group string) error // Start a process
GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process StopProcess(id, user, group string) error // Stop a process
Probe(id string) app.Probe // Probe a process RestartProcess(id, user, group string) error // Restart a process
ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout ReloadProcess(id, user, group string) error // Reload a process
Skills() skills.Skills // Get the ffmpeg skills GetProcess(id, user, group string) (*app.Process, error) // Get a process
ReloadSkills() error // Reload the ffmpeg skills GetProcessState(id, user, group string) (*app.State, error) // Get the state of a process
SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process GetProcessLog(id, user, group string) (*app.Log, error) // Get the logs of a process
GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process GetPlayout(id, user, group, inputid string) (string, error) // Get the URL of the playout API for a process
SetMetadata(key string, data interface{}) error // Set general metadata Probe(id, user, group string) app.Probe // Probe a process
GetMetadata(key string) (interface{}, error) // Get previously set general metadata ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe // Probe a process with specific timeout
SetProcessMetadata(id, user, group, key string, data interface{}) error // Set metatdata to a process
GetProcessMetadata(id, user, group, key string) (interface{}, error) // Get previously set metadata from a process
} }
// Config is the required configuration for a new restreamer instance. // Config is the required configuration for a new restreamer instance.
@@ -67,11 +70,14 @@ type Config struct {
FFmpeg ffmpeg.FFmpeg FFmpeg ffmpeg.FFmpeg
MaxProcesses int64 MaxProcesses int64
Logger log.Logger Logger log.Logger
IAM iam.IAM
} }
type task struct { type task struct {
valid bool valid bool
id string // ID of the task/process id string // ID of the task/process
owner string
group string
reference string reference string
process *app.Process process *app.Process
config *app.Config config *app.Config
@@ -84,6 +90,14 @@ type task struct {
metadata map[string]interface{} metadata map[string]interface{}
} }
func newTaskid(id, group string) string {
return id + "~" + group
}
func (t *task) String() string {
return newTaskid(t.id, t.group)
}
type restream struct { type restream struct {
id string id string
name string name string
@@ -106,6 +120,8 @@ type restream struct {
startOnce sync.Once startOnce sync.Once
stopOnce sync.Once stopOnce sync.Once
iam iam.IAM
} }
// New returns a new instance that implements the Restreamer interface // New returns a new instance that implements the Restreamer interface
@@ -117,12 +133,17 @@ func New(config Config) (Restreamer, error) {
store: config.Store, store: config.Store,
replace: config.Replace, replace: config.Replace,
logger: config.Logger, logger: config.Logger,
iam: config.IAM,
} }
if r.logger == nil { if r.logger == nil {
r.logger = log.New("") r.logger = log.New("")
} }
if r.iam == nil {
return nil, fmt.Errorf("missing IAM")
}
if r.store == nil { if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := store.NewJSON(store.JSONConfig{ s, err := store.NewJSON(store.JSONConfig{
@@ -281,27 +302,33 @@ func (r *restream) load() error {
ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor()) ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor())
} }
for id, process := range data.Process { for _, process := range data.Process {
if len(process.Config.FFVersion) == 0 { if len(process.Config.FFVersion) == 0 {
process.Config.FFVersion = "^" + ffversion process.Config.FFVersion = "^" + ffversion
} }
t := &task{ t := &task{
id: id, id: process.ID,
owner: process.Owner,
group: process.Group,
reference: process.Reference, reference: process.Reference,
process: process, process: process,
config: process.Config.Clone(), config: process.Config.Clone(),
logger: r.logger.WithField("id", id), logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"owner": process.Owner,
"group": process.Group,
}),
} }
// Replace all placeholders in the config // Replace all placeholders in the config
resolvePlaceholders(t.config, r.replace) resolvePlaceholders(t.config, r.replace)
tasks[id] = t tasks[t.String()] = t
} }
for id, userdata := range data.Metadata.Process { for tid, userdata := range data.Metadata.Process {
t, ok := tasks[id] t, ok := tasks[tid]
if !ok { if !ok {
continue continue
} }
@@ -312,44 +339,48 @@ func (r *restream) load() error {
// Now that all tasks are defined and all placeholders are // Now that all tasks are defined and all placeholders are
// replaced, we can resolve references and validate the // replaced, we can resolve references and validate the
// inputs and outputs. // inputs and outputs.
for _, t := range tasks { for tid, t := range tasks {
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
if !c.Check(v) { if !c.Check(v) {
r.logger.Warn().WithFields(log.Fields{ t.logger.Warn().WithFields(log.Fields{
"id": t.id,
"constraint": t.config.FFVersion, "constraint": t.config.FFVersion,
"version": skills.FFmpeg.Version, "version": skills.FFmpeg.Version,
}).WithError(fmt.Errorf("available FFmpeg version doesn't fit constraint; you have to update this process to adjust the constraint")).Log("") }).WithError(fmt.Errorf("available FFmpeg version doesn't fit constraint; you have to update this process to adjust the constraint")).Log("")
} }
} else { } else {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("") t.logger.Warn().WithError(err).Log("")
} }
} else { } else {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("") t.logger.Warn().WithError(err).Log("")
}
if !r.enforce(t.owner, t.group, t.id, "CREATE") {
t.logger.Warn().WithError(fmt.Errorf("forbidden")).Log("Ignoring")
continue
} }
err := r.resolveAddresses(tasks, t.config) err := r.resolveAddresses(tasks, t.config)
if err != nil { if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") t.logger.Warn().WithError(err).Log("Ignoring")
continue continue
} }
t.usesDisk, err = r.validateConfig(t.config) t.usesDisk, err = r.validateConfig(t.config)
if err != nil { if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") t.logger.Warn().WithError(err).Log("Ignoring")
continue continue
} }
err = r.setPlayoutPorts(t) err = r.setPlayoutPorts(t)
if err != nil { if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") t.logger.Warn().WithError(err).Log("Ignoring")
continue continue
} }
t.command = t.config.CreateCommand() t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) t.parser = r.ffmpeg.NewProcessParser(t.logger, tid, t.reference)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect, Reconnect: t.config.Reconnect,
@@ -376,15 +407,29 @@ func (r *restream) load() error {
func (r *restream) save() { func (r *restream) save() {
data := store.NewStoreData() data := store.NewStoreData()
for id, t := range r.tasks { for tid, t := range r.tasks {
data.Process[id] = t.process data.Process[tid] = t.process
data.Metadata.System = r.metadata data.Metadata.System = r.metadata
data.Metadata.Process[id] = t.metadata data.Metadata.Process[tid] = t.metadata
} }
r.store.Store(data) r.store.Store(data)
} }
func (r *restream) enforce(name, group, processid, action string) bool {
if len(name) == 0 {
name = "$anon"
}
if len(group) == 0 {
group = "$none"
}
ok, _ := r.iam.Enforce(name, group, "process:"+processid, action)
return ok
}
func (r *restream) ID() string { func (r *restream) ID() string {
return r.id return r.id
} }
@@ -398,9 +443,15 @@ func (r *restream) CreatedAt() time.Time {
} }
var ErrUnknownProcess = errors.New("unknown process") var ErrUnknownProcess = errors.New("unknown process")
var ErrUnknownProcessGroup = errors.New("unknown process group")
var ErrProcessExists = errors.New("process already exists") var ErrProcessExists = errors.New("process already exists")
var ErrForbidden = errors.New("forbidden")
func (r *restream) AddProcess(config *app.Config) error { func (r *restream) AddProcess(config *app.Config) error {
if !r.enforce(config.Owner, config.Group, config.ID, "CREATE") {
return ErrForbidden
}
r.lock.RLock() r.lock.RLock()
t, err := r.createTask(config) t, err := r.createTask(config)
r.lock.RUnlock() r.lock.RUnlock()
@@ -412,20 +463,22 @@ func (r *restream) AddProcess(config *app.Config) error {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
_, ok := r.tasks[t.id] tid := t.String()
_, ok := r.tasks[tid]
if ok { if ok {
return ErrProcessExists return ErrProcessExists
} }
r.tasks[t.id] = t r.tasks[tid] = t
// set filesystem cleanup rules // set filesystem cleanup rules
r.setCleanup(t.id, t.config) r.setCleanup(tid, t.config)
if t.process.Order == "start" { if t.process.Order == "start" {
err := r.startProcess(t.id) err := r.startProcess(tid)
if err != nil { if err != nil {
delete(r.tasks, t.id) delete(r.tasks, tid)
return err return err
} }
} }
@@ -450,6 +503,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
process := &app.Process{ process := &app.Process{
ID: config.ID, ID: config.ID,
Group: config.Group,
Reference: config.Reference, Reference: config.Reference,
Config: config.Clone(), Config: config.Clone(),
Order: "stop", Order: "stop",
@@ -462,10 +516,14 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t := &task{ t := &task{
id: config.ID, id: config.ID,
group: config.Group,
reference: process.Reference, reference: process.Reference,
process: process, process: process,
config: process.Config.Clone(), config: process.Config.Clone(),
logger: r.logger.WithField("id", process.ID), logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"group": process.Group,
}),
} }
resolvePlaceholders(t.config, r.replace) resolvePlaceholders(t.config, r.replace)
@@ -486,7 +544,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
} }
t.command = t.config.CreateCommand() t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) t.parser = r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect, Reconnect: t.config.Reconnect,
@@ -853,7 +911,11 @@ func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (s
return address, fmt.Errorf("the process '%s' has no outputs with the ID '%s' (%s)", matches[1], matches[2], address) return address, fmt.Errorf("the process '%s' has no outputs with the ID '%s' (%s)", matches[1], matches[2], address)
} }
func (r *restream) UpdateProcess(id string, config *app.Config) error { func (r *restream) UpdateProcess(id, user, group string, config *app.Config) error {
if !r.enforce(user, group, id, "UPDATE") {
return ErrForbidden
}
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
@@ -862,35 +924,39 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error {
return err return err
} }
task, ok := r.tasks[id] tid := newTaskid(id, group)
task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
t.process.Order = task.process.Order t.process.Order = task.process.Order
if id != t.id { if tid != t.String() {
_, ok := r.tasks[t.id] _, ok := r.tasks[t.String()]
if ok { if ok {
return ErrProcessExists return ErrProcessExists
} }
} }
if err := r.stopProcess(id); err != nil { if err := r.stopProcess(tid); err != nil {
return err return err
} }
if err := r.deleteProcess(id); err != nil { if err := r.deleteProcess(tid); err != nil {
return err return err
} }
r.tasks[t.id] = t tid = t.String()
r.tasks[tid] = t
// set filesystem cleanup rules // set filesystem cleanup rules
r.setCleanup(t.id, t.config) r.setCleanup(tid, t.config)
if t.process.Order == "start" { if t.process.Order == "start" {
r.startProcess(t.id) r.startProcess(tid)
} }
r.save() r.save()
@@ -898,17 +964,23 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error {
return nil return nil
} }
func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { func (r *restream) GetProcessIDs(idpattern, refpattern, user, group string) []string {
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
if len(idpattern) == 0 && len(refpattern) == 0 { if len(idpattern) == 0 && len(refpattern) == 0 {
ids := make([]string, len(r.tasks)) ids := []string{}
i := 0
for id := range r.tasks { for _, t := range r.tasks {
ids[i] = id if t.group != group {
i++ continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
ids = append(ids, t.id)
} }
return ids return ids
@@ -918,8 +990,16 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string {
count := 0 count := 0
if len(idpattern) != 0 { if len(idpattern) != 0 {
for id := range r.tasks { for _, t := range r.tasks {
match, err := glob.Match(idpattern, id) if t.group != group {
continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
match, err := glob.Match(idpattern, t.id)
if err != nil { if err != nil {
return nil return nil
} }
@@ -928,7 +1008,7 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string {
continue continue
} }
idmap[id]++ idmap[t.id]++
} }
count++ count++
@@ -936,6 +1016,14 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string {
if len(refpattern) != 0 { if len(refpattern) != 0 {
for _, t := range r.tasks { for _, t := range r.tasks {
if t.group != group {
continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
match, err := glob.Match(refpattern, t.reference) match, err := glob.Match(refpattern, t.reference)
if err != nil { if err != nil {
return nil return nil
@@ -964,11 +1052,17 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string {
return ids return ids
} }
func (r *restream) GetProcess(id string) (*app.Process, error) { func (r *restream) GetProcess(id, user, group string) (*app.Process, error) {
if !r.enforce(user, group, id, "GET") {
return nil, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return &app.Process{}, ErrUnknownProcess return &app.Process{}, ErrUnknownProcess
} }
@@ -978,11 +1072,17 @@ func (r *restream) GetProcess(id string) (*app.Process, error) {
return process, nil return process, nil
} }
func (r *restream) DeleteProcess(id string) error { func (r *restream) DeleteProcess(id, user, group string) error {
if !r.enforce(user, group, id, "DELETE") {
return ErrForbidden
}
tid := newTaskid(id, group)
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
err := r.deleteProcess(id) err := r.deleteProcess(tid)
if err != nil { if err != nil {
return err return err
} }
@@ -992,29 +1092,35 @@ func (r *restream) DeleteProcess(id string) error {
return nil return nil
} }
func (r *restream) deleteProcess(id string) error { func (r *restream) deleteProcess(tid string) error {
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
if task.process.Order != "stop" { if task.process.Order != "stop" {
return fmt.Errorf("the process with the ID '%s' is still running", id) return fmt.Errorf("the process with the ID '%s' is still running", task.id)
} }
r.unsetPlayoutPorts(task) r.unsetPlayoutPorts(task)
r.unsetCleanup(id) r.unsetCleanup(tid)
delete(r.tasks, id) delete(r.tasks, tid)
return nil return nil
} }
func (r *restream) StartProcess(id string) error { func (r *restream) StartProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
err := r.startProcess(id) err := r.startProcess(tid)
if err != nil { if err != nil {
return err return err
} }
@@ -1024,8 +1130,8 @@ func (r *restream) StartProcess(id string) error {
return nil return nil
} }
func (r *restream) startProcess(id string) error { func (r *restream) startProcess(tid string) error {
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
@@ -1053,11 +1159,17 @@ func (r *restream) startProcess(id string) error {
return nil return nil
} }
func (r *restream) StopProcess(id string) error { func (r *restream) StopProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
err := r.stopProcess(id) err := r.stopProcess(tid)
if err != nil { if err != nil {
return err return err
} }
@@ -1067,8 +1179,8 @@ func (r *restream) StopProcess(id string) error {
return nil return nil
} }
func (r *restream) stopProcess(id string) error { func (r *restream) stopProcess(tid string) error {
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
@@ -1092,15 +1204,21 @@ func (r *restream) stopProcess(id string) error {
return nil return nil
} }
func (r *restream) RestartProcess(id string) error { func (r *restream) RestartProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
return r.restartProcess(id) return r.restartProcess(tid)
} }
func (r *restream) restartProcess(id string) error { func (r *restream) restartProcess(tid string) error {
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
@@ -1118,11 +1236,17 @@ func (r *restream) restartProcess(id string) error {
return nil return nil
} }
func (r *restream) ReloadProcess(id string) error { func (r *restream) ReloadProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
err := r.reloadProcess(id) err := r.reloadProcess(tid)
if err != nil { if err != nil {
return err return err
} }
@@ -1132,8 +1256,8 @@ func (r *restream) ReloadProcess(id string) error {
return nil return nil
} }
func (r *restream) reloadProcess(id string) error { func (r *restream) reloadProcess(tid string) error {
t, ok := r.tasks[id] t, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
@@ -1164,7 +1288,7 @@ func (r *restream) reloadProcess(id string) error {
order := "stop" order := "stop"
if t.process.Order == "start" { if t.process.Order == "start" {
order = "start" order = "start"
r.stopProcess(id) r.stopProcess(tid)
} }
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
@@ -1185,19 +1309,25 @@ func (r *restream) reloadProcess(id string) error {
t.valid = true t.valid = true
if order == "start" { if order == "start" {
r.startProcess(id) r.startProcess(tid)
} }
return nil return nil
} }
func (r *restream) GetProcessState(id string) (*app.State, error) { func (r *restream) GetProcessState(id, user, group string) (*app.State, error) {
state := &app.State{} state := &app.State{}
if !r.enforce(user, group, id, "GET") {
return state, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return state, ErrUnknownProcess return state, ErrUnknownProcess
} }
@@ -1254,21 +1384,27 @@ func (r *restream) GetProcessState(id string) (*app.State, error) {
return state, nil return state, nil
} }
func (r *restream) GetProcessLog(id string) (*app.Log, error) { func (r *restream) GetProcessLog(id, user, group string) (*app.Log, error) {
log := &app.Log{}
if !r.enforce(user, group, id, "GET") {
return log, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return &app.Log{}, ErrUnknownProcess return log, ErrUnknownProcess
} }
if !task.valid { if !task.valid {
return &app.Log{}, nil return log, nil
} }
log := &app.Log{}
current := task.parser.Report() current := task.parser.Report()
log.CreatedAt = current.CreatedAt log.CreatedAt = current.CreatedAt
@@ -1303,16 +1439,23 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
return log, nil return log, nil
} }
func (r *restream) Probe(id string) app.Probe { func (r *restream) Probe(id, user, group string) app.Probe {
return r.ProbeWithTimeout(id, 20*time.Second) return r.ProbeWithTimeout(id, user, group, 20*time.Second)
} }
func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe { func (r *restream) ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe {
r.lock.RLock()
appprobe := app.Probe{} appprobe := app.Probe{}
task, ok := r.tasks[id] if !r.enforce(user, group, id, "PROBE") {
appprobe.Log = append(appprobe.Log, ErrForbidden.Error())
return appprobe
}
tid := newTaskid(id, group)
r.lock.RLock()
task, ok := r.tasks[tid]
if !ok { if !ok {
appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id)) appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id))
r.lock.RUnlock() r.lock.RUnlock()
@@ -1376,11 +1519,17 @@ func (r *restream) ReloadSkills() error {
return r.ffmpeg.ReloadSkills() return r.ffmpeg.ReloadSkills()
} }
func (r *restream) GetPlayout(id, inputid string) (string, error) { func (r *restream) GetPlayout(id, user, group, inputid string) (string, error) {
if !r.enforce(user, group, id, "PLAYOUT") {
return "", ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return "", ErrUnknownProcess return "", ErrUnknownProcess
} }
@@ -1399,15 +1548,21 @@ func (r *restream) GetPlayout(id, inputid string) (string, error) {
var ErrMetadataKeyNotFound = errors.New("unknown key") var ErrMetadataKeyNotFound = errors.New("unknown key")
func (r *restream) SetProcessMetadata(id, key string, data interface{}) error { func (r *restream) SetProcessMetadata(id, user, group, key string, data interface{}) error {
r.lock.Lock() if !r.enforce(user, group, id, "METADATA") {
defer r.lock.Unlock() return ErrForbidden
}
if len(key) == 0 { if len(key) == 0 {
return fmt.Errorf("a key for storing the data has to be provided") return fmt.Errorf("a key for storing the data has to be provided")
} }
task, ok := r.tasks[id] tid := newTaskid(id, group)
r.lock.Lock()
defer r.lock.Unlock()
task, ok := r.tasks[tid]
if !ok { if !ok {
return ErrUnknownProcess return ErrUnknownProcess
} }
@@ -1431,11 +1586,17 @@ func (r *restream) SetProcessMetadata(id, key string, data interface{}) error {
return nil return nil
} }
func (r *restream) GetProcessMetadata(id, key string) (interface{}, error) { func (r *restream) GetProcessMetadata(id, user, group, key string) (interface{}, error) {
if !r.enforce(user, group, id, "METADATA") {
return nil, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock() r.lock.RLock()
defer r.lock.RUnlock() defer r.lock.RUnlock()
task, ok := r.tasks[id] task, ok := r.tasks[tid]
if !ok { if !ok {
return nil, ErrUnknownProcess return nil, ErrUnknownProcess
} }
@@ -1501,6 +1662,7 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
vars := map[string]string{ vars := map[string]string{
"processid": config.ID, "processid": config.ID,
"reference": config.Reference, "reference": config.Reference,
"group": config.Group,
} }
for i, option := range config.Options { for i, option := range config.Options {

View File

@@ -6,7 +6,9 @@ import (
"time" "time"
"github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/replace"
@@ -30,9 +32,27 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
return nil, err return nil, err
} }
dummyfs, err := fs.NewMemFilesystem(fs.MemConfig{})
if err != nil {
return nil, err
}
iam, err := iam.NewIAM(iam.Config{
FS: dummyfs,
Superuser: iam.User{
Name: "foobar",
},
JWTRealm: "",
JWTSecret: "",
Logger: nil,
})
iam.AddPolicy("$anon", "$none", "process:*", "CREATE|GET|DELETE|UPDATE|COMMAND|PROBE|METADATA|PLAYOUT")
rs, err := New(Config{ rs, err := New(Config{
FFmpeg: ffmpeg, FFmpeg: ffmpeg,
Replace: replacer, Replace: replacer,
IAM: iam,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -85,16 +105,16 @@ func TestAddProcess(t *testing.T) {
process := getDummyProcess() process := getDummyProcess()
require.NotNil(t, process) require.NotNil(t, process)
_, err = rs.GetProcess(process.ID) _, err = rs.GetProcess(process.ID, "", "")
require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) require.Equal(t, ErrUnknownProcess, err)
err = rs.AddProcess(process) err = rs.AddProcess(process)
require.Equal(t, nil, err, "Failed to add process (%s)", err) require.Equal(t, nil, err, "Failed to add process (%s)", err)
_, err = rs.GetProcess(process.ID) _, err = rs.GetProcess(process.ID, "", "")
require.Equal(t, nil, err, "Set process not found (%s)", process.ID) require.Equal(t, nil, err, "Set process not found (%s)", process.ID)
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "stop", state.Order, "Process should be stopped") require.Equal(t, "stop", state.Order, "Process should be stopped")
} }
@@ -107,10 +127,10 @@ func TestAutostartProcess(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID) rs.StopProcess(process.ID, "", "")
} }
func TestAddInvalidProcess(t *testing.T) { func TestAddInvalidProcess(t *testing.T) {
@@ -190,10 +210,10 @@ func TestRemoveProcess(t *testing.T) {
err = rs.AddProcess(process) err = rs.AddProcess(process)
require.Equal(t, nil, err, "Failed to add process (%s)", err) require.Equal(t, nil, err, "Failed to add process (%s)", err)
err = rs.DeleteProcess(process.ID) err = rs.DeleteProcess(process.ID, "", "")
require.Equal(t, nil, err, "Set process not found (%s)", process.ID) require.Equal(t, nil, err, "Set process not found (%s)", process.ID)
_, err = rs.GetProcess(process.ID) _, err = rs.GetProcess(process.ID, "", "")
require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID)
} }
@@ -219,17 +239,17 @@ func TestUpdateProcess(t *testing.T) {
require.NotNil(t, process3) require.NotNil(t, process3)
process3.ID = "process2" process3.ID = "process2"
err = rs.UpdateProcess("process1", process3) err = rs.UpdateProcess("process1", "", "", process3)
require.Error(t, err) require.Error(t, err)
process3.ID = "process3" process3.ID = "process3"
err = rs.UpdateProcess("process1", process3) err = rs.UpdateProcess("process1", "", "", process3)
require.NoError(t, err) require.NoError(t, err)
_, err = rs.GetProcess(process1.ID) _, err = rs.GetProcess(process1.ID, "", "")
require.Error(t, err) require.Error(t, err)
_, err = rs.GetProcess(process3.ID) _, err = rs.GetProcess(process3.ID, "", "")
require.NoError(t, err) require.NoError(t, err)
} }
@@ -255,34 +275,34 @@ func TestGetProcess(t *testing.T) {
rs.AddProcess(process3) rs.AddProcess(process3)
rs.AddProcess(process4) rs.AddProcess(process4)
_, err = rs.GetProcess(process1.ID) _, err = rs.GetProcess(process1.ID, "", "")
require.Equal(t, nil, err) require.Equal(t, nil, err)
list := rs.GetProcessIDs("", "") list := rs.GetProcessIDs("", "", "", "")
require.Len(t, list, 4) require.Len(t, list, 4)
require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list) require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("foo_*", "") list = rs.GetProcessIDs("foo_*", "", "", "")
require.Len(t, list, 2) require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
list = rs.GetProcessIDs("bar_*", "") list = rs.GetProcessIDs("bar_*", "", "", "")
require.Len(t, list, 2) require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("*_bbb_*", "") list = rs.GetProcessIDs("*_bbb_*", "", "", "")
require.Len(t, list, 1) require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list) require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
list = rs.GetProcessIDs("", "foo_*") list = rs.GetProcessIDs("", "foo_*", "", "")
require.Len(t, list, 2) require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
list = rs.GetProcessIDs("", "bar_*") list = rs.GetProcessIDs("", "bar_*", "", "")
require.Len(t, list, 2) require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("", "*_bbb_*") list = rs.GetProcessIDs("", "*_bbb_*", "", "")
require.Len(t, list, 1) require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list) require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
} }
@@ -295,22 +315,22 @@ func TestStartProcess(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
err = rs.StartProcess("foobar") err = rs.StartProcess("foobar", "", "")
require.NotEqual(t, nil, err, "shouldn't be able to start non-existing process") require.NotEqual(t, nil, err, "shouldn't be able to start non-existing process")
err = rs.StartProcess(process.ID) err = rs.StartProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to start existing process") require.Equal(t, nil, err, "should be able to start existing process")
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
err = rs.StartProcess(process.ID) err = rs.StartProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to start already running process") require.Equal(t, nil, err, "should be able to start already running process")
state, _ = rs.GetProcessState(process.ID) state, _ = rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID) rs.StopProcess(process.ID, "", "")
} }
func TestStopProcess(t *testing.T) { func TestStopProcess(t *testing.T) {
@@ -320,21 +340,21 @@ func TestStopProcess(t *testing.T) {
process := getDummyProcess() process := getDummyProcess()
rs.AddProcess(process) rs.AddProcess(process)
rs.StartProcess(process.ID) rs.StartProcess(process.ID, "", "")
err = rs.StopProcess("foobar") err = rs.StopProcess("foobar", "", "")
require.NotEqual(t, nil, err, "shouldn't be able to stop non-existing process") require.NotEqual(t, nil, err, "shouldn't be able to stop non-existing process")
err = rs.StopProcess(process.ID) err = rs.StopProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to stop existing running process") require.Equal(t, nil, err, "should be able to stop existing running process")
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "stop", state.Order, "Process should be stopped") require.Equal(t, "stop", state.Order, "Process should be stopped")
err = rs.StopProcess(process.ID) err = rs.StopProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to stop already stopped process") require.Equal(t, nil, err, "should be able to stop already stopped process")
state, _ = rs.GetProcessState(process.ID) state, _ = rs.GetProcessState(process.ID, "", "")
require.Equal(t, "stop", state.Order, "Process should be stopped") require.Equal(t, "stop", state.Order, "Process should be stopped")
} }
@@ -346,21 +366,21 @@ func TestRestartProcess(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
err = rs.RestartProcess("foobar") err = rs.RestartProcess("foobar", "", "")
require.NotEqual(t, nil, err, "shouldn't be able to restart non-existing process") require.NotEqual(t, nil, err, "shouldn't be able to restart non-existing process")
err = rs.RestartProcess(process.ID) err = rs.RestartProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to restart existing stopped process") require.Equal(t, nil, err, "should be able to restart existing stopped process")
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "stop", state.Order, "Process should be stopped") require.Equal(t, "stop", state.Order, "Process should be stopped")
rs.StartProcess(process.ID) rs.StartProcess(process.ID, "", "")
state, _ = rs.GetProcessState(process.ID) state, _ = rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID) rs.StopProcess(process.ID, "", "")
} }
func TestReloadProcess(t *testing.T) { func TestReloadProcess(t *testing.T) {
@@ -371,27 +391,27 @@ func TestReloadProcess(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
err = rs.ReloadProcess("foobar") err = rs.ReloadProcess("foobar", "", "")
require.NotEqual(t, nil, err, "shouldn't be able to reload non-existing process") require.NotEqual(t, nil, err, "shouldn't be able to reload non-existing process")
err = rs.ReloadProcess(process.ID) err = rs.ReloadProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to reload existing stopped process") require.Equal(t, nil, err, "should be able to reload existing stopped process")
state, _ := rs.GetProcessState(process.ID) state, _ := rs.GetProcessState(process.ID, "", "")
require.Equal(t, "stop", state.Order, "Process should be stopped") require.Equal(t, "stop", state.Order, "Process should be stopped")
rs.StartProcess(process.ID) rs.StartProcess(process.ID, "", "")
state, _ = rs.GetProcessState(process.ID) state, _ = rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
err = rs.ReloadProcess(process.ID) err = rs.ReloadProcess(process.ID, "", "")
require.Equal(t, nil, err, "should be able to reload existing process") require.Equal(t, nil, err, "should be able to reload existing process")
state, _ = rs.GetProcessState(process.ID) state, _ = rs.GetProcessState(process.ID, "", "")
require.Equal(t, "start", state.Order, "Process should be started") require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID) rs.StopProcess(process.ID, "", "")
} }
func TestProbeProcess(t *testing.T) { func TestProbeProcess(t *testing.T) {
@@ -402,7 +422,7 @@ func TestProbeProcess(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
probe := rs.ProbeWithTimeout(process.ID, 5*time.Second) probe := rs.ProbeWithTimeout(process.ID, "", "", 5*time.Second)
require.Equal(t, 3, len(probe.Streams)) require.Equal(t, 3, len(probe.Streams))
} }
@@ -415,12 +435,15 @@ func TestProcessMetadata(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
data, _ := rs.GetProcessMetadata(process.ID, "foobar") data, err := rs.GetProcessMetadata(process.ID, "", "", "foobar")
require.Error(t, ErrMetadataKeyNotFound)
require.Equal(t, nil, data, "nothing should be stored under the key") require.Equal(t, nil, data, "nothing should be stored under the key")
rs.SetProcessMetadata(process.ID, "foobar", process) err = rs.SetProcessMetadata(process.ID, "", "", "foobar", process)
require.NoError(t, err)
data, _ = rs.GetProcessMetadata(process.ID, "foobar") data, err = rs.GetProcessMetadata(process.ID, "", "", "foobar")
require.NoError(t, err)
require.NotEqual(t, nil, data, "there should be something stored under the key") require.NotEqual(t, nil, data, "there should be something stored under the key")
p := data.(*app.Config) p := data.(*app.Config)
@@ -436,26 +459,26 @@ func TestLog(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
_, err = rs.GetProcessLog("foobar") _, err = rs.GetProcessLog("foobar", "", "")
require.NotEqual(t, nil, err, "shouldn't be able to get log from non-existing process") require.NotEqual(t, nil, err, "shouldn't be able to get log from non-existing process")
log, err := rs.GetProcessLog(process.ID) log, err := rs.GetProcessLog(process.ID, "", "")
require.Equal(t, nil, err, "should be able to get log from existing process") require.Equal(t, nil, err, "should be able to get log from existing process")
require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Prelude))
require.Equal(t, 0, len(log.Log)) require.Equal(t, 0, len(log.Log))
rs.StartProcess(process.ID) rs.StartProcess(process.ID, "", "")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
log, _ = rs.GetProcessLog(process.ID) log, _ = rs.GetProcessLog(process.ID, "", "")
require.NotEqual(t, 0, len(log.Prelude)) require.NotEqual(t, 0, len(log.Prelude))
require.NotEqual(t, 0, len(log.Log)) require.NotEqual(t, 0, len(log.Log))
rs.StopProcess(process.ID) rs.StopProcess(process.ID, "", "")
log, _ = rs.GetProcessLog(process.ID) log, _ = rs.GetProcessLog(process.ID, "", "")
require.NotEqual(t, 0, len(log.Prelude)) require.NotEqual(t, 0, len(log.Prelude))
require.NotEqual(t, 0, len(log.Log)) require.NotEqual(t, 0, len(log.Log))
@@ -471,13 +494,13 @@ func TestPlayoutNoRange(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
_, err = rs.GetPlayout("foobar", process.Input[0].ID) _, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID)
require.NotEqual(t, nil, err, "playout of non-existing process should error") require.Equal(t, ErrUnknownProcess, err)
_, err = rs.GetPlayout(process.ID, "foobar") _, err = rs.GetPlayout(process.ID, "", "", "foobar")
require.NotEqual(t, nil, err, "playout of non-existing input should error") require.NotEqual(t, nil, err, "playout of non-existing input should error")
addr, _ := rs.GetPlayout(process.ID, process.Input[0].ID) addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID)
require.Equal(t, 0, len(addr), "the playout address should be empty if no port range is given") require.Equal(t, 0, len(addr), "the playout address should be empty if no port range is given")
} }
@@ -494,13 +517,13 @@ func TestPlayoutRange(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
_, err = rs.GetPlayout("foobar", process.Input[0].ID) _, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID)
require.NotEqual(t, nil, err, "playout of non-existing process should error") require.Equal(t, ErrUnknownProcess, err)
_, err = rs.GetPlayout(process.ID, "foobar") _, err = rs.GetPlayout(process.ID, "", "", "foobar")
require.NotEqual(t, nil, err, "playout of non-existing input should error") require.NotEqual(t, nil, err, "playout of non-existing input should error")
addr, _ := rs.GetPlayout(process.ID, process.Input[0].ID) addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID)
require.NotEqual(t, 0, len(addr), "the playout address should not be empty if a port range is given") require.NotEqual(t, 0, len(addr), "the playout address should not be empty if a port range is given")
require.Equal(t, "127.0.0.1:3000", addr, "the playout address should be 127.0.0.1:3000") require.Equal(t, "127.0.0.1:3000", addr, "the playout address should be 127.0.0.1:3000")
} }
@@ -512,10 +535,9 @@ func TestAddressReference(t *testing.T) {
process1 := getDummyProcess() process1 := getDummyProcess()
process2 := getDummyProcess() process2 := getDummyProcess()
process2.ID = "process2"
rs.AddProcess(process1) rs.AddProcess(process1)
process2.ID = "process2"
process2.Input[0].Address = "#process:foobar=out" process2.Input[0].Address = "#process:foobar=out"
err = rs.AddProcess(process2) err = rs.AddProcess(process2)