Fix/relaxing locking on task to avoid global locking

This commit is contained in:
Ingo Oppermann
2025-05-14 12:53:38 +02:00
parent 8807cd2d79
commit 88f5099972
2 changed files with 60 additions and 121 deletions

View File

@@ -31,9 +31,7 @@ func (m *Storage) Range(onlyValid bool, f func(key app.ProcessID, value *task, t
func (m *Storage) Store(id app.ProcessID, t *task) { func (m *Storage) Store(id app.ProcessID, t *task) {
t, ok := m.tasks.LoadAndStore(id, t) t, ok := m.tasks.LoadAndStore(id, t)
if ok { if ok {
t.Lock()
t.Destroy() t.Destroy()
t.Unlock()
} }
} }
@@ -67,7 +65,7 @@ func (m *Storage) LoadAndLock(id app.ProcessID) (*task, bool) {
return nil, false return nil, false
} }
task.lock.Lock() task.Lock()
if !task.IsValid() { if !task.IsValid() {
task.Unlock() task.Unlock()
return nil, false return nil, false

View File

@@ -2,41 +2,42 @@ package restream
import ( import (
"maps" "maps"
"sync"
"sync/atomic"
"time" "time"
"github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/math/rand"
"github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
"github.com/puzpuzpuz/xsync/v3"
) )
type task struct { type task struct {
valid bool valid *atomic.Bool // Whether the task is valid an can be used
readers *atomic.Int64 // Number of concurrent readers
id string // ID of the task/process id string // ID of the task/process
owner string owner string // Owner of the process
domain string domain string // Domain of the process
reference string reference string // reference of the process
process *app.Process process *app.Process // The process definition
config *app.Config // Process config with replaced static placeholders config *app.Config // Process config with replaced static placeholders
command []string // The actual command parameter for ffmpeg command []string // The actual command parameter for ffmpeg
ffmpeg process.Process ffmpeg process.Process // The OS process
parser parse.Parser parser parse.Parser // Parser for the OS process' output
playout map[string]int playout map[string]int // Port mapping to access playout API
logger log.Logger logger log.Logger // Logger
usesDisk bool // Whether this task uses the disk usesDisk bool // Whether this task uses the disk
hwdevice int // Index of the GPU this task uses hwdevice *atomic.Int32 // Index of the GPU this task uses
metadata map[string]interface{} metadata map[string]interface{} // Metadata of the process
lock *xsync.RBMutex lock sync.RWMutex
tokens *xsync.MapOf[string, *xsync.RToken]
} }
func NewTask(process *app.Process, logger log.Logger) *task { func NewTask(process *app.Process, logger log.Logger) *task {
t := &task{ t := &task{
valid: &atomic.Bool{},
readers: &atomic.Int64{},
id: process.ID, id: process.ID,
owner: process.Owner, owner: process.Owner,
domain: process.Domain, domain: process.Domain,
@@ -45,9 +46,8 @@ func NewTask(process *app.Process, logger log.Logger) *task {
config: process.Config.Clone(), config: process.Config.Clone(),
playout: map[string]int{}, playout: map[string]int{},
logger: logger, logger: logger,
hwdevice: &atomic.Int32{},
metadata: nil, metadata: nil,
lock: xsync.NewRBMutex(),
tokens: xsync.NewMapOf[string, *xsync.RToken](),
} }
return t return t
@@ -62,37 +62,20 @@ func (t *task) Unlock() {
} }
func (t *task) RLock() string { func (t *task) RLock() string {
token := "" t.readers.Add(1)
for { return ""
token = rand.String(16)
rtoken := t.lock.RLock()
_, loaded := t.tokens.LoadOrStore(token, rtoken)
if !loaded {
break
}
t.lock.RUnlock(rtoken)
}
return token
} }
func (t *task) Release(token string) { func (t *task) Release(token string) {
rtoken, ok := t.tokens.LoadAndDelete(token) t.readers.Add(-1)
if !ok {
return
}
t.lock.RUnlock(rtoken)
} }
func (t *task) IsValid() bool { func (t *task) IsValid() bool {
return t.valid return t.valid.Load()
} }
func (t *task) SetValid(valid bool) { func (t *task) SetValid(valid bool) {
t.valid = valid t.valid.Store(valid)
} }
func (t *task) UsesDisk() bool { func (t *task) UsesDisk() bool {
@@ -112,15 +95,7 @@ func (t *task) String() string {
// Restore restores the task's order // Restore restores the task's order
func (t *task) Restore() error { func (t *task) Restore() error {
if !t.valid { if !t.valid.Load() {
return ErrInvalidProcessConfig
}
if t.ffmpeg == nil {
return ErrInvalidProcessConfig
}
if t.process == nil {
return ErrInvalidProcessConfig return ErrInvalidProcessConfig
} }
@@ -135,18 +110,10 @@ func (t *task) Restore() error {
} }
func (t *task) Start() error { func (t *task) Start() error {
if !t.valid { if !t.valid.Load() {
return ErrInvalidProcessConfig return ErrInvalidProcessConfig
} }
if t.ffmpeg == nil {
return nil
}
if t.process == nil {
return nil
}
status := t.ffmpeg.Status() status := t.ffmpeg.Status()
if t.process.Order.String() == "start" && status.Order == "start" { if t.process.Order.String() == "start" && status.Order == "start" {
@@ -161,12 +128,8 @@ func (t *task) Start() error {
} }
func (t *task) Stop() error { func (t *task) Stop() error {
if t.ffmpeg == nil { if !t.valid.Load() {
return nil return ErrInvalidProcessConfig
}
if t.process == nil {
return nil
} }
status := t.ffmpeg.Status() status := t.ffmpeg.Status()
@@ -184,7 +147,7 @@ func (t *task) Stop() error {
// Kill stops a process without changing the tasks order // Kill stops a process without changing the tasks order
func (t *task) Kill() { func (t *task) Kill() {
if t.ffmpeg == nil { if !t.valid.Load() {
return return
} }
@@ -192,14 +155,10 @@ func (t *task) Kill() {
} }
func (t *task) Restart() error { func (t *task) Restart() error {
if !t.valid { if !t.valid.Load() {
return ErrInvalidProcessConfig return ErrInvalidProcessConfig
} }
if t.process == nil {
return nil
}
if t.process.Order.String() == "stop" { if t.process.Order.String() == "stop" {
return nil return nil
} }
@@ -215,19 +174,7 @@ func (t *task) Restart() error {
func (t *task) State() (*app.State, error) { func (t *task) State() (*app.State, error) {
state := &app.State{} state := &app.State{}
if !t.valid { if !t.valid.Load() {
return state, nil
}
if t.ffmpeg == nil {
return state, nil
}
if t.parser == nil {
return state, nil
}
if t.process == nil {
return state, nil return state, nil
} }
@@ -318,11 +265,7 @@ func assignConfigID(progress []app.ProgressIO, config []app.ConfigIO) []app.Prog
func (t *task) Report() (*app.Report, error) { func (t *task) Report() (*app.Report, error) {
report := &app.Report{} report := &app.Report{}
if !t.valid { if !t.valid.Load() {
return report, nil
}
if t.parser == nil {
return report, nil return report, nil
} }
@@ -346,11 +289,7 @@ func (t *task) Report() (*app.Report, error) {
} }
func (t *task) SetReport(report *app.Report) error { func (t *task) SetReport(report *app.Report) error {
if !t.valid { if !t.valid.Load() {
return nil
}
if t.parser == nil {
return nil return nil
} }
@@ -362,12 +301,12 @@ func (t *task) SetReport(report *app.Report) error {
} }
func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult {
if t.parser == nil {
return []app.ReportHistorySearchResult{}
}
result := []app.ReportHistorySearchResult{} result := []app.ReportHistorySearchResult{}
if !t.valid.Load() {
return result
}
presult := t.parser.SearchReportHistory(state, from, to) presult := t.parser.SearchReportHistory(state, from, to)
for _, f := range presult { for _, f := range presult {
@@ -435,7 +374,7 @@ func (t *task) ExportMetadata() map[string]interface{} {
} }
func (t *task) Limit(cpu, memory, gpu bool) bool { func (t *task) Limit(cpu, memory, gpu bool) bool {
if t.ffmpeg == nil { if !t.valid.Load() {
return false return false
} }
@@ -445,15 +384,15 @@ func (t *task) Limit(cpu, memory, gpu bool) bool {
} }
func (t *task) SetHWDevice(index int) { func (t *task) SetHWDevice(index int) {
t.hwdevice = index t.hwdevice.Store(int32(index))
} }
func (t *task) GetHWDevice() int { func (t *task) GetHWDevice() int {
return t.hwdevice return int(t.hwdevice.Load())
} }
func (t *task) Equal(config *app.Config) bool { func (t *task) Equal(config *app.Config) bool {
if t.process == nil { if !t.valid.Load() {
return false return false
} }
@@ -461,7 +400,7 @@ func (t *task) Equal(config *app.Config) bool {
} }
func (t *task) ResolvedConfig() *app.Config { func (t *task) ResolvedConfig() *app.Config {
if t.config == nil { if !t.valid.Load() {
return nil return nil
} }
@@ -469,7 +408,7 @@ func (t *task) ResolvedConfig() *app.Config {
} }
func (t *task) Config() *app.Config { func (t *task) Config() *app.Config {
if t.process == nil { if !t.valid.Load() {
return nil return nil
} }
@@ -479,13 +418,15 @@ func (t *task) Config() *app.Config {
func (t *task) Destroy() { func (t *task) Destroy() {
t.Stop() t.Stop()
t.valid = false t.valid.Store(false)
/*
t.process = nil t.process = nil
t.config = nil t.config = nil
t.command = nil t.command = nil
t.ffmpeg = nil t.ffmpeg = nil
t.parser = nil t.parser = nil
t.metadata = map[string]interface{}{} t.metadata = map[string]interface{}{}
*/
} }
func (t *task) Match(id, reference, owner, domain glob.Glob) bool { func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
@@ -524,7 +465,7 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
} }
func (t *task) Process() *app.Process { func (t *task) Process() *app.Process {
if t.process == nil { if !t.valid.Load() {
return nil return nil
} }
@@ -532,7 +473,7 @@ func (t *task) Process() *app.Process {
} }
func (t *task) Order() string { func (t *task) Order() string {
if t.process == nil { if !t.valid.Load() {
return "" return ""
} }
@@ -540,7 +481,7 @@ func (t *task) Order() string {
} }
func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry {
if t.parser == nil { if !t.valid.Load() {
return nil return nil
} }
@@ -548,7 +489,7 @@ func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry {
} }
func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) {
if t.parser == nil { if !t.valid.Load() {
return return
} }