diff --git a/app/api/api.go b/app/api/api.go index 58663d4b..a0821eb9 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -491,14 +491,15 @@ func (a *api) start() error { } ffmpeg, err := ffmpeg.New(ffmpeg.Config{ - Binary: cfg.FFmpeg.Binary, - MaxProc: cfg.FFmpeg.MaxProcesses, - MaxLogLines: cfg.FFmpeg.Log.MaxLines, - LogHistoryLength: cfg.FFmpeg.Log.MaxHistory, - ValidatorInput: validatorIn, - ValidatorOutput: validatorOut, - Portrange: portrange, - Collector: a.sessions.Collector("ffmpeg"), + Binary: cfg.FFmpeg.Binary, + MaxProc: cfg.FFmpeg.MaxProcesses, + MaxLogLines: cfg.FFmpeg.Log.MaxLines, + LogHistoryLength: cfg.FFmpeg.Log.MaxHistory, + LogMinimalHistoryLength: cfg.FFmpeg.Log.MaxMinimalHistory, + ValidatorInput: validatorIn, + ValidatorOutput: validatorOut, + Portrange: portrange, + Collector: a.sessions.Collector("ffmpeg"), }) if err != nil { return fmt.Errorf("unable to create ffmpeg: %w", err) diff --git a/config/config.go b/config/config.go index b6eab656..bf00c386 100644 --- a/config/config.go +++ b/config/config.go @@ -236,6 +236,7 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Output.Block, []string{}, " "), "ffmpeg.access.output.block", "CORE_FFMPEG_ACCESS_OUTPUT_BLOCK", nil, "List of blocked expression to match against the output addresses", false, false) d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxLines, 50), "ffmpeg.log.max_lines", "CORE_FFMPEG_LOG_MAXLINES", nil, "Number of latest log lines to keep for each process", false, false) d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxHistory, 3), "ffmpeg.log.max_history", "CORE_FFMPEG_LOG_MAXHISTORY", nil, "Number of latest logs to keep for each process", false, false) + d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxMinimalHistory, 0), "ffmpeg.log.max_minimal_history", "CORE_FFMPEG_LOG_MAXMINIMALHISTORY", nil, "Number of minimal logs to keep for each process on top of max_history", false, false) // Playout d.vars.Register(value.NewBool(&d.Playout.Enable, false), "playout.enable", "CORE_PLAYOUT_ENABLE", nil, "Enable playout proxy where available", false, false) diff --git a/config/data.go b/config/data.go index 35507888..2ebda017 100644 --- a/config/data.go +++ b/config/data.go @@ -127,8 +127,9 @@ type Data struct { } `json:"output"` } `json:"access"` Log struct { - MaxLines int `json:"max_lines" format:"int"` - MaxHistory int `json:"max_history" format:"int"` + MaxLines int `json:"max_lines" format:"int"` + MaxHistory int `json:"max_history" format:"int"` + MaxMinimalHistory int `json:"max_minimal_history" format:"int"` } `json:"log"` } `json:"ffmpeg"` Playout struct { @@ -190,7 +191,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.API = d.API data.RTMP = d.RTMP data.SRT = d.SRT - data.FFmpeg = d.FFmpeg data.Playout = d.Playout data.Metrics = d.Metrics data.Sessions = d.Sessions @@ -210,10 +210,14 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.Storage.CORS.Origins = copy.Slice(d.Storage.CORS.Origins) + data.FFmpeg.Binary = d.FFmpeg.Binary + data.FFmpeg.MaxProcesses = d.FFmpeg.MaxProcesses data.FFmpeg.Access.Input.Allow = copy.Slice(d.FFmpeg.Access.Input.Allow) data.FFmpeg.Access.Input.Block = copy.Slice(d.FFmpeg.Access.Input.Block) data.FFmpeg.Access.Output.Allow = copy.Slice(d.FFmpeg.Access.Output.Allow) data.FFmpeg.Access.Output.Block = copy.Slice(d.FFmpeg.Access.Output.Block) + data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines + data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList) @@ -250,6 +254,8 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.Storage.S3 = []value.S3Storage{} + data.FFmpeg.Log.MaxMinimalHistory = 0 + data.Version = 3 return data, nil @@ -273,7 +279,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.API = d.API data.RTMP = d.RTMP data.SRT = d.SRT - data.FFmpeg = d.FFmpeg data.Playout = d.Playout data.Metrics = d.Metrics data.Sessions = d.Sessions @@ -293,10 +298,14 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.Storage.CORS.Origins = copy.Slice(d.Storage.CORS.Origins) + data.FFmpeg.Binary = d.FFmpeg.Binary + data.FFmpeg.MaxProcesses = d.FFmpeg.MaxProcesses data.FFmpeg.Access.Input.Allow = copy.Slice(d.FFmpeg.Access.Input.Allow) data.FFmpeg.Access.Input.Block = copy.Slice(d.FFmpeg.Access.Input.Block) data.FFmpeg.Access.Output.Allow = copy.Slice(d.FFmpeg.Access.Output.Allow) data.FFmpeg.Access.Output.Block = copy.Slice(d.FFmpeg.Access.Output.Block) + data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines + data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList) diff --git a/docs/docs.go b/docs/docs.go index b59002ef..88707d2f 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1729,7 +1729,7 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Get the log history entry of a process at a certain time.", + "description": "Get the log history entry of a process that finished at a certain time.", "produces": [ "application/json" ], @@ -1860,13 +1860,13 @@ const docTemplate = `{ }, { "type": "integer", - "description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any", + "description": "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any", "name": "from", "in": "query" }, { "type": "integer", - "description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any", + "description": "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any", "name": "to", "in": "query" } @@ -2449,6 +2449,10 @@ const docTemplate = `{ "max_lines": { "type": "integer", "format": "int" + }, + "max_minimal_history": { + "type": "integer", + "format": "int" } } }, @@ -3394,6 +3398,10 @@ const docTemplate = `{ "exit_state": { "type": "string" }, + "exited_at": { + "type": "integer", + "format": "int64" + }, "log": { "type": "array", "items": { @@ -3424,6 +3432,10 @@ const docTemplate = `{ "exit_state": { "type": "string" }, + "exited_at": { + "type": "integer", + "format": "int64" + }, "id": { "type": "string" }, @@ -4276,6 +4288,10 @@ const docTemplate = `{ "max_lines": { "type": "integer", "format": "int" + }, + "max_minimal_history": { + "type": "integer", + "format": "int" } } }, @@ -4935,18 +4951,7 @@ const docTemplate = `{ "type": "string" }, "auth": { - "type": "object", - "properties": { - "enable": { - "type": "boolean" - }, - "password": { - "type": "string" - }, - "username": { - "type": "string" - } - } + "$ref": "#/definitions/value.S3StorageAuth" }, "bucket": { "type": "string" @@ -4970,6 +4975,20 @@ const docTemplate = `{ "type": "boolean" } } + }, + "value.S3StorageAuth": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "password": { + "type": "string" + }, + "username": { + "type": "string" + } + } } }, "securityDefinitions": { diff --git a/docs/swagger.json b/docs/swagger.json index a9a706b2..004a63f2 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1722,7 +1722,7 @@ "ApiKeyAuth": [] } ], - "description": "Get the log history entry of a process at a certain time.", + "description": "Get the log history entry of a process that finished at a certain time.", "produces": [ "application/json" ], @@ -1853,13 +1853,13 @@ }, { "type": "integer", - "description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any", + "description": "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any", "name": "from", "in": "query" }, { "type": "integer", - "description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any", + "description": "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any", "name": "to", "in": "query" } @@ -2442,6 +2442,10 @@ "max_lines": { "type": "integer", "format": "int" + }, + "max_minimal_history": { + "type": "integer", + "format": "int" } } }, @@ -3387,6 +3391,10 @@ "exit_state": { "type": "string" }, + "exited_at": { + "type": "integer", + "format": "int64" + }, "log": { "type": "array", "items": { @@ -3417,6 +3425,10 @@ "exit_state": { "type": "string" }, + "exited_at": { + "type": "integer", + "format": "int64" + }, "id": { "type": "string" }, @@ -4269,6 +4281,10 @@ "max_lines": { "type": "integer", "format": "int" + }, + "max_minimal_history": { + "type": "integer", + "format": "int" } } }, @@ -4928,18 +4944,7 @@ "type": "string" }, "auth": { - "type": "object", - "properties": { - "enable": { - "type": "boolean" - }, - "password": { - "type": "string" - }, - "username": { - "type": "string" - } - } + "$ref": "#/definitions/value.S3StorageAuth" }, "bucket": { "type": "string" @@ -4963,6 +4968,20 @@ "type": "boolean" } } + }, + "value.S3StorageAuth": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "password": { + "type": "string" + }, + "username": { + "type": "string" + } + } } }, "securityDefinitions": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 99622021..3cfb54d9 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -189,6 +189,9 @@ definitions: max_lines: format: int type: integer + max_minimal_history: + format: int + type: integer type: object max_processes: format: int64 @@ -825,6 +828,9 @@ definitions: type: integer exit_state: type: string + exited_at: + format: int64 + type: integer log: items: items: @@ -845,6 +851,9 @@ definitions: type: integer exit_state: type: string + exited_at: + format: int64 + type: integer id: type: string reference: @@ -1487,6 +1496,9 @@ definitions: max_lines: format: int type: integer + max_minimal_history: + format: int + type: integer type: object max_processes: format: int64 @@ -1921,14 +1933,7 @@ definitions: access_key_id: type: string auth: - properties: - enable: - type: boolean - password: - type: string - username: - type: string - type: object + $ref: '#/definitions/value.S3StorageAuth' bucket: type: string endpoint: @@ -1944,6 +1949,15 @@ definitions: use_ssl: type: boolean type: object + value.S3StorageAuth: + properties: + enable: + type: boolean + password: + type: string + username: + type: string + type: object info: contact: email: hello@datarhei.com @@ -3078,7 +3092,8 @@ paths: - v16.7.2 /api/v3/process/{id}/report/{at}: get: - description: Get the log history entry of a process at a certain time. + description: Get the log history entry of a process that finished at a certain + time. operationId: process-3-get-report-at parameters: - description: Process ID @@ -3161,12 +3176,12 @@ paths: in: query name: state type: string - - description: Search range of when the report has been created, older than - this value. Unix timestamp, leave empty for any + - description: Search range of when the report has been exited, older than this + value. Unix timestamp, leave empty for any in: query name: from type: integer - - description: Search range of when the report has been created, younger than + - description: Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any in: query name: to diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 345e4a46..2e0acc99 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -44,14 +44,15 @@ type ProcessConfig struct { // Config is the configuration for ffmpeg that is part of the configuration // for the restreamer instance. type Config struct { - Binary string - MaxProc int64 - MaxLogLines int - LogHistoryLength int - ValidatorInput Validator - ValidatorOutput Validator - Portrange net.Portranger - Collector session.Collector + Binary string + MaxProc int64 + MaxLogLines int + LogHistoryLength int + LogMinimalHistoryLength int + ValidatorInput Validator + ValidatorOutput Validator + Portrange net.Portranger + Collector session.Collector } type ffmpeg struct { @@ -61,8 +62,9 @@ type ffmpeg struct { portrange net.Portranger skills skills.Skills - logLines int - historyLength int + logLines int + historyLength int + minimalHistoryLength int collector session.Collector @@ -80,6 +82,7 @@ func New(config Config) (FFmpeg, error) { f.binary = binary f.historyLength = config.LogHistoryLength + f.minimalHistoryLength = config.LogMinimalHistoryLength f.logLines = config.MaxLogLines f.portrange = config.Portrange @@ -153,10 +156,11 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser { p := parse.New(parse.Config{ - LogHistory: f.historyLength, - LogLines: f.logLines, - Logger: logger, - Collector: NewWrappedCollector(id, reference, f.collector), + LogLines: f.logLines, + LogHistory: f.historyLength, + LogMinimalHistory: f.minimalHistoryLength, + Logger: logger, + Collector: NewWrappedCollector(id, reference, f.collector), }) return p diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 07090285..25ff4b3b 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -43,12 +43,13 @@ type Parser interface { // Config is the config for the Parser implementation type Config struct { - LogHistory int - LogLines int - PreludeHeadLines int - PreludeTailLines int - Logger log.Logger - Collector session.Collector + LogLines int + LogHistory int + LogMinimalHistory int + PreludeHeadLines int + PreludeTailLines int + Logger log.Logger + Collector session.Collector } type parser struct { @@ -75,8 +76,9 @@ type parser struct { logLines int logStart time.Time - logHistory *ring.Ring - logHistoryLength int + logHistory *ring.Ring + logHistoryLength int + logMinimalHistoryLength int lastLogline string @@ -117,10 +119,11 @@ type parser struct { // New returns a Parser that satisfies the Parser interface func New(config Config) Parser { p := &parser{ - logHistoryLength: config.LogHistory, - logLines: config.LogLines, - logger: config.Logger, - collector: config.Collector, + logLines: config.LogLines, + logHistoryLength: config.LogHistory, + logMinimalHistoryLength: config.LogMinimalHistory, + logger: config.Logger, + collector: config.Collector, } if p.logger == nil { @@ -157,8 +160,10 @@ func New(config Config) Parser { p.lock.log.Lock() p.log = ring.New(config.LogLines) - if p.logHistoryLength > 0 { - p.logHistory = ring.New(p.logHistoryLength) + historyLength := p.logHistoryLength + p.logMinimalHistoryLength + + if historyLength > 0 { + p.logHistory = ring.New(historyLength) } if p.collector == nil { @@ -798,6 +803,20 @@ func (p *parser) storeReportHistory(state string) { } p.logHistory.Value = h + + if p.logMinimalHistoryLength > 0 { + // Remove the Log and Prelude from older history entries + r := p.logHistory.Move(-p.logHistoryLength) + + if r.Value != nil { + history := r.Value.(ReportHistoryEntry) + history.Log = nil + history.Prelude = nil + + r.Value = history + } + } + p.logHistory = p.logHistory.Next() } diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index af02e645..36f2f9b1 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -129,7 +129,7 @@ func TestParserLog(t *testing.T) { require.Equal(t, 1, len(log)) } -func TestParserLasLogLine(t *testing.T) { +func TestParserLastLogLine(t *testing.T) { parser := New(Config{ LogLines: 20, }).(*parser) @@ -188,6 +188,86 @@ func TestParserLogHistory(t *testing.T) { }, history[0].Progress) } +func TestParserLogHistoryLength(t *testing.T) { + parser := New(Config{ + LogLines: 20, + LogHistory: 3, + }).(*parser) + + history := parser.ReportHistory() + require.Equal(t, 0, len(history)) + + for i := 0; i < 5; i++ { + parser.Parse("bla") + + parser.prelude.done = true + parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") + + parser.Stop("finished") + } + + history = parser.ReportHistory() + require.Equal(t, 3, len(history)) +} + +func TestParserLogMinimalHistoryLength(t *testing.T) { + parser := New(Config{ + LogLines: 20, + LogHistory: 3, + LogMinimalHistory: 10, + }).(*parser) + + history := parser.ReportHistory() + require.Equal(t, 0, len(history)) + + for i := 0; i < 15; i++ { + parser.Parse("bla") + + parser.prelude.done = true + parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") + + parser.Stop("finished") + } + + history = parser.ReportHistory() + require.Equal(t, 13, len(history)) + + for i := 0; i < 10; i++ { + require.Empty(t, history[i].Log, i) + } + + for i := 10; i < 13; i++ { + require.NotEmpty(t, history[i].Log, i) + } +} + +func TestParserLogMinimalHistoryLengthWithoutFullHistory(t *testing.T) { + parser := New(Config{ + LogLines: 20, + LogHistory: 0, + LogMinimalHistory: 10, + }).(*parser) + + history := parser.ReportHistory() + require.Equal(t, 0, len(history)) + + for i := 0; i < 15; i++ { + parser.Parse("bla") + + parser.prelude.done = true + parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") + + parser.Stop("finished") + } + + history = parser.ReportHistory() + require.Equal(t, 10, len(history)) + + for i := 0; i < 10; i++ { + require.Empty(t, history[i].Log, i) + } +} + func TestParserLogHistorySearch(t *testing.T) { parser := New(Config{ LogLines: 20, diff --git a/http/api/report.go b/http/api/report.go index 3ac1ea3c..6fbddac9 100644 --- a/http/api/report.go +++ b/http/api/report.go @@ -9,8 +9,8 @@ import ( // ProcessReportEntry represents the logs of a run of a restream process type ProcessReportEntry struct { CreatedAt int64 `json:"created_at" format:"int64"` - Prelude []string `json:"prelude"` - Log [][2]string `json:"log"` + Prelude []string `json:"prelude,omitempty"` + Log [][2]string `json:"log,omitempty"` } type ProcessReportHistoryEntry struct { diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 0f1fe1b3..12cb0161 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -384,8 +384,8 @@ func (h *RestreamHandler) GetReportAt(c echo.Context) error { // @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern." // @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern." // @Param state query string false "State of a process, leave empty for any" -// @Param from query int64 false "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any" -// @Param to query int64 false "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any" +// @Param from query int64 false "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any" +// @Param to query int64 false "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any" // @Success 200 {array} api.ProcessReportSearchResult // @Failure 400 {object} api.Error // @Security ApiKeyAuth