diff --git a/restream/core.go b/restream/core.go index c3d8e775..12fd9799 100644 --- a/restream/core.go +++ b/restream/core.go @@ -183,7 +183,9 @@ func (r *restream) Start() { go r.resourceObserver(ctx, r.resources, time.Second) } - r.tasks.Range(func(id app.ProcessID, t *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) + t.Restore() // The filesystem cleanup rules can be set @@ -214,9 +216,10 @@ func (r *restream) Stop() { // Stop the currently running processes without altering their order such that on a subsequent // Start() they will get restarted. - r.tasks.Range(func(_ app.ProcessID, t *task) bool { + r.tasks.Range(true, func(_ app.ProcessID, t *task, token string) bool { wg.Add(1) go func(t *task) { + defer t.Release(token) defer wg.Done() t.Kill() }(t) @@ -226,7 +229,8 @@ func (r *restream) Stop() { wg.Wait() - r.tasks.Range(func(id app.ProcessID, _ *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) r.unsetCleanup(id) return true }) @@ -259,7 +263,8 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int if isFull { // Stop all tasks that write to this filesystem - r.tasks.Range(func(id app.ProcessID, t *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) if !t.UsesDisk() { return true } @@ -315,7 +320,8 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources break } - r.tasks.Range(func(id app.ProcessID, t *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) limitGPU := false gpuindex := t.GetHWDevice() if gpuindex >= 0 { @@ -375,7 +381,7 @@ func (r *restream) load() error { // Replace all placeholders in the config resolveStaticPlaceholders(t.config, r.replace) - tasks.LoadOrStore(t.ID(), t) + tasks.Store(t.ID(), t) } } @@ -383,7 +389,9 @@ func (r *restream) load() error { // replaced, we can resolve references and validate the // inputs and outputs. - tasks.Range(func(_ app.ProcessID, t *task) bool { + tasks.Range(false, func(_ app.ProcessID, t *task, token string) bool { + defer t.Release(token) + // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { @@ -460,7 +468,7 @@ func (r *restream) load() error { } t.ffmpeg = ffmpeg - t.Valid(true) + t.SetValid(true) return true }) @@ -480,7 +488,13 @@ func (r *restream) save() { data := store.NewData() - r.tasks.Range(func(tid app.ProcessID, t *task) bool { + r.tasks.Range(true, func(tid app.ProcessID, t *task, token string) bool { + defer t.Release(token) + + if !t.IsValid() { + return true + } + domain := data.Process[tid.Domain] if domain == nil { domain = map[string]store.Process{} @@ -520,7 +534,6 @@ var ErrForbidden = errors.New("forbidden") func (r *restream) AddProcess(config *app.Config) error { t, err := r.createTask(config) - if err != nil { return err } @@ -529,6 +542,7 @@ func (r *restream) AddProcess(config *app.Config) error { _, ok := r.tasks.LoadOrStore(tid, t) if ok { + t.Destroy() return ErrProcessExists } @@ -538,6 +552,7 @@ func (r *restream) AddProcess(config *app.Config) error { err = t.Restore() if err != nil { r.tasks.Delete(tid) + t.Destroy() return err } @@ -645,7 +660,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.ffmpeg = ffmpeg - t.Valid(true) + t.SetValid(true) return t, nil } @@ -673,8 +688,9 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro selectedGPU = 0 } - if t, hasTask := r.tasks.Load(cfg.ProcessID()); hasTask { + if t, token, hasTask := r.tasks.Load(cfg.ProcessID()); hasTask { t.SetHWDevice(selectedGPU) + t.Release(token) } config := cfg.Clone() @@ -742,12 +758,12 @@ func (r *restream) unsetCleanup(id app.ProcessID) { } } -func (r *restream) setPlayoutPorts(t *task) error { - r.unsetPlayoutPorts(t) +func (r *restream) setPlayoutPorts(task *task) error { + r.unsetPlayoutPorts(task) - t.playout = make(map[string]int) + task.playout = make(map[string]int) - for i, input := range t.config.Input { + for i, input := range task.config.Input { if !strings.HasPrefix(input.Address, "avstream:") && !strings.HasPrefix(input.Address, "playout:") { continue } @@ -771,18 +787,18 @@ func (r *restream) setPlayoutPorts(t *task) error { if port, err := r.ffmpeg.GetPort(); err == nil { options = append(options, "-playout_httpport", strconv.Itoa(port)) - t.logger.WithFields(log.Fields{ + task.logger.WithFields(log.Fields{ "port": port, "input": input.ID, }).Debug().Log("Assinging playout port") - t.playout[input.ID] = port + task.playout[input.ID] = port } else if err != net.ErrNoPortrangerProvided { return err } input.Options = options - t.config.Input[i] = input + task.config.Input[i] = input } return nil @@ -1029,13 +1045,16 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e } var t *task = nil + var ttoken string = "" - tasks.Range(func(_ app.ProcessID, tsk *task) bool { - if tsk.id == matches["id"] && tsk.domain == matches["domain"] { - t = tsk + tasks.Range(true, func(_ app.ProcessID, task *task, token string) bool { + if task.id == matches["id"] && task.domain == matches["domain"] { + t = task + ttoken = token return false } + task.Release(token) return true }) @@ -1043,6 +1062,8 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address) } + defer t.Release(ttoken) + teeOptions := regexp.MustCompile(`^\[[^\]]*\]`) for _, x := range t.config.Output { @@ -1149,7 +1170,15 @@ func parseAddressReference(address string) (map[string]string, error) { } func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { - err := r.updateProcess(id, config) + task, ok := r.tasks.LoadAndLock(id) + if !ok { + return ErrUnknownProcess + } + + err := r.updateProcess(task, config) + + task.Unlock() + if err != nil { return err } @@ -1159,12 +1188,7 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { return nil } -func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error { - task, ok := r.tasks.Load(id) - if !ok { - return ErrUnknownProcess - } - +func (r *restream) updateProcess(task *task, config *app.Config) error { // If the new config has the same hash as the current config, do nothing. if task.Equal(config) { return nil @@ -1177,16 +1201,23 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error { tid := t.ID() - if !tid.Equal(id) { - _, ok := r.tasks.Load(tid) - if ok { + if !tid.Equal(task.ID()) { + if r.tasks.Has(tid) { + t.Destroy() return ErrProcessExists } } - t.process.Order.Set(task.Order()) + order := task.Order() + if len(order) == 0 { + t.Destroy() + return ErrUnknownProcess + } - if err := r.stopProcess(id); err != nil { + t.process.Order.Set(order) + + if err := r.stopProcess(task); err != nil { + t.Destroy() return fmt.Errorf("stop process: %w", err) } @@ -1199,7 +1230,8 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error { // Transfer the metadata to the new process t.ImportMetadata(task.ExportMetadata()) - if err := r.deleteProcess(id); err != nil { + if err := r.deleteProcess(task); err != nil { + t.Destroy() return fmt.Errorf("delete process: %w", err) } @@ -1240,7 +1272,9 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt if idglob == nil && refglob == nil && ownerglob == nil && domainglob == nil { ids = make([]app.ProcessID, 0, r.tasks.Size()) - r.tasks.Range(func(id app.ProcessID, t *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) + ids = append(ids, id) return true @@ -1248,7 +1282,9 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt } else { ids = []app.ProcessID{} - r.tasks.Range(func(id app.ProcessID, t *task) bool { + r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { + defer t.Release(token) + if !t.Match(idglob, refglob, ownerglob, domainglob) { return true } @@ -1263,16 +1299,25 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt } func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) { - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { return &app.Process{}, ErrUnknownProcess } + defer task.Release(token) return task.Process(), nil } func (r *restream) DeleteProcess(id app.ProcessID) error { - err := r.deleteProcess(id) + task, ok := r.tasks.LoadAndLock(id) + if !ok { + return ErrUnknownProcess + } + + err := r.deleteProcess(task) + + task.Unlock() + if err != nil { return err } @@ -1282,26 +1327,31 @@ func (r *restream) DeleteProcess(id app.ProcessID) error { return nil } -func (r *restream) deleteProcess(tid app.ProcessID) error { - task, ok := r.tasks.Load(tid) - if !ok { - return ErrUnknownProcess - } - +func (r *restream) deleteProcess(task *task) error { if task.Order() != "stop" { - return fmt.Errorf("the process with the ID '%s' is still running", tid) + return fmt.Errorf("the process with the ID '%s' is still running", task.String()) } r.unsetPlayoutPorts(task) - r.unsetCleanup(tid) + r.unsetCleanup(task.ID()) - r.tasks.Delete(tid) + r.tasks.Delete(task.ID()) + + task.Destroy() return nil } func (r *restream) StartProcess(id app.ProcessID) error { - err := r.startProcess(id) + task, token, ok := r.tasks.Load(id) + if !ok { + return ErrUnknownProcess + } + + err := r.startProcess(task) + + task.Release(token) + if err != nil { return err } @@ -1311,12 +1361,7 @@ func (r *restream) StartProcess(id app.ProcessID) error { return nil } -func (r *restream) startProcess(tid app.ProcessID) error { - task, ok := r.tasks.Load(tid) - if !ok { - return ErrUnknownProcess - } - +func (r *restream) startProcess(task *task) error { err := task.Start() if err != nil { return err @@ -1328,7 +1373,15 @@ func (r *restream) startProcess(tid app.ProcessID) error { } func (r *restream) StopProcess(id app.ProcessID) error { - err := r.stopProcess(id) + task, token, ok := r.tasks.Load(id) + if !ok { + return ErrUnknownProcess + } + + err := r.stopProcess(task) + + task.Release(token) + if err != nil { return err } @@ -1338,12 +1391,7 @@ func (r *restream) StopProcess(id app.ProcessID) error { return nil } -func (r *restream) stopProcess(tid app.ProcessID) error { - task, ok := r.tasks.Load(tid) - if !ok { - return ErrUnknownProcess - } - +func (r *restream) stopProcess(task *task) error { // TODO: aufpassen mit nProc und nil error. In task.Stop() noch einen error einführen, falls der process nicht läuft. err := task.Stop() if err != nil { @@ -1356,22 +1404,31 @@ func (r *restream) stopProcess(tid app.ProcessID) error { } func (r *restream) RestartProcess(id app.ProcessID) error { - return r.restartProcess(id) -} - -func (r *restream) restartProcess(tid app.ProcessID) error { - task, ok := r.tasks.Load(tid) + task, token, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess } + defer task.Release(token) + return r.restartProcess(task) +} + +func (r *restream) restartProcess(task *task) error { task.Restart() return nil } func (r *restream) ReloadProcess(id app.ProcessID) error { - err := r.reloadProcess(id) + task, ok := r.tasks.LoadAndLock(id) + if !ok { + return ErrUnknownProcess + } + + err := r.reloadProcess(task) + + task.Unlock() + if err != nil { return err } @@ -1381,12 +1438,7 @@ func (r *restream) ReloadProcess(id app.ProcessID) error { return nil } -func (r *restream) reloadProcess(id app.ProcessID) error { - task, ok := r.tasks.Load(id) - if !ok { - return ErrUnknownProcess - } - +func (r *restream) reloadProcess(task *task) error { t, err := r.createTask(task.Config()) if err != nil { return err @@ -1394,20 +1446,27 @@ func (r *restream) reloadProcess(id app.ProcessID) error { tid := t.ID() - t.process.Order.Set(task.Order()) + order := task.Order() + if len(order) == 0 { + t.Destroy() + return ErrUnknownProcess + } + + t.process.Order.Set(order) if err := task.Stop(); err != nil { + t.Destroy() return fmt.Errorf("stop process: %w", err) } // Transfer the report history to the new process - history := task.parser.ReportHistory() - t.parser.ImportReportHistory(history) + t.parser.ImportReportHistory(task.parser.ReportHistory()) // Transfer the metadata to the new process t.metadata = task.metadata - if err := r.deleteProcess(id); err != nil { + if err := r.deleteProcess(task); err != nil { + t.Destroy() return fmt.Errorf("delete process: %w", err) } @@ -1418,18 +1477,17 @@ func (r *restream) reloadProcess(id app.ProcessID) error { t.Restore() - r.save() - return nil } func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { state := &app.State{} - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { return state, ErrUnknownProcess } + defer task.Release(token) return task.State() } @@ -1437,19 +1495,21 @@ func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) { report := &app.Report{} - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { return report, ErrUnknownProcess } + defer task.Release(token) return task.Report() } func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error { - task, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } + defer task.Unlock() return task.SetReport(report) } @@ -1460,14 +1520,15 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, ids := r.GetProcessIDs(idpattern, refpattern, "", "") for _, id := range ids { - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { continue } presult := task.SearchReportHistory(state, from, to) - result = append(result, presult...) + + task.Release(token) } return result @@ -1561,12 +1622,13 @@ func (r *restream) ReloadSkills() error { } func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) { - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { return "", ErrUnknownProcess } + defer task.Release(token) - if !task.valid { + if !task.IsValid() { return "", fmt.Errorf("invalid process definition") } @@ -1579,12 +1641,15 @@ func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) } func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interface{}) error { - task, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } err := task.SetMetadata(key, data) + + task.Unlock() + if err != nil { return err } @@ -1595,10 +1660,11 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa } func (r *restream) GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) { - task, ok := r.tasks.Load(id) + task, token, ok := r.tasks.Load(id) if !ok { return nil, ErrUnknownProcess } + defer task.Release(token) return task.GetMetadata(key) } diff --git a/restream/core_test.go b/restream/core_test.go index d0c4fb5e..4bf0de12 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -988,15 +988,15 @@ func TestTeeAddressReference(t *testing.T) { r := rs.(*restream) - task, ok := r.tasks.Load(app.ProcessID{ID: "process2"}) + task, _, ok := r.tasks.Load(app.ProcessID{ID: "process2"}) require.True(t, ok) require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) - task, ok = r.tasks.Load(app.ProcessID{ID: "process3"}) + task, _, ok = r.tasks.Load(app.ProcessID{ID: "process3"}) require.True(t, ok) require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) - task, ok = r.tasks.Load(app.ProcessID{ID: "process4"}) + task, _, ok = r.tasks.Load(app.ProcessID{ID: "process4"}) require.True(t, ok) require.Equal(t, "rtmp://example.com/live.stream?token=123", task.config.Input[0].Address) } @@ -1598,8 +1598,9 @@ func TestProcessReplacer(t *testing.T) { LogPatterns: []string{}, } - task, ok := rs.tasks.Load(app.ProcessID{ID: "314159265359"}) + task, token, ok := rs.tasks.Load(app.ProcessID{ID: "314159265359"}) require.True(t, ok) + task.Release(token) require.Equal(t, process, task.config) @@ -1692,8 +1693,9 @@ func TestProcessLimit(t *testing.T) { rs := rsi.(*restream) - task, ok := rs.tasks.Load(app.ProcessID{ID: process.ID}) + task, token, ok := rs.tasks.Load(app.ProcessID{ID: process.ID}) require.True(t, ok) + task.Release(token) status := task.ffmpeg.Status() diff --git a/restream/manager.go b/restream/manager.go index c2ce403d..136a7a2e 100644 --- a/restream/manager.go +++ b/restream/manager.go @@ -17,12 +17,24 @@ func NewStorage() *Storage { return m } -func (m *Storage) Range(f func(key app.ProcessID, value *task) bool) { - m.tasks.Range(f) +func (m *Storage) Range(onlyValid bool, f func(key app.ProcessID, value *task, token string) bool) { + m.tasks.Range(func(id app.ProcessID, task *task) bool { + token := task.RLock() + if onlyValid && !task.IsValid() { + task.Release(token) + return true + } + return f(id, task, token) + }) } func (m *Storage) Store(id app.ProcessID, t *task) { - m.tasks.Store(id, t) + t, ok := m.tasks.LoadAndStore(id, t) + if ok { + t.Lock() + t.Destroy() + t.Unlock() + } } func (m *Storage) LoadOrStore(id app.ProcessID, t *task) (*task, bool) { @@ -30,17 +42,41 @@ func (m *Storage) LoadOrStore(id app.ProcessID, t *task) (*task, bool) { } func (m *Storage) Has(id app.ProcessID) bool { - _, hasTask := m.Load(id) + _, hasTask := m.tasks.Load(id) return hasTask } -func (m *Storage) Load(id app.ProcessID) (*task, bool) { - return m.tasks.Load(id) +func (m *Storage) Load(id app.ProcessID) (*task, string, bool) { + task, ok := m.tasks.Load(id) + if !ok { + return nil, "", false + } + + token := task.RLock() + if !task.IsValid() { + task.Release(token) + return nil, "", false + } + return task, token, true +} + +func (m *Storage) LoadAndLock(id app.ProcessID) (*task, bool) { + task, ok := m.tasks.Load(id) + if !ok { + return nil, false + } + + task.lock.Lock() + if !task.IsValid() { + task.Unlock() + return nil, false + } + return task, true } func (m *Storage) Delete(id app.ProcessID) bool { - if t, ok := m.Load(id); ok { + if t, ok := m.tasks.Load(id); ok { m.tasks.Delete(id) t.Destroy() return true diff --git a/restream/task.go b/restream/task.go index 24eeece0..f0ae7a78 100644 --- a/restream/task.go +++ b/restream/task.go @@ -3,12 +3,12 @@ package restream import ( "errors" "maps" - "sync/atomic" "time" "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/math/rand" "github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/restream/app" @@ -32,11 +32,12 @@ type task struct { parser parse.Parser playout map[string]int logger log.Logger - usesDisk bool // Whether this task uses the disk - hwdevice atomic.Int32 // Index of the GPU this task uses + usesDisk bool // Whether this task uses the disk + hwdevice int // Index of the GPU this task uses metadata map[string]interface{} - lock *xsync.RBMutex + lock *xsync.RBMutex + tokens *xsync.MapOf[string, *xsync.RToken] } func NewTask(process *app.Process, logger log.Logger) *task { @@ -51,29 +52,55 @@ func NewTask(process *app.Process, logger log.Logger) *task { logger: logger, metadata: nil, lock: xsync.NewRBMutex(), + tokens: xsync.NewMapOf[string, *xsync.RToken](), } return t } -func (t *task) IsValid() bool { - token := t.lock.RLock() - defer t.lock.RUnlock(token) +func (t *task) Lock() { + t.lock.Lock() +} +func (t *task) Unlock() { + t.lock.Unlock() +} + +func (t *task) RLock() string { + token := "" + for { + token = rand.String(16) + rtoken := t.lock.RLock() + + _, loaded := t.tokens.LoadOrStore(token, rtoken) + if !loaded { + break + } + + t.lock.RUnlock(rtoken) + } + + return token +} + +func (t *task) Release(token string) { + rtoken, ok := t.tokens.LoadAndDelete(token) + if !ok { + return + } + + t.lock.RUnlock(rtoken) +} + +func (t *task) IsValid() bool { return t.valid } -func (t *task) Valid(valid bool) { - t.lock.Lock() - defer t.lock.Unlock() - +func (t *task) SetValid(valid bool) { t.valid = valid } func (t *task) UsesDisk() bool { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - return t.usesDisk } @@ -90,9 +117,6 @@ func (t *task) String() string { // Restore restores the task's order func (t *task) Restore() error { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if !t.valid { return ErrInvalidProcessConfig } @@ -116,9 +140,6 @@ func (t *task) Restore() error { } func (t *task) Start() error { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if !t.valid { return ErrInvalidProcessConfig } @@ -145,9 +166,6 @@ func (t *task) Start() error { } func (t *task) Stop() error { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.ffmpeg == nil { return nil } @@ -171,9 +189,6 @@ func (t *task) Stop() error { // Kill stops a process without changing the tasks order func (t *task) Kill() { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.ffmpeg == nil { return } @@ -182,9 +197,6 @@ func (t *task) Kill() { } func (t *task) Restart() error { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if !t.valid { return ErrInvalidProcessConfig } @@ -206,9 +218,6 @@ func (t *task) Restart() error { } func (t *task) State() (*app.State, error) { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - state := &app.State{} if !t.valid { @@ -312,9 +321,6 @@ func assignConfigID(progress []app.ProgressIO, config []app.ConfigIO) []app.Prog } func (t *task) Report() (*app.Report, error) { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - report := &app.Report{} if !t.valid { @@ -345,9 +351,6 @@ func (t *task) Report() (*app.Report, error) { } func (t *task) SetReport(report *app.Report) error { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if !t.valid { return nil } @@ -364,9 +367,6 @@ func (t *task) SetReport(report *app.Report) error { } func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.parser == nil { return []app.ReportHistorySearchResult{} } @@ -389,9 +389,6 @@ func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.Repo } func (t *task) SetMetadata(key string, data interface{}) error { - t.lock.Lock() - defer t.lock.Unlock() - if len(key) == 0 { return ErrMetadataKeyRequired } @@ -414,16 +411,10 @@ func (t *task) SetMetadata(key string, data interface{}) error { } func (t *task) ImportMetadata(m map[string]interface{}) { - t.lock.Lock() - defer t.lock.Unlock() - t.metadata = m } func (t *task) GetMetadata(key string) (interface{}, error) { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if len(key) == 0 { if t.metadata == nil { return nil, nil @@ -445,16 +436,10 @@ func (t *task) GetMetadata(key string) (interface{}, error) { } func (t *task) ExportMetadata() map[string]interface{} { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - return t.metadata } func (t *task) Limit(cpu, memory, gpu bool) bool { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.ffmpeg == nil { return false } @@ -465,17 +450,14 @@ func (t *task) Limit(cpu, memory, gpu bool) bool { } func (t *task) SetHWDevice(index int) { - t.hwdevice.Store(int32(index)) + t.hwdevice = index } func (t *task) GetHWDevice() int { - return int(t.hwdevice.Load()) + return t.hwdevice } func (t *task) Equal(config *app.Config) bool { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.process == nil { return false } @@ -483,10 +465,15 @@ func (t *task) Equal(config *app.Config) bool { return t.process.Config.Equal(config) } -func (t *task) Config() *app.Config { - token := t.lock.RLock() - defer t.lock.RUnlock(token) +func (t *task) ResolvedConfig() *app.Config { + if t.config == nil { + return nil + } + return t.config.Clone() +} + +func (t *task) Config() *app.Config { if t.process == nil { return nil } @@ -497,9 +484,6 @@ func (t *task) Config() *app.Config { func (t *task) Destroy() { t.Stop() - t.lock.Lock() - defer t.lock.Unlock() - t.valid = false t.process = nil t.config = nil @@ -510,9 +494,6 @@ func (t *task) Destroy() { } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - count := 0 matches := 0 @@ -548,9 +529,6 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool { } func (t *task) Process() *app.Process { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.process == nil { return nil } @@ -559,9 +537,6 @@ func (t *task) Process() *app.Process { } func (t *task) Order() string { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.process == nil { return "" } @@ -570,9 +545,6 @@ func (t *task) Order() string { } func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.parser == nil { return nil } @@ -581,9 +553,6 @@ func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { } func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { - token := t.lock.RLock() - defer t.lock.RUnlock(token) - if t.parser == nil { return }