mirror of
https://github.com/datarhei/core.git
synced 2025-10-05 07:57:13 +08:00
Add log_patterns to process config
log_patterns allow to filter the FFmpeg log messages based on regular expressions. Each entry of log_patterns is interpreted as regular expression and matched against every non-progress log line emitted from FFmpeg. All matching lines are returned in the matches array of the report.
This commit is contained in:
20
docs/docs.go
20
docs/docs.go
@@ -711,7 +711,7 @@ const docTemplate = `{
|
||||
"v16.7.2"
|
||||
],
|
||||
"summary": "Remove a file from a filesystem",
|
||||
"operationId": "filesystem-3-delete-files",
|
||||
"operationId": "filesystem-3-delete-file",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
@@ -3402,6 +3402,12 @@ const docTemplate = `{
|
||||
"limits": {
|
||||
"$ref": "#/definitions/api.ProcessConfigLimits"
|
||||
},
|
||||
"log_patterns": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -3537,6 +3543,12 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"matches": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"prelude": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -3571,6 +3583,12 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"matches": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"prelude": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
|
@@ -704,7 +704,7 @@
|
||||
"v16.7.2"
|
||||
],
|
||||
"summary": "Remove a file from a filesystem",
|
||||
"operationId": "filesystem-3-delete-files",
|
||||
"operationId": "filesystem-3-delete-file",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
@@ -3395,6 +3395,12 @@
|
||||
"limits": {
|
||||
"$ref": "#/definitions/api.ProcessConfigLimits"
|
||||
},
|
||||
"log_patterns": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -3530,6 +3536,12 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"matches": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"prelude": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -3564,6 +3576,12 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"matches": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"prelude": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
|
@@ -763,6 +763,10 @@ definitions:
|
||||
type: array
|
||||
limits:
|
||||
$ref: '#/definitions/api.ProcessConfigLimits'
|
||||
log_patterns:
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
options:
|
||||
items:
|
||||
type: string
|
||||
@@ -858,6 +862,10 @@ definitions:
|
||||
type: string
|
||||
type: array
|
||||
type: array
|
||||
matches:
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
prelude:
|
||||
items:
|
||||
type: string
|
||||
@@ -881,6 +889,10 @@ definitions:
|
||||
type: string
|
||||
type: array
|
||||
type: array
|
||||
matches:
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
prelude:
|
||||
items:
|
||||
type: string
|
||||
@@ -2372,7 +2384,7 @@ paths:
|
||||
/api/v3/fs/{storage}/{filepath}:
|
||||
delete:
|
||||
description: Remove a file from a filesystem
|
||||
operationId: filesystem-3-delete-files
|
||||
operationId: filesystem-3-delete-file
|
||||
parameters:
|
||||
- description: Name of the filesystem
|
||||
in: path
|
||||
|
@@ -17,7 +17,7 @@ import (
|
||||
|
||||
type FFmpeg interface {
|
||||
New(config ProcessConfig) (process.Process, error)
|
||||
NewProcessParser(logger log.Logger, id, reference string) parse.Parser
|
||||
NewProcessParser(logger log.Logger, id, reference string, logpatterns []string) parse.Parser
|
||||
NewProbeParser(logger log.Logger) probe.Parser
|
||||
ValidateInputAddress(address string) bool
|
||||
ValidateOutputAddress(address string) bool
|
||||
@@ -168,11 +168,12 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
|
||||
return ffmpeg, err
|
||||
}
|
||||
|
||||
func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser {
|
||||
func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string, logpatterns []string) parse.Parser {
|
||||
p := parse.New(parse.Config{
|
||||
LogLines: f.logLines,
|
||||
LogHistory: f.historyLength,
|
||||
LogMinimalHistory: f.minimalHistoryLength,
|
||||
Patterns: logpatterns,
|
||||
Logger: logger,
|
||||
Collector: NewWrappedCollector(id, reference, f.collector),
|
||||
})
|
||||
|
@@ -48,6 +48,7 @@ type Config struct {
|
||||
LogMinimalHistory int
|
||||
PreludeHeadLines int
|
||||
PreludeTailLines int
|
||||
Patterns []string
|
||||
Logger log.Logger
|
||||
Collector session.Collector
|
||||
}
|
||||
@@ -72,6 +73,11 @@ type parser struct {
|
||||
done bool
|
||||
}
|
||||
|
||||
logpatterns struct {
|
||||
matches []string
|
||||
patterns []*regexp.Regexp
|
||||
}
|
||||
|
||||
log *ring.Ring
|
||||
logLines int
|
||||
logStart time.Time
|
||||
@@ -158,7 +164,7 @@ func New(config Config) Parser {
|
||||
p.lock.prelude.Unlock()
|
||||
|
||||
p.lock.log.Lock()
|
||||
p.log = ring.New(config.LogLines)
|
||||
p.log = ring.New(p.logLines)
|
||||
|
||||
historyLength := p.logHistoryLength + p.logMinimalHistoryLength
|
||||
|
||||
@@ -171,6 +177,17 @@ func New(config Config) Parser {
|
||||
}
|
||||
|
||||
p.logStart = time.Time{}
|
||||
|
||||
for _, pattern := range config.Patterns {
|
||||
r, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
p.logpatterns.matches = append(p.logpatterns.matches, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
p.logpatterns.patterns = append(p.logpatterns.patterns, r)
|
||||
}
|
||||
|
||||
p.lock.log.Unlock()
|
||||
|
||||
p.ResetStats()
|
||||
@@ -185,8 +202,8 @@ func (p *parser) Parse(line string) uint64 {
|
||||
isFFmpegProgress := strings.HasPrefix(line, "ffmpeg.progress:")
|
||||
isAVstreamProgress := strings.HasPrefix(line, "avstream.progress:")
|
||||
|
||||
if p.logStart.IsZero() {
|
||||
p.lock.log.Lock()
|
||||
if p.logStart.IsZero() {
|
||||
p.logStart = time.Now()
|
||||
|
||||
p.logger.WithComponent("ProcessReport").WithFields(log.Fields{
|
||||
@@ -194,9 +211,8 @@ func (p *parser) Parse(line string) uint64 {
|
||||
"report": "created",
|
||||
"timestamp": p.logStart.Unix(),
|
||||
}).Info().Log("Created")
|
||||
|
||||
p.lock.log.Unlock()
|
||||
}
|
||||
p.lock.log.Unlock()
|
||||
|
||||
p.lock.prelude.Lock()
|
||||
preludeDone := p.prelude.done
|
||||
@@ -268,6 +284,14 @@ func (p *parser) Parse(line string) uint64 {
|
||||
}
|
||||
p.lock.prelude.Unlock()
|
||||
|
||||
p.lock.log.Lock()
|
||||
for _, pattern := range p.logpatterns.patterns {
|
||||
if pattern.MatchString(line) {
|
||||
p.logpatterns.matches = append(p.logpatterns.matches, line)
|
||||
}
|
||||
}
|
||||
p.lock.log.Unlock()
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -669,6 +693,16 @@ func (p *parser) Log() []process.Line {
|
||||
return log
|
||||
}
|
||||
|
||||
func (p *parser) Matches() []string {
|
||||
p.lock.log.Lock()
|
||||
defer p.lock.log.Unlock()
|
||||
|
||||
matches := make([]string, len(p.logpatterns.matches))
|
||||
copy(matches, p.logpatterns.matches)
|
||||
|
||||
return matches
|
||||
}
|
||||
|
||||
func (p *parser) LastLogline() string {
|
||||
p.lock.log.RLock()
|
||||
defer p.lock.log.RUnlock()
|
||||
@@ -729,6 +763,7 @@ func (p *parser) ResetLog() {
|
||||
p.lock.log.Lock()
|
||||
p.log = ring.New(p.logLines)
|
||||
p.logStart = time.Time{}
|
||||
p.logpatterns.matches = []string{}
|
||||
p.lock.log.Unlock()
|
||||
}
|
||||
|
||||
@@ -737,6 +772,7 @@ type Report struct {
|
||||
CreatedAt time.Time
|
||||
Prelude []string
|
||||
Log []process.Line
|
||||
Matches []string
|
||||
}
|
||||
|
||||
// ReportHistoryEntry represents an historical log report, including the exit status of the
|
||||
@@ -821,6 +857,7 @@ func (p *parser) storeReportHistory(state string) {
|
||||
history := r.Value.(ReportHistoryEntry)
|
||||
history.Log = nil
|
||||
history.Prelude = nil
|
||||
history.Matches = nil
|
||||
|
||||
r.Value = history
|
||||
}
|
||||
@@ -839,6 +876,7 @@ func (p *parser) Report() Report {
|
||||
h := Report{
|
||||
Prelude: p.Prelude(),
|
||||
Log: p.Log(),
|
||||
Matches: p.Matches(),
|
||||
}
|
||||
|
||||
p.lock.log.RLock()
|
||||
|
@@ -878,3 +878,34 @@ func TestParserProgressPlayout(t *testing.T) {
|
||||
Dup: 0,
|
||||
}, progress)
|
||||
}
|
||||
|
||||
func TestParserPatterns(t *testing.T) {
|
||||
parser := New(Config{
|
||||
Patterns: []string{
|
||||
"^foobar",
|
||||
"foobar$",
|
||||
},
|
||||
})
|
||||
|
||||
parser.Parse("some foobar more")
|
||||
require.Empty(t, parser.Report().Matches)
|
||||
|
||||
parser.Parse("foobar some more")
|
||||
require.Equal(t, 1, len(parser.Report().Matches))
|
||||
require.Equal(t, "foobar some more", parser.Report().Matches[0])
|
||||
|
||||
parser.Parse("some more foobar")
|
||||
require.Equal(t, 2, len(parser.Report().Matches))
|
||||
require.Equal(t, "some more foobar", parser.Report().Matches[1])
|
||||
}
|
||||
|
||||
func TestParserPatternsError(t *testing.T) {
|
||||
parser := New(Config{
|
||||
Patterns: []string{
|
||||
"^foobar",
|
||||
"foo(bar$",
|
||||
},
|
||||
})
|
||||
|
||||
require.Equal(t, 1, len(parser.Report().Matches))
|
||||
}
|
||||
|
@@ -55,6 +55,7 @@ type ProcessConfig struct {
|
||||
StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"`
|
||||
Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"`
|
||||
Scheduler string `json:"scheduler"`
|
||||
LogPatterns []string `json:"log_patterns"`
|
||||
Limits ProcessConfigLimits `json:"limits"`
|
||||
}
|
||||
|
||||
@@ -107,6 +108,9 @@ func (cfg *ProcessConfig) Marshal() *app.Config {
|
||||
|
||||
}
|
||||
|
||||
p.LogPatterns = make([]string, len(cfg.LogPatterns))
|
||||
copy(p.LogPatterns, cfg.LogPatterns)
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -190,6 +194,9 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
|
||||
|
||||
cfg.Output = append(cfg.Output, io)
|
||||
}
|
||||
|
||||
cfg.LogPatterns = make([]string, len(c.LogPatterns))
|
||||
copy(cfg.LogPatterns, c.LogPatterns)
|
||||
}
|
||||
|
||||
// ProcessState represents the current state of an ffmpeg process
|
||||
|
@@ -11,6 +11,7 @@ type ProcessReportEntry struct {
|
||||
CreatedAt int64 `json:"created_at" format:"int64"`
|
||||
Prelude []string `json:"prelude,omitempty"`
|
||||
Log [][2]string `json:"log,omitempty"`
|
||||
Matches []string `json:"matches,omitempty"`
|
||||
ExitedAt int64 `json:"exited_at,omitempty" format:"int64"`
|
||||
ExitState string `json:"exit_state,omitempty"`
|
||||
Progress *Progress `json:"progress,omitempty"`
|
||||
@@ -39,6 +40,7 @@ func (report *ProcessReport) Unmarshal(l *app.Log) {
|
||||
report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
|
||||
report.Log[i][1] = line.Data
|
||||
}
|
||||
report.Matches = l.Matches
|
||||
|
||||
report.History = []ProcessReportEntry{}
|
||||
|
||||
|
@@ -13,6 +13,7 @@ type LogEntry struct {
|
||||
CreatedAt time.Time
|
||||
Prelude []string
|
||||
Log []LogLine
|
||||
Matches []string
|
||||
}
|
||||
|
||||
type LogHistoryEntry struct {
|
||||
|
@@ -46,6 +46,7 @@ type Config struct {
|
||||
StaleTimeout uint64 `json:"stale_timeout_seconds"` // seconds
|
||||
Timeout uint64 `json:"timeout_seconds"` // seconds
|
||||
Scheduler string `json:"scheduler"` // crontab pattern or RFC3339 timestamp
|
||||
LogPatterns []string `json:"log_patterns"`
|
||||
LimitCPU float64 `json:"limit_cpu_usage"` // percent
|
||||
LimitMemory uint64 `json:"limit_memory_bytes"` // bytes
|
||||
LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds
|
||||
@@ -80,6 +81,9 @@ func (config *Config) Clone() *Config {
|
||||
clone.Options = make([]string, len(config.Options))
|
||||
copy(clone.Options, config.Options)
|
||||
|
||||
clone.LogPatterns = make([]string, len(config.LogPatterns))
|
||||
copy(clone.LogPatterns, config.LogPatterns)
|
||||
|
||||
return clone
|
||||
}
|
||||
|
||||
|
@@ -358,7 +358,7 @@ func (r *restream) load() error {
|
||||
}
|
||||
|
||||
t.command = t.config.CreateCommand()
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
|
||||
|
||||
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
Reconnect: t.config.Reconnect,
|
||||
@@ -510,7 +510,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
|
||||
}
|
||||
|
||||
t.command = t.config.CreateCommand()
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
|
||||
|
||||
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
Reconnect: t.config.Reconnect,
|
||||
@@ -1227,7 +1227,7 @@ func (r *restream) reloadProcess(id string) error {
|
||||
r.stopProcess(id)
|
||||
}
|
||||
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
|
||||
|
||||
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
Reconnect: t.config.Reconnect,
|
||||
@@ -1431,6 +1431,7 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
|
||||
Data: line.Data,
|
||||
}
|
||||
}
|
||||
log.Matches = current.Matches
|
||||
|
||||
history := task.parser.ReportHistory()
|
||||
|
||||
@@ -1439,6 +1440,7 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
|
||||
LogEntry: app.LogEntry{
|
||||
CreatedAt: h.CreatedAt,
|
||||
Prelude: h.Prelude,
|
||||
Matches: h.Matches,
|
||||
},
|
||||
ExitedAt: h.ExitedAt,
|
||||
ExitState: h.ExitState,
|
||||
|
@@ -1124,3 +1124,32 @@ func TestProcessReplacer(t *testing.T) {
|
||||
|
||||
require.Equal(t, process, rs.tasks["314159265359"].config)
|
||||
}
|
||||
|
||||
func TestProcessLogPattern(t *testing.T) {
|
||||
rs, err := getDummyRestreamer(nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
process := getDummyProcess()
|
||||
process.LogPatterns = []string{
|
||||
"using cpu capabilities:",
|
||||
}
|
||||
process.Autostart = false
|
||||
process.Reconnect = true
|
||||
|
||||
err = rs.AddProcess(process)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = rs.StartProcess("process")
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
log, err := rs.GetProcessLog("process")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(log.Matches))
|
||||
require.Equal(t, "[libx264 @ 0x7fa96a800600] using cpu capabilities: MMX2 SSE2Fast SSSE3 SSE4.2 AVX FMA3 BMI2 AVX2", log.Matches[0])
|
||||
|
||||
err = rs.StopProcess("process")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user