Introduce task level locks

This commit is contained in:
Ingo Oppermann
2025-04-10 16:17:01 +02:00
parent ba77ec0ecf
commit 5845f3bf0f
4 changed files with 256 additions and 183 deletions

View File

@@ -183,7 +183,9 @@ func (r *restream) Start() {
go r.resourceObserver(ctx, r.resources, time.Second)
}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
t.Restore()
// The filesystem cleanup rules can be set
@@ -214,9 +216,10 @@ func (r *restream) Stop() {
// Stop the currently running processes without altering their order such that on a subsequent
// Start() they will get restarted.
r.tasks.Range(func(_ app.ProcessID, t *task) bool {
r.tasks.Range(true, func(_ app.ProcessID, t *task, token string) bool {
wg.Add(1)
go func(t *task) {
defer t.Release(token)
defer wg.Done()
t.Kill()
}(t)
@@ -226,7 +229,8 @@ func (r *restream) Stop() {
wg.Wait()
r.tasks.Range(func(id app.ProcessID, _ *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
r.unsetCleanup(id)
return true
})
@@ -259,7 +263,8 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int
if isFull {
// Stop all tasks that write to this filesystem
r.tasks.Range(func(id app.ProcessID, t *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
if !t.UsesDisk() {
return true
}
@@ -315,7 +320,8 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
break
}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
limitGPU := false
gpuindex := t.GetHWDevice()
if gpuindex >= 0 {
@@ -375,7 +381,7 @@ func (r *restream) load() error {
// Replace all placeholders in the config
resolveStaticPlaceholders(t.config, r.replace)
tasks.LoadOrStore(t.ID(), t)
tasks.Store(t.ID(), t)
}
}
@@ -383,7 +389,9 @@ func (r *restream) load() error {
// replaced, we can resolve references and validate the
// inputs and outputs.
tasks.Range(func(_ app.ProcessID, t *task) bool {
tasks.Range(false, func(_ app.ProcessID, t *task, token string) bool {
defer t.Release(token)
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
@@ -460,7 +468,7 @@ func (r *restream) load() error {
}
t.ffmpeg = ffmpeg
t.Valid(true)
t.SetValid(true)
return true
})
@@ -480,7 +488,13 @@ func (r *restream) save() {
data := store.NewData()
r.tasks.Range(func(tid app.ProcessID, t *task) bool {
r.tasks.Range(true, func(tid app.ProcessID, t *task, token string) bool {
defer t.Release(token)
if !t.IsValid() {
return true
}
domain := data.Process[tid.Domain]
if domain == nil {
domain = map[string]store.Process{}
@@ -520,7 +534,6 @@ var ErrForbidden = errors.New("forbidden")
func (r *restream) AddProcess(config *app.Config) error {
t, err := r.createTask(config)
if err != nil {
return err
}
@@ -529,6 +542,7 @@ func (r *restream) AddProcess(config *app.Config) error {
_, ok := r.tasks.LoadOrStore(tid, t)
if ok {
t.Destroy()
return ErrProcessExists
}
@@ -538,6 +552,7 @@ func (r *restream) AddProcess(config *app.Config) error {
err = t.Restore()
if err != nil {
r.tasks.Delete(tid)
t.Destroy()
return err
}
@@ -645,7 +660,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t.ffmpeg = ffmpeg
t.Valid(true)
t.SetValid(true)
return t, nil
}
@@ -673,8 +688,9 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro
selectedGPU = 0
}
if t, hasTask := r.tasks.Load(cfg.ProcessID()); hasTask {
if t, token, hasTask := r.tasks.Load(cfg.ProcessID()); hasTask {
t.SetHWDevice(selectedGPU)
t.Release(token)
}
config := cfg.Clone()
@@ -742,12 +758,12 @@ func (r *restream) unsetCleanup(id app.ProcessID) {
}
}
func (r *restream) setPlayoutPorts(t *task) error {
r.unsetPlayoutPorts(t)
func (r *restream) setPlayoutPorts(task *task) error {
r.unsetPlayoutPorts(task)
t.playout = make(map[string]int)
task.playout = make(map[string]int)
for i, input := range t.config.Input {
for i, input := range task.config.Input {
if !strings.HasPrefix(input.Address, "avstream:") && !strings.HasPrefix(input.Address, "playout:") {
continue
}
@@ -771,18 +787,18 @@ func (r *restream) setPlayoutPorts(t *task) error {
if port, err := r.ffmpeg.GetPort(); err == nil {
options = append(options, "-playout_httpport", strconv.Itoa(port))
t.logger.WithFields(log.Fields{
task.logger.WithFields(log.Fields{
"port": port,
"input": input.ID,
}).Debug().Log("Assinging playout port")
t.playout[input.ID] = port
task.playout[input.ID] = port
} else if err != net.ErrNoPortrangerProvided {
return err
}
input.Options = options
t.config.Input[i] = input
task.config.Input[i] = input
}
return nil
@@ -1029,13 +1045,16 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e
}
var t *task = nil
var ttoken string = ""
tasks.Range(func(_ app.ProcessID, tsk *task) bool {
if tsk.id == matches["id"] && tsk.domain == matches["domain"] {
t = tsk
tasks.Range(true, func(_ app.ProcessID, task *task, token string) bool {
if task.id == matches["id"] && task.domain == matches["domain"] {
t = task
ttoken = token
return false
}
task.Release(token)
return true
})
@@ -1043,6 +1062,8 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e
return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address)
}
defer t.Release(ttoken)
teeOptions := regexp.MustCompile(`^\[[^\]]*\]`)
for _, x := range t.config.Output {
@@ -1149,7 +1170,15 @@ func parseAddressReference(address string) (map[string]string, error) {
}
func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
err := r.updateProcess(id, config)
task, ok := r.tasks.LoadAndLock(id)
if !ok {
return ErrUnknownProcess
}
err := r.updateProcess(task, config)
task.Unlock()
if err != nil {
return err
}
@@ -1159,12 +1188,7 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
return nil
}
func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error {
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
func (r *restream) updateProcess(task *task, config *app.Config) error {
// If the new config has the same hash as the current config, do nothing.
if task.Equal(config) {
return nil
@@ -1177,16 +1201,23 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error {
tid := t.ID()
if !tid.Equal(id) {
_, ok := r.tasks.Load(tid)
if ok {
if !tid.Equal(task.ID()) {
if r.tasks.Has(tid) {
t.Destroy()
return ErrProcessExists
}
}
t.process.Order.Set(task.Order())
order := task.Order()
if len(order) == 0 {
t.Destroy()
return ErrUnknownProcess
}
if err := r.stopProcess(id); err != nil {
t.process.Order.Set(order)
if err := r.stopProcess(task); err != nil {
t.Destroy()
return fmt.Errorf("stop process: %w", err)
}
@@ -1199,7 +1230,8 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error {
// Transfer the metadata to the new process
t.ImportMetadata(task.ExportMetadata())
if err := r.deleteProcess(id); err != nil {
if err := r.deleteProcess(task); err != nil {
t.Destroy()
return fmt.Errorf("delete process: %w", err)
}
@@ -1240,7 +1272,9 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt
if idglob == nil && refglob == nil && ownerglob == nil && domainglob == nil {
ids = make([]app.ProcessID, 0, r.tasks.Size())
r.tasks.Range(func(id app.ProcessID, t *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
ids = append(ids, id)
return true
@@ -1248,7 +1282,9 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt
} else {
ids = []app.ProcessID{}
r.tasks.Range(func(id app.ProcessID, t *task) bool {
r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool {
defer t.Release(token)
if !t.Match(idglob, refglob, ownerglob, domainglob) {
return true
}
@@ -1263,16 +1299,25 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt
}
func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) {
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return &app.Process{}, ErrUnknownProcess
}
defer task.Release(token)
return task.Process(), nil
}
func (r *restream) DeleteProcess(id app.ProcessID) error {
err := r.deleteProcess(id)
task, ok := r.tasks.LoadAndLock(id)
if !ok {
return ErrUnknownProcess
}
err := r.deleteProcess(task)
task.Unlock()
if err != nil {
return err
}
@@ -1282,26 +1327,31 @@ func (r *restream) DeleteProcess(id app.ProcessID) error {
return nil
}
func (r *restream) deleteProcess(tid app.ProcessID) error {
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
func (r *restream) deleteProcess(task *task) error {
if task.Order() != "stop" {
return fmt.Errorf("the process with the ID '%s' is still running", tid)
return fmt.Errorf("the process with the ID '%s' is still running", task.String())
}
r.unsetPlayoutPorts(task)
r.unsetCleanup(tid)
r.unsetCleanup(task.ID())
r.tasks.Delete(tid)
r.tasks.Delete(task.ID())
task.Destroy()
return nil
}
func (r *restream) StartProcess(id app.ProcessID) error {
err := r.startProcess(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
err := r.startProcess(task)
task.Release(token)
if err != nil {
return err
}
@@ -1311,12 +1361,7 @@ func (r *restream) StartProcess(id app.ProcessID) error {
return nil
}
func (r *restream) startProcess(tid app.ProcessID) error {
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
func (r *restream) startProcess(task *task) error {
err := task.Start()
if err != nil {
return err
@@ -1328,7 +1373,15 @@ func (r *restream) startProcess(tid app.ProcessID) error {
}
func (r *restream) StopProcess(id app.ProcessID) error {
err := r.stopProcess(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
err := r.stopProcess(task)
task.Release(token)
if err != nil {
return err
}
@@ -1338,12 +1391,7 @@ func (r *restream) StopProcess(id app.ProcessID) error {
return nil
}
func (r *restream) stopProcess(tid app.ProcessID) error {
task, ok := r.tasks.Load(tid)
if !ok {
return ErrUnknownProcess
}
func (r *restream) stopProcess(task *task) error {
// TODO: aufpassen mit nProc und nil error. In task.Stop() noch einen error einführen, falls der process nicht läuft.
err := task.Stop()
if err != nil {
@@ -1356,22 +1404,31 @@ func (r *restream) stopProcess(tid app.ProcessID) error {
}
func (r *restream) RestartProcess(id app.ProcessID) error {
return r.restartProcess(id)
}
func (r *restream) restartProcess(tid app.ProcessID) error {
task, ok := r.tasks.Load(tid)
task, token, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
defer task.Release(token)
return r.restartProcess(task)
}
func (r *restream) restartProcess(task *task) error {
task.Restart()
return nil
}
func (r *restream) ReloadProcess(id app.ProcessID) error {
err := r.reloadProcess(id)
task, ok := r.tasks.LoadAndLock(id)
if !ok {
return ErrUnknownProcess
}
err := r.reloadProcess(task)
task.Unlock()
if err != nil {
return err
}
@@ -1381,12 +1438,7 @@ func (r *restream) ReloadProcess(id app.ProcessID) error {
return nil
}
func (r *restream) reloadProcess(id app.ProcessID) error {
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
func (r *restream) reloadProcess(task *task) error {
t, err := r.createTask(task.Config())
if err != nil {
return err
@@ -1394,20 +1446,27 @@ func (r *restream) reloadProcess(id app.ProcessID) error {
tid := t.ID()
t.process.Order.Set(task.Order())
order := task.Order()
if len(order) == 0 {
t.Destroy()
return ErrUnknownProcess
}
t.process.Order.Set(order)
if err := task.Stop(); err != nil {
t.Destroy()
return fmt.Errorf("stop process: %w", err)
}
// Transfer the report history to the new process
history := task.parser.ReportHistory()
t.parser.ImportReportHistory(history)
t.parser.ImportReportHistory(task.parser.ReportHistory())
// Transfer the metadata to the new process
t.metadata = task.metadata
if err := r.deleteProcess(id); err != nil {
if err := r.deleteProcess(task); err != nil {
t.Destroy()
return fmt.Errorf("delete process: %w", err)
}
@@ -1418,18 +1477,17 @@ func (r *restream) reloadProcess(id app.ProcessID) error {
t.Restore()
r.save()
return nil
}
func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) {
state := &app.State{}
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return state, ErrUnknownProcess
}
defer task.Release(token)
return task.State()
}
@@ -1437,19 +1495,21 @@ func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) {
func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) {
report := &app.Report{}
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return report, ErrUnknownProcess
}
defer task.Release(token)
return task.Report()
}
func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error {
task, ok := r.tasks.Load(id)
task, ok := r.tasks.LoadAndLock(id)
if !ok {
return ErrUnknownProcess
}
defer task.Unlock()
return task.SetReport(report)
}
@@ -1460,14 +1520,15 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string,
ids := r.GetProcessIDs(idpattern, refpattern, "", "")
for _, id := range ids {
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
continue
}
presult := task.SearchReportHistory(state, from, to)
result = append(result, presult...)
task.Release(token)
}
return result
@@ -1561,12 +1622,13 @@ func (r *restream) ReloadSkills() error {
}
func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) {
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return "", ErrUnknownProcess
}
defer task.Release(token)
if !task.valid {
if !task.IsValid() {
return "", fmt.Errorf("invalid process definition")
}
@@ -1579,12 +1641,15 @@ func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error)
}
func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interface{}) error {
task, ok := r.tasks.Load(id)
task, ok := r.tasks.LoadAndLock(id)
if !ok {
return ErrUnknownProcess
}
err := task.SetMetadata(key, data)
task.Unlock()
if err != nil {
return err
}
@@ -1595,10 +1660,11 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa
}
func (r *restream) GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) {
task, ok := r.tasks.Load(id)
task, token, ok := r.tasks.Load(id)
if !ok {
return nil, ErrUnknownProcess
}
defer task.Release(token)
return task.GetMetadata(key)
}