Use puzpuzpuz/xsync.MapOf for tasks, abstract tasks

This commit is contained in:
Ingo Oppermann
2024-07-17 16:54:26 +02:00
parent 4d0eed092e
commit 15e1cd7b6f
4 changed files with 632 additions and 421 deletions

View File

@@ -12,14 +12,12 @@ import (
"time"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/net/url"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
@@ -77,34 +75,6 @@ type Config struct {
Logger log.Logger
}
type task struct {
valid bool
id string // ID of the task/process
owner string
domain string
reference string
process *app.Process
config *app.Config // Process config with replaced static placeholders
command []string // The actual command parameter for ffmpeg
ffmpeg process.Process
parser parse.Parser
playout map[string]int
logger log.Logger
usesDisk bool // Whether this task uses the disk
metadata map[string]interface{}
}
func (t *task) ID() app.ProcessID {
return app.ProcessID{
ID: t.id,
Domain: t.domain,
}
}
func (t *task) String() string {
return t.ID().String()
}
type restream struct {
id string
name string
@@ -118,7 +88,7 @@ type restream struct {
}
replace replace.Replacer
rewrite rewrite.Rewriter
tasks map[app.ProcessID]*task // domain:ProcessID
tasks *Storage // domain:ProcessID
metadata map[string]interface{} // global metadata
logger log.Logger
@@ -143,7 +113,7 @@ func New(config Config) (Restreamer, error) {
replace: config.Replace,
rewrite: config.Rewrite,
logger: config.Logger,
tasks: map[app.ProcessID]*task{},
tasks: NewStorage(),
metadata: map[string]interface{}{},
}
@@ -151,19 +121,6 @@ func New(config Config) (Restreamer, error) {
r.logger = log.New("")
}
/*
if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := jsonstore.New(jsonstore.Config{
Filesystem: dummyfs,
})
if err != nil {
return nil, err
}
r.store = s
}
*/
if len(config.Filesystems) == 0 {
return nil, fmt.Errorf("at least one filesystem must be provided")
}
@@ -226,14 +183,14 @@ func (r *restream) Start() {
go r.resourceObserver(ctx, r.resources, time.Second)
}
for id, t := range r.tasks {
if t.process.Order == "start" {
r.startProcess(id)
}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
t.Restore()
// The filesystem cleanup rules can be set
r.setCleanup(id, t.config)
}
r.setCleanup(id, t.Config())
return true
})
for _, fs := range r.fs.list {
fs.Start()
@@ -252,28 +209,27 @@ func (r *restream) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
wg := sync.WaitGroup{}
// Stop the currently running processes without altering their order such that on a subsequent
// Start() they will get restarted.
wg := sync.WaitGroup{}
for _, t := range r.tasks {
if t.ffmpeg == nil {
continue
}
r.tasks.Range(func(_ app.ProcessID, t *task) bool {
wg.Add(1)
go func(p process.Process) {
go func(t *task) {
defer wg.Done()
p.Stop(true)
}(t.ffmpeg)
}
t.Kill()
}(t)
return true
})
wg.Wait()
for id := range r.tasks {
r.tasks.Range(func(id app.ProcessID, _ *task) bool {
r.unsetCleanup(id)
}
return true
})
r.cancelObserver()
@@ -303,24 +259,16 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int
if isFull {
// Stop all tasks that write to this filesystem
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
r.tasks.Range(func(id app.ProcessID, t *task) bool {
if !t.UsesDisk() {
return true
}
if !t.usesDisk {
continue
}
r.logger.Warn().WithField("id", id).Log("Shutting down because filesystem is full")
t.Stop()
if t.process.Order != "start" {
continue
}
r.logger.Warn().Log("Shutting down because filesystem is full")
r.stopProcess(id)
}
r.lock.Unlock()
return true
})
}
}
}
@@ -355,20 +303,17 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
break
}
r.lock.RLock()
for id, t := range r.tasks {
if !t.valid {
continue
}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
if t.Limit(limitCPU, limitMemory) {
r.logger.Debug().WithFields(log.Fields{
"limit_cpu": limitCPU,
"limit_memory": limitMemory,
"id": id,
}).Log("Limiting process CPU and memory consumption")
t.ffmpeg.Limit(limitCPU, limitMemory)
}
r.lock.RUnlock()
return true
})
}
}
}
@@ -384,7 +329,7 @@ func (r *restream) load() error {
return err
}
tasks := make(map[app.ProcessID]*task)
tasks := NewStorage()
skills := r.ffmpeg.Skills()
ffversion := skills.FFmpeg.Version
@@ -420,7 +365,7 @@ func (r *restream) load() error {
// Replace all placeholders in the config
resolveStaticPlaceholders(t.config, r.replace)
tasks[t.ID()] = t
tasks.LoadOrStore(t.ID(), t)
}
}
@@ -428,9 +373,7 @@ func (r *restream) load() error {
// replaced, we can resolve references and validate the
// inputs and outputs.
for _, t := range tasks {
t := t
tasks.Range(func(_ app.ProcessID, t *task) bool {
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
@@ -450,7 +393,7 @@ func (r *restream) load() error {
err := r.resolveAddresses(tasks, t.config)
if err != nil {
t.logger.Warn().WithError(err).Log("Ignoring")
continue
return true
}
// Validate config with all placeholders replaced. However, we need to take care
@@ -461,13 +404,13 @@ func (r *restream) load() error {
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
t.logger.Warn().WithError(err).Log("Ignoring")
continue
return true
}
err = r.setPlayoutPorts(t)
if err != nil {
t.logger.Warn().WithError(err).Log("Ignoring")
continue
return true
}
t.command = t.config.CreateCommand()
@@ -505,14 +448,18 @@ func (r *restream) load() error {
},
})
if err != nil {
return err
return true
}
t.ffmpeg = ffmpeg
t.valid = true
}
return true
})
r.tasks.Clear()
r.tasks = tasks
r.metadata = data.Metadata
return nil
@@ -525,7 +472,7 @@ func (r *restream) save() {
data := store.NewData()
for tid, t := range r.tasks {
r.tasks.Range(func(tid app.ProcessID, t *task) bool {
domain := data.Process[tid.Domain]
if domain == nil {
domain = map[string]store.Process{}
@@ -537,7 +484,9 @@ func (r *restream) save() {
}
data.Process[tid.Domain] = domain
}
return true
})
data.Metadata = r.metadata
@@ -562,36 +511,27 @@ var ErrProcessExists = errors.New("process already exists")
var ErrForbidden = errors.New("forbidden")
func (r *restream) AddProcess(config *app.Config) error {
r.lock.RLock()
t, err := r.createTask(config)
r.lock.RUnlock()
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
tid := t.ID()
_, ok := r.tasks[tid]
_, ok := r.tasks.LoadOrStore(tid, t)
if ok {
return ErrProcessExists
}
r.tasks[tid] = t
// set filesystem cleanup rules
r.setCleanup(tid, t.config)
if t.process.Order == "start" {
err := r.startProcess(tid)
err = t.Restore()
if err != nil {
delete(r.tasks, tid)
r.tasks.Delete(tid)
return err
}
}
r.save()
@@ -1032,7 +972,7 @@ func validateOutputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (strin
}
// resolveAddresses replaces the addresse reference from each input in a config with the actual address.
func (r *restream) resolveAddresses(tasks map[app.ProcessID]*task, config *app.Config) error {
func (r *restream) resolveAddresses(tasks *Storage, config *app.Config) error {
for i, input := range config.Input {
// Resolve any references
address, err := r.resolveAddress(tasks, config.ID, input.Address)
@@ -1049,7 +989,7 @@ func (r *restream) resolveAddresses(tasks map[app.ProcessID]*task, config *app.C
}
// resolveAddress replaces the address reference with the actual address.
func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address string) (string, error) {
func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, error) {
matches, err := parseAddressReference(address)
if err != nil {
return address, err
@@ -1066,13 +1006,15 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str
var t *task = nil
for _, tsk := range tasks {
tasks.Range(func(_ app.ProcessID, tsk *task) bool {
if tsk.id == matches["id"] && tsk.domain == matches["domain"] {
t = tsk
break
}
return false
}
return true
})
if t == nil {
return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address)
}
@@ -1186,13 +1128,13 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
r.lock.Lock()
defer r.lock.Unlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
// If the new config has the same hash as the current config, do nothing.
if task.process.Config.Equal(config) {
if task.Equal(config) {
return nil
}
@@ -1204,22 +1146,18 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
tid := t.ID()
if !tid.Equal(id) {
_, ok := r.tasks[tid]
_, ok := r.tasks.Load(tid)
if ok {
return ErrProcessExists
}
}
t.process.Order = task.process.Order
t.process.Order = task.Order()
if err := r.stopProcess(id); err != nil {
return fmt.Errorf("stop process: %w", err)
}
if err := r.deleteProcess(id); err != nil {
return fmt.Errorf("delete process: %w", err)
}
// This would require a major version jump
//t.process.CreatedAt = task.process.CreatedAt
t.process.UpdatedAt = time.Now().Unix()
@@ -1231,14 +1169,16 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
// Transfer the metadata to the new process
t.metadata = task.metadata
r.tasks[tid] = t
if err := r.deleteProcess(id); err != nil {
return fmt.Errorf("delete process: %w", err)
}
r.tasks.Store(tid, t)
// set filesystem cleanup rules
r.setCleanup(tid, t.config)
r.setCleanup(tid, t.Config())
if t.process.Order == "start" {
r.startProcess(tid)
}
t.Restore()
r.save()
@@ -1246,112 +1186,64 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
}
func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID {
count := 0
var idglob glob.Glob
var refglob glob.Glob
var ownerglob glob.Glob
var domainglob glob.Glob
var idglob glob.Glob = nil
var refglob glob.Glob = nil
var ownerglob glob.Glob = nil
var domainglob glob.Glob = nil
if len(idpattern) != 0 {
count++
idglob, _ = glob.Compile(idpattern)
}
if len(refpattern) != 0 {
count++
refglob, _ = glob.Compile(refpattern)
}
if len(ownerpattern) != 0 {
count++
ownerglob, _ = glob.Compile(ownerpattern)
}
if len(domainpattern) != 0 {
count++
domainglob, _ = glob.Compile(domainpattern)
}
var ids []app.ProcessID
r.lock.RLock()
defer r.lock.RUnlock()
if idglob == nil && refglob == nil && ownerglob == nil && domainglob == nil {
ids = make([]app.ProcessID, 0, r.tasks.Size())
if count == 0 {
ids = make([]app.ProcessID, 0, len(r.tasks))
r.tasks.Range(func(id app.ProcessID, t *task) bool {
ids = append(ids, id)
for _, t := range r.tasks {
tid := app.ProcessID{
ID: t.id,
Domain: t.domain,
}
ids = append(ids, tid)
}
return true
})
} else {
ids = []app.ProcessID{}
for _, t := range r.tasks {
matches := 0
if idglob != nil {
if match := idglob.Match(t.id); match {
matches++
}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
if !t.Match(idglob, refglob, ownerglob, domainglob) {
return true
}
if refglob != nil {
if match := refglob.Match(t.reference); match {
matches++
}
}
ids = append(ids, id)
if ownerglob != nil {
if match := ownerglob.Match(t.owner); match {
matches++
}
}
if domainglob != nil {
if match := domainglob.Match(t.domain); match {
matches++
}
}
if count != matches {
continue
}
tid := app.ProcessID{
ID: t.id,
Domain: t.domain,
}
ids = append(ids, tid)
}
return true
})
}
return ids
}
func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return &app.Process{}, ErrUnknownProcess
}
process := task.process.Clone()
return process, nil
return task.Process(), nil
}
func (r *restream) DeleteProcess(id app.ProcessID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.deleteProcess(id)
if err != nil {
return err
@@ -1363,27 +1255,24 @@ func (r *restream) DeleteProcess(id app.ProcessID) error {
}
func (r *restream) deleteProcess(tid app.ProcessID) error {
task, ok := r.tasks[tid]
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
if task.process.Order != "stop" {
if task.Order() != "stop" {
return fmt.Errorf("the process with the ID '%s' is still running", tid)
}
r.unsetPlayoutPorts(task)
r.unsetCleanup(tid)
delete(r.tasks, tid)
r.tasks.Delete(tid)
return nil
}
func (r *restream) StartProcess(id app.ProcessID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.startProcess(id)
if err != nil {
return err
@@ -1395,40 +1284,22 @@ func (r *restream) StartProcess(id app.ProcessID) error {
}
func (r *restream) startProcess(tid app.ProcessID) error {
task, ok := r.tasks[tid]
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
if !task.valid {
return fmt.Errorf("invalid process definition")
err := task.Start()
if err != nil {
return err
}
if task.ffmpeg != nil {
status := task.ffmpeg.Status()
if task.process.Order == "start" && status.Order == "start" {
return nil
}
}
if r.maxProc > 0 && r.nProc >= r.maxProc {
return fmt.Errorf("max. number of running processes (%d) reached", r.maxProc)
}
task.process.Order = "start"
task.ffmpeg.Start()
r.nProc++
return nil
}
func (r *restream) StopProcess(id app.ProcessID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.stopProcess(id)
if err != nil {
return err
@@ -1440,63 +1311,38 @@ func (r *restream) StopProcess(id app.ProcessID) error {
}
func (r *restream) stopProcess(tid app.ProcessID) error {
task, ok := r.tasks[tid]
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
if task.ffmpeg == nil {
return nil
// TODO: aufpassen mit nProc und nil error. In task.Stop() noch einen error einführen, falls der process nicht läuft.
err := task.Stop()
if err != nil {
return err
}
status := task.ffmpeg.Status()
if task.process.Order == "stop" && status.Order == "stop" {
return nil
}
task.process.Order = "stop"
task.ffmpeg.Stop(true)
r.nProc--
return nil
}
func (r *restream) RestartProcess(id app.ProcessID) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.restartProcess(id)
}
func (r *restream) restartProcess(tid app.ProcessID) error {
task, ok := r.tasks[tid]
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
if !task.valid {
return fmt.Errorf("invalid process definition")
}
if task.process.Order == "stop" {
return nil
}
if task.ffmpeg != nil {
task.ffmpeg.Stop(true)
task.ffmpeg.Start()
}
task.Restart()
return nil
}
func (r *restream) ReloadProcess(id app.ProcessID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.reloadProcess(id)
if err != nil {
return err
@@ -1508,7 +1354,7 @@ func (r *restream) ReloadProcess(id app.ProcessID) error {
}
func (r *restream) reloadProcess(tid app.ProcessID) error {
t, ok := r.tasks[tid]
t, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
@@ -1601,125 +1447,32 @@ func (r *restream) reloadProcess(tid app.ProcessID) error {
func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) {
state := &app.State{}
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return state, ErrUnknownProcess
}
if !task.valid {
return state, nil
}
status := task.ffmpeg.Status()
state.Order = task.process.Order
state.State = status.State
state.States.Marshal(status.States)
state.Time = status.Time.Unix()
state.Memory = status.Memory.Current
state.CPU = status.CPU.Current / status.CPU.NCPU
state.LimitMode = status.LimitMode
state.Resources.CPU = status.CPU
state.Resources.Memory = status.Memory
state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds()
state.Reconnect = -1
state.Command = status.CommandArgs
state.LastLog = task.parser.LastLogline()
if status.Reconnect >= time.Duration(0) {
state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds()
}
progress := task.parser.Progress()
state.Progress.UnmarshalParser(&progress)
for i, p := range state.Progress.Input {
if int(p.Index) >= len(task.process.Config.Input) {
continue
}
state.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID
}
for i, p := range state.Progress.Output {
if int(p.Index) >= len(task.process.Config.Output) {
continue
}
state.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID
}
return state, nil
return task.State()
}
func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) {
report := &app.Report{}
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return report, ErrUnknownProcess
}
if !task.valid {
return report, nil
}
current := task.parser.Report()
report.UnmarshalParser(&current)
history := task.parser.ReportHistory()
report.History = make([]app.ReportHistoryEntry, len(history))
for i, h := range history {
report.History[i].UnmarshalParser(&h)
e := &report.History[i]
for i, p := range e.Progress.Input {
if int(p.Index) >= len(task.process.Config.Input) {
continue
}
e.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID
}
for i, p := range e.Progress.Output {
if int(p.Index) >= len(task.process.Config.Output) {
continue
}
e.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID
}
}
return report, nil
return task.Report()
}
func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
if !task.valid {
return nil
}
_, history := report.MarshalParser()
task.parser.ImportReportHistory(history)
return nil
return task.SetReport(report)
}
func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.ReportHistorySearchResult {
@@ -1727,26 +1480,15 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string,
ids := r.GetProcessIDs(idpattern, refpattern, "", "")
r.lock.RLock()
defer r.lock.RUnlock()
for _, id := range ids {
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
continue
}
presult := task.parser.SearchReportHistory(state, from, to)
presult := task.SearchReportHistory(state, from, to)
for _, f := range presult {
result = append(result, app.ReportHistorySearchResult{
ProcessID: task.id,
Reference: task.reference,
ExitState: f.ExitState,
CreatedAt: f.CreatedAt,
ExitedAt: f.ExitedAt,
})
}
result = append(result, presult...)
}
return result
@@ -1836,10 +1578,7 @@ func (r *restream) ReloadSkills() error {
}
func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return "", ErrUnknownProcess
}
@@ -1856,33 +1595,15 @@ func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error)
return "127.0.0.1:" + strconv.Itoa(port), nil
}
var ErrMetadataKeyNotFound = errors.New("unknown key")
func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interface{}) error {
if len(key) == 0 {
return fmt.Errorf("a key for storing the data has to be provided")
}
r.lock.Lock()
defer r.lock.Unlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
if task.metadata == nil {
task.metadata = make(map[string]interface{})
}
if data == nil {
delete(task.metadata, key)
} else {
task.metadata[key] = data
}
if len(task.metadata) == 0 {
task.metadata = nil
err := task.SetMetadata(key, data)
if err != nil {
return err
}
r.save()
@@ -1891,24 +1612,12 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa
}
func (r *restream) GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
task, ok := r.tasks.Load(id)
if !ok {
return nil, ErrUnknownProcess
}
if len(key) == 0 {
return task.metadata, nil
}
data, ok := task.metadata[key]
if !ok {
return nil, ErrMetadataKeyNotFound
}
return data, nil
return task.GetMetadata(key)
}
func (r *restream) SetMetadata(key string, data interface{}) error {

View File

@@ -884,9 +884,17 @@ func TestTeeAddressReference(t *testing.T) {
r := rs.(*restream)
require.Equal(t, "http://example.com/live.m3u8", r.tasks[app.ProcessID{ID: "process2"}].config.Input[0].Address)
require.Equal(t, "http://example.com/live.m3u8", r.tasks[app.ProcessID{ID: "process3"}].config.Input[0].Address)
require.Equal(t, "rtmp://example.com/live.stream?token=123", r.tasks[app.ProcessID{ID: "process4"}].config.Input[0].Address)
task, ok := r.tasks.Load(app.ProcessID{ID: "process2"})
require.True(t, ok)
require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address)
task, ok = r.tasks.Load(app.ProcessID{ID: "process3"})
require.True(t, ok)
require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address)
task, ok = r.tasks.Load(app.ProcessID{ID: "process4"})
require.True(t, ok)
require.Equal(t, "rtmp://example.com/live.stream?token=123", task.config.Input[0].Address)
}
func TestConfigValidation(t *testing.T) {
@@ -1466,7 +1474,7 @@ func TestProcessReplacer(t *testing.T) {
LogPatterns: []string{},
}
task, ok := rs.tasks[app.ProcessID{ID: "314159265359"}]
task, ok := rs.tasks.Load(app.ProcessID{ID: "314159265359"})
require.True(t, ok)
require.Equal(t, process, task.config)
@@ -1517,7 +1525,7 @@ func TestProcessLimit(t *testing.T) {
rs := rsi.(*restream)
task, ok := rs.tasks[app.ProcessID{ID: process.ID}]
task, ok := rs.tasks.Load(app.ProcessID{ID: process.ID})
require.True(t, ok)
status := task.ffmpeg.Status()

64
restream/manager.go Normal file
View File

@@ -0,0 +1,64 @@
package restream
import (
"github.com/datarhei/core/v16/restream/app"
"github.com/puzpuzpuz/xsync/v3"
)
type Storage struct {
tasks *xsync.MapOf[app.ProcessID, *task]
}
func NewStorage() *Storage {
m := &Storage{
tasks: xsync.NewMapOf[app.ProcessID, *task](),
}
return m
}
func (m *Storage) Range(f func(key app.ProcessID, value *task) bool) {
m.tasks.Range(f)
}
func (m *Storage) Store(id app.ProcessID, t *task) {
m.tasks.Store(id, t)
}
func (m *Storage) LoadOrStore(id app.ProcessID, t *task) (*task, bool) {
return m.tasks.LoadOrStore(id, t)
}
func (m *Storage) Has(id app.ProcessID) bool {
_, hasTask := m.Load(id)
return hasTask
}
func (m *Storage) Load(id app.ProcessID) (*task, bool) {
return m.tasks.Load(id)
}
func (m *Storage) Delete(id app.ProcessID) bool {
if t, ok := m.Load(id); ok {
m.tasks.Delete(id)
t.Destroy()
return true
}
return false
}
func (m *Storage) Size() int {
return m.tasks.Size()
}
func (m *Storage) Clear() {
m.tasks.Range(func(_ app.ProcessID, t *task) bool {
t.Destroy()
return true
})
m.tasks.Clear()
}

430
restream/task.go Normal file
View File

@@ -0,0 +1,430 @@
package restream
import (
"errors"
"sync"
"time"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app"
)
var ErrInvalidProcessConfig = errors.New("invalid process config")
var ErrMetadataKeyNotFound = errors.New("unknown metadata key")
var ErrMetadataKeyRequired = errors.New("a key for storing metadata is required")
type task struct {
valid bool
id string // ID of the task/process
owner string
domain string
reference string
process *app.Process
config *app.Config // Process config with replaced static placeholders
command []string // The actual command parameter for ffmpeg
ffmpeg process.Process
parser parse.Parser
playout map[string]int
logger log.Logger
usesDisk bool // Whether this task uses the disk
metadata map[string]interface{}
lock sync.RWMutex
}
func (t *task) IsValid() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.valid
}
func (t *task) UsesDisk() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.usesDisk
}
func (t *task) ID() app.ProcessID {
return app.ProcessID{
ID: t.id,
Domain: t.domain,
}
}
func (t *task) String() string {
return t.ID().String()
}
// Restore restores the task's order
func (t *task) Restore() error {
t.lock.RLock()
defer t.lock.RUnlock()
if !t.valid {
return ErrInvalidProcessConfig
}
if t.ffmpeg == nil {
return ErrInvalidProcessConfig
}
if t.process.Order == "start" {
err := t.ffmpeg.Start()
if err != nil {
return err
}
}
return nil
}
func (t *task) Start() error {
t.lock.Lock()
defer t.lock.Unlock()
if !t.valid {
return ErrInvalidProcessConfig
}
if t.ffmpeg == nil {
return nil
}
status := t.ffmpeg.Status()
if t.process.Order == "start" && status.Order == "start" {
return nil
}
t.process.Order = "start"
t.ffmpeg.Start()
return nil
}
func (t *task) Stop() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.ffmpeg == nil {
return nil
}
status := t.ffmpeg.Status()
if t.process.Order == "stop" && status.Order == "stop" {
return nil
}
t.process.Order = "stop"
t.ffmpeg.Stop(true)
return nil
}
// Kill stops a process without changing the tasks order
func (t *task) Kill() {
t.lock.RLock()
defer t.lock.RUnlock()
if t.ffmpeg == nil {
return
}
t.ffmpeg.Stop(true)
}
func (t *task) Restart() error {
t.lock.RLock()
defer t.lock.RUnlock()
if !t.valid {
return ErrInvalidProcessConfig
}
if t.process.Order == "stop" {
return nil
}
if t.ffmpeg != nil {
t.ffmpeg.Stop(true)
t.ffmpeg.Start()
}
return nil
}
func (t *task) State() (*app.State, error) {
t.lock.RLock()
defer t.lock.RUnlock()
state := &app.State{}
if !t.valid {
return state, nil
}
status := t.ffmpeg.Status()
state.Order = t.process.Order
state.State = status.State
state.States.Marshal(status.States)
state.Time = status.Time.Unix()
state.Memory = status.Memory.Current
state.CPU = status.CPU.Current / status.CPU.NCPU
state.LimitMode = status.LimitMode
state.Resources.CPU = status.CPU
state.Resources.Memory = status.Memory
state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds()
state.Reconnect = -1
state.Command = status.CommandArgs
state.LastLog = t.parser.LastLogline()
if status.Reconnect >= time.Duration(0) {
state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds()
}
progress := t.parser.Progress()
state.Progress.UnmarshalParser(&progress)
for i, p := range state.Progress.Input {
if int(p.Index) >= len(t.process.Config.Input) {
continue
}
state.Progress.Input[i].ID = t.process.Config.Input[p.Index].ID
}
for i, p := range state.Progress.Output {
if int(p.Index) >= len(t.process.Config.Output) {
continue
}
state.Progress.Output[i].ID = t.process.Config.Output[p.Index].ID
}
return state, nil
}
func (t *task) Report() (*app.Report, error) {
t.lock.RLock()
defer t.lock.RUnlock()
report := &app.Report{}
if !t.valid {
return report, nil
}
current := t.parser.Report()
report.UnmarshalParser(&current)
history := t.parser.ReportHistory()
report.History = make([]app.ReportHistoryEntry, len(history))
for i, h := range history {
report.History[i].UnmarshalParser(&h)
e := &report.History[i]
for i, p := range e.Progress.Input {
if int(p.Index) >= len(t.process.Config.Input) {
continue
}
e.Progress.Input[i].ID = t.process.Config.Input[p.Index].ID
}
for i, p := range e.Progress.Output {
if int(p.Index) >= len(t.process.Config.Output) {
continue
}
e.Progress.Output[i].ID = t.process.Config.Output[p.Index].ID
}
}
return report, nil
}
func (t *task) SetReport(report *app.Report) error {
t.lock.RLock()
defer t.lock.RUnlock()
if !t.valid {
return nil
}
_, history := report.MarshalParser()
t.parser.ImportReportHistory(history)
return nil
}
func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult {
t.lock.RLock()
defer t.lock.RUnlock()
result := []app.ReportHistorySearchResult{}
presult := t.parser.SearchReportHistory(state, from, to)
for _, f := range presult {
result = append(result, app.ReportHistorySearchResult{
ProcessID: t.id,
Reference: t.reference,
ExitState: f.ExitState,
CreatedAt: f.CreatedAt,
ExitedAt: f.ExitedAt,
})
}
return result
}
func (t *task) SetMetadata(key string, data interface{}) error {
t.lock.Lock()
defer t.lock.Unlock()
if len(key) == 0 {
return ErrMetadataKeyRequired
}
if t.metadata == nil {
t.metadata = make(map[string]interface{})
}
if data == nil {
delete(t.metadata, key)
} else {
t.metadata[key] = data
}
if len(t.metadata) == 0 {
t.metadata = nil
}
return nil
}
func (t *task) GetMetadata(key string) (interface{}, error) {
t.lock.RLock()
defer t.lock.RUnlock()
if len(key) == 0 {
return t.metadata, nil
}
data, ok := t.metadata[key]
if !ok {
return nil, ErrMetadataKeyNotFound
}
return data, nil
}
func (t *task) Limit(cpu, memory bool) bool {
t.lock.RLock()
defer t.lock.RUnlock()
if !t.valid {
return false
}
if t.ffmpeg == nil {
return false
}
t.ffmpeg.Limit(cpu, memory)
return true
}
func (t *task) Equal(config *app.Config) bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.process.Config.Equal(config)
}
func (t *task) Config() *app.Config {
t.lock.RLock()
defer t.lock.RUnlock()
return t.config.Clone()
}
func (t *task) Destroy() {
t.Stop()
t.lock.Lock()
defer t.lock.Unlock()
t.valid = false
t.process = nil
t.config = nil
t.command = nil
t.ffmpeg = nil
t.parser = nil
t.metadata = nil
}
func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
t.lock.RLock()
defer t.lock.RUnlock()
count := 0
matches := 0
if id != nil {
count++
if match := id.Match(t.id); match {
matches++
}
}
if reference != nil {
count++
if match := reference.Match(t.reference); match {
matches++
}
}
if owner != nil {
count++
if match := owner.Match(t.owner); match {
matches++
}
}
if domain != nil {
count++
if match := domain.Match(t.domain); match {
matches++
}
}
return count == matches
}
func (t *task) Process() *app.Process {
t.lock.RLock()
defer t.lock.RUnlock()
return t.process.Clone()
}
func (t *task) Order() string {
t.lock.RLock()
defer t.lock.RUnlock()
return t.process.Order
}