Add exit state and last progress data to process report history

This commit is contained in:
Ingo Oppermann
2023-03-01 15:28:28 +01:00
parent 3cad139952
commit 86b3c053f1
15 changed files with 372 additions and 88 deletions

View File

@@ -1,5 +1,4 @@
// Package docs GENERATED BY SWAG; DO NOT EDIT
// This file was generated by swaggo/swag
// Code generated by swaggo/swag. DO NOT EDIT
package docs
import "github.com/swaggo/swag"
@@ -3191,6 +3190,9 @@ const docTemplate = `{
"type": "integer",
"format": "int64"
},
"exit_state": {
"type": "string"
},
"log": {
"type": "array",
"items": {
@@ -3205,6 +3207,9 @@ const docTemplate = `{
"items": {
"type": "string"
}
},
"progress": {
"$ref": "#/definitions/api.Progress"
}
}
},

View File

@@ -3183,6 +3183,9 @@
"type": "integer",
"format": "int64"
},
"exit_state": {
"type": "string"
},
"log": {
"type": "array",
"items": {
@@ -3197,6 +3200,9 @@
"items": {
"type": "string"
}
},
"progress": {
"$ref": "#/definitions/api.Progress"
}
}
},

View File

@@ -809,6 +809,8 @@ definitions:
created_at:
format: int64
type: integer
exit_state:
type: string
log:
items:
items:
@@ -819,6 +821,8 @@ definitions:
items:
type: string
type: array
progress:
$ref: '#/definitions/api.Progress'
type: object
api.ProcessState:
properties:

View File

@@ -36,7 +36,7 @@ type ProcessConfig struct {
Parser process.Parser
Logger log.Logger
OnArgs func([]string) []string
OnExit func()
OnExit func(state string)
OnStart func()
OnStateChange func(from, to string)
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net/url"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/session"
)
@@ -23,7 +22,7 @@ type Parser interface {
process.Parser
// Progress returns the current progress information of the process
Progress() app.Progress
Progress() Progress
// Prelude returns an array of the lines before the progress information started
Prelude() []string
@@ -32,7 +31,7 @@ type Parser interface {
Report() Report
// ReportHistory returns an array of previews logs
ReportHistory() []Report
ReportHistory() []ReportHistoryEntry
}
// Config is the config for the Parser implementation
@@ -116,7 +115,7 @@ func New(config Config) Parser {
}
if p.logger == nil {
p.logger = log.New("Parser")
p.logger = log.New("")
}
if p.logLines <= 0 {
@@ -503,7 +502,12 @@ func (p *parser) parseAVstreamProgress(line string) error {
return nil
}
func (p *parser) Progress() app.Progress {
func (p *parser) Stop(state string) {
// The process stopped. The right moment to store the current state to the log history
p.storeReportHistory(state)
}
func (p *parser) Progress() Progress {
p.lock.progress.RLock()
defer p.lock.progress.RUnlock()
@@ -685,8 +689,6 @@ func (p *parser) ResetStats() {
}
func (p *parser) ResetLog() {
p.storeLogHistory()
p.lock.prelude.Lock()
p.prelude.data = []string{}
p.prelude.tail = ring.New(p.prelude.tailLines)
@@ -700,25 +702,41 @@ func (p *parser) ResetLog() {
p.lock.log.Unlock()
}
// Report represents a log report, including the prelude and the last log lines
// of the process.
// Report represents a log report, including the prelude and the last log lines of the process.
type Report struct {
CreatedAt time.Time
Prelude []string
Log []process.Line
}
func (p *parser) storeLogHistory() {
// ReportHistoryEntry represents an historical log report, including the exit status of the
// process and the last progress data.
type ReportHistoryEntry struct {
Report
ExitState string
Progress Progress
}
func (p *parser) storeReportHistory(state string) {
if p.logHistory == nil {
return
}
h := p.Report()
report := p.Report()
if len(report.Prelude) == 0 {
return
}
h := ReportHistoryEntry{
Report: report,
ExitState: state,
Progress: p.Progress(),
}
if len(h.Prelude) != 0 {
p.logHistory.Value = h
p.logHistory = p.logHistory.Next()
}
}
func (p *parser) Report() Report {
@@ -734,15 +752,15 @@ func (p *parser) Report() Report {
return h
}
func (p *parser) ReportHistory() []Report {
var history = []Report{}
func (p *parser) ReportHistory() []ReportHistoryEntry {
var history = []ReportHistoryEntry{}
p.logHistory.Do(func(l interface{}) {
if l == nil {
return
}
history = append(history, l.(Report))
history = append(history, l.(ReportHistoryEntry))
})
return history

View File

@@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/datarhei/core/v16/restream/app"
"github.com/stretchr/testify/require"
)
@@ -19,7 +18,7 @@ func TestParserProgress(t *testing.T) {
parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463")
d, _ := time.ParseDuration("3m58s440ms")
wantP := app.Progress{
wantP := Progress{
Frame: 5968,
FPS: 25,
Quantizer: 19.4,

View File

@@ -4,8 +4,6 @@ import (
"encoding/json"
"errors"
"time"
"github.com/datarhei/core/v16/restream/app"
)
// Duration represents a time.Duration
@@ -49,8 +47,8 @@ type ffmpegAVstreamIO struct {
Size uint64 `json:"size_kb"`
}
func (avio *ffmpegAVstreamIO) export() app.AVstreamIO {
return app.AVstreamIO{
func (avio *ffmpegAVstreamIO) export() AVstreamIO {
return AVstreamIO{
State: avio.State,
Packet: avio.Packet,
Time: avio.Time,
@@ -74,8 +72,8 @@ type ffmpegAVstream struct {
GOP string `json:"gop"`
}
func (av *ffmpegAVstream) export() *app.AVstream {
return &app.AVstream{
func (av *ffmpegAVstream) export() *AVstream {
return &AVstream{
Aqueue: av.Aqueue,
Queue: av.Queue,
Drop: av.Drop,
@@ -104,7 +102,7 @@ type ffmpegProgressIO struct {
Quantizer float64 `json:"q"`
}
func (io *ffmpegProgressIO) exportTo(progress *app.ProgressIO) {
func (io *ffmpegProgressIO) exportTo(progress *ProgressIO) {
progress.Index = io.Index
progress.Stream = io.Stream
progress.Frame = io.Frame
@@ -132,7 +130,7 @@ type ffmpegProgress struct {
Dup uint64 `json:"dup"`
}
func (p *ffmpegProgress) exportTo(progress *app.Progress) {
func (p *ffmpegProgress) exportTo(progress *Progress) {
progress.Frame = p.Frame
progress.Packet = p.Packet
progress.FPS = p.FPS
@@ -184,8 +182,8 @@ type ffmpegProcessIO struct {
Channels uint64 `json:"channels"`
}
func (io *ffmpegProcessIO) export() app.ProgressIO {
return app.ProgressIO{
func (io *ffmpegProcessIO) export() ProgressIO {
return ProgressIO{
Address: io.Address,
Format: io.Format,
Index: io.Index,
@@ -207,8 +205,8 @@ type ffmpegProcess struct {
output []ffmpegProcessIO
}
func (p *ffmpegProcess) export() app.Progress {
progress := app.Progress{}
func (p *ffmpegProcess) export() Progress {
progress := Progress{}
for _, io := range p.input {
aio := io.export()
@@ -224,3 +222,71 @@ func (p *ffmpegProcess) export() app.Progress {
return progress
}
type ProgressIO struct {
Address string
// General
Index uint64
Stream uint64
Format string
Type string
Codec string
Coder string
Frame uint64
FPS float64
Packet uint64
PPS float64
Size uint64 // bytes
Bitrate float64 // bit/s
// Video
Pixfmt string
Quantizer float64
Width uint64
Height uint64
// Audio
Sampling uint64
Layout string
Channels uint64
// avstream
AVstream *AVstream
}
type Progress struct {
Input []ProgressIO
Output []ProgressIO
Frame uint64
Packet uint64
FPS float64
PPS float64
Quantizer float64
Size uint64 // bytes
Time float64
Bitrate float64 // bit/s
Speed float64
Drop uint64
Dup uint64
}
type AVstreamIO struct {
State string
Packet uint64
Time uint64
Size uint64
}
type AVstream struct {
Input AVstreamIO
Output AVstreamIO
Aqueue uint64
Queue uint64
Dup uint64
Drop uint64
Enc uint64
Looping bool
Duplicating bool
GOP string
}

View File

@@ -8,13 +8,12 @@ import (
"github.com/datarhei/core/v16/ffmpeg/prelude"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app"
)
type Parser interface {
process.Parser
Probe() app.Probe
Probe() Probe
}
type Config struct {
@@ -40,8 +39,8 @@ func New(config Config) Parser {
return p
}
func (p *prober) Probe() app.Probe {
probe := app.Probe{}
func (p *prober) Probe() Probe {
probe := Probe{}
for _, io := range p.inputs {
probe.Streams = append(probe.Streams, io.export())
@@ -112,6 +111,8 @@ func (p *prober) parseDefault() {
}
}
func (p *prober) Stop(state string) {}
func (p *prober) Log() []process.Line {
return p.data
}

View File

@@ -1,9 +1,5 @@
package probe
import (
"github.com/datarhei/core/v16/restream/app"
)
type probeIO struct {
// common
Address string `json:"url"`
@@ -29,8 +25,8 @@ type probeIO struct {
Channels uint64 `json:"channels"`
}
func (io *probeIO) export() app.ProbeIO {
return app.ProbeIO{
func (io *probeIO) export() ProbeIO {
return ProbeIO{
Address: io.Address,
Format: io.Format,
Index: io.Index,
@@ -50,3 +46,34 @@ func (io *probeIO) export() app.ProbeIO {
Channels: io.Channels,
}
}
type ProbeIO struct {
Address string
// General
Index uint64
Stream uint64
Language string
Format string
Type string
Codec string
Coder string
Bitrate float64 // kbit/s
Duration float64
// Video
Pixfmt string
Width uint64
Height uint64
FPS float64
// Audio
Sampling uint64
Layout string
Channels uint64
}
type Probe struct {
Streams []ProbeIO
Log []string
}

View File

@@ -186,16 +186,23 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
}
}
// ProcessReportHistoryEntry represents the logs of a run of a restream process
type ProcessReportHistoryEntry struct {
// ProcessReportEntry represents the logs of a run of a restream process
type ProcessReportEntry struct {
CreatedAt int64 `json:"created_at" format:"int64"`
Prelude []string `json:"prelude"`
Log [][2]string `json:"log"`
}
type ProcessReportHistoryEntry struct {
ProcessReportEntry
ExitState string `json:"exit_state"`
Progress Progress `json:"progress"`
}
// ProcessReport represents the current log and the logs of previous runs of a restream process
type ProcessReport struct {
ProcessReportHistoryEntry
ProcessReportEntry
History []ProcessReportHistoryEntry `json:"history"`
}
@@ -217,14 +224,19 @@ func (report *ProcessReport) Unmarshal(l *app.Log) {
for _, h := range l.History {
he := ProcessReportHistoryEntry{
ProcessReportEntry: ProcessReportEntry{
CreatedAt: h.CreatedAt.Unix(),
Prelude: h.Prelude,
Log: make([][2]string, len(h.Log)),
},
ExitState: h.ExitState,
}
he.Progress.Unmarshal(&h.Progress)
for i, line := range h.Log {
he.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
he.Log[i][1] = line.Data
he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
he.ProcessReportEntry.Log[i][1] = line.Data
}
report.History = append(report.History, he)

View File

@@ -12,6 +12,10 @@ type Parser interface {
// or previous line, ...)
Parse(line string) uint64
// Stop tells the parser that the process stopped and provides
// its exit state.
Stop(state string)
// Reset resets any collected statistics or temporary data.
// This is called before the process starts and after the
// process stopped. The stats are meant to be collected
@@ -43,10 +47,8 @@ func NewNullParser() Parser {
var _ Parser = &nullParser{}
func (p *nullParser) Parse(line string) uint64 { return 1 }
func (p *nullParser) Log() []Line { return []Line{} }
func (p *nullParser) Parse(string) uint64 { return 1 }
func (p *nullParser) Stop(string) {}
func (p *nullParser) ResetStats() {}
func (p *nullParser) ResetLog() {}
func (p *nullParser) Log() []Line { return []Line{} }

View File

@@ -57,7 +57,7 @@ type Config struct {
Parser Parser // A parser for the output of the process
OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args
OnStart func() // A callback which is called after the process started
OnExit func() // A callback which is called after the process exited
OnExit func(state string) // A callback which is called after the process exited with the exit state
OnStateChange func(from, to string) // A callback which is called after a state changed
Logger log.Logger
}
@@ -192,7 +192,7 @@ type process struct {
callbacks struct {
onArgs func(args []string) []string
onStart func()
onExit func()
onExit func(state string)
onStateChange func(from, to string)
lock sync.Mutex
}
@@ -602,14 +602,14 @@ func (p *process) stop(wait bool) error {
p.callbacks.lock.Lock()
if p.callbacks.onExit == nil {
p.callbacks.onExit = func() {
p.callbacks.onExit = func(string) {
wg.Done()
p.callbacks.onExit = nil
}
} else {
cb := p.callbacks.onExit
p.callbacks.onExit = func() {
cb()
p.callbacks.onExit = func(state string) {
cb(state)
wg.Done()
p.callbacks.onExit = cb
}
@@ -770,6 +770,10 @@ func (p *process) waiter() {
p.stop(false)
}
// The process exited normally, i.e. the return code is zero and no signal
// has been raised
state := stateFinished
if err := p.cmd.Wait(); err != nil {
// The process exited abnormally, i.e. the return code is non-zero or a signal
// has been raised.
@@ -791,34 +795,32 @@ func (p *process) waiter() {
// If ffmpeg has been killed with a SIGINT, SIGTERM, etc., then it exited normally,
// i.e. closing all stream properly such that all written data is sane.
p.logger.Info().Log("Finished")
p.setState(stateFinished)
state = stateFinished
} else {
// The process exited by itself with a non-zero return code
p.logger.Info().Log("Failed")
p.setState(stateFailed)
state = stateFailed
}
} else if status.Signaled() {
// If ffmpeg has been killed the hard way, something went wrong and
// it can be assumed that any written data is not sane.
p.logger.Info().Log("Killed")
p.setState(stateKilled)
state = stateKilled
} else {
// The process exited because of something else (e.g. coredump, ...)
p.logger.Info().Log("Killed")
p.setState(stateKilled)
state = stateKilled
}
} else {
// Some other error regarding I/O triggered during Wait()
p.logger.Info().Log("Killed")
p.logger.WithError(err).Debug().Log("Killed")
p.setState(stateKilled)
state = stateKilled
}
} else {
// The process exited normally, i.e. the return code is zero and no signal
// has been raised
p.setState(stateFinished)
}
p.setState(state)
p.logger.Info().Log("Stopped")
p.debuglogger.WithField("log", p.parser.Log()).Debug().Log("Stopped")
@@ -840,13 +842,16 @@ func (p *process) waiter() {
}
p.stale.lock.Unlock()
// Send exit state to the parser
p.parser.Stop(state.String())
// Reset the parser stats
p.parser.ResetStats()
// Call the onExit callback
p.callbacks.lock.Lock()
if p.callbacks.onExit != nil {
go p.callbacks.onExit()
go p.callbacks.onExit(state.String())
}
p.callbacks.lock.Unlock()

View File

@@ -171,7 +171,7 @@ func TestFFmpegWaitStop(t *testing.T) {
Args: []string{},
Reconnect: false,
StaleTimeout: 0,
OnExit: func() {
OnExit: func(state string) {
time.Sleep(2 * time.Second)
},
})

View File

@@ -4,18 +4,25 @@ import (
"time"
)
type LogEntry struct {
type LogLine struct {
Timestamp time.Time
Data string
}
type LogHistoryEntry struct {
type LogEntry struct {
CreatedAt time.Time
Prelude []string
Log []LogEntry
Log []LogLine
}
type LogHistoryEntry struct {
LogEntry
ExitState string
Progress Progress
}
type Log struct {
LogHistoryEntry
LogEntry
History []LogHistoryEntry
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/ffmpeg/probe"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs"
@@ -1266,7 +1267,7 @@ func (r *restream) GetProcessState(id string) (*app.State, error) {
}
}
state.Progress = task.parser.Progress()
convertProgressFromParser(&state.Progress, task.parser.Progress())
for i, p := range state.Progress.Input {
if int(p.Index) >= len(task.process.Config.Input) {
@@ -1293,6 +1294,103 @@ func (r *restream) GetProcessState(id string) (*app.State, error) {
return state, nil
}
func convertProgressFromParser(progress *app.Progress, pprogress parse.Progress) {
progress.Frame = pprogress.Frame
progress.Packet = pprogress.Packet
progress.FPS = pprogress.FPS
progress.PPS = pprogress.PPS
progress.Quantizer = pprogress.Quantizer
progress.Size = pprogress.Size
progress.Time = pprogress.Time
progress.Bitrate = pprogress.Bitrate
progress.Speed = pprogress.Speed
progress.Drop = pprogress.Drop
progress.Dup = pprogress.Dup
for _, pinput := range pprogress.Input {
input := app.ProgressIO{
Address: pinput.Address,
Index: pinput.Index,
Stream: pinput.Stream,
Format: pinput.Format,
Type: pinput.Type,
Codec: pinput.Codec,
Coder: pinput.Coder,
Frame: pinput.Frame,
FPS: pinput.FPS,
Packet: pinput.Packet,
PPS: pinput.PPS,
Size: pinput.Size,
Bitrate: pinput.Bitrate,
Pixfmt: pinput.Pixfmt,
Quantizer: pinput.Quantizer,
Width: pinput.Width,
Height: pinput.Height,
Sampling: pinput.Sampling,
Layout: pinput.Layout,
Channels: pinput.Channels,
AVstream: nil,
}
if pinput.AVstream != nil {
avstream := &app.AVstream{
Input: app.AVstreamIO{
State: pinput.AVstream.Input.State,
Packet: pinput.AVstream.Input.Packet,
Time: pinput.AVstream.Input.Time,
Size: pinput.AVstream.Input.Size,
},
Output: app.AVstreamIO{
State: pinput.AVstream.Output.State,
Packet: pinput.AVstream.Output.Packet,
Time: pinput.AVstream.Output.Time,
Size: pinput.AVstream.Output.Size,
},
Aqueue: pinput.AVstream.Aqueue,
Queue: pinput.AVstream.Queue,
Dup: pinput.AVstream.Dup,
Drop: pinput.AVstream.Drop,
Enc: pinput.AVstream.Enc,
Looping: pinput.AVstream.Looping,
Duplicating: pinput.AVstream.Duplicating,
GOP: pinput.AVstream.GOP,
}
input.AVstream = avstream
}
progress.Input = append(progress.Input, input)
}
for _, poutput := range pprogress.Output {
output := app.ProgressIO{
Address: poutput.Address,
Index: poutput.Index,
Stream: poutput.Stream,
Format: poutput.Format,
Type: poutput.Type,
Codec: poutput.Codec,
Coder: poutput.Coder,
Frame: poutput.Frame,
FPS: poutput.FPS,
Packet: poutput.Packet,
PPS: poutput.PPS,
Size: poutput.Size,
Bitrate: poutput.Bitrate,
Pixfmt: poutput.Pixfmt,
Quantizer: poutput.Quantizer,
Width: poutput.Width,
Height: poutput.Height,
Sampling: poutput.Sampling,
Layout: poutput.Layout,
Channels: poutput.Channels,
AVstream: nil,
}
progress.Output = append(progress.Output, output)
}
}
func (r *restream) GetProcessLog(id string) (*app.Log, error) {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -1312,9 +1410,9 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
log.CreatedAt = current.CreatedAt
log.Prelude = current.Prelude
log.Log = make([]app.LogEntry, len(current.Log))
log.Log = make([]app.LogLine, len(current.Log))
for i, line := range current.Log {
log.Log[i] = app.LogEntry{
log.Log[i] = app.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
@@ -1324,13 +1422,18 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
for _, h := range history {
e := app.LogHistoryEntry{
LogEntry: app.LogEntry{
CreatedAt: h.CreatedAt,
Prelude: h.Prelude,
},
ExitState: h.ExitState,
}
e.Log = make([]app.LogEntry, len(h.Log))
convertProgressFromParser(&e.Progress, h.Progress)
e.LogEntry.Log = make([]app.LogLine, len(h.Log))
for i, line := range h.Log {
e.Log[i] = app.LogEntry{
e.LogEntry.Log[i] = app.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
@@ -1388,7 +1491,7 @@ func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe
Args: command,
Parser: prober,
Logger: task.logger,
OnExit: func() {
OnExit: func(string) {
wg.Done()
},
})
@@ -1402,11 +1505,40 @@ func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe
wg.Wait()
appprobe = prober.Probe()
convertProbeFromProber(&appprobe, prober.Probe())
return appprobe
}
func convertProbeFromProber(appprobe *app.Probe, pprobe probe.Probe) {
appprobe.Log = make([]string, len(pprobe.Log))
copy(appprobe.Log, pprobe.Log)
for _, s := range pprobe.Streams {
stream := app.ProbeIO{
Address: s.Address,
Index: s.Index,
Stream: s.Stream,
Language: s.Language,
Format: s.Format,
Type: s.Type,
Codec: s.Codec,
Coder: s.Coder,
Bitrate: s.Bitrate,
Duration: s.Duration,
Pixfmt: s.Pixfmt,
Width: s.Width,
Height: s.Height,
FPS: s.FPS,
Sampling: s.Sampling,
Layout: s.Layout,
Channels: s.Channels,
}
appprobe.Streams = append(appprobe.Streams, stream)
}
}
func (r *restream) Skills() skills.Skills {
return r.ffmpeg.Skills()
}