diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index c61f363a..ac750c23 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -61,7 +61,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) } - p, _ := h.getProcess(config.ID, "config") + p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config") return c.JSON(http.StatusOK, p.Config) } @@ -88,14 +88,16 @@ func (h *RestreamHandler) GetAll(c echo.Context) error { }) idpattern := util.DefaultQuery(c, "idpattern", "") refpattern := util.DefaultQuery(c, "refpattern", "") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - ids := h.restream.GetProcessIDs(idpattern, refpattern) + ids := h.restream.GetProcessIDs(idpattern, refpattern, user, group) processes := []api.Process{} if len(wantids) == 0 || len(reference) != 0 { for _, id := range ids { - if p, err := h.getProcess(id, filter); err == nil { + if p, err := h.getProcess(id, user, group, filter); err == nil { if len(reference) != 0 && p.Reference != reference { continue } @@ -106,7 +108,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error { for _, id := range ids { for _, wantid := range wantids { if wantid == id { - if p, err := h.getProcess(id, filter); err == nil { + if p, err := h.getProcess(id, user, group, filter); err == nil { processes = append(processes, p) } } @@ -132,8 +134,10 @@ func (h *RestreamHandler) GetAll(c echo.Context) error { func (h *RestreamHandler) Get(c echo.Context) error { id := util.PathParam(c, "id") filter := util.DefaultQuery(c, "filter", "") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - p, err := h.getProcess(id, filter) + p, err := h.getProcess(id, user, group, filter) if err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -154,12 +158,14 @@ func (h *RestreamHandler) Get(c echo.Context) error { // @Router /api/v3/process/{id} [delete] func (h *RestreamHandler) Delete(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - if err := h.restream.StopProcess(id); err != nil { + if err := h.restream.StopProcess(id, user, group); err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } - if err := h.restream.DeleteProcess(id); err != nil { + if err := h.restream.DeleteProcess(id, user, group); err != nil { return api.Err(http.StatusInternalServerError, "Process can't be deleted", "%s", err) } @@ -182,6 +188,8 @@ func (h *RestreamHandler) Delete(c echo.Context) error { // @Router /api/v3/process/{id} [put] func (h *RestreamHandler) Update(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") process := api.ProcessConfig{ ID: id, @@ -189,7 +197,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { Autostart: true, } - current, err := h.restream.GetProcess(id) + current, err := h.restream.GetProcess(id, user, group) if err != nil { return api.Err(http.StatusNotFound, "Process not found", "%s", id) } @@ -203,7 +211,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { config := process.Marshal() - if err := h.restream.UpdateProcess(id, config); err != nil { + if err := h.restream.UpdateProcess(id, user, group, config); err != nil { if err == restream.ErrUnknownProcess { return api.Err(http.StatusNotFound, "Process not found", "%s", id) } @@ -211,7 +219,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) } - p, _ := h.getProcess(config.ID, "config") + p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config") return c.JSON(http.StatusOK, p.Config) } @@ -232,6 +240,8 @@ func (h *RestreamHandler) Update(c echo.Context) error { // @Router /api/v3/process/{id}/command [put] func (h *RestreamHandler) Command(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") var command api.Command @@ -241,13 +251,13 @@ func (h *RestreamHandler) Command(c echo.Context) error { var err error if command.Command == "start" { - err = h.restream.StartProcess(id) + err = h.restream.StartProcess(id, user, group) } else if command.Command == "stop" { - err = h.restream.StopProcess(id) + err = h.restream.StopProcess(id, user, group) } else if command.Command == "restart" { - err = h.restream.RestartProcess(id) + err = h.restream.RestartProcess(id, user, group) } else if command.Command == "reload" { - err = h.restream.ReloadProcess(id) + err = h.restream.ReloadProcess(id, user, group) } else { return api.Err(http.StatusBadRequest, "Unknown command provided", "Known commands are: start, stop, reload, restart") } @@ -273,8 +283,10 @@ func (h *RestreamHandler) Command(c echo.Context) error { // @Router /api/v3/process/{id}/config [get] func (h *RestreamHandler) GetConfig(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - p, err := h.restream.GetProcess(id) + p, err := h.restream.GetProcess(id, user, group) if err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -299,8 +311,10 @@ func (h *RestreamHandler) GetConfig(c echo.Context) error { // @Router /api/v3/process/{id}/state [get] func (h *RestreamHandler) GetState(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - s, err := h.restream.GetProcessState(id) + s, err := h.restream.GetProcessState(id, user, group) if err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -325,8 +339,10 @@ func (h *RestreamHandler) GetState(c echo.Context) error { // @Router /api/v3/process/{id}/report [get] func (h *RestreamHandler) GetReport(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - l, err := h.restream.GetProcessLog(id) + l, err := h.restream.GetProcessLog(id, user, group) if err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -349,8 +365,10 @@ func (h *RestreamHandler) GetReport(c echo.Context) error { // @Router /api/v3/process/{id}/probe [get] func (h *RestreamHandler) Probe(c echo.Context) error { id := util.PathParam(c, "id") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - probe := h.restream.Probe(id) + probe := h.restream.Probe(id, user, group) apiprobe := api.Probe{} apiprobe.Unmarshal(&probe) @@ -411,8 +429,10 @@ func (h *RestreamHandler) ReloadSkills(c echo.Context) error { func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error { id := util.PathParam(c, "id") key := util.PathParam(c, "key") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") - data, err := h.restream.GetProcessMetadata(id, key) + data, err := h.restream.GetProcessMetadata(id, user, group, key) if err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -437,6 +457,8 @@ func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error { func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error { id := util.PathParam(c, "id") key := util.PathParam(c, "key") + user := util.DefaultContext(c, "user", "") + group := util.DefaultQuery(c, "group", "") if len(key) == 0 { return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0") @@ -448,7 +470,7 @@ func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if err := h.restream.SetProcessMetadata(id, key, data); err != nil { + if err := h.restream.SetProcessMetadata(id, key, user, group, data); err != nil { return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) } @@ -510,7 +532,7 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error { return c.JSON(http.StatusOK, data) } -func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, error) { +func (h *RestreamHandler) getProcess(id, user, group, filterString string) (api.Process, error) { filter := strings.FieldsFunc(filterString, func(r rune) bool { return r == rune(',') }) @@ -534,7 +556,7 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro } } - process, err := h.restream.GetProcess(id) + process, err := h.restream.GetProcess(id, user, group) if err != nil { return api.Process{}, err } @@ -552,21 +574,21 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro } if wants["state"] { - if state, err := h.restream.GetProcessState(id); err == nil { + if state, err := h.restream.GetProcessState(id, user, group); err == nil { info.State = &api.ProcessState{} info.State.Unmarshal(state) } } if wants["report"] { - if log, err := h.restream.GetProcessLog(id); err == nil { + if log, err := h.restream.GetProcessLog(id, user, group); err == nil { info.Report = &api.ProcessReport{} info.Report.Unmarshal(log) } } if wants["metadata"] { - if data, err := h.restream.GetProcessMetadata(id, ""); err == nil { + if data, err := h.restream.GetProcessMetadata(id, "", user, group); err == nil { info.Metadata = api.NewMetadata(data) } } diff --git a/http/handler/util/util.go b/http/handler/util/util.go index fbbcd2fd..5219aa59 100644 --- a/http/handler/util/util.go +++ b/http/handler/util/util.go @@ -76,3 +76,12 @@ func DefaultQuery(c echo.Context, name, defValue string) string { return param } + +func DefaultContext(c echo.Context, name, defValue string) string { + value, ok := c.Get(name).(string) + if !ok { + return defValue + } + + return value +} diff --git a/iam/identity.go b/iam/identity.go index c9c500d8..39c80ff8 100644 --- a/iam/identity.go +++ b/iam/identity.go @@ -370,6 +370,10 @@ func NewIdentityManager(config IdentityConfig) (IdentityManager, error) { im.logger = log.New("") } + if im.fs == nil { + return nil, fmt.Errorf("no filesystem provided") + } + err := im.load(im.filePath) if err != nil { return nil, err @@ -552,10 +556,6 @@ func (im *identityManager) Rename(oldname, newname string) error { } func (im *identityManager) load(filePath string) error { - if im.fs == nil { - return fmt.Errorf("no filesystem provided") - } - if _, err := im.fs.Stat(filePath); os.IsNotExist(err) { return nil } @@ -587,10 +587,6 @@ func (im *identityManager) Save() error { } func (im *identityManager) save(filePath string) error { - if im.fs == nil { - return fmt.Errorf("no filesystem provided") - } - if filePath == "" { return fmt.Errorf("invalid file path, file path cannot be empty") } diff --git a/monitor/restream.go b/monitor/restream.go index cfd069f4..300f5dc5 100644 --- a/monitor/restream.go +++ b/monitor/restream.go @@ -57,15 +57,15 @@ func (c *restreamCollector) Collect() metric.Metrics { "starting": 0, } - ids := c.r.GetProcessIDs("", "") + ids := c.r.GetProcessIDs("", "", "$superuser", "$none") for _, id := range ids { - state, _ := c.r.GetProcessState(id) + state, _ := c.r.GetProcessState(id, "$superuser", "$none") if state == nil { continue } - proc, _ := c.r.GetProcess(id) + proc, _ := c.r.GetProcess(id, "$superuser", "$none") if proc == nil { continue } diff --git a/restream/app/process.go b/restream/app/process.go index 4ec6036a..7be919f2 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -36,6 +36,8 @@ func (io ConfigIO) Clone() ConfigIO { type Config struct { ID string `json:"id"` Reference string `json:"reference"` + Owner string `json:"owner"` + Group string `json:"group"` FFVersion string `json:"ffversion"` Input []ConfigIO `json:"input"` Output []ConfigIO `json:"output"` @@ -103,6 +105,8 @@ func (config *Config) CreateCommand() []string { type Process struct { ID string `json:"id"` + Owner string `json:"owner"` + Group string `json:"group"` Reference string `json:"reference"` Config *Config `json:"config"` CreatedAt int64 `json:"created_at"` diff --git a/restream/restream.go b/restream/restream.go index fcf38999..847225cf 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -15,6 +15,7 @@ import ( "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/iam" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" @@ -30,31 +31,33 @@ import ( // The Restreamer interface type Restreamer interface { - ID() string // ID of this instance - Name() string // Arbitrary name of this instance - CreatedAt() time.Time // Time of when this instance has been created - Start() // Start all processes that have a "start" order - Stop() // Stop all running process but keep their "start" order - AddProcess(config *app.Config) error // Add a new process - GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference - DeleteProcess(id string) error // Delete a process - UpdateProcess(id string, config *app.Config) error // Update a process - StartProcess(id string) error // Start a process - StopProcess(id string) error // Stop a process - RestartProcess(id string) error // Restart a process - ReloadProcess(id string) error // Reload a process - GetProcess(id string) (*app.Process, error) // Get a process - GetProcessState(id string) (*app.State, error) // Get the state of a process - GetProcessLog(id string) (*app.Log, error) // Get the logs of a process - GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process - Probe(id string) app.Probe // Probe a process - ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout - Skills() skills.Skills // Get the ffmpeg skills - ReloadSkills() error // Reload the ffmpeg skills - SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process - GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process - SetMetadata(key string, data interface{}) error // Set general metadata - GetMetadata(key string) (interface{}, error) // Get previously set general metadata + ID() string // ID of this instance + Name() string // Arbitrary name of this instance + CreatedAt() time.Time // Time of when this instance has been created + Start() // Start all processes that have a "start" order + Stop() // Stop all running process but keep their "start" order + + Skills() skills.Skills // Get the ffmpeg skills + ReloadSkills() error // Reload the ffmpeg skills + SetMetadata(key string, data interface{}) error // Set general metadata + GetMetadata(key string) (interface{}, error) // Get previously set general metadata + + AddProcess(config *app.Config) error // Add a new process + GetProcessIDs(idpattern, refpattern, user, group string) []string // Get a list of process IDs based on patterns for ID and reference + DeleteProcess(id, user, group string) error // Delete a process + UpdateProcess(id, user, group string, config *app.Config) error // Update a process + StartProcess(id, user, group string) error // Start a process + StopProcess(id, user, group string) error // Stop a process + RestartProcess(id, user, group string) error // Restart a process + ReloadProcess(id, user, group string) error // Reload a process + GetProcess(id, user, group string) (*app.Process, error) // Get a process + GetProcessState(id, user, group string) (*app.State, error) // Get the state of a process + GetProcessLog(id, user, group string) (*app.Log, error) // Get the logs of a process + GetPlayout(id, user, group, inputid string) (string, error) // Get the URL of the playout API for a process + Probe(id, user, group string) app.Probe // Probe a process + ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe // Probe a process with specific timeout + SetProcessMetadata(id, user, group, key string, data interface{}) error // Set metatdata to a process + GetProcessMetadata(id, user, group, key string) (interface{}, error) // Get previously set metadata from a process } // Config is the required configuration for a new restreamer instance. @@ -67,11 +70,14 @@ type Config struct { FFmpeg ffmpeg.FFmpeg MaxProcesses int64 Logger log.Logger + IAM iam.IAM } type task struct { valid bool id string // ID of the task/process + owner string + group string reference string process *app.Process config *app.Config @@ -84,6 +90,14 @@ type task struct { metadata map[string]interface{} } +func newTaskid(id, group string) string { + return id + "~" + group +} + +func (t *task) String() string { + return newTaskid(t.id, t.group) +} + type restream struct { id string name string @@ -106,6 +120,8 @@ type restream struct { startOnce sync.Once stopOnce sync.Once + + iam iam.IAM } // New returns a new instance that implements the Restreamer interface @@ -117,12 +133,17 @@ func New(config Config) (Restreamer, error) { store: config.Store, replace: config.Replace, logger: config.Logger, + iam: config.IAM, } if r.logger == nil { r.logger = log.New("") } + if r.iam == nil { + return nil, fmt.Errorf("missing IAM") + } + if r.store == nil { dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) s, err := store.NewJSON(store.JSONConfig{ @@ -281,27 +302,33 @@ func (r *restream) load() error { ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor()) } - for id, process := range data.Process { + for _, process := range data.Process { if len(process.Config.FFVersion) == 0 { process.Config.FFVersion = "^" + ffversion } t := &task{ - id: id, + id: process.ID, + owner: process.Owner, + group: process.Group, reference: process.Reference, process: process, config: process.Config.Clone(), - logger: r.logger.WithField("id", id), + logger: r.logger.WithFields(log.Fields{ + "id": process.ID, + "owner": process.Owner, + "group": process.Group, + }), } // Replace all placeholders in the config resolvePlaceholders(t.config, r.replace) - tasks[id] = t + tasks[t.String()] = t } - for id, userdata := range data.Metadata.Process { - t, ok := tasks[id] + for tid, userdata := range data.Metadata.Process { + t, ok := tasks[tid] if !ok { continue } @@ -312,44 +339,48 @@ func (r *restream) load() error { // Now that all tasks are defined and all placeholders are // replaced, we can resolve references and validate the // inputs and outputs. - for _, t := range tasks { + for tid, t := range tasks { // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { if !c.Check(v) { - r.logger.Warn().WithFields(log.Fields{ - "id": t.id, + t.logger.Warn().WithFields(log.Fields{ "constraint": t.config.FFVersion, "version": skills.FFmpeg.Version, }).WithError(fmt.Errorf("available FFmpeg version doesn't fit constraint; you have to update this process to adjust the constraint")).Log("") } } else { - r.logger.Warn().WithField("id", t.id).WithError(err).Log("") + t.logger.Warn().WithError(err).Log("") } } else { - r.logger.Warn().WithField("id", t.id).WithError(err).Log("") + t.logger.Warn().WithError(err).Log("") + } + + if !r.enforce(t.owner, t.group, t.id, "CREATE") { + t.logger.Warn().WithError(fmt.Errorf("forbidden")).Log("Ignoring") + continue } err := r.resolveAddresses(tasks, t.config) if err != nil { - r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") + t.logger.Warn().WithError(err).Log("Ignoring") continue } t.usesDisk, err = r.validateConfig(t.config) if err != nil { - r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") + t.logger.Warn().WithError(err).Log("Ignoring") continue } err = r.setPlayoutPorts(t) if err != nil { - r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") + t.logger.Warn().WithError(err).Log("Ignoring") continue } t.command = t.config.CreateCommand() - t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) + t.parser = r.ffmpeg.NewProcessParser(t.logger, tid, t.reference) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, @@ -376,15 +407,29 @@ func (r *restream) load() error { func (r *restream) save() { data := store.NewStoreData() - for id, t := range r.tasks { - data.Process[id] = t.process + for tid, t := range r.tasks { + data.Process[tid] = t.process data.Metadata.System = r.metadata - data.Metadata.Process[id] = t.metadata + data.Metadata.Process[tid] = t.metadata } r.store.Store(data) } +func (r *restream) enforce(name, group, processid, action string) bool { + if len(name) == 0 { + name = "$anon" + } + + if len(group) == 0 { + group = "$none" + } + + ok, _ := r.iam.Enforce(name, group, "process:"+processid, action) + + return ok +} + func (r *restream) ID() string { return r.id } @@ -398,9 +443,15 @@ func (r *restream) CreatedAt() time.Time { } var ErrUnknownProcess = errors.New("unknown process") +var ErrUnknownProcessGroup = errors.New("unknown process group") var ErrProcessExists = errors.New("process already exists") +var ErrForbidden = errors.New("forbidden") func (r *restream) AddProcess(config *app.Config) error { + if !r.enforce(config.Owner, config.Group, config.ID, "CREATE") { + return ErrForbidden + } + r.lock.RLock() t, err := r.createTask(config) r.lock.RUnlock() @@ -412,20 +463,22 @@ func (r *restream) AddProcess(config *app.Config) error { r.lock.Lock() defer r.lock.Unlock() - _, ok := r.tasks[t.id] + tid := t.String() + + _, ok := r.tasks[tid] if ok { return ErrProcessExists } - r.tasks[t.id] = t + r.tasks[tid] = t // set filesystem cleanup rules - r.setCleanup(t.id, t.config) + r.setCleanup(tid, t.config) if t.process.Order == "start" { - err := r.startProcess(t.id) + err := r.startProcess(tid) if err != nil { - delete(r.tasks, t.id) + delete(r.tasks, tid) return err } } @@ -450,6 +503,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { process := &app.Process{ ID: config.ID, + Group: config.Group, Reference: config.Reference, Config: config.Clone(), Order: "stop", @@ -462,10 +516,14 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t := &task{ id: config.ID, + group: config.Group, reference: process.Reference, process: process, config: process.Config.Clone(), - logger: r.logger.WithField("id", process.ID), + logger: r.logger.WithFields(log.Fields{ + "id": process.ID, + "group": process.Group, + }), } resolvePlaceholders(t.config, r.replace) @@ -486,7 +544,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { } t.command = t.config.CreateCommand() - t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) + t.parser = r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, @@ -853,7 +911,11 @@ func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (s return address, fmt.Errorf("the process '%s' has no outputs with the ID '%s' (%s)", matches[1], matches[2], address) } -func (r *restream) UpdateProcess(id string, config *app.Config) error { +func (r *restream) UpdateProcess(id, user, group string, config *app.Config) error { + if !r.enforce(user, group, id, "UPDATE") { + return ErrForbidden + } + r.lock.Lock() defer r.lock.Unlock() @@ -862,35 +924,39 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error { return err } - task, ok := r.tasks[id] + tid := newTaskid(id, group) + + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } t.process.Order = task.process.Order - if id != t.id { - _, ok := r.tasks[t.id] + if tid != t.String() { + _, ok := r.tasks[t.String()] if ok { return ErrProcessExists } } - if err := r.stopProcess(id); err != nil { + if err := r.stopProcess(tid); err != nil { return err } - if err := r.deleteProcess(id); err != nil { + if err := r.deleteProcess(tid); err != nil { return err } - r.tasks[t.id] = t + tid = t.String() + + r.tasks[tid] = t // set filesystem cleanup rules - r.setCleanup(t.id, t.config) + r.setCleanup(tid, t.config) if t.process.Order == "start" { - r.startProcess(t.id) + r.startProcess(tid) } r.save() @@ -898,17 +964,23 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error { return nil } -func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { +func (r *restream) GetProcessIDs(idpattern, refpattern, user, group string) []string { r.lock.RLock() defer r.lock.RUnlock() if len(idpattern) == 0 && len(refpattern) == 0 { - ids := make([]string, len(r.tasks)) - i := 0 + ids := []string{} - for id := range r.tasks { - ids[i] = id - i++ + for _, t := range r.tasks { + if t.group != group { + continue + } + + if !r.enforce(user, group, t.id, "GET") { + continue + } + + ids = append(ids, t.id) } return ids @@ -918,8 +990,16 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { count := 0 if len(idpattern) != 0 { - for id := range r.tasks { - match, err := glob.Match(idpattern, id) + for _, t := range r.tasks { + if t.group != group { + continue + } + + if !r.enforce(user, group, t.id, "GET") { + continue + } + + match, err := glob.Match(idpattern, t.id) if err != nil { return nil } @@ -928,7 +1008,7 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { continue } - idmap[id]++ + idmap[t.id]++ } count++ @@ -936,6 +1016,14 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { if len(refpattern) != 0 { for _, t := range r.tasks { + if t.group != group { + continue + } + + if !r.enforce(user, group, t.id, "GET") { + continue + } + match, err := glob.Match(refpattern, t.reference) if err != nil { return nil @@ -964,11 +1052,17 @@ func (r *restream) GetProcessIDs(idpattern, refpattern string) []string { return ids } -func (r *restream) GetProcess(id string) (*app.Process, error) { +func (r *restream) GetProcess(id, user, group string) (*app.Process, error) { + if !r.enforce(user, group, id, "GET") { + return nil, ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - task, ok := r.tasks[id] + task, ok := r.tasks[tid] if !ok { return &app.Process{}, ErrUnknownProcess } @@ -978,11 +1072,17 @@ func (r *restream) GetProcess(id string) (*app.Process, error) { return process, nil } -func (r *restream) DeleteProcess(id string) error { +func (r *restream) DeleteProcess(id, user, group string) error { + if !r.enforce(user, group, id, "DELETE") { + return ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.Lock() defer r.lock.Unlock() - err := r.deleteProcess(id) + err := r.deleteProcess(tid) if err != nil { return err } @@ -992,29 +1092,35 @@ func (r *restream) DeleteProcess(id string) error { return nil } -func (r *restream) deleteProcess(id string) error { - task, ok := r.tasks[id] +func (r *restream) deleteProcess(tid string) error { + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } if task.process.Order != "stop" { - return fmt.Errorf("the process with the ID '%s' is still running", id) + return fmt.Errorf("the process with the ID '%s' is still running", task.id) } r.unsetPlayoutPorts(task) - r.unsetCleanup(id) + r.unsetCleanup(tid) - delete(r.tasks, id) + delete(r.tasks, tid) return nil } -func (r *restream) StartProcess(id string) error { +func (r *restream) StartProcess(id, user, group string) error { + if !r.enforce(user, group, id, "COMMAND") { + return ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.Lock() defer r.lock.Unlock() - err := r.startProcess(id) + err := r.startProcess(tid) if err != nil { return err } @@ -1024,8 +1130,8 @@ func (r *restream) StartProcess(id string) error { return nil } -func (r *restream) startProcess(id string) error { - task, ok := r.tasks[id] +func (r *restream) startProcess(tid string) error { + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } @@ -1053,11 +1159,17 @@ func (r *restream) startProcess(id string) error { return nil } -func (r *restream) StopProcess(id string) error { +func (r *restream) StopProcess(id, user, group string) error { + if !r.enforce(user, group, id, "COMMAND") { + return ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.Lock() defer r.lock.Unlock() - err := r.stopProcess(id) + err := r.stopProcess(tid) if err != nil { return err } @@ -1067,8 +1179,8 @@ func (r *restream) StopProcess(id string) error { return nil } -func (r *restream) stopProcess(id string) error { - task, ok := r.tasks[id] +func (r *restream) stopProcess(tid string) error { + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } @@ -1092,15 +1204,21 @@ func (r *restream) stopProcess(id string) error { return nil } -func (r *restream) RestartProcess(id string) error { +func (r *restream) RestartProcess(id, user, group string) error { + if !r.enforce(user, group, id, "COMMAND") { + return ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - return r.restartProcess(id) + return r.restartProcess(tid) } -func (r *restream) restartProcess(id string) error { - task, ok := r.tasks[id] +func (r *restream) restartProcess(tid string) error { + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } @@ -1118,11 +1236,17 @@ func (r *restream) restartProcess(id string) error { return nil } -func (r *restream) ReloadProcess(id string) error { +func (r *restream) ReloadProcess(id, user, group string) error { + if !r.enforce(user, group, id, "COMMAND") { + return ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.Lock() defer r.lock.Unlock() - err := r.reloadProcess(id) + err := r.reloadProcess(tid) if err != nil { return err } @@ -1132,8 +1256,8 @@ func (r *restream) ReloadProcess(id string) error { return nil } -func (r *restream) reloadProcess(id string) error { - t, ok := r.tasks[id] +func (r *restream) reloadProcess(tid string) error { + t, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } @@ -1164,7 +1288,7 @@ func (r *restream) reloadProcess(id string) error { order := "stop" if t.process.Order == "start" { order = "start" - r.stopProcess(id) + r.stopProcess(tid) } t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) @@ -1185,19 +1309,25 @@ func (r *restream) reloadProcess(id string) error { t.valid = true if order == "start" { - r.startProcess(id) + r.startProcess(tid) } return nil } -func (r *restream) GetProcessState(id string) (*app.State, error) { +func (r *restream) GetProcessState(id, user, group string) (*app.State, error) { state := &app.State{} + if !r.enforce(user, group, id, "GET") { + return state, ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - task, ok := r.tasks[id] + task, ok := r.tasks[tid] if !ok { return state, ErrUnknownProcess } @@ -1254,21 +1384,27 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { return state, nil } -func (r *restream) GetProcessLog(id string) (*app.Log, error) { +func (r *restream) GetProcessLog(id, user, group string) (*app.Log, error) { + log := &app.Log{} + + if !r.enforce(user, group, id, "GET") { + return log, ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - task, ok := r.tasks[id] + task, ok := r.tasks[tid] if !ok { - return &app.Log{}, ErrUnknownProcess + return log, ErrUnknownProcess } if !task.valid { - return &app.Log{}, nil + return log, nil } - log := &app.Log{} - current := task.parser.Report() log.CreatedAt = current.CreatedAt @@ -1303,16 +1439,23 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { return log, nil } -func (r *restream) Probe(id string) app.Probe { - return r.ProbeWithTimeout(id, 20*time.Second) +func (r *restream) Probe(id, user, group string) app.Probe { + return r.ProbeWithTimeout(id, user, group, 20*time.Second) } -func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe { - r.lock.RLock() - +func (r *restream) ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe { appprobe := app.Probe{} - task, ok := r.tasks[id] + if !r.enforce(user, group, id, "PROBE") { + appprobe.Log = append(appprobe.Log, ErrForbidden.Error()) + return appprobe + } + + tid := newTaskid(id, group) + + r.lock.RLock() + + task, ok := r.tasks[tid] if !ok { appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id)) r.lock.RUnlock() @@ -1376,11 +1519,17 @@ func (r *restream) ReloadSkills() error { return r.ffmpeg.ReloadSkills() } -func (r *restream) GetPlayout(id, inputid string) (string, error) { +func (r *restream) GetPlayout(id, user, group, inputid string) (string, error) { + if !r.enforce(user, group, id, "PLAYOUT") { + return "", ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - task, ok := r.tasks[id] + task, ok := r.tasks[tid] if !ok { return "", ErrUnknownProcess } @@ -1399,15 +1548,21 @@ func (r *restream) GetPlayout(id, inputid string) (string, error) { var ErrMetadataKeyNotFound = errors.New("unknown key") -func (r *restream) SetProcessMetadata(id, key string, data interface{}) error { - r.lock.Lock() - defer r.lock.Unlock() +func (r *restream) SetProcessMetadata(id, user, group, key string, data interface{}) error { + if !r.enforce(user, group, id, "METADATA") { + return ErrForbidden + } if len(key) == 0 { return fmt.Errorf("a key for storing the data has to be provided") } - task, ok := r.tasks[id] + tid := newTaskid(id, group) + + r.lock.Lock() + defer r.lock.Unlock() + + task, ok := r.tasks[tid] if !ok { return ErrUnknownProcess } @@ -1431,11 +1586,17 @@ func (r *restream) SetProcessMetadata(id, key string, data interface{}) error { return nil } -func (r *restream) GetProcessMetadata(id, key string) (interface{}, error) { +func (r *restream) GetProcessMetadata(id, user, group, key string) (interface{}, error) { + if !r.enforce(user, group, id, "METADATA") { + return nil, ErrForbidden + } + + tid := newTaskid(id, group) + r.lock.RLock() defer r.lock.RUnlock() - task, ok := r.tasks[id] + task, ok := r.tasks[tid] if !ok { return nil, ErrUnknownProcess } @@ -1501,6 +1662,7 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) { vars := map[string]string{ "processid": config.ID, "reference": config.Reference, + "group": config.Group, } for i, option := range config.Options { diff --git a/restream/restream_test.go b/restream/restream_test.go index 11b08240..ca256fe6 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -6,7 +6,9 @@ import ( "time" "github.com/datarhei/core/v16/ffmpeg" + "github.com/datarhei/core/v16/iam" "github.com/datarhei/core/v16/internal/testhelper" + "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" @@ -30,9 +32,27 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, err } + dummyfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + if err != nil { + return nil, err + } + + iam, err := iam.NewIAM(iam.Config{ + FS: dummyfs, + Superuser: iam.User{ + Name: "foobar", + }, + JWTRealm: "", + JWTSecret: "", + Logger: nil, + }) + + iam.AddPolicy("$anon", "$none", "process:*", "CREATE|GET|DELETE|UPDATE|COMMAND|PROBE|METADATA|PLAYOUT") + rs, err := New(Config{ FFmpeg: ffmpeg, Replace: replacer, + IAM: iam, }) if err != nil { return nil, err @@ -85,16 +105,16 @@ func TestAddProcess(t *testing.T) { process := getDummyProcess() require.NotNil(t, process) - _, err = rs.GetProcess(process.ID) - require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) + _, err = rs.GetProcess(process.ID, "", "") + require.Equal(t, ErrUnknownProcess, err) err = rs.AddProcess(process) require.Equal(t, nil, err, "Failed to add process (%s)", err) - _, err = rs.GetProcess(process.ID) + _, err = rs.GetProcess(process.ID, "", "") require.Equal(t, nil, err, "Set process not found (%s)", process.ID) - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "stop", state.Order, "Process should be stopped") } @@ -107,10 +127,10 @@ func TestAutostartProcess(t *testing.T) { rs.AddProcess(process) - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - rs.StopProcess(process.ID) + rs.StopProcess(process.ID, "", "") } func TestAddInvalidProcess(t *testing.T) { @@ -190,10 +210,10 @@ func TestRemoveProcess(t *testing.T) { err = rs.AddProcess(process) require.Equal(t, nil, err, "Failed to add process (%s)", err) - err = rs.DeleteProcess(process.ID) + err = rs.DeleteProcess(process.ID, "", "") require.Equal(t, nil, err, "Set process not found (%s)", process.ID) - _, err = rs.GetProcess(process.ID) + _, err = rs.GetProcess(process.ID, "", "") require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) } @@ -219,17 +239,17 @@ func TestUpdateProcess(t *testing.T) { require.NotNil(t, process3) process3.ID = "process2" - err = rs.UpdateProcess("process1", process3) + err = rs.UpdateProcess("process1", "", "", process3) require.Error(t, err) process3.ID = "process3" - err = rs.UpdateProcess("process1", process3) + err = rs.UpdateProcess("process1", "", "", process3) require.NoError(t, err) - _, err = rs.GetProcess(process1.ID) + _, err = rs.GetProcess(process1.ID, "", "") require.Error(t, err) - _, err = rs.GetProcess(process3.ID) + _, err = rs.GetProcess(process3.ID, "", "") require.NoError(t, err) } @@ -255,34 +275,34 @@ func TestGetProcess(t *testing.T) { rs.AddProcess(process3) rs.AddProcess(process4) - _, err = rs.GetProcess(process1.ID) + _, err = rs.GetProcess(process1.ID, "", "") require.Equal(t, nil, err) - list := rs.GetProcessIDs("", "") + list := rs.GetProcessIDs("", "", "", "") require.Len(t, list, 4) require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list) - list = rs.GetProcessIDs("foo_*", "") + list = rs.GetProcessIDs("foo_*", "", "", "") require.Len(t, list, 2) require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) - list = rs.GetProcessIDs("bar_*", "") + list = rs.GetProcessIDs("bar_*", "", "", "") require.Len(t, list, 2) require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) - list = rs.GetProcessIDs("*_bbb_*", "") + list = rs.GetProcessIDs("*_bbb_*", "", "", "") require.Len(t, list, 1) require.ElementsMatch(t, []string{"bar_bbb_2"}, list) - list = rs.GetProcessIDs("", "foo_*") + list = rs.GetProcessIDs("", "foo_*", "", "") require.Len(t, list, 2) require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) - list = rs.GetProcessIDs("", "bar_*") + list = rs.GetProcessIDs("", "bar_*", "", "") require.Len(t, list, 2) require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) - list = rs.GetProcessIDs("", "*_bbb_*") + list = rs.GetProcessIDs("", "*_bbb_*", "", "") require.Len(t, list, 1) require.ElementsMatch(t, []string{"bar_bbb_2"}, list) } @@ -295,22 +315,22 @@ func TestStartProcess(t *testing.T) { rs.AddProcess(process) - err = rs.StartProcess("foobar") + err = rs.StartProcess("foobar", "", "") require.NotEqual(t, nil, err, "shouldn't be able to start non-existing process") - err = rs.StartProcess(process.ID) + err = rs.StartProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to start existing process") - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - err = rs.StartProcess(process.ID) + err = rs.StartProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to start already running process") - state, _ = rs.GetProcessState(process.ID) + state, _ = rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - rs.StopProcess(process.ID) + rs.StopProcess(process.ID, "", "") } func TestStopProcess(t *testing.T) { @@ -320,21 +340,21 @@ func TestStopProcess(t *testing.T) { process := getDummyProcess() rs.AddProcess(process) - rs.StartProcess(process.ID) + rs.StartProcess(process.ID, "", "") - err = rs.StopProcess("foobar") + err = rs.StopProcess("foobar", "", "") require.NotEqual(t, nil, err, "shouldn't be able to stop non-existing process") - err = rs.StopProcess(process.ID) + err = rs.StopProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to stop existing running process") - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "stop", state.Order, "Process should be stopped") - err = rs.StopProcess(process.ID) + err = rs.StopProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to stop already stopped process") - state, _ = rs.GetProcessState(process.ID) + state, _ = rs.GetProcessState(process.ID, "", "") require.Equal(t, "stop", state.Order, "Process should be stopped") } @@ -346,21 +366,21 @@ func TestRestartProcess(t *testing.T) { rs.AddProcess(process) - err = rs.RestartProcess("foobar") + err = rs.RestartProcess("foobar", "", "") require.NotEqual(t, nil, err, "shouldn't be able to restart non-existing process") - err = rs.RestartProcess(process.ID) + err = rs.RestartProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to restart existing stopped process") - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "stop", state.Order, "Process should be stopped") - rs.StartProcess(process.ID) + rs.StartProcess(process.ID, "", "") - state, _ = rs.GetProcessState(process.ID) + state, _ = rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - rs.StopProcess(process.ID) + rs.StopProcess(process.ID, "", "") } func TestReloadProcess(t *testing.T) { @@ -371,27 +391,27 @@ func TestReloadProcess(t *testing.T) { rs.AddProcess(process) - err = rs.ReloadProcess("foobar") + err = rs.ReloadProcess("foobar", "", "") require.NotEqual(t, nil, err, "shouldn't be able to reload non-existing process") - err = rs.ReloadProcess(process.ID) + err = rs.ReloadProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to reload existing stopped process") - state, _ := rs.GetProcessState(process.ID) + state, _ := rs.GetProcessState(process.ID, "", "") require.Equal(t, "stop", state.Order, "Process should be stopped") - rs.StartProcess(process.ID) + rs.StartProcess(process.ID, "", "") - state, _ = rs.GetProcessState(process.ID) + state, _ = rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - err = rs.ReloadProcess(process.ID) + err = rs.ReloadProcess(process.ID, "", "") require.Equal(t, nil, err, "should be able to reload existing process") - state, _ = rs.GetProcessState(process.ID) + state, _ = rs.GetProcessState(process.ID, "", "") require.Equal(t, "start", state.Order, "Process should be started") - rs.StopProcess(process.ID) + rs.StopProcess(process.ID, "", "") } func TestProbeProcess(t *testing.T) { @@ -402,7 +422,7 @@ func TestProbeProcess(t *testing.T) { rs.AddProcess(process) - probe := rs.ProbeWithTimeout(process.ID, 5*time.Second) + probe := rs.ProbeWithTimeout(process.ID, "", "", 5*time.Second) require.Equal(t, 3, len(probe.Streams)) } @@ -415,12 +435,15 @@ func TestProcessMetadata(t *testing.T) { rs.AddProcess(process) - data, _ := rs.GetProcessMetadata(process.ID, "foobar") + data, err := rs.GetProcessMetadata(process.ID, "", "", "foobar") + require.Error(t, ErrMetadataKeyNotFound) require.Equal(t, nil, data, "nothing should be stored under the key") - rs.SetProcessMetadata(process.ID, "foobar", process) + err = rs.SetProcessMetadata(process.ID, "", "", "foobar", process) + require.NoError(t, err) - data, _ = rs.GetProcessMetadata(process.ID, "foobar") + data, err = rs.GetProcessMetadata(process.ID, "", "", "foobar") + require.NoError(t, err) require.NotEqual(t, nil, data, "there should be something stored under the key") p := data.(*app.Config) @@ -436,26 +459,26 @@ func TestLog(t *testing.T) { rs.AddProcess(process) - _, err = rs.GetProcessLog("foobar") + _, err = rs.GetProcessLog("foobar", "", "") require.NotEqual(t, nil, err, "shouldn't be able to get log from non-existing process") - log, err := rs.GetProcessLog(process.ID) + log, err := rs.GetProcessLog(process.ID, "", "") require.Equal(t, nil, err, "should be able to get log from existing process") require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Log)) - rs.StartProcess(process.ID) + rs.StartProcess(process.ID, "", "") time.Sleep(3 * time.Second) - log, _ = rs.GetProcessLog(process.ID) + log, _ = rs.GetProcessLog(process.ID, "", "") require.NotEqual(t, 0, len(log.Prelude)) require.NotEqual(t, 0, len(log.Log)) - rs.StopProcess(process.ID) + rs.StopProcess(process.ID, "", "") - log, _ = rs.GetProcessLog(process.ID) + log, _ = rs.GetProcessLog(process.ID, "", "") require.NotEqual(t, 0, len(log.Prelude)) require.NotEqual(t, 0, len(log.Log)) @@ -471,13 +494,13 @@ func TestPlayoutNoRange(t *testing.T) { rs.AddProcess(process) - _, err = rs.GetPlayout("foobar", process.Input[0].ID) - require.NotEqual(t, nil, err, "playout of non-existing process should error") + _, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID) + require.Equal(t, ErrUnknownProcess, err) - _, err = rs.GetPlayout(process.ID, "foobar") + _, err = rs.GetPlayout(process.ID, "", "", "foobar") require.NotEqual(t, nil, err, "playout of non-existing input should error") - addr, _ := rs.GetPlayout(process.ID, process.Input[0].ID) + addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID) require.Equal(t, 0, len(addr), "the playout address should be empty if no port range is given") } @@ -494,13 +517,13 @@ func TestPlayoutRange(t *testing.T) { rs.AddProcess(process) - _, err = rs.GetPlayout("foobar", process.Input[0].ID) - require.NotEqual(t, nil, err, "playout of non-existing process should error") + _, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID) + require.Equal(t, ErrUnknownProcess, err) - _, err = rs.GetPlayout(process.ID, "foobar") + _, err = rs.GetPlayout(process.ID, "", "", "foobar") require.NotEqual(t, nil, err, "playout of non-existing input should error") - addr, _ := rs.GetPlayout(process.ID, process.Input[0].ID) + addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID) require.NotEqual(t, 0, len(addr), "the playout address should not be empty if a port range is given") require.Equal(t, "127.0.0.1:3000", addr, "the playout address should be 127.0.0.1:3000") } @@ -512,10 +535,9 @@ func TestAddressReference(t *testing.T) { process1 := getDummyProcess() process2 := getDummyProcess() - process2.ID = "process2" - rs.AddProcess(process1) + process2.ID = "process2" process2.Input[0].Address = "#process:foobar=out" err = rs.AddProcess(process2)