diff --git a/app/api/api.go b/app/api/api.go index 2b00cbc8..9e64965c 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -32,6 +32,7 @@ import ( "github.com/datarhei/core/v16/monitor" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" + "github.com/datarhei/core/v16/psutil" "github.com/datarhei/core/v16/restream" restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" @@ -116,6 +117,8 @@ type api struct { state string undoMaxprocs func() + + process psutil.Process } // ErrConfigReload is an error returned to indicate that a reload of @@ -1322,6 +1325,9 @@ func (a *api) start() error { debug.SetMemoryLimit(math.MaxInt64) } + //p, _ := psutil.NewProcess(int32(os.Getpid()), false) + //a.process = p + // Start the restream processes restream.Start() @@ -1385,6 +1391,11 @@ func (a *api) stop() { a.restream = nil } + if a.process != nil { + a.process.Stop() + a.process = nil + } + // Stop the session tracker if a.sessions != nil { a.sessions.UnregisterAll() diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index b0d671e3..7914b082 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -570,9 +570,20 @@ func (p *parser) parseAVstreamProgress(line string) error { return nil } -func (p *parser) Stop(state string) { +func (p *parser) Stop(state string, pusage process.Usage) { + fmt.Printf("%+v\n", pusage) + usage := Usage{} + + usage.CPU.Average = pusage.CPU.Average + usage.CPU.Max = pusage.CPU.Max + usage.CPU.Limit = pusage.CPU.Limit + + usage.Memory.Average = pusage.Memory.Average + usage.Memory.Max = pusage.Memory.Max + usage.Memory.Limit = pusage.Memory.Limit + // The process stopped. The right moment to store the current state to the log history - p.storeReportHistory(state) + p.storeReportHistory(state, usage) } func (p *parser) Progress() Progress { @@ -806,6 +817,7 @@ type ReportHistoryEntry struct { ExitedAt time.Time ExitState string Progress Progress + Usage Usage } type ReportHistorySearchResult struct { @@ -850,7 +862,7 @@ func (p *parser) SearchReportHistory(state string, from, to *time.Time) []Report return result } -func (p *parser) storeReportHistory(state string) { +func (p *parser) storeReportHistory(state string, usage Usage) { if p.logHistory == nil { return } @@ -868,6 +880,7 @@ func (p *parser) storeReportHistory(state string) { ExitedAt: time.Now(), ExitState: state, Progress: p.Progress(), + Usage: usage, } p.logHistory.Value = h diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index 2cbd1540..d5a6bdbf 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/datarhei/core/v16/process" "github.com/stretchr/testify/require" ) @@ -165,7 +166,7 @@ func TestParserLogHistory(t *testing.T) { history := parser.ReportHistory() require.Equal(t, 0, len(history)) - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) history = parser.ReportHistory() require.Equal(t, 1, len(history)) @@ -203,7 +204,7 @@ func TestParserLogHistoryLength(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) } history = parser.ReportHistory() @@ -226,7 +227,7 @@ func TestParserLogMinimalHistoryLength(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) } history = parser.ReportHistory() @@ -257,7 +258,7 @@ func TestParserLogMinimalHistoryLengthWithoutFullHistory(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) } history = parser.ReportHistory() @@ -279,7 +280,7 @@ func TestParserLogHistorySearch(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) parser.ResetStats() @@ -292,7 +293,7 @@ func TestParserLogHistorySearch(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("finished") + parser.Stop("finished", process.Usage{}) parser.ResetStats() @@ -305,7 +306,7 @@ func TestParserLogHistorySearch(t *testing.T) { parser.prelude.done = true parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") - parser.Stop("failed") + parser.Stop("failed", process.Usage{}) res := parser.SearchReportHistory("", nil, nil) require.Equal(t, 3, len(res)) @@ -905,7 +906,7 @@ func TestParserPatterns(t *testing.T) { pp, ok := p.(*parser) require.True(t, ok) - pp.storeReportHistory("something") + pp.storeReportHistory("something", Usage{}) report := p.ReportHistory() require.Equal(t, 1, len(report)) diff --git a/ffmpeg/parse/types.go b/ffmpeg/parse/types.go index a7473285..b19dff8a 100644 --- a/ffmpeg/parse/types.go +++ b/ffmpeg/parse/types.go @@ -321,3 +321,16 @@ type AVstream struct { Duplicating bool GOP string } + +type Usage struct { + CPU struct { + Average float64 + Max float64 + Limit float64 + } + Memory struct { + Average float64 + Max uint64 + Limit uint64 + } +} diff --git a/ffmpeg/probe/prober.go b/ffmpeg/probe/prober.go index cf3fc98d..3ddb8ec2 100644 --- a/ffmpeg/probe/prober.go +++ b/ffmpeg/probe/prober.go @@ -111,7 +111,7 @@ func (p *prober) parseDefault() { } } -func (p *prober) Stop(state string) {} +func (p *prober) Stop(state string, usage process.Usage) {} func (p *prober) Log() []process.Line { return p.data diff --git a/http/api/process.go b/http/api/process.go index 6853f02a..1c2a36a1 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -231,3 +231,18 @@ func (s *ProcessState) Unmarshal(state *app.State) { s.Progress.Unmarshal(&state.Progress) } + +type ProcessUsageCPU struct { + Average json.Number `json:"avg" swaggertype:"number" jsonschema:"type=number"` + Max json.Number `json:"max" swaggertype:"number" jsonschema:"type=number"` +} + +type ProcessUsageMemory struct { + Average json.Number `json:"avg" swaggertype:"number" jsonschema:"type=number"` + Max uint64 `json:"max" format:"uint64"` +} + +type ProcessUsage struct { + CPU ProcessUsageCPU `json:"cpu_usage"` + Memory ProcessUsageMemory `json:"memory_bytes"` +} diff --git a/http/api/report.go b/http/api/report.go index 52497e96..e05d1cb1 100644 --- a/http/api/report.go +++ b/http/api/report.go @@ -8,13 +8,14 @@ import ( // ProcessReportEntry represents the logs of a run of a restream process type ProcessReportEntry struct { - CreatedAt int64 `json:"created_at" format:"int64"` - Prelude []string `json:"prelude,omitempty"` - Log [][2]string `json:"log,omitempty"` - Matches []string `json:"matches,omitempty"` - ExitedAt int64 `json:"exited_at,omitempty" format:"int64"` - ExitState string `json:"exit_state,omitempty"` - Progress *Progress `json:"progress,omitempty"` + CreatedAt int64 `json:"created_at" format:"int64"` + Prelude []string `json:"prelude,omitempty"` + Log [][2]string `json:"log,omitempty"` + Matches []string `json:"matches,omitempty"` + ExitedAt int64 `json:"exited_at,omitempty" format:"int64"` + ExitState string `json:"exit_state,omitempty"` + Progress *Progress `json:"progress,omitempty"` + Resources *ProcessUsage `json:"resources,omitempty"` } type ProcessReportHistoryEntry struct { @@ -52,6 +53,16 @@ func (report *ProcessReport) Unmarshal(l *app.Log) { Matches: h.Matches, ExitedAt: h.ExitedAt.Unix(), ExitState: h.ExitState, + Resources: &ProcessUsage{ + CPU: ProcessUsageCPU{ + Average: toNumber(h.Usage.CPU.Average), + Max: toNumber(h.Usage.CPU.Max), + }, + Memory: ProcessUsageMemory{ + Average: toNumber(h.Usage.Memory.Average), + Max: h.Usage.Memory.Max, + }, + }, } he.Progress = &Progress{} diff --git a/process/limits.go b/process/limits.go index 0c43b3ba..890e79d9 100644 --- a/process/limits.go +++ b/process/limits.go @@ -9,6 +9,21 @@ import ( "github.com/datarhei/core/v16/psutil" ) +type Usage struct { + CPU struct { + Current float64 // percent 0-100 + Average float64 // percent 0-100 + Max float64 // percent 0-100 + Limit float64 // percent 0-100 + } + Memory struct { + Current uint64 // bytes + Average float64 // bytes + Max uint64 // bytes + Limit uint64 // bytes + } +} + type LimitFunc func(cpu float64, memory uint64) type LimiterConfig struct { @@ -28,8 +43,12 @@ type Limiter interface { // Current returns the current CPU and memory values Current() (cpu float64, memory uint64) - // Limits returns the defined CPU and memory limits. Values < 0 means no limit + // Limits returns the defined CPU and memory limits. Values <= 0 means no limit Limits() (cpu float64, memory uint64) + + // Usage returns the current state of the limiter, such as current, average, max, and + // limit values for CPU and memory. + Usage() Usage } type limiter struct { @@ -38,15 +57,23 @@ type limiter struct { cancel context.CancelFunc onLimit LimitFunc - cpu float64 - cpuCurrent float64 - cpuLast float64 - cpuLimitSince time.Time + cpu float64 + cpuCurrent float64 + cpuMax float64 + cpuAvg float64 + cpuAvgCounter uint64 + cpuLast float64 + cpuLimitSince time.Time + memory uint64 memoryCurrent uint64 + memoryMax uint64 + memoryAvg float64 + memoryAvgCounter uint64 memoryLast uint64 memoryLimitSince time.Time - waitFor time.Duration + + waitFor time.Duration } // NewLimiter returns a new Limiter @@ -68,8 +95,15 @@ func NewLimiter(config LimiterConfig) Limiter { func (l *limiter) reset() { l.cpuCurrent = 0 l.cpuLast = 0 + l.cpuAvg = 0 + l.cpuAvgCounter = 0 + l.cpuMax = 0 + l.memoryCurrent = 0 l.memoryLast = 0 + l.memoryAvg = 0 + l.memoryAvgCounter = 0 + l.memoryMax = 0 } func (l *limiter) Start(process psutil.Process) error { @@ -87,7 +121,7 @@ func (l *limiter) Start(process psutil.Process) error { ctx, cancel := context.WithCancel(context.Background()) l.cancel = cancel - go l.ticker(ctx) + go l.ticker(ctx, 500*time.Millisecond) return nil } @@ -108,8 +142,8 @@ func (l *limiter) Stop() { l.reset() } -func (l *limiter) ticker(ctx context.Context) { - ticker := time.NewTicker(time.Second) +func (l *limiter) ticker(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -132,10 +166,26 @@ func (l *limiter) collect(t time.Time) { if mstat, err := l.proc.VirtualMemory(); err == nil { l.memoryLast, l.memoryCurrent = l.memoryCurrent, mstat + + if l.memoryCurrent > l.memoryMax { + l.memoryMax = l.memoryCurrent + } + + l.memoryAvgCounter++ + + l.memoryAvg = ((l.memoryAvg * float64(l.memoryAvgCounter-1)) + float64(l.memoryCurrent)) / float64(l.memoryAvgCounter) } if cpustat, err := l.proc.CPUPercent(); err == nil { l.cpuLast, l.cpuCurrent = l.cpuCurrent, cpustat.System+cpustat.User+cpustat.Other + + if l.cpuCurrent > l.cpuMax { + l.cpuMax = l.cpuCurrent + } + + l.cpuAvgCounter++ + + l.cpuAvg = ((l.cpuAvg * float64(l.cpuAvgCounter-1)) + l.cpuCurrent) / float64(l.cpuAvgCounter) } isLimitExceeded := false @@ -185,6 +235,25 @@ func (l *limiter) Current() (cpu float64, memory uint64) { return } +func (l *limiter) Usage() Usage { + l.lock.Lock() + defer l.lock.Unlock() + + usage := Usage{} + + usage.CPU.Limit = l.cpu + usage.CPU.Current = l.cpuCurrent + usage.CPU.Average = l.cpuAvg + usage.CPU.Max = l.cpuMax + + usage.Memory.Limit = l.memory + usage.Memory.Current = l.memoryCurrent + usage.Memory.Average = l.memoryAvg + usage.Memory.Max = l.memoryMax + + return usage +} + func (l *limiter) Limits() (cpu float64, memory uint64) { return l.cpu, l.memory } diff --git a/process/parser.go b/process/parser.go index 2e7591c4..914289b5 100644 --- a/process/parser.go +++ b/process/parser.go @@ -14,7 +14,7 @@ type Parser interface { // Stop tells the parser that the process stopped and provides // its exit state. - Stop(state string) + Stop(state string, usage Usage) // Reset resets any collected statistics or temporary data. // This is called before the process starts and after the @@ -48,7 +48,7 @@ func NewNullParser() Parser { var _ Parser = &nullParser{} func (p *nullParser) Parse(string) uint64 { return 1 } -func (p *nullParser) Stop(string) {} +func (p *nullParser) Stop(string, Usage) {} func (p *nullParser) ResetStats() {} func (p *nullParser) ResetLog() {} func (p *nullParser) Log() []Line { return []Line{} } diff --git a/process/process.go b/process/process.go index 75d007bb..ff531004 100644 --- a/process/process.go +++ b/process/process.go @@ -72,9 +72,19 @@ type Status struct { Reconnect time.Duration // Reconnect is the time until the next reconnect, negative if no reconnect is scheduled. Duration time.Duration // Duration is the time since the last change of the state Time time.Time // Time is the time of the last change of the state - CPU float64 // Used CPU in percent - Memory uint64 // Used memory in bytes CommandArgs []string // Currently running command arguments + CPU struct { + Current float64 + Average float64 + Max float64 + Limit float64 + } // Used CPU in percent + Memory struct { + Current uint64 + Average float64 + Max uint64 + Limit uint64 + } // Used memory in bytes } // States @@ -275,8 +285,9 @@ func (p *process) initState(state stateType) { // setState sets a new state. It also checks if the transition // of the current state to the new state is allowed. If not, -// the current state will not be changed. -func (p *process) setState(state stateType) error { +// the current state will not be changed. It returns the previous +// state or an error +func (p *process) setState(state stateType) (stateType, error) { p.state.lock.Lock() defer p.state.lock.Unlock() @@ -353,11 +364,11 @@ func (p *process) setState(state stateType) error { failed = true } } else { - return fmt.Errorf("current state is unhandled: %s", p.state.state) + return "", fmt.Errorf("current state is unhandled: %s", p.state.state) } if failed { - return fmt.Errorf("can't change from state %s to %s", p.state.state, state) + return "", fmt.Errorf("can't change from state %s to %s", p.state.state, state) } p.state.time = time.Now() @@ -368,7 +379,7 @@ func (p *process) setState(state stateType) error { } p.callbacks.lock.Unlock() - return nil + return prevState, nil } func (p *process) getState() stateType { @@ -394,7 +405,7 @@ func (p *process) getStateString() string { // Status returns the current status of the process func (p *process) Status() Status { - cpu, memory := p.limits.Current() + usage := p.limits.Usage() p.state.lock.Lock() stateTime := p.state.time @@ -413,8 +424,8 @@ func (p *process) Status() Status { Reconnect: time.Duration(-1), Duration: time.Since(stateTime), Time: stateTime, - CPU: cpu, - Memory: memory, + CPU: usage.CPU, + Memory: usage.Memory, } s.CommandArgs = make([]string, len(p.args)) @@ -489,8 +500,12 @@ func (p *process) start() error { // Stop any restart timer in order to start the process immediately p.unreconnect() + fmt.Printf("q\n") + p.setState(stateStarting) + fmt.Printf("w\n") + args := p.args p.callbacks.lock.Lock() @@ -502,6 +517,8 @@ func (p *process) start() error { } p.callbacks.lock.Unlock() + fmt.Printf("e\n") + // Start the stop timeout if enabled if p.timeout > time.Duration(0) { p.stopTimerLock.Lock() @@ -519,6 +536,8 @@ func (p *process) start() error { p.stopTimerLock.Unlock() } + fmt.Printf("r\n") + p.cmd = exec.Command(p.binary, args...) p.cmd.Env = []string{} @@ -545,7 +564,8 @@ func (p *process) start() error { p.pid = int32(p.cmd.Process.Pid) - if proc, err := psutil.NewProcess(p.pid); err == nil { + if proc, err := psutil.NewProcess(p.pid, false); err == nil { + fmt.Printf("starting limiter\n") p.limits.Start(proc) } @@ -651,9 +671,6 @@ func (p *process) stop(wait bool) error { p.callbacks.onExit = func(string) { wg.Done() - p.callbacks.lock.Lock() - defer p.callbacks.lock.Unlock() - p.callbacks.onExit = nil } } else { @@ -662,9 +679,6 @@ func (p *process) stop(wait bool) error { cb(state) wg.Done() - p.callbacks.lock.Lock() - defer p.callbacks.lock.Unlock() - p.callbacks.onExit = cb } } @@ -878,6 +892,7 @@ func (p *process) waiter() { p.logger.Info().Log("Stopped") p.debuglogger.WithField("log", p.parser.Log()).Debug().Log("Stopped") + pusage := p.limits.Usage() p.limits.Stop() // Stop the stop timer @@ -908,7 +923,7 @@ func (p *process) waiter() { p.stale.lock.Unlock() // Send exit state to the parser - p.parser.Stop(state.String()) + p.parser.Stop(state.String(), pusage) // Reset the parser stats p.parser.ResetStats() diff --git a/psutil/process.go b/psutil/process.go index 1b1ab3bd..c0d5babf 100644 --- a/psutil/process.go +++ b/psutil/process.go @@ -2,6 +2,8 @@ package psutil import ( "context" + "fmt" + "math" "sync" "time" @@ -9,8 +11,14 @@ import ( ) type Process interface { + // CPUPercent returns the current CPU load for this process only. The values + // are normed to the range of 0 to 100. CPUPercent() (*CPUInfoStat, error) + + // VirtualMemory returns the current memory usage in bytes of this process only. VirtualMemory() (uint64, error) + + // Stop will stop collecting CPU and memory data for this process. Stop() } @@ -28,14 +36,17 @@ type process struct { statCurrentTime time.Time statPrevious cpuTimesStat statPreviousTime time.Time + + imposeLimit bool } -func (u *util) Process(pid int32) (Process, error) { +func (u *util) Process(pid int32, limit bool) (Process, error) { p := &process{ - pid: pid, - hasCgroup: u.hasCgroup, - cpuLimit: u.cpuLimit, - ncpu: u.ncpu, + pid: pid, + hasCgroup: u.hasCgroup, + cpuLimit: u.cpuLimit, + ncpu: u.ncpu, + imposeLimit: limit, } proc, err := psprocess.NewProcess(pid) @@ -47,19 +58,23 @@ func (u *util) Process(pid int32) (Process, error) { ctx, cancel := context.WithCancel(context.Background()) p.stopTicker = cancel - go p.tick(ctx) + go p.tick(ctx, 1000*time.Millisecond) return p, nil } -func NewProcess(pid int32) (Process, error) { - return DefaultUtil.Process(pid) +func NewProcess(pid int32, limit bool) (Process, error) { + return DefaultUtil.Process(pid, limit) } -func (p *process) tick(ctx context.Context) { - ticker := time.NewTicker(time.Second) +func (p *process) tick(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() + if p.imposeLimit { + go p.limit(ctx, interval) + } + for { select { case <-ctx.Done(): @@ -71,6 +86,65 @@ func (p *process) tick(ctx context.Context) { p.statPrevious, p.statCurrent = p.statCurrent, stat p.statPreviousTime, p.statCurrentTime = p.statCurrentTime, t p.lock.Unlock() + + pct, _ := p.CPUPercent() + pcpu := (pct.System + pct.User + pct.Other) / 100 + + fmt.Printf("%d\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu) + } + } +} + +func (p *process) limit(ctx context.Context, interval time.Duration) { + var limit float64 = 50.0 / 100.0 / p.ncpu + var workingrate float64 = -1 + + counter := 0 + + for { + select { + case <-ctx.Done(): + return + default: + pct, _ := p.CPUPercent() + /* + pct.System *= p.ncpu + pct.Idle *= p.ncpu + pct.User *= p.ncpu + pct.Other *= p.ncpu + */ + + pcpu := (pct.System + pct.User + pct.Other) / 100 + + if workingrate < 0 { + workingrate = limit + } else { + workingrate = math.Min(workingrate/pcpu*limit, 1) + } + + worktime := float64(interval.Nanoseconds()) * workingrate + sleeptime := float64(interval.Nanoseconds()) - worktime + /* + if counter%20 == 0 { + fmt.Printf("\nPID\t%%CPU\twork quantum\tsleep quantum\tactive rate\n") + counter = 0 + } + + fmt.Printf("%d\t%0.2f%%\t%.2f us\t%.2f us\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu, worktime/1000, sleeptime/1000, workingrate*100) + */ + if p.imposeLimit { + p.proc.Resume() + } + time.Sleep(time.Duration(worktime) * time.Nanosecond) + + if sleeptime > 0 { + if p.imposeLimit { + p.proc.Suspend() + } + time.Sleep(time.Duration(sleeptime) * time.Nanosecond) + } + + counter++ } } } @@ -104,6 +178,9 @@ func (p *process) cpuTimes() (*cpuTimesStat, error) { } s.other = s.total - s.system - s.user + if s.other < 0.0001 { + s.other = 0 + } return s, nil } diff --git a/psutil/psutil.go b/psutil/psutil.go index 16b306bd..9b08ffee 100644 --- a/psutil/psutil.go +++ b/psutil/psutil.go @@ -46,35 +46,42 @@ func init() { } type MemoryInfoStat struct { - Total uint64 - Available uint64 - Used uint64 + Total uint64 // bytes + Available uint64 // bytes + Used uint64 // bytes } type CPUInfoStat struct { - System float64 - User float64 - Idle float64 - Other float64 + System float64 // percent 0-100 + User float64 // percent 0-100 + Idle float64 // percent 0-100 + Other float64 // percent 0-100 } type cpuTimesStat struct { - total float64 - system float64 - user float64 - idle float64 - other float64 + total float64 // seconds + system float64 // seconds + user float64 // seconds + idle float64 // seconds + other float64 // seconds } type Util interface { Start() Stop() + + // CPUCounts returns the number of cores, either logical or physical. CPUCounts(logical bool) (float64, error) + + // CPUPercent returns the current CPU load in percent. The values range + // from 0 to 100, independently of the number of logical cores. CPUPercent() (*CPUInfoStat, error) DiskUsage(path string) (*disk.UsageStat, error) VirtualMemory() (*MemoryInfoStat, error) NetIOCounters(pernic bool) ([]net.IOCountersStat, error) - Process(pid int32) (Process, error) + + // Process returns a process observer for a process with the given pid. + Process(pid int32, limit bool) (Process, error) } type util struct { @@ -131,7 +138,7 @@ func (u *util) Start() { ctx, cancel := context.WithCancel(context.Background()) u.stopTicker = cancel - go u.tick(ctx, time.Second) + go u.tick(ctx, 100*time.Millisecond) }) } @@ -240,6 +247,9 @@ func (u *util) tick(ctx context.Context, interval time.Duration) { u.statPrevious, u.statCurrent = u.statCurrent, stat u.statPreviousTime, u.statCurrentTime = u.statCurrentTime, t u.lock.Unlock() + + //p, _ := u.CPUPercent() + //fmt.Printf("%+v\n", p) } } } @@ -273,6 +283,7 @@ func CPUCounts(logical bool) (float64, error) { return DefaultUtil.CPUCounts(logical) } +// cpuTimes returns the current cpu usage times in seconds. func (u *util) cpuTimes() (*cpuTimesStat, error) { if u.hasCgroup && u.cpuLimit > 0 { if stat, err := u.cgroupCPUTimes(u.cgroupType); err == nil { @@ -280,7 +291,7 @@ func (u *util) cpuTimes() (*cpuTimesStat, error) { } } - times, err := cpu.Times(false) + times, err := cpu.Times(true) if err != nil { return nil, err } @@ -289,14 +300,19 @@ func (u *util) cpuTimes() (*cpuTimesStat, error) { return nil, errors.New("cpu.Times() returned an empty slice") } - s := &cpuTimesStat{ - total: cpuTotal(×[0]), - system: times[0].System, - user: times[0].User, - idle: times[0].Idle, - } + s := &cpuTimesStat{} - s.other = s.total - s.system - s.user - s.idle + for _, t := range times { + s.total += cpuTotal(&t) + s.system += t.System + s.user += t.User + s.idle += t.Idle + + s.other = s.total - s.system - s.user - s.idle + if s.other < 0.0001 { + s.other = 0 + } + } return s, nil } diff --git a/restream/app/log.go b/restream/app/log.go index 8a4173cf..470db445 100644 --- a/restream/app/log.go +++ b/restream/app/log.go @@ -22,6 +22,7 @@ type LogHistoryEntry struct { ExitedAt time.Time ExitState string Progress Progress + Usage ProcessUsage } type Log struct { diff --git a/restream/app/process.go b/restream/app/process.go index 25d354f6..da7ddddf 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -162,3 +162,20 @@ type State struct { CPU float64 // Current CPU consumption in percent Command []string // ffmpeg command line parameters } + +type ProcessUsageCPU struct { + Average float64 + Max float64 + Limit float64 +} + +type ProcessUsageMemory struct { + Average float64 + Max uint64 + Limit uint64 +} + +type ProcessUsage struct { + CPU ProcessUsageCPU + Memory ProcessUsageMemory +} diff --git a/restream/restream.go b/restream/restream.go index 50ed5df5..ac281dcf 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1281,8 +1281,8 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { state.State = status.State state.States.Marshal(status.States) state.Time = status.Time.Unix() - state.Memory = status.Memory - state.CPU = status.CPU + state.Memory = status.Memory.Current + state.CPU = status.CPU.Current state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds() state.Reconnect = -1 state.Command = status.CommandArgs @@ -1456,6 +1456,18 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { }, ExitedAt: h.ExitedAt, ExitState: h.ExitState, + Usage: app.ProcessUsage{ + CPU: app.ProcessUsageCPU{ + Average: h.Usage.CPU.Average, + Max: h.Usage.CPU.Max, + Limit: h.Usage.CPU.Limit, + }, + Memory: app.ProcessUsageMemory{ + Average: h.Usage.Memory.Average, + Max: h.Usage.Memory.Max, + Limit: h.Usage.Memory.Limit, + }, + }, } convertProgressFromParser(&e.Progress, h.Progress)