Add config value for how many minimal process report should be kept in the history

A minimal history is a history entry without log and prelude.

The corresponding config entry is ffmpeg.log.max_minimal_history. This value is
added on top of the ffmpeg.log.max_history value. I.e. the latest max_history
entries contain the log and prelude, and the remaining entries don't have the
log and prelude. In total there are max_minimal_history+max_history history
entries.

If you want no history, set both values to 0.
If you want only full history, set max_minimal_history to 0.
If you want only minimal history, set max_history to 0.
This commit is contained in:
Ingo Oppermann
2023-03-16 12:25:06 +01:00
parent 5b2b2243bb
commit 4ce8a0eaa3
11 changed files with 254 additions and 87 deletions

View File

@@ -491,14 +491,15 @@ func (a *api) start() error {
}
ffmpeg, err := ffmpeg.New(ffmpeg.Config{
Binary: cfg.FFmpeg.Binary,
MaxProc: cfg.FFmpeg.MaxProcesses,
MaxLogLines: cfg.FFmpeg.Log.MaxLines,
LogHistoryLength: cfg.FFmpeg.Log.MaxHistory,
ValidatorInput: validatorIn,
ValidatorOutput: validatorOut,
Portrange: portrange,
Collector: a.sessions.Collector("ffmpeg"),
Binary: cfg.FFmpeg.Binary,
MaxProc: cfg.FFmpeg.MaxProcesses,
MaxLogLines: cfg.FFmpeg.Log.MaxLines,
LogHistoryLength: cfg.FFmpeg.Log.MaxHistory,
LogMinimalHistoryLength: cfg.FFmpeg.Log.MaxMinimalHistory,
ValidatorInput: validatorIn,
ValidatorOutput: validatorOut,
Portrange: portrange,
Collector: a.sessions.Collector("ffmpeg"),
})
if err != nil {
return fmt.Errorf("unable to create ffmpeg: %w", err)

View File

@@ -236,6 +236,7 @@ func (d *Config) init() {
d.vars.Register(value.NewStringList(&d.FFmpeg.Access.Output.Block, []string{}, " "), "ffmpeg.access.output.block", "CORE_FFMPEG_ACCESS_OUTPUT_BLOCK", nil, "List of blocked expression to match against the output addresses", false, false)
d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxLines, 50), "ffmpeg.log.max_lines", "CORE_FFMPEG_LOG_MAXLINES", nil, "Number of latest log lines to keep for each process", false, false)
d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxHistory, 3), "ffmpeg.log.max_history", "CORE_FFMPEG_LOG_MAXHISTORY", nil, "Number of latest logs to keep for each process", false, false)
d.vars.Register(value.NewInt(&d.FFmpeg.Log.MaxMinimalHistory, 0), "ffmpeg.log.max_minimal_history", "CORE_FFMPEG_LOG_MAXMINIMALHISTORY", nil, "Number of minimal logs to keep for each process on top of max_history", false, false)
// Playout
d.vars.Register(value.NewBool(&d.Playout.Enable, false), "playout.enable", "CORE_PLAYOUT_ENABLE", nil, "Enable playout proxy where available", false, false)

View File

@@ -127,8 +127,9 @@ type Data struct {
} `json:"output"`
} `json:"access"`
Log struct {
MaxLines int `json:"max_lines" format:"int"`
MaxHistory int `json:"max_history" format:"int"`
MaxLines int `json:"max_lines" format:"int"`
MaxHistory int `json:"max_history" format:"int"`
MaxMinimalHistory int `json:"max_minimal_history" format:"int"`
} `json:"log"`
} `json:"ffmpeg"`
Playout struct {
@@ -190,7 +191,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.API = d.API
data.RTMP = d.RTMP
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Metrics = d.Metrics
data.Sessions = d.Sessions
@@ -210,10 +210,14 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Storage.CORS.Origins = copy.Slice(d.Storage.CORS.Origins)
data.FFmpeg.Binary = d.FFmpeg.Binary
data.FFmpeg.MaxProcesses = d.FFmpeg.MaxProcesses
data.FFmpeg.Access.Input.Allow = copy.Slice(d.FFmpeg.Access.Input.Allow)
data.FFmpeg.Access.Input.Block = copy.Slice(d.FFmpeg.Access.Input.Block)
data.FFmpeg.Access.Output.Allow = copy.Slice(d.FFmpeg.Access.Output.Allow)
data.FFmpeg.Access.Output.Block = copy.Slice(d.FFmpeg.Access.Output.Block)
data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines
data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory
data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList)
@@ -250,6 +254,8 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Storage.S3 = []value.S3Storage{}
data.FFmpeg.Log.MaxMinimalHistory = 0
data.Version = 3
return data, nil
@@ -273,7 +279,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.API = d.API
data.RTMP = d.RTMP
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Metrics = d.Metrics
data.Sessions = d.Sessions
@@ -293,10 +298,14 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Storage.CORS.Origins = copy.Slice(d.Storage.CORS.Origins)
data.FFmpeg.Binary = d.FFmpeg.Binary
data.FFmpeg.MaxProcesses = d.FFmpeg.MaxProcesses
data.FFmpeg.Access.Input.Allow = copy.Slice(d.FFmpeg.Access.Input.Allow)
data.FFmpeg.Access.Input.Block = copy.Slice(d.FFmpeg.Access.Input.Block)
data.FFmpeg.Access.Output.Allow = copy.Slice(d.FFmpeg.Access.Output.Allow)
data.FFmpeg.Access.Output.Block = copy.Slice(d.FFmpeg.Access.Output.Block)
data.FFmpeg.Log.MaxLines = d.FFmpeg.Log.MaxLines
data.FFmpeg.Log.MaxHistory = d.FFmpeg.Log.MaxHistory
data.Sessions.IPIgnoreList = copy.Slice(d.Sessions.IPIgnoreList)

View File

@@ -1729,7 +1729,7 @@ const docTemplate = `{
"ApiKeyAuth": []
}
],
"description": "Get the log history entry of a process at a certain time.",
"description": "Get the log history entry of a process that finished at a certain time.",
"produces": [
"application/json"
],
@@ -1860,13 +1860,13 @@ const docTemplate = `{
},
{
"type": "integer",
"description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any",
"description": "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any",
"name": "from",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any",
"description": "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any",
"name": "to",
"in": "query"
}
@@ -2449,6 +2449,10 @@ const docTemplate = `{
"max_lines": {
"type": "integer",
"format": "int"
},
"max_minimal_history": {
"type": "integer",
"format": "int"
}
}
},
@@ -3394,6 +3398,10 @@ const docTemplate = `{
"exit_state": {
"type": "string"
},
"exited_at": {
"type": "integer",
"format": "int64"
},
"log": {
"type": "array",
"items": {
@@ -3424,6 +3432,10 @@ const docTemplate = `{
"exit_state": {
"type": "string"
},
"exited_at": {
"type": "integer",
"format": "int64"
},
"id": {
"type": "string"
},
@@ -4276,6 +4288,10 @@ const docTemplate = `{
"max_lines": {
"type": "integer",
"format": "int"
},
"max_minimal_history": {
"type": "integer",
"format": "int"
}
}
},
@@ -4935,18 +4951,7 @@ const docTemplate = `{
"type": "string"
},
"auth": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
"$ref": "#/definitions/value.S3StorageAuth"
},
"bucket": {
"type": "string"
@@ -4970,6 +4975,20 @@ const docTemplate = `{
"type": "boolean"
}
}
},
"value.S3StorageAuth": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
}
},
"securityDefinitions": {

View File

@@ -1722,7 +1722,7 @@
"ApiKeyAuth": []
}
],
"description": "Get the log history entry of a process at a certain time.",
"description": "Get the log history entry of a process that finished at a certain time.",
"produces": [
"application/json"
],
@@ -1853,13 +1853,13 @@
},
{
"type": "integer",
"description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any",
"description": "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any",
"name": "from",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any",
"description": "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any",
"name": "to",
"in": "query"
}
@@ -2442,6 +2442,10 @@
"max_lines": {
"type": "integer",
"format": "int"
},
"max_minimal_history": {
"type": "integer",
"format": "int"
}
}
},
@@ -3387,6 +3391,10 @@
"exit_state": {
"type": "string"
},
"exited_at": {
"type": "integer",
"format": "int64"
},
"log": {
"type": "array",
"items": {
@@ -3417,6 +3425,10 @@
"exit_state": {
"type": "string"
},
"exited_at": {
"type": "integer",
"format": "int64"
},
"id": {
"type": "string"
},
@@ -4269,6 +4281,10 @@
"max_lines": {
"type": "integer",
"format": "int"
},
"max_minimal_history": {
"type": "integer",
"format": "int"
}
}
},
@@ -4928,18 +4944,7 @@
"type": "string"
},
"auth": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
"$ref": "#/definitions/value.S3StorageAuth"
},
"bucket": {
"type": "string"
@@ -4963,6 +4968,20 @@
"type": "boolean"
}
}
},
"value.S3StorageAuth": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
}
},
"securityDefinitions": {

View File

@@ -189,6 +189,9 @@ definitions:
max_lines:
format: int
type: integer
max_minimal_history:
format: int
type: integer
type: object
max_processes:
format: int64
@@ -825,6 +828,9 @@ definitions:
type: integer
exit_state:
type: string
exited_at:
format: int64
type: integer
log:
items:
items:
@@ -845,6 +851,9 @@ definitions:
type: integer
exit_state:
type: string
exited_at:
format: int64
type: integer
id:
type: string
reference:
@@ -1487,6 +1496,9 @@ definitions:
max_lines:
format: int
type: integer
max_minimal_history:
format: int
type: integer
type: object
max_processes:
format: int64
@@ -1921,14 +1933,7 @@ definitions:
access_key_id:
type: string
auth:
properties:
enable:
type: boolean
password:
type: string
username:
type: string
type: object
$ref: '#/definitions/value.S3StorageAuth'
bucket:
type: string
endpoint:
@@ -1944,6 +1949,15 @@ definitions:
use_ssl:
type: boolean
type: object
value.S3StorageAuth:
properties:
enable:
type: boolean
password:
type: string
username:
type: string
type: object
info:
contact:
email: hello@datarhei.com
@@ -3078,7 +3092,8 @@ paths:
- v16.7.2
/api/v3/process/{id}/report/{at}:
get:
description: Get the log history entry of a process at a certain time.
description: Get the log history entry of a process that finished at a certain
time.
operationId: process-3-get-report-at
parameters:
- description: Process ID
@@ -3161,12 +3176,12 @@ paths:
in: query
name: state
type: string
- description: Search range of when the report has been created, older than
this value. Unix timestamp, leave empty for any
- description: Search range of when the report has been exited, older than this
value. Unix timestamp, leave empty for any
in: query
name: from
type: integer
- description: Search range of when the report has been created, younger than
- description: Search range of when the report has been exited, younger than
this value. Unix timestamp, leave empty for any
in: query
name: to

View File

@@ -44,14 +44,15 @@ type ProcessConfig struct {
// Config is the configuration for ffmpeg that is part of the configuration
// for the restreamer instance.
type Config struct {
Binary string
MaxProc int64
MaxLogLines int
LogHistoryLength int
ValidatorInput Validator
ValidatorOutput Validator
Portrange net.Portranger
Collector session.Collector
Binary string
MaxProc int64
MaxLogLines int
LogHistoryLength int
LogMinimalHistoryLength int
ValidatorInput Validator
ValidatorOutput Validator
Portrange net.Portranger
Collector session.Collector
}
type ffmpeg struct {
@@ -61,8 +62,9 @@ type ffmpeg struct {
portrange net.Portranger
skills skills.Skills
logLines int
historyLength int
logLines int
historyLength int
minimalHistoryLength int
collector session.Collector
@@ -80,6 +82,7 @@ func New(config Config) (FFmpeg, error) {
f.binary = binary
f.historyLength = config.LogHistoryLength
f.minimalHistoryLength = config.LogMinimalHistoryLength
f.logLines = config.MaxLogLines
f.portrange = config.Portrange
@@ -153,10 +156,11 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser {
p := parse.New(parse.Config{
LogHistory: f.historyLength,
LogLines: f.logLines,
Logger: logger,
Collector: NewWrappedCollector(id, reference, f.collector),
LogLines: f.logLines,
LogHistory: f.historyLength,
LogMinimalHistory: f.minimalHistoryLength,
Logger: logger,
Collector: NewWrappedCollector(id, reference, f.collector),
})
return p

View File

@@ -43,12 +43,13 @@ type Parser interface {
// Config is the config for the Parser implementation
type Config struct {
LogHistory int
LogLines int
PreludeHeadLines int
PreludeTailLines int
Logger log.Logger
Collector session.Collector
LogLines int
LogHistory int
LogMinimalHistory int
PreludeHeadLines int
PreludeTailLines int
Logger log.Logger
Collector session.Collector
}
type parser struct {
@@ -75,8 +76,9 @@ type parser struct {
logLines int
logStart time.Time
logHistory *ring.Ring
logHistoryLength int
logHistory *ring.Ring
logHistoryLength int
logMinimalHistoryLength int
lastLogline string
@@ -117,10 +119,11 @@ type parser struct {
// New returns a Parser that satisfies the Parser interface
func New(config Config) Parser {
p := &parser{
logHistoryLength: config.LogHistory,
logLines: config.LogLines,
logger: config.Logger,
collector: config.Collector,
logLines: config.LogLines,
logHistoryLength: config.LogHistory,
logMinimalHistoryLength: config.LogMinimalHistory,
logger: config.Logger,
collector: config.Collector,
}
if p.logger == nil {
@@ -157,8 +160,10 @@ func New(config Config) Parser {
p.lock.log.Lock()
p.log = ring.New(config.LogLines)
if p.logHistoryLength > 0 {
p.logHistory = ring.New(p.logHistoryLength)
historyLength := p.logHistoryLength + p.logMinimalHistoryLength
if historyLength > 0 {
p.logHistory = ring.New(historyLength)
}
if p.collector == nil {
@@ -798,6 +803,20 @@ func (p *parser) storeReportHistory(state string) {
}
p.logHistory.Value = h
if p.logMinimalHistoryLength > 0 {
// Remove the Log and Prelude from older history entries
r := p.logHistory.Move(-p.logHistoryLength)
if r.Value != nil {
history := r.Value.(ReportHistoryEntry)
history.Log = nil
history.Prelude = nil
r.Value = history
}
}
p.logHistory = p.logHistory.Next()
}

View File

@@ -129,7 +129,7 @@ func TestParserLog(t *testing.T) {
require.Equal(t, 1, len(log))
}
func TestParserLasLogLine(t *testing.T) {
func TestParserLastLogLine(t *testing.T) {
parser := New(Config{
LogLines: 20,
}).(*parser)
@@ -188,6 +188,86 @@ func TestParserLogHistory(t *testing.T) {
}, history[0].Progress)
}
func TestParserLogHistoryLength(t *testing.T) {
parser := New(Config{
LogLines: 20,
LogHistory: 3,
}).(*parser)
history := parser.ReportHistory()
require.Equal(t, 0, len(history))
for i := 0; i < 5; i++ {
parser.Parse("bla")
parser.prelude.done = true
parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463")
parser.Stop("finished")
}
history = parser.ReportHistory()
require.Equal(t, 3, len(history))
}
func TestParserLogMinimalHistoryLength(t *testing.T) {
parser := New(Config{
LogLines: 20,
LogHistory: 3,
LogMinimalHistory: 10,
}).(*parser)
history := parser.ReportHistory()
require.Equal(t, 0, len(history))
for i := 0; i < 15; i++ {
parser.Parse("bla")
parser.prelude.done = true
parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463")
parser.Stop("finished")
}
history = parser.ReportHistory()
require.Equal(t, 13, len(history))
for i := 0; i < 10; i++ {
require.Empty(t, history[i].Log, i)
}
for i := 10; i < 13; i++ {
require.NotEmpty(t, history[i].Log, i)
}
}
func TestParserLogMinimalHistoryLengthWithoutFullHistory(t *testing.T) {
parser := New(Config{
LogLines: 20,
LogHistory: 0,
LogMinimalHistory: 10,
}).(*parser)
history := parser.ReportHistory()
require.Equal(t, 0, len(history))
for i := 0; i < 15; i++ {
parser.Parse("bla")
parser.prelude.done = true
parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463")
parser.Stop("finished")
}
history = parser.ReportHistory()
require.Equal(t, 10, len(history))
for i := 0; i < 10; i++ {
require.Empty(t, history[i].Log, i)
}
}
func TestParserLogHistorySearch(t *testing.T) {
parser := New(Config{
LogLines: 20,

View File

@@ -9,8 +9,8 @@ import (
// ProcessReportEntry represents the logs of a run of a restream process
type ProcessReportEntry struct {
CreatedAt int64 `json:"created_at" format:"int64"`
Prelude []string `json:"prelude"`
Log [][2]string `json:"log"`
Prelude []string `json:"prelude,omitempty"`
Log [][2]string `json:"log,omitempty"`
}
type ProcessReportHistoryEntry struct {

View File

@@ -384,8 +384,8 @@ func (h *RestreamHandler) GetReportAt(c echo.Context) error {
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern."
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern."
// @Param state query string false "State of a process, leave empty for any"
// @Param from query int64 false "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any"
// @Param to query int64 false "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any"
// @Param from query int64 false "Search range of when the report has been exited, older than this value. Unix timestamp, leave empty for any"
// @Param to query int64 false "Search range of when the report has been exited, younger than this value. Unix timestamp, leave empty for any"
// @Success 200 {array} api.ProcessReportSearchResult
// @Failure 400 {object} api.Error
// @Security ApiKeyAuth