Remove bottlenecks in process handling, still some rough edges

This commit is contained in:
Ingo Oppermann
2024-07-18 17:16:49 +02:00
parent 8a28e2cf96
commit 72883d18d4
19 changed files with 302 additions and 205 deletions

View File

@@ -575,11 +575,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: "restreamer-ui:ingest:" + cfg.id,
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}
if v1data.Actions.Ingest == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}
config := &app.Config{
@@ -1211,11 +1211,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: "restreamer-ui:ingest:" + cfg.id + "_snapshot",
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}
if v1data.Actions.Ingest == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}
snapshotConfig := &app.Config{
@@ -1292,11 +1292,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro
ID: egressId,
Reference: cfg.id,
CreatedAt: time.Now().Unix(),
Order: "stop",
Order: app.NewOrder("stop"),
}
if v1data.Actions.Egress == "start" {
process.Order = "start"
process.Order = app.NewOrder("start")
}
egress := restreamerUIEgress{

View File

@@ -634,7 +634,6 @@ func (c *cluster) Shutdown() error {
if c.manager != nil {
c.manager.NodesClear()
c.manager = nil
}
if c.api != nil {
@@ -648,6 +647,9 @@ func (c *cluster) Shutdown() error {
c.raft.Shutdown()
}
c.manager = nil
c.raft = nil
return nil
}

View File

@@ -197,6 +197,9 @@ func relocate(have []node.Process, nodes map[string]node.About, relocateMap map[
haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid)
relocatedProcessIDs = append(relocatedProcessIDs, processid)
// Move only one process at a time.
break
}
return opStack, resources.Map(), relocatedProcessIDs

View File

@@ -368,6 +368,8 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce
err: errNotEnoughResourcesForDeployment,
})
}
//break
}
return opStack, resources.Map(), reality

View File

@@ -796,10 +796,12 @@ func (n *Core) ClusterProcessList() ([]Process, error) {
UpdatedAt: time.Unix(p.UpdatedAt, 0),
}
config, metadata := p.Config.Marshal()
config, _ := p.Config.Marshal()
process.Config = config
process.Metadata = metadata
if p.Metadata != nil {
process.Metadata = p.Metadata.(map[string]interface{})
}
processes = append(processes, process)
}

View File

@@ -154,7 +154,7 @@ func (cfg *ProcessConfig) generateInputOutputIDs(ioconfig []ProcessConfigIO) {
}
// Unmarshal converts a core process config to a process config in API representation
func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface{}) {
if c == nil {
return
}
@@ -212,6 +212,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
cfg.LogPatterns = make([]string, len(c.LogPatterns))
copy(cfg.LogPatterns, c.LogPatterns)
cfg.Metadata = metadata
}
func (p *ProcessConfig) ProcessID() app.ProcessID {

View File

@@ -107,7 +107,7 @@ func TestProcessConfig(t *testing.T) {
}
p := ProcessConfig{}
p.Unmarshal(&original)
p.Unmarshal(&original, nil)
restored, _ := p.Marshal()
require.Equal(t, &original, restored)

View File

@@ -69,8 +69,7 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{})
var buf bytes.Buffer
config := api.ProcessConfig{}
config.Unmarshal(p)
config.Metadata = metadata
config.Unmarshal(p, metadata)
e := json.NewEncoder(&buf)
e.Encode(config)
@@ -87,8 +86,7 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map
var buf bytes.Buffer
config := api.ProcessConfig{}
config.Unmarshal(p)
config.Metadata = metadata
config.Unmarshal(p, metadata)
e := json.NewEncoder(&buf)
e.Encode(config)
@@ -206,7 +204,7 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
var buf bytes.Buffer
config := api.ProcessConfig{}
config.Unmarshal(p)
config.Unmarshal(p, nil)
e := json.NewEncoder(&buf)
e.Encode(config)

View File

@@ -241,7 +241,7 @@ func (h *ClusterHandler) convertStoreProcessToAPIProcess(p store.Process, filter
if filter.config {
config := &api.ProcessConfig{}
config.Unmarshal(p.Config)
config.Unmarshal(p.Config, p.Metadata)
process.Config = config
}
@@ -442,7 +442,7 @@ func (h *ClusterHandler) ProcessUpdate(c echo.Context) error {
}
// Prefill the config with the current values
process.Unmarshal(current.Config)
process.Unmarshal(current.Config, current.Metadata)
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())

View File

@@ -3,7 +3,6 @@ package api
import (
"fmt"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
@@ -156,7 +155,7 @@ func (h *ProcessHandler) GetAll(c echo.Context) error {
wg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
for i := 0; i < 8; /*runtime.NumCPU()*/ i++ {
wg.Add(1)
go func(idChan <-chan app.ProcessID) {
@@ -316,7 +315,7 @@ func (h *ProcessHandler) Update(c echo.Context) error {
}
// Prefill the config with the current values
process.Unmarshal(current.Config)
process.Unmarshal(current.Config, nil)
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
@@ -446,7 +445,7 @@ func (h *ProcessHandler) GetConfig(c echo.Context) error {
}
config := api.ProcessConfig{}
config.Unmarshal(p.Config)
config.Unmarshal(p.Config, nil)
return c.JSON(http.StatusOK, config)
}
@@ -1062,7 +1061,7 @@ func (h *ProcessHandler) getProcess(id app.ProcessID, filter filter) (api.Proces
if filter.config {
info.Config = &api.ProcessConfig{}
info.Config.Unmarshal(process.Config)
info.Config.Unmarshal(process.Config, nil)
}
if filter.state {

View File

@@ -91,10 +91,13 @@ type limiter struct {
ncpu float64
ncpuFactor float64
proc psutil.Process
lock sync.Mutex
lock sync.RWMutex
cancel context.CancelFunc
onLimit LimitFunc
lastUsage Usage
lastUsageLock sync.RWMutex
cpu float64 // CPU limit
cpuCurrent float64 // Current CPU load of this process
cpuLast float64 // Last CPU load of this process
@@ -150,6 +153,10 @@ func NewLimiter(config LimiterConfig) Limiter {
l.ncpu = ncpu
}
l.lastUsage.CPU.NCPU = l.ncpu
l.lastUsage.CPU.Limit = l.cpu * l.ncpu
l.lastUsage.Memory.Limit = l.memory
l.ncpuFactor = 1
mode := "hard"
@@ -208,7 +215,7 @@ func (l *limiter) Start(process psutil.Process) error {
ctx, cancel := context.WithCancel(context.Background())
l.cancel = cancel
go l.ticker(ctx, 1000*time.Millisecond)
go l.ticker(ctx, time.Second)
if l.mode == LimitModeSoft {
ctx, cancel = context.WithCancel(context.Background())
@@ -255,15 +262,21 @@ func (l *limiter) ticker(ctx context.Context, interval time.Duration) {
}
}
func (l *limiter) collect(t time.Time) {
func (l *limiter) collect(_ time.Time) {
l.lock.Lock()
defer l.lock.Unlock()
proc := l.proc
l.lock.Unlock()
if l.proc == nil {
if proc == nil {
return
}
if mstat, err := l.proc.VirtualMemory(); err == nil {
mstat, merr := proc.VirtualMemory()
cpustat, cerr := proc.CPUPercent()
l.lock.Lock()
if merr == nil {
l.memoryLast, l.memoryCurrent = l.memoryCurrent, mstat
if l.memoryCurrent > l.memoryMax {
@@ -281,7 +294,7 @@ func (l *limiter) collect(t time.Time) {
l.memoryAvg = ((l.memoryAvg * float64(l.memoryAvgCounter-1)) + float64(l.memoryCurrent)) / float64(l.memoryAvgCounter)
}
if cpustat, err := l.proc.CPUPercent(); err == nil {
if cerr == nil {
l.cpuLast, l.cpuCurrent = l.cpuCurrent, (cpustat.System+cpustat.User+cpustat.Other)/100
if l.cpuCurrent > l.cpuMax {
@@ -354,6 +367,19 @@ func (l *limiter) collect(t time.Time) {
if isLimitExceeded {
go l.onLimit(l.cpuCurrent*l.ncpuFactor*100, l.memoryCurrent)
}
l.lastUsageLock.Lock()
l.lastUsage.CPU.Current = l.cpuCurrent * l.ncpu * 100
l.lastUsage.CPU.Average = l.cpuAvg * l.ncpu * 100
l.lastUsage.CPU.Max = l.cpuMax * l.ncpu * 100
l.lastUsage.CPU.IsThrottling = l.cpuThrottling
l.lastUsage.Memory.Current = l.memoryCurrent
l.lastUsage.Memory.Average = l.memoryAvg
l.lastUsage.Memory.Max = l.memoryMax
l.lastUsageLock.Unlock()
l.lock.Unlock()
}
func (l *limiter) Limit(cpu, memory bool) error {
@@ -498,34 +524,20 @@ func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Dur
}
func (l *limiter) Current() (cpu float64, memory uint64) {
l.lock.Lock()
defer l.lock.Unlock()
l.lastUsageLock.RLock()
defer l.lastUsageLock.RUnlock()
cpu = l.cpuCurrent * 100
memory = l.memoryCurrent * 100
cpu = l.lastUsage.CPU.Current / l.ncpu
memory = l.lastUsage.Memory.Current
return
}
func (l *limiter) Usage() Usage {
l.lock.Lock()
defer l.lock.Unlock()
l.lastUsageLock.RLock()
defer l.lastUsageLock.RUnlock()
usage := Usage{}
usage.CPU.NCPU = l.ncpu
usage.CPU.Limit = l.cpu * l.ncpu * 100
usage.CPU.Current = l.cpuCurrent * l.ncpu * 100
usage.CPU.Average = l.cpuAvg * l.ncpu * 100
usage.CPU.Max = l.cpuMax * l.ncpu * 100
usage.CPU.IsThrottling = l.cpuThrottling
usage.Memory.Limit = l.memory
usage.Memory.Current = l.memoryCurrent
usage.Memory.Average = l.memoryAvg
usage.Memory.Max = l.memoryMax
return usage
return l.lastUsage
}
func (l *limiter) Limits() (cpu float64, memory uint64) {

View File

@@ -177,7 +177,7 @@ type process struct {
state stateType
time time.Time
states States
lock sync.Mutex
lock sync.RWMutex
}
order struct {
order string
@@ -401,8 +401,8 @@ func (p *process) setState(state stateType) (stateType, error) {
}
func (p *process) getState() stateType {
p.state.lock.Lock()
defer p.state.lock.Unlock()
p.state.lock.RLock()
defer p.state.lock.RUnlock()
return p.state.state
}
@@ -431,15 +431,15 @@ func (p *process) setOrder(order string) bool {
}
func (p *process) isRunning() bool {
p.state.lock.Lock()
defer p.state.lock.Unlock()
p.state.lock.RLock()
defer p.state.lock.RUnlock()
return p.state.state.IsRunning()
}
func (p *process) getStateString() string {
p.state.lock.Lock()
defer p.state.lock.Unlock()
p.state.lock.RLock()
defer p.state.lock.RUnlock()
return p.state.state.String()
}
@@ -448,11 +448,11 @@ func (p *process) getStateString() string {
func (p *process) Status() Status {
usage := p.limits.Usage()
p.state.lock.Lock()
p.state.lock.RLock()
stateTime := p.state.time
state := p.state.state
states := p.state.states
p.state.lock.Unlock()
p.state.lock.RUnlock()
if state == stateRunning && !p.parser.IsRunning() {
state = stateStarting

View File

@@ -41,6 +41,7 @@ type process struct {
statPrevious cpuTimesStat
statPreviousTime time.Time
nTicks uint64
memRSS uint64
}
func (u *util) Process(pid int32) (Process, error) {
@@ -60,7 +61,8 @@ func (u *util) Process(pid int32) (Process, error) {
ctx, cancel := context.WithCancel(context.Background())
p.stopTicker = cancel
go p.tick(ctx, 1000*time.Millisecond)
go p.tickCPU(ctx, time.Second)
go p.tickMemory(ctx, time.Second)
return p, nil
}
@@ -69,7 +71,7 @@ func NewProcess(pid int32, limit bool) (Process, error) {
return DefaultUtil.Process(pid)
}
func (p *process) tick(ctx context.Context, interval time.Duration) {
func (p *process) tickCPU(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -78,7 +80,7 @@ func (p *process) tick(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case t := <-ticker.C:
stat := p.collect()
stat := p.collectCPU()
p.lock.Lock()
p.statPrevious, p.statCurrent = p.statCurrent, stat
@@ -89,7 +91,7 @@ func (p *process) tick(ctx context.Context, interval time.Duration) {
}
}
func (p *process) collect() cpuTimesStat {
func (p *process) collectCPU() cpuTimesStat {
stat, err := p.cpuTimes()
if err != nil {
return cpuTimesStat{
@@ -101,6 +103,33 @@ func (p *process) collect() cpuTimesStat {
return *stat
}
func (p *process) tickMemory(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rss := p.collectMemory()
p.lock.Lock()
p.memRSS = rss
p.lock.Unlock()
}
}
}
func (p *process) collectMemory() uint64 {
info, err := p.proc.MemoryInfo()
if err != nil {
return 0
}
return info.RSS
}
func (p *process) Stop() {
p.stopTicker()
}
@@ -178,10 +207,8 @@ func (p *process) CPUPercent() (*CPUInfoStat, error) {
}
func (p *process) VirtualMemory() (uint64, error) {
info, err := p.proc.MemoryInfo()
if err != nil {
return 0, err
}
p.lock.RLock()
defer p.lock.RUnlock()
return info.RSS, nil
return p.memRSS, nil
}

View File

@@ -102,6 +102,7 @@ type util struct {
statPrevious cpuTimesStat
statPreviousTime time.Time
nTicks uint64
mem MemoryInfoStat
}
// New returns a new util, it will be started automatically
@@ -127,6 +128,13 @@ func New(root string) (Util, error) {
}
}
mem, err := u.virtualMemory()
if err != nil {
return nil, fmt.Errorf("unable to determine system memory: %w", err)
}
u.mem = *mem
u.stopOnce.Do(func() {})
u.Start()
@@ -139,7 +147,8 @@ func (u *util) Start() {
ctx, cancel := context.WithCancel(context.Background())
u.stopTicker = cancel
go u.tick(ctx, 1000*time.Millisecond)
go u.tickCPU(ctx, time.Second)
go u.tickMemory(ctx, time.Second)
})
}
@@ -233,7 +242,7 @@ func (u *util) cgroupCPULimit(version int) (uint64, float64) {
return 0, 0
}
func (u *util) tick(ctx context.Context, interval time.Duration) {
func (u *util) tickCPU(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -242,7 +251,7 @@ func (u *util) tick(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case t := <-ticker.C:
stat := u.collect()
stat := u.collectCPU()
u.lock.Lock()
u.statPrevious, u.statCurrent = u.statCurrent, stat
@@ -253,7 +262,7 @@ func (u *util) tick(ctx context.Context, interval time.Duration) {
}
}
func (u *util) collect() cpuTimesStat {
func (u *util) collectCPU() cpuTimesStat {
stat, err := u.cpuTimes()
if err != nil {
return cpuTimesStat{
@@ -265,6 +274,34 @@ func (u *util) collect() cpuTimesStat {
return *stat
}
func (u *util) tickMemory(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stat := u.collectMemory()
if stat != nil {
u.lock.Lock()
u.mem = *stat
u.lock.Unlock()
}
}
}
}
func (u *util) collectMemory() *MemoryInfoStat {
stat, err := u.virtualMemory()
if err != nil {
return nil
}
return stat
}
func (u *util) CPUCounts(logical bool) (float64, error) {
if u.hasCgroup && u.ncpu > 0 {
return u.ncpu, nil
@@ -409,7 +446,7 @@ func DiskUsage(path string) (*disk.UsageStat, error) {
return DefaultUtil.DiskUsage(path)
}
func (u *util) VirtualMemory() (*MemoryInfoStat, error) {
func (u *util) virtualMemory() (*MemoryInfoStat, error) {
info, err := mem.VirtualMemory()
if err != nil {
return nil, err
@@ -431,6 +468,19 @@ func (u *util) VirtualMemory() (*MemoryInfoStat, error) {
}, nil
}
func (u *util) VirtualMemory() (*MemoryInfoStat, error) {
u.lock.RLock()
defer u.lock.RUnlock()
stat := &MemoryInfoStat{
Total: u.mem.Total,
Available: u.mem.Available,
Used: u.mem.Used,
}
return stat, nil
}
func VirtualMemory() (*MemoryInfoStat, error) {
return DefaultUtil.VirtualMemory()
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"strconv"
"strings"
"sync"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/process"
@@ -197,6 +198,37 @@ func (c *Config) ProcessID() ProcessID {
}
}
type order struct {
order string
lock sync.RWMutex
}
func NewOrder(o string) order {
return order{
order: o,
}
}
func (o *order) Clone() order {
return order{
order: o.order,
}
}
func (o *order) String() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.order
}
func (o *order) Set(order string) {
o.lock.Lock()
defer o.lock.Unlock()
o.order = order
}
type Process struct {
ID string
Owner string
@@ -205,7 +237,7 @@ type Process struct {
Config *Config
CreatedAt int64
UpdatedAt int64
Order string
Order order
}
func (process *Process) Clone() *Process {
@@ -217,7 +249,7 @@ func (process *Process) Clone() *Process {
Config: process.Config.Clone(),
CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt,
Order: process.Order,
Order: process.Order.Clone(),
}
return clone

View File

@@ -26,6 +26,7 @@ import (
"github.com/datarhei/core/v16/restream/store"
"github.com/Masterminds/semver/v3"
"github.com/puzpuzpuz/xsync/v3"
)
// The Restreamer interface
@@ -358,6 +359,7 @@ func (r *restream) load() error {
"domain": p.Process.Domain,
"reference": p.Process.Reference,
}),
lock: xsync.NewRBMutex(),
}
t.metadata = p.Metadata
@@ -452,7 +454,7 @@ func (r *restream) load() error {
}
t.ffmpeg = ffmpeg
t.valid = true
t.Valid(true)
return true
})
@@ -558,14 +560,14 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
Domain: config.Domain,
Reference: config.Reference,
Config: config.Clone(),
Order: "stop",
Order: app.NewOrder("stop"),
CreatedAt: time.Now().Unix(),
}
process.UpdatedAt = process.CreatedAt
if config.Autostart {
process.Order = "start"
process.Order.Set("start")
}
t := &task{
@@ -581,6 +583,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
"reference": process.Reference,
"domain": process.Domain,
}),
lock: xsync.NewRBMutex(),
}
resolveStaticPlaceholders(t.config, r.replace)
@@ -647,7 +650,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t.ffmpeg = ffmpeg
t.valid = true
t.Valid(true)
return t, nil
}
@@ -1125,9 +1128,17 @@ func parseAddressReference(address string) (map[string]string, error) {
}
func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.updateProcess(id, config)
if err != nil {
return err
}
r.save()
return nil
}
func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error {
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
@@ -1152,7 +1163,7 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
}
}
t.process.Order = task.Order()
t.process.Order.Set(task.Order())
if err := r.stopProcess(id); err != nil {
return fmt.Errorf("stop process: %w", err)
@@ -1160,7 +1171,6 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
// This would require a major version jump
//t.process.CreatedAt = task.process.CreatedAt
t.process.UpdatedAt = time.Now().Unix()
// Transfer the report history to the new process
history := task.parser.ReportHistory()
@@ -1180,8 +1190,6 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
t.Restore()
r.save()
return nil
}
@@ -1353,93 +1361,44 @@ func (r *restream) ReloadProcess(id app.ProcessID) error {
return nil
}
func (r *restream) reloadProcess(tid app.ProcessID) error {
t, ok := r.tasks.Load(tid)
func (r *restream) reloadProcess(id app.ProcessID) error {
task, ok := r.tasks.Load(id)
if !ok {
return ErrUnknownProcess
}
t.valid = false
t.config = t.process.Config.Clone()
resolveStaticPlaceholders(t.config, r.replace)
err := r.resolveAddresses(r.tasks, t.config)
t, err := r.createTask(task.Config())
if err != nil {
return err
}
// Validate config with all placeholders replaced. However, we need to take care
// that the config with the task keeps its dynamic placeholders for process starts.
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
tid := t.ID()
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return err
t.process.Order.Set(task.Order())
if err := task.Stop(); err != nil {
return fmt.Errorf("stop process: %w", err)
}
err = r.setPlayoutPorts(t)
if err != nil {
return err
}
t.command = t.config.CreateCommand()
order := "stop"
if t.process.Order == "start" {
order = "start"
r.stopProcess(tid)
}
history := t.parser.ReportHistory()
parser := r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference, t.config.LogPatterns)
// Transfer the report history to the new process
history := task.parser.ReportHistory()
t.parser.ImportReportHistory(history)
t.parser = parser
limitMode := "hard"
if r.enableSoftLimit {
limitMode = "soft"
// Transfer the metadata to the new process
t.metadata = task.metadata
if err := r.deleteProcess(id); err != nil {
return fmt.Errorf("delete process: %w", err)
}
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: limitMode,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error {
if !r.enableSoftLimit {
return nil
}
r.tasks.Store(tid, t)
if err := r.resources.Request(t.config.LimitCPU, t.config.LimitMemory); err != nil {
return err
}
// set filesystem cleanup rules
r.setCleanup(tid, t.Config())
return nil
},
})
if err != nil {
return err
}
t.Restore()
t.ffmpeg = ffmpeg
t.valid = true
if order == "start" {
r.startProcess(tid)
}
r.save()
return nil
}

View File

@@ -186,7 +186,7 @@ func MarshalProcess(a *app.Process) Process {
Config: ProcessConfig{},
CreatedAt: a.CreatedAt,
UpdatedAt: a.UpdatedAt,
Order: a.Order,
Order: a.Order.String(),
}
p.Config.Marshal(a.Config)
@@ -203,7 +203,7 @@ func UnmarshalProcess(p Process) *app.Process {
Config: &app.Config{},
CreatedAt: p.CreatedAt,
UpdatedAt: p.UpdatedAt,
Order: p.Order,
Order: app.NewOrder(p.Order),
}
a.Config = p.Config.Unmarshal()

View File

@@ -73,7 +73,7 @@ func TestStoreLoad(t *testing.T) {
},
CreatedAt: 0,
UpdatedAt: 0,
Order: "stop",
Order: app.NewOrder("stop"),
},
Metadata: map[string]interface{}{
"some": "data",
@@ -112,7 +112,7 @@ func TestStoreLoad(t *testing.T) {
},
CreatedAt: 0,
UpdatedAt: 0,
Order: "stop",
Order: app.NewOrder("stop"),
},
Metadata: map[string]interface{}{
"some-more": "data",

View File

@@ -2,7 +2,6 @@ package restream
import (
"errors"
"sync"
"time"
"github.com/datarhei/core/v16/ffmpeg/parse"
@@ -10,6 +9,8 @@ import (
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app"
"github.com/puzpuzpuz/xsync/v3"
)
var ErrInvalidProcessConfig = errors.New("invalid process config")
@@ -32,19 +33,26 @@ type task struct {
usesDisk bool // Whether this task uses the disk
metadata map[string]interface{}
lock sync.RWMutex
lock *xsync.RBMutex
}
func (t *task) IsValid() bool {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.valid
}
func (t *task) Valid(valid bool) {
t.lock.Lock()
defer t.lock.Unlock()
t.valid = valid
}
func (t *task) UsesDisk() bool {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.usesDisk
}
@@ -62,8 +70,8 @@ func (t *task) String() string {
// Restore restores the task's order
func (t *task) Restore() error {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return ErrInvalidProcessConfig
@@ -73,7 +81,7 @@ func (t *task) Restore() error {
return ErrInvalidProcessConfig
}
if t.process.Order == "start" {
if t.process.Order.String() == "start" {
err := t.ffmpeg.Start()
if err != nil {
return err
@@ -84,8 +92,8 @@ func (t *task) Restore() error {
}
func (t *task) Start() error {
t.lock.Lock()
defer t.lock.Unlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return ErrInvalidProcessConfig
@@ -97,19 +105,20 @@ func (t *task) Start() error {
status := t.ffmpeg.Status()
if t.process.Order == "start" && status.Order == "start" {
if t.process.Order.String() == "start" && status.Order == "start" {
return nil
}
t.process.Order = "start"
t.process.Order.Set("start")
t.ffmpeg.Start()
return nil
}
func (t *task) Stop() error {
t.lock.Lock()
defer t.lock.Unlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.ffmpeg == nil {
return nil
@@ -117,11 +126,11 @@ func (t *task) Stop() error {
status := t.ffmpeg.Status()
if t.process.Order == "stop" && status.Order == "stop" {
if t.process.Order.String() == "stop" && status.Order == "stop" {
return nil
}
t.process.Order = "stop"
t.process.Order.Set("stop")
t.ffmpeg.Stop(true)
@@ -130,8 +139,8 @@ func (t *task) Stop() error {
// Kill stops a process without changing the tasks order
func (t *task) Kill() {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.ffmpeg == nil {
return
@@ -141,14 +150,14 @@ func (t *task) Kill() {
}
func (t *task) Restart() error {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return ErrInvalidProcessConfig
}
if t.process.Order == "stop" {
if t.process.Order.String() == "stop" {
return nil
}
@@ -161,8 +170,8 @@ func (t *task) Restart() error {
}
func (t *task) State() (*app.State, error) {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
state := &app.State{}
@@ -172,7 +181,7 @@ func (t *task) State() (*app.State, error) {
status := t.ffmpeg.Status()
state.Order = t.process.Order
state.Order = t.process.Order.String()
state.State = status.State
state.States.Marshal(status.States)
state.Time = status.Time.Unix()
@@ -213,8 +222,8 @@ func (t *task) State() (*app.State, error) {
}
func (t *task) Report() (*app.Report, error) {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
report := &app.Report{}
@@ -255,8 +264,8 @@ func (t *task) Report() (*app.Report, error) {
}
func (t *task) SetReport(report *app.Report) error {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return nil
@@ -270,8 +279,8 @@ func (t *task) SetReport(report *app.Report) error {
}
func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
result := []app.ReportHistorySearchResult{}
@@ -316,8 +325,8 @@ func (t *task) SetMetadata(key string, data interface{}) error {
}
func (t *task) GetMetadata(key string) (interface{}, error) {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if len(key) == 0 {
return t.metadata, nil
@@ -332,8 +341,8 @@ func (t *task) GetMetadata(key string) (interface{}, error) {
}
func (t *task) Limit(cpu, memory bool) bool {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return false
@@ -349,15 +358,15 @@ func (t *task) Limit(cpu, memory bool) bool {
}
func (t *task) Equal(config *app.Config) bool {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.process.Config.Equal(config)
}
func (t *task) Config() *app.Config {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.config.Clone()
}
@@ -378,8 +387,8 @@ func (t *task) Destroy() {
}
func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
count := 0
matches := 0
@@ -416,15 +425,15 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
}
func (t *task) Process() *app.Process {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.process.Clone()
}
func (t *task) Order() string {
t.lock.RLock()
defer t.lock.RUnlock()
token := t.lock.RLock()
defer t.lock.RUnlock(token)
return t.process.Order
return t.process.Order.String()
}