diff --git a/docs/docs.go b/docs/docs.go index 75e9671c..e05d3c84 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -711,7 +711,7 @@ const docTemplate = `{ "v16.7.2" ], "summary": "Remove a file from a filesystem", - "operationId": "filesystem-3-delete-files", + "operationId": "filesystem-3-delete-file", "parameters": [ { "type": "string", @@ -3402,6 +3402,12 @@ const docTemplate = `{ "limits": { "$ref": "#/definitions/api.ProcessConfigLimits" }, + "log_patterns": { + "type": "array", + "items": { + "type": "string" + } + }, "options": { "type": "array", "items": { @@ -3537,6 +3543,12 @@ const docTemplate = `{ } } }, + "matches": { + "type": "array", + "items": { + "type": "string" + } + }, "prelude": { "type": "array", "items": { @@ -3571,6 +3583,12 @@ const docTemplate = `{ } } }, + "matches": { + "type": "array", + "items": { + "type": "string" + } + }, "prelude": { "type": "array", "items": { diff --git a/docs/swagger.json b/docs/swagger.json index de68d59c..24f27481 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -704,7 +704,7 @@ "v16.7.2" ], "summary": "Remove a file from a filesystem", - "operationId": "filesystem-3-delete-files", + "operationId": "filesystem-3-delete-file", "parameters": [ { "type": "string", @@ -3395,6 +3395,12 @@ "limits": { "$ref": "#/definitions/api.ProcessConfigLimits" }, + "log_patterns": { + "type": "array", + "items": { + "type": "string" + } + }, "options": { "type": "array", "items": { @@ -3530,6 +3536,12 @@ } } }, + "matches": { + "type": "array", + "items": { + "type": "string" + } + }, "prelude": { "type": "array", "items": { @@ -3564,6 +3576,12 @@ } } }, + "matches": { + "type": "array", + "items": { + "type": "string" + } + }, "prelude": { "type": "array", "items": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 38fe6224..07c2d816 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -763,6 +763,10 @@ definitions: type: array limits: $ref: '#/definitions/api.ProcessConfigLimits' + log_patterns: + items: + type: string + type: array options: items: type: string @@ -858,6 +862,10 @@ definitions: type: string type: array type: array + matches: + items: + type: string + type: array prelude: items: type: string @@ -881,6 +889,10 @@ definitions: type: string type: array type: array + matches: + items: + type: string + type: array prelude: items: type: string @@ -2372,7 +2384,7 @@ paths: /api/v3/fs/{storage}/{filepath}: delete: description: Remove a file from a filesystem - operationId: filesystem-3-delete-files + operationId: filesystem-3-delete-file parameters: - description: Name of the filesystem in: path diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 8fcf9744..553df06c 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -17,7 +17,7 @@ import ( type FFmpeg interface { New(config ProcessConfig) (process.Process, error) - NewProcessParser(logger log.Logger, id, reference string) parse.Parser + NewProcessParser(logger log.Logger, id, reference string, logpatterns []string) parse.Parser NewProbeParser(logger log.Logger) probe.Parser ValidateInputAddress(address string) bool ValidateOutputAddress(address string) bool @@ -168,11 +168,12 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { return ffmpeg, err } -func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser { +func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string, logpatterns []string) parse.Parser { p := parse.New(parse.Config{ LogLines: f.logLines, LogHistory: f.historyLength, LogMinimalHistory: f.minimalHistoryLength, + Patterns: logpatterns, Logger: logger, Collector: NewWrappedCollector(id, reference, f.collector), }) diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 7adfd764..cbbea6ab 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -48,6 +48,7 @@ type Config struct { LogMinimalHistory int PreludeHeadLines int PreludeTailLines int + Patterns []string Logger log.Logger Collector session.Collector } @@ -72,6 +73,11 @@ type parser struct { done bool } + logpatterns struct { + matches []string + patterns []*regexp.Regexp + } + log *ring.Ring logLines int logStart time.Time @@ -158,7 +164,7 @@ func New(config Config) Parser { p.lock.prelude.Unlock() p.lock.log.Lock() - p.log = ring.New(config.LogLines) + p.log = ring.New(p.logLines) historyLength := p.logHistoryLength + p.logMinimalHistoryLength @@ -171,6 +177,17 @@ func New(config Config) Parser { } p.logStart = time.Time{} + + for _, pattern := range config.Patterns { + r, err := regexp.Compile(pattern) + if err != nil { + p.logpatterns.matches = append(p.logpatterns.matches, err.Error()) + continue + } + + p.logpatterns.patterns = append(p.logpatterns.patterns, r) + } + p.lock.log.Unlock() p.ResetStats() @@ -185,8 +202,8 @@ func (p *parser) Parse(line string) uint64 { isFFmpegProgress := strings.HasPrefix(line, "ffmpeg.progress:") isAVstreamProgress := strings.HasPrefix(line, "avstream.progress:") + p.lock.log.Lock() if p.logStart.IsZero() { - p.lock.log.Lock() p.logStart = time.Now() p.logger.WithComponent("ProcessReport").WithFields(log.Fields{ @@ -194,9 +211,8 @@ func (p *parser) Parse(line string) uint64 { "report": "created", "timestamp": p.logStart.Unix(), }).Info().Log("Created") - - p.lock.log.Unlock() } + p.lock.log.Unlock() p.lock.prelude.Lock() preludeDone := p.prelude.done @@ -268,6 +284,14 @@ func (p *parser) Parse(line string) uint64 { } p.lock.prelude.Unlock() + p.lock.log.Lock() + for _, pattern := range p.logpatterns.patterns { + if pattern.MatchString(line) { + p.logpatterns.matches = append(p.logpatterns.matches, line) + } + } + p.lock.log.Unlock() + return 0 } @@ -669,6 +693,16 @@ func (p *parser) Log() []process.Line { return log } +func (p *parser) Matches() []string { + p.lock.log.Lock() + defer p.lock.log.Unlock() + + matches := make([]string, len(p.logpatterns.matches)) + copy(matches, p.logpatterns.matches) + + return matches +} + func (p *parser) LastLogline() string { p.lock.log.RLock() defer p.lock.log.RUnlock() @@ -729,6 +763,7 @@ func (p *parser) ResetLog() { p.lock.log.Lock() p.log = ring.New(p.logLines) p.logStart = time.Time{} + p.logpatterns.matches = []string{} p.lock.log.Unlock() } @@ -737,6 +772,7 @@ type Report struct { CreatedAt time.Time Prelude []string Log []process.Line + Matches []string } // ReportHistoryEntry represents an historical log report, including the exit status of the @@ -821,6 +857,7 @@ func (p *parser) storeReportHistory(state string) { history := r.Value.(ReportHistoryEntry) history.Log = nil history.Prelude = nil + history.Matches = nil r.Value = history } @@ -839,6 +876,7 @@ func (p *parser) Report() Report { h := Report{ Prelude: p.Prelude(), Log: p.Log(), + Matches: p.Matches(), } p.lock.log.RLock() diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index 36f2f9b1..6ec0968e 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -878,3 +878,34 @@ func TestParserProgressPlayout(t *testing.T) { Dup: 0, }, progress) } + +func TestParserPatterns(t *testing.T) { + parser := New(Config{ + Patterns: []string{ + "^foobar", + "foobar$", + }, + }) + + parser.Parse("some foobar more") + require.Empty(t, parser.Report().Matches) + + parser.Parse("foobar some more") + require.Equal(t, 1, len(parser.Report().Matches)) + require.Equal(t, "foobar some more", parser.Report().Matches[0]) + + parser.Parse("some more foobar") + require.Equal(t, 2, len(parser.Report().Matches)) + require.Equal(t, "some more foobar", parser.Report().Matches[1]) +} + +func TestParserPatternsError(t *testing.T) { + parser := New(Config{ + Patterns: []string{ + "^foobar", + "foo(bar$", + }, + }) + + require.Equal(t, 1, len(parser.Report().Matches)) +} diff --git a/http/api/process.go b/http/api/process.go index 5db7eb20..942f293b 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -55,6 +55,7 @@ type ProcessConfig struct { StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"` Scheduler string `json:"scheduler"` + LogPatterns []string `json:"log_patterns"` Limits ProcessConfigLimits `json:"limits"` } @@ -107,6 +108,9 @@ func (cfg *ProcessConfig) Marshal() *app.Config { } + p.LogPatterns = make([]string, len(cfg.LogPatterns)) + copy(p.LogPatterns, cfg.LogPatterns) + return p } @@ -190,6 +194,9 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { cfg.Output = append(cfg.Output, io) } + + cfg.LogPatterns = make([]string, len(c.LogPatterns)) + copy(cfg.LogPatterns, c.LogPatterns) } // ProcessState represents the current state of an ffmpeg process diff --git a/http/api/report.go b/http/api/report.go index 1350367f..f058e1b0 100644 --- a/http/api/report.go +++ b/http/api/report.go @@ -11,6 +11,7 @@ type ProcessReportEntry struct { CreatedAt int64 `json:"created_at" format:"int64"` Prelude []string `json:"prelude,omitempty"` Log [][2]string `json:"log,omitempty"` + Matches []string `json:"matches,omitempty"` ExitedAt int64 `json:"exited_at,omitempty" format:"int64"` ExitState string `json:"exit_state,omitempty"` Progress *Progress `json:"progress,omitempty"` @@ -39,6 +40,7 @@ func (report *ProcessReport) Unmarshal(l *app.Log) { report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) report.Log[i][1] = line.Data } + report.Matches = l.Matches report.History = []ProcessReportEntry{} diff --git a/restream/app/log.go b/restream/app/log.go index a5152446..8a4173cf 100644 --- a/restream/app/log.go +++ b/restream/app/log.go @@ -13,6 +13,7 @@ type LogEntry struct { CreatedAt time.Time Prelude []string Log []LogLine + Matches []string } type LogHistoryEntry struct { diff --git a/restream/app/process.go b/restream/app/process.go index 92b73c16..830e20f7 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -46,6 +46,7 @@ type Config struct { StaleTimeout uint64 `json:"stale_timeout_seconds"` // seconds Timeout uint64 `json:"timeout_seconds"` // seconds Scheduler string `json:"scheduler"` // crontab pattern or RFC3339 timestamp + LogPatterns []string `json:"log_patterns"` LimitCPU float64 `json:"limit_cpu_usage"` // percent LimitMemory uint64 `json:"limit_memory_bytes"` // bytes LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds @@ -80,6 +81,9 @@ func (config *Config) Clone() *Config { clone.Options = make([]string, len(config.Options)) copy(clone.Options, config.Options) + clone.LogPatterns = make([]string, len(config.LogPatterns)) + copy(clone.LogPatterns, config.LogPatterns) + return clone } diff --git a/restream/restream.go b/restream/restream.go index 7948da03..b5403d75 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -358,7 +358,7 @@ func (r *restream) load() error { } t.command = t.config.CreateCommand() - t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) + t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, @@ -510,7 +510,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { } t.command = t.config.CreateCommand() - t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) + t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, @@ -1227,7 +1227,7 @@ func (r *restream) reloadProcess(id string) error { r.stopProcess(id) } - t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) + t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, @@ -1431,6 +1431,7 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { Data: line.Data, } } + log.Matches = current.Matches history := task.parser.ReportHistory() @@ -1439,6 +1440,7 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { LogEntry: app.LogEntry{ CreatedAt: h.CreatedAt, Prelude: h.Prelude, + Matches: h.Matches, }, ExitedAt: h.ExitedAt, ExitState: h.ExitState, diff --git a/restream/restream_test.go b/restream/restream_test.go index 3feaddf4..7bb8c0f8 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -1124,3 +1124,32 @@ func TestProcessReplacer(t *testing.T) { require.Equal(t, process, rs.tasks["314159265359"].config) } + +func TestProcessLogPattern(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + process.LogPatterns = []string{ + "using cpu capabilities:", + } + process.Autostart = false + process.Reconnect = true + + err = rs.AddProcess(process) + require.NoError(t, err) + + err = rs.StartProcess("process") + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + log, err := rs.GetProcessLog("process") + require.NoError(t, err) + + require.Equal(t, 1, len(log.Matches)) + require.Equal(t, "[libx264 @ 0x7fa96a800600] using cpu capabilities: MMX2 SSE2Fast SSSE3 SSE4.2 AVX FMA3 BMI2 AVX2", log.Matches[0]) + + err = rs.StopProcess("process") + require.NoError(t, err) +}