From 88f5099972b92365170abdb59d618d686b6ed439 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 14 May 2025 12:53:38 +0200 Subject: [PATCH] Fix/relaxing locking on task to avoid global locking --- restream/manager.go | 4 +- restream/task.go | 177 +++++++++++++++----------------------------- 2 files changed, 60 insertions(+), 121 deletions(-) diff --git a/restream/manager.go b/restream/manager.go index 136a7a2e..ff3549e4 100644 --- a/restream/manager.go +++ b/restream/manager.go @@ -31,9 +31,7 @@ func (m *Storage) Range(onlyValid bool, f func(key app.ProcessID, value *task, t func (m *Storage) Store(id app.ProcessID, t *task) { t, ok := m.tasks.LoadAndStore(id, t) if ok { - t.Lock() t.Destroy() - t.Unlock() } } @@ -67,7 +65,7 @@ func (m *Storage) LoadAndLock(id app.ProcessID) (*task, bool) { return nil, false } - task.lock.Lock() + task.Lock() if !task.IsValid() { task.Unlock() return nil, false diff --git a/restream/task.go b/restream/task.go index cda0da05..8fcee0bf 100644 --- a/restream/task.go +++ b/restream/task.go @@ -2,41 +2,42 @@ package restream import ( "maps" + "sync" + "sync/atomic" "time" "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/math/rand" "github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/restream/app" - - "github.com/puzpuzpuz/xsync/v3" ) type task struct { - valid bool - id string // ID of the task/process - owner string - domain string - reference string - process *app.Process - config *app.Config // Process config with replaced static placeholders - command []string // The actual command parameter for ffmpeg - ffmpeg process.Process - parser parse.Parser - playout map[string]int - logger log.Logger - usesDisk bool // Whether this task uses the disk - hwdevice int // Index of the GPU this task uses - metadata map[string]interface{} + valid *atomic.Bool // Whether the task is valid an can be used + readers *atomic.Int64 // Number of concurrent readers + id string // ID of the task/process + owner string // Owner of the process + domain string // Domain of the process + reference string // reference of the process + process *app.Process // The process definition + config *app.Config // Process config with replaced static placeholders + command []string // The actual command parameter for ffmpeg + ffmpeg process.Process // The OS process + parser parse.Parser // Parser for the OS process' output + playout map[string]int // Port mapping to access playout API + logger log.Logger // Logger + usesDisk bool // Whether this task uses the disk + hwdevice *atomic.Int32 // Index of the GPU this task uses + metadata map[string]interface{} // Metadata of the process - lock *xsync.RBMutex - tokens *xsync.MapOf[string, *xsync.RToken] + lock sync.RWMutex } func NewTask(process *app.Process, logger log.Logger) *task { t := &task{ + valid: &atomic.Bool{}, + readers: &atomic.Int64{}, id: process.ID, owner: process.Owner, domain: process.Domain, @@ -45,9 +46,8 @@ func NewTask(process *app.Process, logger log.Logger) *task { config: process.Config.Clone(), playout: map[string]int{}, logger: logger, + hwdevice: &atomic.Int32{}, metadata: nil, - lock: xsync.NewRBMutex(), - tokens: xsync.NewMapOf[string, *xsync.RToken](), } return t @@ -62,37 +62,20 @@ func (t *task) Unlock() { } func (t *task) RLock() string { - token := "" - for { - token = rand.String(16) - rtoken := t.lock.RLock() - - _, loaded := t.tokens.LoadOrStore(token, rtoken) - if !loaded { - break - } - - t.lock.RUnlock(rtoken) - } - - return token + t.readers.Add(1) + return "" } func (t *task) Release(token string) { - rtoken, ok := t.tokens.LoadAndDelete(token) - if !ok { - return - } - - t.lock.RUnlock(rtoken) + t.readers.Add(-1) } func (t *task) IsValid() bool { - return t.valid + return t.valid.Load() } func (t *task) SetValid(valid bool) { - t.valid = valid + t.valid.Store(valid) } func (t *task) UsesDisk() bool { @@ -112,15 +95,7 @@ func (t *task) String() string { // Restore restores the task's order func (t *task) Restore() error { - if !t.valid { - return ErrInvalidProcessConfig - } - - if t.ffmpeg == nil { - return ErrInvalidProcessConfig - } - - if t.process == nil { + if !t.valid.Load() { return ErrInvalidProcessConfig } @@ -135,18 +110,10 @@ func (t *task) Restore() error { } func (t *task) Start() error { - if !t.valid { + if !t.valid.Load() { return ErrInvalidProcessConfig } - if t.ffmpeg == nil { - return nil - } - - if t.process == nil { - return nil - } - status := t.ffmpeg.Status() if t.process.Order.String() == "start" && status.Order == "start" { @@ -161,12 +128,8 @@ func (t *task) Start() error { } func (t *task) Stop() error { - if t.ffmpeg == nil { - return nil - } - - if t.process == nil { - return nil + if !t.valid.Load() { + return ErrInvalidProcessConfig } status := t.ffmpeg.Status() @@ -184,7 +147,7 @@ func (t *task) Stop() error { // Kill stops a process without changing the tasks order func (t *task) Kill() { - if t.ffmpeg == nil { + if !t.valid.Load() { return } @@ -192,14 +155,10 @@ func (t *task) Kill() { } func (t *task) Restart() error { - if !t.valid { + if !t.valid.Load() { return ErrInvalidProcessConfig } - if t.process == nil { - return nil - } - if t.process.Order.String() == "stop" { return nil } @@ -215,19 +174,7 @@ func (t *task) Restart() error { func (t *task) State() (*app.State, error) { state := &app.State{} - if !t.valid { - return state, nil - } - - if t.ffmpeg == nil { - return state, nil - } - - if t.parser == nil { - return state, nil - } - - if t.process == nil { + if !t.valid.Load() { return state, nil } @@ -318,11 +265,7 @@ func assignConfigID(progress []app.ProgressIO, config []app.ConfigIO) []app.Prog func (t *task) Report() (*app.Report, error) { report := &app.Report{} - if !t.valid { - return report, nil - } - - if t.parser == nil { + if !t.valid.Load() { return report, nil } @@ -346,11 +289,7 @@ func (t *task) Report() (*app.Report, error) { } func (t *task) SetReport(report *app.Report) error { - if !t.valid { - return nil - } - - if t.parser == nil { + if !t.valid.Load() { return nil } @@ -362,12 +301,12 @@ func (t *task) SetReport(report *app.Report) error { } func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { - if t.parser == nil { - return []app.ReportHistorySearchResult{} - } - result := []app.ReportHistorySearchResult{} + if !t.valid.Load() { + return result + } + presult := t.parser.SearchReportHistory(state, from, to) for _, f := range presult { @@ -435,7 +374,7 @@ func (t *task) ExportMetadata() map[string]interface{} { } func (t *task) Limit(cpu, memory, gpu bool) bool { - if t.ffmpeg == nil { + if !t.valid.Load() { return false } @@ -445,15 +384,15 @@ func (t *task) Limit(cpu, memory, gpu bool) bool { } func (t *task) SetHWDevice(index int) { - t.hwdevice = index + t.hwdevice.Store(int32(index)) } func (t *task) GetHWDevice() int { - return t.hwdevice + return int(t.hwdevice.Load()) } func (t *task) Equal(config *app.Config) bool { - if t.process == nil { + if !t.valid.Load() { return false } @@ -461,7 +400,7 @@ func (t *task) Equal(config *app.Config) bool { } func (t *task) ResolvedConfig() *app.Config { - if t.config == nil { + if !t.valid.Load() { return nil } @@ -469,7 +408,7 @@ func (t *task) ResolvedConfig() *app.Config { } func (t *task) Config() *app.Config { - if t.process == nil { + if !t.valid.Load() { return nil } @@ -479,13 +418,15 @@ func (t *task) Config() *app.Config { func (t *task) Destroy() { t.Stop() - t.valid = false - t.process = nil - t.config = nil - t.command = nil - t.ffmpeg = nil - t.parser = nil - t.metadata = map[string]interface{}{} + t.valid.Store(false) + /* + t.process = nil + t.config = nil + t.command = nil + t.ffmpeg = nil + t.parser = nil + t.metadata = map[string]interface{}{} + */ } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { @@ -524,7 +465,7 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool { } func (t *task) Process() *app.Process { - if t.process == nil { + if !t.valid.Load() { return nil } @@ -532,7 +473,7 @@ func (t *task) Process() *app.Process { } func (t *task) Order() string { - if t.process == nil { + if !t.valid.Load() { return "" } @@ -540,7 +481,7 @@ func (t *task) Order() string { } func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { - if t.parser == nil { + if !t.valid.Load() { return nil } @@ -548,7 +489,7 @@ func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { } func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { - if t.parser == nil { + if !t.valid.Load() { return }