From 7e90bb87ce15d1c3ca94d827d687080e34f1216d Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 10 Jul 2024 16:46:49 +0200 Subject: [PATCH] Allow to import report history for a process --- app/version.go | 4 +- cluster/docs/ClusterAPI_docs.go | 29 +- cluster/docs/ClusterAPI_swagger.json | 29 +- cluster/docs/ClusterAPI_swagger.yaml | 20 +- cluster/leader.go | 30 ++ cluster/node/core.go | 24 +- cluster/node/manager.go | 27 ++ docs/docs.go | 94 +++++-- docs/swagger.json | 94 +++++-- docs/swagger.yaml | 64 ++++- ffmpeg/parse/parser.go | 59 +--- ffmpeg/parse/parser_test.go | 63 ++++- ffmpeg/parse/types.go | 52 +++- http/api/avstream.go | 30 ++ http/api/avstream_test.go | 57 ++++ http/api/event.go | 2 +- http/api/json.go | 19 -- http/api/probe.go | 8 +- http/api/process.go | 92 +++++- http/api/process_test.go | 114 ++++++++ http/api/progress.go | 280 ++++++++++++++----- http/api/progress_test.go | 331 ++++++++++++++++++++++ http/api/report.go | 142 ++++++---- http/api/report_test.go | 403 +++++++++++++++++++++++++++ http/api/session.go | 12 +- http/client/client.go | 5 + http/client/process.go | 20 ++ http/graph/resolver/resolver.go | 2 +- http/handler/api/cluster_process.go | 8 +- http/handler/api/events.go | 4 +- http/handler/api/process.go | 69 ++++- http/server.go | 61 ++-- restream/app/avstream.go | 79 ++++++ restream/app/probe.go | 74 +++++ restream/app/process.go | 49 ++++ restream/app/progress.go | 227 ++++++++++++++- restream/app/report.go | 80 ++++++ restream/restream.go | 284 ++++--------------- restream/restream_test.go | 20 +- 39 files changed, 2488 insertions(+), 573 deletions(-) create mode 100644 http/api/avstream_test.go delete mode 100644 http/api/json.go create mode 100644 http/api/process_test.go create mode 100644 http/api/progress_test.go create mode 100644 http/api/report_test.go diff --git a/app/version.go b/app/version.go index 9c48d880..fb14c6d7 100644 --- a/app/version.go +++ b/app/version.go @@ -29,8 +29,8 @@ func (v versionInfo) MinorString() string { // Version of the app var Version = versionInfo{ Major: 16, - Minor: 19, - Patch: 1, + Minor: 20, + Patch: 0, } // Commit is the git commit the app is build from. It should be filled in during compilation diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 90240e9d..be2fd046 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -51,6 +51,33 @@ const docTemplateClusterAPI = `{ } } }, + "/v1/about": { + "get": { + "description": "The cluster version", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "The cluster version", + "operationId": "cluster-1-about", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/barrier/{name}": { "get": { "description": "Has the barrier already has been passed", @@ -535,7 +562,7 @@ const docTemplateClusterAPI = `{ "operationId": "cluster-1-lock", "parameters": [ { - "description": "Lock request", + "description": "LockCreate request", "name": "data", "in": "body", "required": true, diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 711c3c88..70cf3f4e 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -43,6 +43,33 @@ } } }, + "/v1/about": { + "get": { + "description": "The cluster version", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "The cluster version", + "operationId": "cluster-1-about", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/barrier/{name}": { "get": { "description": "Has the barrier already has been passed", @@ -527,7 +554,7 @@ "operationId": "cluster-1-lock", "parameters": [ { - "description": "Lock request", + "description": "LockCreate request", "name": "data", "in": "body", "required": true, diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index e9052e6e..c0628aca 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -896,6 +896,24 @@ paths: summary: The cluster version tags: - v1.0.0 + /v1/about: + get: + description: The cluster version + operationId: cluster-1-about + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + summary: The cluster version + tags: + - v1.0.0 /v1/barrier/{name}: get: description: Has the barrier already has been passed @@ -1216,7 +1234,7 @@ paths: description: Acquire a named lock operationId: cluster-1-lock parameters: - - description: Lock request + - description: LockCreate request in: body name: data required: true diff --git a/cluster/leader.go b/cluster/leader.go index 2f3c8e96..c45f05f9 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -594,6 +594,36 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError { break } + // Transfer report with best effort, it's ok if it fails. + err = c.manager.ProcessCommand(v.fromNodeid, v.config.ProcessID(), "stop") + if err == nil { + process, err := c.manager.ProcessGet(v.fromNodeid, v.config.ProcessID(), []string{"report"}) + if err != nil { + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, get process report") + } + if process.Report != nil && err == nil { + report := process.Report.Marshal() + err = c.manager.ProcessReportSet(v.toNodeid, v.config.ProcessID(), &report) + if err != nil { + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, set process report") + } + } + } else { + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, stopping process") + } + err = c.manager.ProcessDelete(v.fromNodeid, v.config.ProcessID()) if err != nil { opErr = processOpError{ diff --git a/cluster/node/core.go b/cluster/node/core.go index 94a919fa..5f158805 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -312,7 +312,6 @@ func (n *Core) About() (CoreAbout, error) { } func (n *Core) ProcessAdd(config *app.Config, metadata map[string]interface{}) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -325,7 +324,6 @@ func (n *Core) ProcessAdd(config *app.Config, metadata map[string]interface{}) e } func (n *Core) ProcessCommand(id app.ProcessID, command string) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -338,7 +336,6 @@ func (n *Core) ProcessCommand(id app.ProcessID, command string) error { } func (n *Core) ProcessDelete(id app.ProcessID) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -351,7 +348,6 @@ func (n *Core) ProcessDelete(id app.ProcessID) error { } func (n *Core) ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -363,8 +359,19 @@ func (n *Core) ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[ return client.ProcessUpdate(id, config, metadata) } -func (n *Core) ProcessProbe(id app.ProcessID) (api.Probe, error) { +func (n *Core) ProcessReportSet(id app.ProcessID, report *app.Report) error { + n.lock.RLock() + client := n.client + n.lock.RUnlock() + if client == nil { + return ErrNoPeer + } + + return client.ProcessReportSet(id, report) +} + +func (n *Core) ProcessProbe(id app.ProcessID) (api.Probe, error) { n.lock.RLock() client := n.client n.lock.RUnlock() @@ -384,7 +391,6 @@ func (n *Core) ProcessProbe(id app.ProcessID) (api.Probe, error) { } func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -404,7 +410,6 @@ func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) { } func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -417,7 +422,6 @@ func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, er } func (n *Core) FilesystemList(storage, pattern string) ([]api.FileInfo, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -439,7 +443,6 @@ func (n *Core) FilesystemList(storage, pattern string) ([]api.FileInfo, error) { } func (n *Core) FilesystemDeleteFile(storage, path string) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -452,7 +455,6 @@ func (n *Core) FilesystemDeleteFile(storage, path string) error { } func (n *Core) FilesystemPutFile(storage, path string, data io.Reader) error { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -465,7 +467,6 @@ func (n *Core) FilesystemPutFile(storage, path string, data io.Reader) error { } func (n *Core) FilesystemGetFileInfo(storage, path string) (int64, time.Time, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -487,7 +488,6 @@ func (n *Core) FilesystemGetFileInfo(storage, path string) (int64, time.Time, er } func (n *Core) FilesystemGetFile(storage, path string, offset int64) (io.ReadCloser, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() diff --git a/cluster/node/manager.go b/cluster/node/manager.go index d0c345b7..4b63c2dc 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -540,6 +540,24 @@ func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process { return processList } +func (p *Manager) ProcessGet(nodeid string, id app.ProcessID, filter []string) (api.Process, error) { + node, err := p.NodeGet(nodeid) + if err != nil { + return api.Process{}, fmt.Errorf("node not found: %w", err) + } + + list, err := node.Core().ProcessList(client.ProcessListOptions{ + ID: []string{id.ID}, + Filter: filter, + Domain: id.Domain, + }) + if err != nil { + return api.Process{}, err + } + + return list[0], nil +} + func (p *Manager) ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error { node, err := p.NodeGet(nodeid) if err != nil { @@ -567,6 +585,15 @@ func (p *Manager) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Con return node.Core().ProcessUpdate(id, config, metadata) } +func (p *Manager) ProcessReportSet(nodeid string, id app.ProcessID, report *app.Report) error { + node, err := p.NodeGet(nodeid) + if err != nil { + return fmt.Errorf("node not found: %w", err) + } + + return node.Core().ProcessReportSet(id, report) +} + func (p *Manager) ProcessCommand(nodeid string, id app.ProcessID, command string) error { node, err := p.NodeGet(nodeid) if err != nil { diff --git a/docs/docs.go b/docs/docs.go index 5f9ab7e8..ee197903 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -381,7 +381,7 @@ const docTemplate = `{ "v16.?.?" ], "summary": "List of identities in the cluster", - "operationId": "cluster-3-db-list-identity", + "operationId": "cluster-3-db-get-identity", "responses": { "200": { "description": "OK", @@ -3061,6 +3061,12 @@ const docTemplate = `{ "type": "string" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -4479,6 +4485,75 @@ const docTemplate = `{ } } } + }, + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Set the report history a process", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Set the report history a process", + "operationId": "process-3-set-report", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, + { + "description": "Process report", + "name": "report", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessReport" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } } }, "/api/v3/process/{id}/state": { @@ -6849,17 +6924,10 @@ const docTemplate = `{ "type": "integer", "format": "int64" }, - "exit_state": { - "type": "string" - }, - "exited_at": { - "type": "integer", - "format": "int64" - }, "history": { "type": "array", "items": { - "$ref": "#/definitions/api.ProcessReportEntry" + "$ref": "#/definitions/api.ProcessReportHistoryEntry" } }, "log": { @@ -6882,16 +6950,10 @@ const docTemplate = `{ "items": { "type": "string" } - }, - "progress": { - "$ref": "#/definitions/api.Progress" - }, - "resources": { - "$ref": "#/definitions/api.ProcessUsage" } } }, - "api.ProcessReportEntry": { + "api.ProcessReportHistoryEntry": { "type": "object", "properties": { "created_at": { diff --git a/docs/swagger.json b/docs/swagger.json index 06ac2d39..ba1fbbe4 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -373,7 +373,7 @@ "v16.?.?" ], "summary": "List of identities in the cluster", - "operationId": "cluster-3-db-list-identity", + "operationId": "cluster-3-db-get-identity", "responses": { "200": { "description": "OK", @@ -3053,6 +3053,12 @@ "type": "string" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -4471,6 +4477,75 @@ } } } + }, + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Set the report history a process", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Set the report history a process", + "operationId": "process-3-set-report", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, + { + "description": "Process report", + "name": "report", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessReport" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } } }, "/api/v3/process/{id}/state": { @@ -6841,17 +6916,10 @@ "type": "integer", "format": "int64" }, - "exit_state": { - "type": "string" - }, - "exited_at": { - "type": "integer", - "format": "int64" - }, "history": { "type": "array", "items": { - "$ref": "#/definitions/api.ProcessReportEntry" + "$ref": "#/definitions/api.ProcessReportHistoryEntry" } }, "log": { @@ -6874,16 +6942,10 @@ "items": { "type": "string" } - }, - "progress": { - "$ref": "#/definitions/api.Progress" - }, - "resources": { - "$ref": "#/definitions/api.ProcessUsage" } } }, - "api.ProcessReportEntry": { + "api.ProcessReportHistoryEntry": { "type": "object", "properties": { "created_at": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 25f3cb48..61e9d8b2 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1272,14 +1272,9 @@ definitions: created_at: format: int64 type: integer - exit_state: - type: string - exited_at: - format: int64 - type: integer history: items: - $ref: '#/definitions/api.ProcessReportEntry' + $ref: '#/definitions/api.ProcessReportHistoryEntry' type: array log: items: @@ -1295,12 +1290,8 @@ definitions: items: type: string type: array - progress: - $ref: '#/definitions/api.Progress' - resources: - $ref: '#/definitions/api.ProcessUsage' type: object - api.ProcessReportEntry: + api.ProcessReportHistoryEntry: properties: created_at: format: int64 @@ -2826,7 +2817,7 @@ paths: /api/v3/cluster/db/user/{name}: get: description: List of identities in the cluster - operationId: cluster-3-db-list-identity + operationId: cluster-3-db-get-identity produces: - application/json responses: @@ -4502,6 +4493,10 @@ paths: description: OK schema: type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' "403": description: Forbidden schema: @@ -5503,6 +5498,51 @@ paths: summary: Get the logs of a process tags: - v16.7.2 + put: + consumes: + - application/json + description: Set the report history a process + operationId: process-3-set-report + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Domain to act on + in: query + name: domain + type: string + - description: Process report + in: body + name: report + required: true + schema: + $ref: '#/definitions/api.ProcessReport' + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Set the report history a process + tags: + - v16.?.? /api/v3/process/{id}/state: get: description: Get the state and progress data of a process. diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 9f3f8fd3..5c8b9549 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -40,8 +40,8 @@ type Parser interface { // LastLogline returns the last parsed log line LastLogline() string - // TransferReportHistory transfers the report history to another parser - TransferReportHistory(Parser) error + // ImportReportHistory imports a report history from another parser + ImportReportHistory([]ReportHistoryEntry) } // Config is the config for the Parser implementation @@ -862,31 +862,6 @@ func (p *parser) ResetLog() { p.lock.log.Unlock() } -// Report represents a log report, including the prelude and the last log lines of the process. -type Report struct { - CreatedAt time.Time - Prelude []string - Log []process.Line - Matches []string -} - -// ReportHistoryEntry represents an historical log report, including the exit status of the -// process and the last progress data. -type ReportHistoryEntry struct { - Report - - ExitedAt time.Time - ExitState string - Progress Progress - Usage Usage -} - -type ReportHistorySearchResult struct { - CreatedAt time.Time - ExitedAt time.Time - ExitState string -} - func (p *parser) SearchReportHistory(state string, from, to *time.Time) []ReportHistorySearchResult { p.lock.logHistory.RLock() defer p.lock.logHistory.RUnlock() @@ -1006,26 +981,20 @@ func (p *parser) ReportHistory() []ReportHistoryEntry { return history } -func (p *parser) TransferReportHistory(dst Parser) error { - p.lock.logHistory.RLock() - defer p.lock.logHistory.RUnlock() +func (p *parser) ImportReportHistory(history []ReportHistoryEntry) { + p.lock.logHistory.Lock() + defer p.lock.logHistory.Unlock() - pp, ok := dst.(*parser) - if !ok { - return fmt.Errorf("the target parser is not of the required type") + historyLength := p.logHistoryLength + p.logMinimalHistoryLength + + if historyLength <= 0 { + return } - pp.lock.logHistory.Lock() - defer pp.lock.logHistory.Unlock() + p.logHistory = ring.New(historyLength) - p.logHistory.Do(func(l interface{}) { - if l == nil { - return - } - - pp.logHistory.Value = l - pp.logHistory = pp.logHistory.Next() - }) - - return nil + for _, r := range history { + p.logHistory.Value = r + p.logHistory = p.logHistory.Next() + } } diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index ee67ee26..43cec2f1 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -22,11 +22,9 @@ func TestParserProgress(t *testing.T) { d, _ := time.ParseDuration("3m58s440ms") wantP := Progress{ Frame: 5968, - FPS: 25, Quantizer: 19.4, Size: 453632, Time: d.Seconds(), - Bitrate: 5632, Speed: 0.999, Drop: 3522, Dup: 87463, @@ -202,6 +200,67 @@ func TestParserLogHistory(t *testing.T) { } } +func TestParserImportLogHistory(t *testing.T) { + parser := New(Config{ + LogLines: 20, + LogHistory: 5, + }).(*parser) + + for i := 0; i < 7; i++ { + parser.Parse("bla") + + parser.prelude.done = true + parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") + + history := parser.ReportHistory() + require.Equal(t, int(math.Min(float64(i), 5)), len(history)) + + parser.Stop("finished", process.Usage{}) + parser.ResetStats() + + time.Sleep(time.Second) + } + + history := parser.ReportHistory() + + for i, h := range history { + h.Prelude[0] = "blubb" + h.ExitState = "nothing" + h.Progress.Frame = 42 + + history[i] = h + } + + parser.ImportReportHistory(history[:3]) + + history = parser.ReportHistory() + require.Equal(t, 3, len(history)) + + for i := 0; i < 3; i++ { + require.Equal(t, "nothing", history[i].ExitState) + require.Equal(t, "bla", history[i].Log[0].Data) + require.Equal(t, "blubb", history[i].Prelude[0]) + + d, _ := time.ParseDuration("3m58s440ms") + require.Equal(t, Progress{ + Started: true, + Frame: 42, + FPS: 0, // is calculated with averager + Quantizer: 19.4, + Size: 453632, + Time: d.Seconds(), + Bitrate: 0, // is calculated with averager + Speed: 0.999, + Drop: 3522, + Dup: 87463, + }, history[i].Progress) + + if i != 0 { + require.Greater(t, history[i].CreatedAt, history[i-1].ExitedAt) + } + } +} + func TestParserLogHistoryLength(t *testing.T) { parser := New(Config{ LogLines: 20, diff --git a/ffmpeg/parse/types.go b/ffmpeg/parse/types.go index 3f7910cd..01ffcc78 100644 --- a/ffmpeg/parse/types.go +++ b/ffmpeg/parse/types.go @@ -5,6 +5,7 @@ import ( "time" "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/process" ) // Duration represents a time.Duration @@ -500,17 +501,21 @@ type AVstream struct { } type Usage struct { - CPU struct { - NCPU float64 - Average float64 - Max float64 - Limit float64 - } - Memory struct { - Average float64 - Max uint64 - Limit uint64 - } + CPU UsageCPU + Memory UsageMemory +} + +type UsageCPU struct { + NCPU float64 + Average float64 + Max float64 + Limit float64 +} + +type UsageMemory struct { + Average float64 + Max uint64 + Limit uint64 } type GraphElement struct { @@ -542,3 +547,28 @@ type StreamMapping struct { Graphs []GraphElement Mapping []GraphMapping } + +// Report represents a log report, including the prelude and the last log lines of the process. +type Report struct { + CreatedAt time.Time + Prelude []string + Log []process.Line + Matches []string +} + +// ReportHistoryEntry represents an historical log report, including the exit status of the +// process and the last progress data. +type ReportHistoryEntry struct { + Report + + ExitedAt time.Time + ExitState string + Progress Progress + Usage Usage +} + +type ReportHistorySearchResult struct { + CreatedAt time.Time + ExitedAt time.Time + ExitState string +} diff --git a/http/api/avstream.go b/http/api/avstream.go index 7cb55769..5af4e90d 100644 --- a/http/api/avstream.go +++ b/http/api/avstream.go @@ -22,6 +22,17 @@ func (i *AVstreamIO) Unmarshal(io *app.AVstreamIO) { i.Size = io.Size } +func (i *AVstreamIO) Marshal() app.AVstreamIO { + io := app.AVstreamIO{ + State: i.State, + Packet: i.Packet, + Time: i.Time, + Size: i.Size, + } + + return io +} + type AVstream struct { Input AVstreamIO `json:"input"` Output AVstreamIO `json:"output"` @@ -56,3 +67,22 @@ func (a *AVstream) Unmarshal(av *app.AVstream) { a.Input.Unmarshal(&av.Input) a.Output.Unmarshal(&av.Output) } + +func (a *AVstream) Marshal() *app.AVstream { + av := &app.AVstream{ + Input: a.Input.Marshal(), + Output: a.Output.Marshal(), + Aqueue: a.Aqueue, + Queue: a.Queue, + Dup: a.Dup, + Drop: a.Drop, + Enc: a.Enc, + Looping: a.Looping, + LoopingRuntime: a.LoopingRuntime, + Duplicating: a.Duplicating, + GOP: a.GOP, + Mode: a.Mode, + } + + return av +} diff --git a/http/api/avstream_test.go b/http/api/avstream_test.go new file mode 100644 index 00000000..f8bb4d73 --- /dev/null +++ b/http/api/avstream_test.go @@ -0,0 +1,57 @@ +package api + +import ( + "testing" + + "github.com/datarhei/core/v16/restream/app" + + "github.com/stretchr/testify/require" +) + +func TestAVStreamIO(t *testing.T) { + original := app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + } + + p := AVstreamIO{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestAVStream(t *testing.T) { + original := app.AVstream{ + Input: app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + }, + Output: app.AVstreamIO{ + State: "yyy", + Packet: 7473, + Time: 57634, + Size: 363, + }, + Aqueue: 3829, + Queue: 4398, + Dup: 47, + Drop: 85, + Enc: 4578, + Looping: true, + LoopingRuntime: 483, + Duplicating: true, + GOP: "gop", + Mode: "mode", + } + + p := AVstream{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, &original, restored) +} diff --git a/http/api/event.go b/http/api/event.go index e48f256a..416e37d5 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -19,7 +19,7 @@ type Event struct { Data map[string]string `json:"data"` } -func (e *Event) Marshal(le *log.Event) { +func (e *Event) Unmarshal(le *log.Event) { e.Timestamp = le.Time.Unix() e.Level = int(le.Level) e.Component = strings.ToLower(le.Component) diff --git a/http/api/json.go b/http/api/json.go deleted file mode 100644 index f10b339b..00000000 --- a/http/api/json.go +++ /dev/null @@ -1,19 +0,0 @@ -package api - -import ( - "fmt" - - "github.com/datarhei/core/v16/encoding/json" -) - -func ToNumber(f float64) json.Number { - var s string - - if f == float64(int64(f)) { - s = fmt.Sprintf("%.0f", f) // 0 decimal if integer - } else { - s = fmt.Sprintf("%.3f", f) // max. 3 decimal if float - } - - return json.Number(s) -} diff --git a/http/api/probe.go b/http/api/probe.go index 5f16054b..7f077400 100644 --- a/http/api/probe.go +++ b/http/api/probe.go @@ -44,10 +44,10 @@ func (i *ProbeIO) Unmarshal(io *app.ProbeIO) { i.Type = io.Type i.Codec = io.Codec i.Coder = io.Coder - i.Bitrate = ToNumber(io.Bitrate) - i.Duration = ToNumber(io.Duration) + i.Bitrate = json.ToNumber(io.Bitrate) + i.Duration = json.ToNumber(io.Duration) - i.FPS = ToNumber(io.FPS) + i.FPS = json.ToNumber(io.FPS) i.Pixfmt = io.Pixfmt i.Width = io.Width i.Height = io.Height @@ -64,7 +64,7 @@ type Probe struct { Log []string `json:"log"` } -// Unmarshal converts a restreamer Probe to a Probe in API representation +// Unmarshal converts a core Probe to a Probe in API representation func (probe *Probe) Unmarshal(p *app.Probe) { if p == nil { return diff --git a/http/api/process.go b/http/api/process.go index a4b5c710..29fa57dd 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -70,7 +70,7 @@ type ProcessConfig struct { Metadata map[string]interface{} `json:"metadata,omitempty"` } -// Marshal converts a process config in API representation to a restreamer process config and metadata +// Marshal converts a process config in API representation to a core process config and metadata func (cfg *ProcessConfig) Marshal() (*app.Config, map[string]interface{}) { p := &app.Config{ ID: cfg.ID, @@ -153,7 +153,7 @@ func (cfg *ProcessConfig) generateInputOutputIDs(ioconfig []ProcessConfigIO) { } } -// Unmarshal converts a restream process config to a process config in API representation +// Unmarshal converts a core process config to a process config in API representation func (cfg *ProcessConfig) Unmarshal(c *app.Config) { if c == nil { return @@ -236,7 +236,7 @@ type ProcessState struct { Command []string `json:"command"` } -// Unmarshal converts a restreamer ffmpeg process state to a state in API representation +// Unmarshal converts a core ffmpeg process state to a state in API representation func (s *ProcessState) Unmarshal(state *app.State) { if state == nil { return @@ -249,19 +249,19 @@ func (s *ProcessState) Unmarshal(state *app.State) { s.LastLog = state.LastLog s.Progress = &Progress{} s.Memory = state.Memory - s.CPU = ToNumber(state.CPU) + s.CPU = json.ToNumber(state.CPU) s.LimitMode = state.LimitMode s.Resources.CPU = ProcessUsageCPU{ - NCPU: ToNumber(state.Resources.CPU.NCPU), - Current: ToNumber(state.Resources.CPU.Current), - Average: ToNumber(state.Resources.CPU.Average), - Max: ToNumber(state.Resources.CPU.Max), - Limit: ToNumber(state.Resources.CPU.Limit), + NCPU: json.ToNumber(state.Resources.CPU.NCPU), + Current: json.ToNumber(state.Resources.CPU.Current), + Average: json.ToNumber(state.Resources.CPU.Average), + Max: json.ToNumber(state.Resources.CPU.Max), + Limit: json.ToNumber(state.Resources.CPU.Limit), IsThrottling: state.Resources.CPU.IsThrottling, } s.Resources.Memory = ProcessUsageMemory{ Current: state.Resources.Memory.Current, - Average: ToNumber(state.Resources.Memory.Average), + Average: json.ToNumber(state.Resources.Memory.Average), Max: state.Resources.Memory.Max, Limit: state.Resources.Memory.Limit, } @@ -279,6 +279,43 @@ type ProcessUsageCPU struct { IsThrottling bool `json:"throttling"` } +func (p *ProcessUsageCPU) Unmarshal(pp *app.ProcessUsageCPU) { + p.NCPU = json.ToNumber(pp.NCPU) + p.Current = json.ToNumber(pp.Current) + p.Average = json.ToNumber(pp.Average) + p.Max = json.ToNumber(pp.Max) + p.Limit = json.ToNumber(pp.Limit) + p.IsThrottling = pp.IsThrottling +} + +func (p *ProcessUsageCPU) Marshal() app.ProcessUsageCPU { + pp := app.ProcessUsageCPU{ + IsThrottling: p.IsThrottling, + } + + if x, err := p.NCPU.Float64(); err == nil { + pp.NCPU = x + } + + if x, err := p.Current.Float64(); err == nil { + pp.Current = x + } + + if x, err := p.Average.Float64(); err == nil { + pp.Average = x + } + + if x, err := p.Max.Float64(); err == nil { + pp.Max = x + } + + if x, err := p.Limit.Float64(); err == nil { + pp.Limit = x + } + + return pp +} + type ProcessUsageMemory struct { Current uint64 `json:"cur" format:"uint64"` Average json.Number `json:"avg" swaggertype:"number" jsonschema:"type=number"` @@ -286,7 +323,42 @@ type ProcessUsageMemory struct { Limit uint64 `json:"limit" format:"uint64"` } +func (p *ProcessUsageMemory) Unmarshal(pp *app.ProcessUsageMemory) { + p.Current = pp.Current + p.Average = json.ToNumber(pp.Average) + p.Max = pp.Max + p.Limit = pp.Limit +} + +func (p *ProcessUsageMemory) Marshal() app.ProcessUsageMemory { + pp := app.ProcessUsageMemory{ + Current: p.Current, + Max: p.Max, + Limit: p.Limit, + } + + if x, err := p.Average.Float64(); err == nil { + pp.Average = x + } + + return pp +} + type ProcessUsage struct { CPU ProcessUsageCPU `json:"cpu_usage"` Memory ProcessUsageMemory `json:"memory_bytes"` } + +func (p *ProcessUsage) Unmarshal(pp *app.ProcessUsage) { + p.CPU.Unmarshal(&pp.CPU) + p.Memory.Unmarshal(&pp.Memory) +} + +func (p *ProcessUsage) Marshal() app.ProcessUsage { + pp := app.ProcessUsage{ + CPU: p.CPU.Marshal(), + Memory: p.Memory.Marshal(), + } + + return pp +} diff --git a/http/api/process_test.go b/http/api/process_test.go new file mode 100644 index 00000000..4ddc1dc9 --- /dev/null +++ b/http/api/process_test.go @@ -0,0 +1,114 @@ +package api + +import ( + "testing" + + "github.com/datarhei/core/v16/restream/app" + + "github.com/stretchr/testify/require" +) + +func TestProcessUsageCPU(t *testing.T) { + original := app.ProcessUsageCPU{ + NCPU: 1.5, + Current: 0.7, + Average: 0.9, + Max: 1.3, + Limit: 100, + IsThrottling: true, + } + + p := ProcessUsageCPU{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProcessUsageMemory(t *testing.T) { + original := app.ProcessUsageMemory{ + Current: 100, + Average: 72, + Max: 150, + Limit: 200, + } + + p := ProcessUsageMemory{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProcessUsage(t *testing.T) { + original := app.ProcessUsage{ + CPU: app.ProcessUsageCPU{ + NCPU: 1.5, + Current: 0.7, + Average: 0.9, + Max: 1.3, + Limit: 100, + IsThrottling: true, + }, + Memory: app.ProcessUsageMemory{ + Current: 100, + Average: 72, + Max: 150, + Limit: 200, + }, + } + + p := ProcessUsage{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProcessConfig(t *testing.T) { + original := app.Config{ + ID: "foobar", + Reference: "none", + Owner: "me", + Domain: "all", + Input: []app.ConfigIO{ + { + ID: "in", + Address: "example_in", + Options: []string{"io1", "io2"}, + }, + }, + Output: []app.ConfigIO{ + { + ID: "out", + Address: "example_out", + Options: []string{"oo1", "oo2", "oo3"}, + Cleanup: []app.ConfigIOCleanup{ + { + Pattern: "xxxx", + MaxFiles: 5, + MaxFileAge: 100, + PurgeOnDelete: true, + }, + }, + }, + }, + Options: []string{"o1", "o2", "o3"}, + Reconnect: true, + ReconnectDelay: 20, + Autostart: true, + StaleTimeout: 50, + Timeout: 60, + Scheduler: "xxx", + LogPatterns: []string{"bla", "blubb"}, + LimitCPU: 10, + LimitMemory: 100 * 1024 * 1024, + LimitWaitFor: 20, + } + + p := ProcessConfig{} + p.Unmarshal(&original) + restored, _ := p.Marshal() + + require.Equal(t, &original, restored) +} diff --git a/http/api/progress.go b/http/api/progress.go index e7b48116..93671b57 100644 --- a/http/api/progress.go +++ b/http/api/progress.go @@ -1,8 +1,6 @@ package api import ( - "fmt" - "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/restream/app" ) @@ -50,7 +48,7 @@ type ProgressIO struct { AVstream *AVstream `json:"avstream" jsonschema:"anyof_type=null;object"` } -// Unmarshal converts a restreamer ProgressIO to a ProgressIO in API representation +// Unmarshal converts a core ProgressIO to a ProgressIO in API representation func (i *ProgressIO) Unmarshal(io *app.ProgressIO) { if io == nil { return @@ -66,17 +64,17 @@ func (i *ProgressIO) Unmarshal(io *app.ProgressIO) { i.Coder = io.Coder i.Frame = io.Frame i.Keyframe = io.Keyframe - i.Framerate.Min = json.Number(fmt.Sprintf("%.3f", io.Framerate.Min)) - i.Framerate.Max = json.Number(fmt.Sprintf("%.3f", io.Framerate.Max)) - i.Framerate.Average = json.Number(fmt.Sprintf("%.3f", io.Framerate.Average)) - i.FPS = json.Number(fmt.Sprintf("%.3f", io.FPS)) + i.Framerate.Min = json.ToNumber(io.Framerate.Min) + i.Framerate.Max = json.ToNumber(io.Framerate.Max) + i.Framerate.Average = json.ToNumber(io.Framerate.Average) + i.FPS = json.ToNumber(io.FPS) i.Packet = io.Packet - i.PPS = json.Number(fmt.Sprintf("%.3f", io.PPS)) + i.PPS = json.ToNumber(io.PPS) i.Size = io.Size / 1024 - i.Bitrate = json.Number(fmt.Sprintf("%.3f", io.Bitrate/1024)) + i.Bitrate = json.ToNumber(io.Bitrate / 1024) i.Extradata = io.Extradata i.Pixfmt = io.Pixfmt - i.Quantizer = json.Number(fmt.Sprintf("%.3f", io.Quantizer)) + i.Quantizer = json.ToNumber(io.Quantizer) i.Width = io.Width i.Height = io.Height i.Sampling = io.Sampling @@ -89,6 +87,64 @@ func (i *ProgressIO) Unmarshal(io *app.ProgressIO) { } } +func (i *ProgressIO) Marshal() app.ProgressIO { + p := app.ProgressIO{ + ID: i.ID, + Address: i.Address, + Index: i.Index, + Stream: i.Stream, + Format: i.Format, + Type: i.Type, + Codec: i.Codec, + Coder: i.Coder, + Frame: i.Frame, + Keyframe: i.Keyframe, + Packet: i.Packet, + Size: i.Size * 1024, + Extradata: i.Extradata, + Pixfmt: i.Pixfmt, + Width: i.Width, + Height: i.Height, + Sampling: i.Sampling, + Layout: i.Layout, + Channels: i.Channels, + } + + if x, err := i.Framerate.Min.Float64(); err == nil { + p.Framerate.Min = x + } + + if x, err := i.Framerate.Max.Float64(); err == nil { + p.Framerate.Max = x + } + + if x, err := i.Framerate.Average.Float64(); err == nil { + p.Framerate.Average = x + } + + if x, err := i.FPS.Float64(); err == nil { + p.FPS = x + } + + if x, err := i.PPS.Float64(); err == nil { + p.PPS = x + } + + if x, err := i.Bitrate.Float64(); err == nil { + p.Bitrate = x * 1024 + } + + if x, err := i.Quantizer.Float64(); err == nil { + p.Quantizer = x + } + + if i.AVstream != nil { + p.AVstream = i.AVstream.Marshal() + } + + return p +} + // Progress represents the progress of an ffmpeg process type Progress struct { Started bool `json:"started"` @@ -107,38 +163,82 @@ type Progress struct { Dup uint64 `json:"dup" format:"uint64"` } -// Unmarshal converts a restreamer Progress to a Progress in API representation -func (progress *Progress) Unmarshal(p *app.Progress) { - progress.Input = []ProgressIO{} - progress.Output = []ProgressIO{} +// Unmarshal converts a core Progress to a Progress in API representation +func (p *Progress) Unmarshal(pp *app.Progress) { + p.Input = []ProgressIO{} + p.Output = []ProgressIO{} - if p == nil { + if pp == nil { return } - progress.Started = p.Started - progress.Input = make([]ProgressIO, len(p.Input)) - progress.Output = make([]ProgressIO, len(p.Output)) - progress.Frame = p.Frame - progress.Packet = p.Packet - progress.FPS = ToNumber(p.FPS) - progress.Quantizer = ToNumber(p.Quantizer) - progress.Size = p.Size / 1024 - progress.Time = ToNumber(p.Time) - progress.Bitrate = ToNumber(p.Bitrate / 1024) - progress.Speed = ToNumber(p.Speed) - progress.Drop = p.Drop - progress.Dup = p.Dup + p.Started = pp.Started + p.Input = make([]ProgressIO, len(pp.Input)) + p.Output = make([]ProgressIO, len(pp.Output)) + p.Frame = pp.Frame + p.Packet = pp.Packet + p.FPS = json.ToNumber(pp.FPS) + p.Quantizer = json.ToNumber(pp.Quantizer) + p.Size = pp.Size / 1024 + p.Time = json.ToNumber(pp.Time) + p.Bitrate = json.ToNumber(pp.Bitrate / 1024) + p.Speed = json.ToNumber(pp.Speed) + p.Drop = pp.Drop + p.Dup = pp.Dup - for i, io := range p.Input { - progress.Input[i].Unmarshal(&io) + for i, io := range pp.Input { + p.Input[i].Unmarshal(&io) } - for i, io := range p.Output { - progress.Output[i].Unmarshal(&io) + for i, io := range pp.Output { + p.Output[i].Unmarshal(&io) } - progress.Mapping.Unmarshal(&p.Mapping) + p.Mapping.Unmarshal(&pp.Mapping) +} + +func (p *Progress) Marshal() app.Progress { + pp := app.Progress{ + Started: p.Started, + Input: make([]app.ProgressIO, 0, len(p.Input)), + Output: make([]app.ProgressIO, 0, len(p.Output)), + Mapping: p.Mapping.Marshal(), + Frame: p.Frame, + Packet: p.Packet, + Size: p.Size * 1024, + Drop: p.Drop, + Dup: p.Dup, + } + + if x, err := p.FPS.Float64(); err == nil { + pp.FPS = x + } + + if x, err := p.Quantizer.Float64(); err == nil { + pp.Quantizer = x + } + + if x, err := p.Time.Float64(); err == nil { + pp.Time = x + } + + if x, err := p.Bitrate.Float64(); err == nil { + pp.Bitrate = x * 1024 + } + + if x, err := p.Speed.Float64(); err == nil { + pp.Speed = x + } + + for _, io := range p.Input { + pp.Input = append(pp.Input, io.Marshal()) + } + + for _, io := range p.Output { + pp.Output = append(pp.Output, io.Marshal()) + } + + return pp } type GraphElement struct { @@ -158,6 +258,44 @@ type GraphElement struct { Height uint64 `json:"height"` } +func (g *GraphElement) Unmarshal(a *app.GraphElement) { + g.Index = a.Index + g.Name = a.Name + g.Filter = a.Filter + g.DstName = a.DstName + g.DstFilter = a.DstFilter + g.Inpad = a.Inpad + g.Outpad = a.Outpad + g.Timebase = a.Timebase + g.Type = a.Type + g.Format = a.Format + g.Sampling = a.Sampling + g.Layout = a.Layout + g.Width = a.Width + g.Height = a.Height +} + +func (g *GraphElement) Marshal() app.GraphElement { + a := app.GraphElement{ + Index: g.Index, + Name: g.Name, + Filter: g.Filter, + DstName: g.DstName, + DstFilter: g.DstFilter, + Inpad: g.Inpad, + Outpad: g.Outpad, + Timebase: g.Timebase, + Type: g.Type, + Format: g.Format, + Sampling: g.Sampling, + Layout: g.Layout, + Width: g.Width, + Height: g.Height, + } + + return a +} + type GraphMapping struct { Input int `json:"input"` Output int `json:"output"` @@ -166,45 +304,57 @@ type GraphMapping struct { Copy bool `json:"copy"` } +func (g *GraphMapping) Unmarshal(a *app.GraphMapping) { + g.Input = a.Input + g.Output = a.Output + g.Index = a.Index + g.Name = a.Name + g.Copy = a.Copy +} + +func (g *GraphMapping) Marshal() app.GraphMapping { + a := app.GraphMapping{ + Input: g.Input, + Output: g.Output, + Index: g.Index, + Name: g.Name, + Copy: g.Copy, + } + + return a +} + type StreamMapping struct { Graphs []GraphElement `json:"graphs"` Mapping []GraphMapping `json:"mapping"` } -// Unmarshal converts a restreamer StreamMapping to a StreamMapping in API representation +// Unmarshal converts a core StreamMapping to a StreamMapping in API representation func (s *StreamMapping) Unmarshal(m *app.StreamMapping) { - s.Graphs = make([]GraphElement, 0, len(m.Graphs)) - for _, mge := range m.Graphs { - ge := GraphElement{ - Index: mge.Index, - Name: mge.Name, - Filter: mge.Filter, - DstName: mge.DstName, - DstFilter: mge.DstFilter, - Inpad: mge.Inpad, - Outpad: mge.Outpad, - Timebase: mge.Timebase, - Type: mge.Type, - Format: mge.Format, - Sampling: mge.Sampling, - Layout: mge.Layout, - Width: mge.Width, - Height: mge.Height, - } - - s.Graphs = append(s.Graphs, ge) + s.Graphs = make([]GraphElement, len(m.Graphs)) + for i, graph := range m.Graphs { + s.Graphs[i].Unmarshal(&graph) } - s.Mapping = make([]GraphMapping, 0, len(m.Mapping)) - for _, mmapping := range m.Mapping { - mapping := GraphMapping{ - Input: mmapping.Input, - Output: mmapping.Output, - Index: mmapping.Index, - Name: mmapping.Name, - Copy: mmapping.Copy, - } - - s.Mapping = append(s.Mapping, mapping) + s.Mapping = make([]GraphMapping, len(m.Mapping)) + for i, mapping := range m.Mapping { + s.Mapping[i].Unmarshal(&mapping) } } + +func (s *StreamMapping) Marshal() app.StreamMapping { + m := app.StreamMapping{ + Graphs: make([]app.GraphElement, 0, len(s.Graphs)), + Mapping: make([]app.GraphMapping, 0, len(s.Mapping)), + } + + for _, graph := range s.Graphs { + m.Graphs = append(m.Graphs, graph.Marshal()) + } + + for _, mapping := range s.Mapping { + m.Mapping = append(m.Mapping, mapping.Marshal()) + } + + return m +} diff --git a/http/api/progress_test.go b/http/api/progress_test.go new file mode 100644 index 00000000..812e434c --- /dev/null +++ b/http/api/progress_test.go @@ -0,0 +1,331 @@ +package api + +import ( + "testing" + + "github.com/datarhei/core/v16/restream/app" + + "github.com/stretchr/testify/require" +) + +func TestGraphMapping(t *testing.T) { + original := app.GraphMapping{ + Input: 1, + Output: 3, + Index: 39, + Name: "foobar", + Copy: true, + } + + p := GraphMapping{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestGraphElement(t *testing.T) { + original := app.GraphElement{ + Index: 5, + Name: "foobar", + Filter: "infilter", + DstName: "outfilter_", + DstFilter: "outfilter", + Inpad: "inpad", + Outpad: "outpad", + Timebase: "100", + Type: "video", + Format: "yuv420p", + Sampling: 39944, + Layout: "atmos", + Width: 1029, + Height: 463, + } + + p := GraphElement{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestStreamMapping(t *testing.T) { + original := app.StreamMapping{ + Graphs: []app.GraphElement{ + { + Index: 5, + Name: "foobar", + Filter: "infilter", + DstName: "outfilter_", + DstFilter: "outfilter", + Inpad: "inpad", + Outpad: "outpad", + Timebase: "100", + Type: "video", + Format: "yuv420p", + Sampling: 39944, + Layout: "atmos", + Width: 1029, + Height: 463, + }, + }, + Mapping: []app.GraphMapping{ + { + Input: 1, + Output: 3, + Index: 39, + Name: "foobar", + Copy: true, + }, + }, + } + + p := StreamMapping{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProgressIO(t *testing.T) { + original := app.ProgressIO{ + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: nil, + } + + p := ProgressIO{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProgressIOAVstream(t *testing.T) { + original := app.ProgressIO{ + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: &app.AVstream{ + Input: app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + }, + Output: app.AVstreamIO{ + State: "yyy", + Packet: 7473, + Time: 57634, + Size: 363, + }, + Aqueue: 3829, + Queue: 4398, + Dup: 47, + Drop: 85, + Enc: 4578, + Looping: true, + LoopingRuntime: 483, + Duplicating: true, + GOP: "gop", + Mode: "mode", + }, + } + + p := ProgressIO{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProgress(t *testing.T) { + original := app.Progress{ + Started: true, + Input: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: &app.AVstream{ + Input: app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + }, + Output: app.AVstreamIO{ + State: "yyy", + Packet: 7473, + Time: 57634, + Size: 363, + }, + Aqueue: 3829, + Queue: 4398, + Dup: 47, + Drop: 85, + Enc: 4578, + Looping: true, + LoopingRuntime: 483, + Duplicating: true, + GOP: "gop", + Mode: "mode", + }, + }, + }, + Output: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: nil, + }, + }, + Mapping: app.StreamMapping{ + Graphs: []app.GraphElement{ + { + Index: 5, + Name: "foobar", + Filter: "infilter", + DstName: "outfilter_", + DstFilter: "outfilter", + Inpad: "inpad", + Outpad: "outpad", + Timebase: "100", + Type: "video", + Format: "yuv420p", + Sampling: 39944, + Layout: "atmos", + Width: 1029, + Height: 463, + }, + }, + Mapping: []app.GraphMapping{ + { + Input: 1, + Output: 3, + Index: 39, + Name: "foobar", + Copy: true, + }, + }, + }, + Frame: 329, + Packet: 4343, + FPS: 84.2, + Quantizer: 234.2, + Size: 339393 * 1024, + Time: 494, + Bitrate: 33848.2 * 1024, + Speed: 293.2, + Drop: 2393, + Dup: 5958, + } + + p := Progress{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} diff --git a/http/api/report.go b/http/api/report.go index 49c67a74..025f0f94 100644 --- a/http/api/report.go +++ b/http/api/report.go @@ -2,82 +2,122 @@ package api import ( "strconv" + "time" "github.com/datarhei/core/v16/restream/app" ) -// ProcessReportEntry represents the logs of a run of a restream process +// ProcessReportEntry represents the logs of a run of a core process type ProcessReportEntry struct { - CreatedAt int64 `json:"created_at" format:"int64"` - Prelude []string `json:"prelude,omitempty"` - Log [][2]string `json:"log,omitempty"` - Matches []string `json:"matches,omitempty"` + CreatedAt int64 `json:"created_at" format:"int64"` + Prelude []string `json:"prelude,omitempty"` + Log [][2]string `json:"log,omitempty"` + Matches []string `json:"matches,omitempty"` +} + +func (r *ProcessReportEntry) Unmarshal(p *app.ReportEntry) { + r.CreatedAt = p.CreatedAt.Unix() + r.Prelude = p.Prelude + r.Log = make([][2]string, len(p.Log)) + for i, line := range p.Log { + r.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) + r.Log[i][1] = line.Data + } + r.Matches = p.Matches +} + +func (r *ProcessReportEntry) Marshal() app.ReportEntry { + p := app.ReportEntry{ + CreatedAt: time.Unix(r.CreatedAt, 0), + Prelude: r.Prelude, + Log: make([]app.LogLine, 0, len(r.Log)), + Matches: r.Matches, + } + + for _, l := range r.Log { + ts, _ := strconv.ParseInt(l[0], 10, 64) + p.Log = append(p.Log, app.LogLine{ + Timestamp: time.Unix(ts, 0), + Data: l[1], + }) + } + + return p +} + +type ProcessReportHistoryEntry struct { + ProcessReportEntry + ExitedAt int64 `json:"exited_at,omitempty" format:"int64"` ExitState string `json:"exit_state,omitempty"` Progress *Progress `json:"progress,omitempty"` Resources *ProcessUsage `json:"resources,omitempty"` } -type ProcessReportHistoryEntry struct { - ProcessReportEntry +func (r *ProcessReportHistoryEntry) Unmarshal(p *app.ReportHistoryEntry) { + r.ProcessReportEntry.Unmarshal(&p.ReportEntry) + + r.ExitedAt = p.ExitedAt.Unix() + r.ExitState = p.ExitState + + r.Resources = &ProcessUsage{} + r.Resources.Unmarshal(&p.Usage) + + r.Progress = &Progress{} + r.Progress.Unmarshal(&p.Progress) +} + +func (r *ProcessReportHistoryEntry) Marshal() app.ReportHistoryEntry { + p := app.ReportHistoryEntry{ + ReportEntry: r.ProcessReportEntry.Marshal(), + ExitedAt: time.Unix(r.ExitedAt, 0), + ExitState: r.ExitState, + Progress: app.Progress{}, + Usage: app.ProcessUsage{}, + } + + if r.Progress != nil { + p.Progress = r.Progress.Marshal() + } + + if r.Resources != nil { + p.Usage = r.Resources.Marshal() + } + + return p } // ProcessReport represents the current log and the logs of previous runs of a restream process type ProcessReport struct { ProcessReportEntry - History []ProcessReportEntry `json:"history"` + History []ProcessReportHistoryEntry `json:"history"` } -// Unmarshal converts a restream log to a report -func (report *ProcessReport) Unmarshal(l *app.Report) { +// Unmarshal converts a core report to a report +func (r *ProcessReport) Unmarshal(l *app.Report) { if l == nil { return } - report.CreatedAt = l.CreatedAt.Unix() - report.Prelude = l.Prelude - report.Log = make([][2]string, len(l.Log)) - for i, line := range l.Log { - report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) - report.Log[i][1] = line.Data + r.ProcessReportEntry.Unmarshal(&l.ReportEntry) + r.History = make([]ProcessReportHistoryEntry, len(l.History)) + + for i, h := range l.History { + r.History[i].Unmarshal(&h) } - report.Matches = l.Matches +} - report.History = []ProcessReportEntry{} - - for _, h := range l.History { - he := ProcessReportEntry{ - CreatedAt: h.CreatedAt.Unix(), - Prelude: h.Prelude, - Log: make([][2]string, len(h.Log)), - Matches: h.Matches, - ExitedAt: h.ExitedAt.Unix(), - ExitState: h.ExitState, - Resources: &ProcessUsage{ - CPU: ProcessUsageCPU{ - NCPU: ToNumber(h.Usage.CPU.NCPU), - Average: ToNumber(h.Usage.CPU.Average), - Max: ToNumber(h.Usage.CPU.Max), - Limit: ToNumber(h.Usage.CPU.Limit), - }, - Memory: ProcessUsageMemory{ - Average: ToNumber(h.Usage.Memory.Average), - Max: h.Usage.Memory.Max, - Limit: h.Usage.Memory.Limit, - }, - }, - } - - he.Progress = &Progress{} - he.Progress.Unmarshal(&h.Progress) - - for i, line := range h.Log { - he.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) - he.Log[i][1] = line.Data - } - - report.History = append(report.History, he) +func (r *ProcessReport) Marshal() app.Report { + p := app.Report{ + ReportEntry: r.ProcessReportEntry.Marshal(), + History: make([]app.ReportHistoryEntry, 0, len(r.History)), } + + for _, h := range r.History { + p.History = append(p.History, h.Marshal()) + } + + return p } type ProcessReportSearchResult struct { diff --git a/http/api/report_test.go b/http/api/report_test.go new file mode 100644 index 00000000..6347c8a4 --- /dev/null +++ b/http/api/report_test.go @@ -0,0 +1,403 @@ +package api + +import ( + "testing" + "time" + + "github.com/datarhei/core/v16/restream/app" + + "github.com/stretchr/testify/require" +) + +func TestProcessReportEntry(t *testing.T) { + original := app.ReportEntry{ + CreatedAt: time.Unix(12345, 0), + Prelude: []string{"lalala", "lululu"}, + Log: []app.LogLine{ + { + Timestamp: time.Unix(123, 0), + Data: "xxx", + }, + { + Timestamp: time.Unix(124, 0), + Data: "yyy", + }, + }, + Matches: []string{"match1", "match2", "match3"}, + } + + p := ProcessReportEntry{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProcessReportHistoryEntry(t *testing.T) { + original := app.ReportHistoryEntry{ + ReportEntry: app.ReportEntry{ + CreatedAt: time.Unix(12345, 0), + Prelude: []string{"lalala", "lululu"}, + Log: []app.LogLine{ + { + Timestamp: time.Unix(123, 0), + Data: "xxx", + }, + { + Timestamp: time.Unix(124, 0), + Data: "yyy", + }, + }, + Matches: []string{"match1", "match2", "match3"}, + }, + ExitedAt: time.Unix(394949, 0), + ExitState: "kaputt", + Progress: app.Progress{ + Started: true, + Input: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: &app.AVstream{ + Input: app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + }, + Output: app.AVstreamIO{ + State: "yyy", + Packet: 7473, + Time: 57634, + Size: 363, + }, + Aqueue: 3829, + Queue: 4398, + Dup: 47, + Drop: 85, + Enc: 4578, + Looping: true, + LoopingRuntime: 483, + Duplicating: true, + GOP: "gop", + Mode: "mode", + }, + }, + }, + Output: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: nil, + }, + }, + Mapping: app.StreamMapping{ + Graphs: []app.GraphElement{ + { + Index: 5, + Name: "foobar", + Filter: "infilter", + DstName: "outfilter_", + DstFilter: "outfilter", + Inpad: "inpad", + Outpad: "outpad", + Timebase: "100", + Type: "video", + Format: "yuv420p", + Sampling: 39944, + Layout: "atmos", + Width: 1029, + Height: 463, + }, + }, + Mapping: []app.GraphMapping{ + { + Input: 1, + Output: 3, + Index: 39, + Name: "foobar", + Copy: true, + }, + }, + }, + Frame: 329, + Packet: 4343, + FPS: 84.2, + Quantizer: 234.2, + Size: 339393 * 1024, + Time: 494, + Bitrate: 33848.2 * 1024, + Speed: 293.2, + Drop: 2393, + Dup: 5958, + }, + Usage: app.ProcessUsage{ + CPU: app.ProcessUsageCPU{ + NCPU: 1.5, + Current: 0.7, + Average: 0.9, + Max: 1.3, + Limit: 100, + IsThrottling: true, + }, + Memory: app.ProcessUsageMemory{ + Current: 100, + Average: 72, + Max: 150, + Limit: 200, + }, + }, + } + + p := ProcessReportHistoryEntry{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} + +func TestProcessReport(t *testing.T) { + original := app.Report{ + ReportEntry: app.ReportEntry{ + CreatedAt: time.Unix(12345, 0), + Prelude: []string{"lalala", "lululu"}, + Log: []app.LogLine{ + { + Timestamp: time.Unix(123, 0), + Data: "xxx", + }, + { + Timestamp: time.Unix(124, 0), + Data: "yyy", + }, + }, + Matches: []string{"match1", "match2", "match3"}, + }, + History: []app.ReportHistoryEntry{ + { + ReportEntry: app.ReportEntry{ + CreatedAt: time.Unix(12345, 0), + Prelude: []string{"lalala", "lululu"}, + Log: []app.LogLine{ + { + Timestamp: time.Unix(123, 0), + Data: "xxx", + }, + { + Timestamp: time.Unix(124, 0), + Data: "yyy", + }, + }, + Matches: []string{"match1", "match2", "match3"}, + }, + ExitedAt: time.Unix(394949, 0), + ExitState: "kaputt", + Progress: app.Progress{ + Started: true, + Input: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: &app.AVstream{ + Input: app.AVstreamIO{ + State: "xxx", + Packet: 100, + Time: 42, + Size: 95744, + }, + Output: app.AVstreamIO{ + State: "yyy", + Packet: 7473, + Time: 57634, + Size: 363, + }, + Aqueue: 3829, + Queue: 4398, + Dup: 47, + Drop: 85, + Enc: 4578, + Looping: true, + LoopingRuntime: 483, + Duplicating: true, + GOP: "gop", + Mode: "mode", + }, + }, + }, + Output: []app.ProgressIO{ + { + ID: "id", + Address: "jfdk", + Index: 4, + Stream: 7, + Format: "rtmp", + Type: "video", + Codec: "x", + Coder: "y", + Frame: 133, + Keyframe: 39, + Framerate: app.ProgressIOFramerate{ + Min: 12.5, + Max: 30.0, + Average: 25.9, + }, + FPS: 25.3, + Packet: 442, + PPS: 45.5, + Size: 45944 * 1024, + Bitrate: 5848.22 * 1024, + Extradata: 34, + Pixfmt: "yuv420p", + Quantizer: 494.2, + Width: 10393, + Height: 4933, + Sampling: 58483, + Layout: "atmos", + Channels: 4944, + AVstream: nil, + }, + }, + Mapping: app.StreamMapping{ + Graphs: []app.GraphElement{ + { + Index: 5, + Name: "foobar", + Filter: "infilter", + DstName: "outfilter_", + DstFilter: "outfilter", + Inpad: "inpad", + Outpad: "outpad", + Timebase: "100", + Type: "video", + Format: "yuv420p", + Sampling: 39944, + Layout: "atmos", + Width: 1029, + Height: 463, + }, + }, + Mapping: []app.GraphMapping{ + { + Input: 1, + Output: 3, + Index: 39, + Name: "foobar", + Copy: true, + }, + }, + }, + Frame: 329, + Packet: 4343, + FPS: 84.2, + Quantizer: 234.2, + Size: 339393 * 1024, + Time: 494, + Bitrate: 33848.2 * 1024, + Speed: 293.2, + Drop: 2393, + Dup: 5958, + }, + Usage: app.ProcessUsage{ + CPU: app.ProcessUsageCPU{ + NCPU: 1.5, + Current: 0.7, + Average: 0.9, + Max: 1.3, + Limit: 100, + IsThrottling: true, + }, + Memory: app.ProcessUsageMemory{ + Current: 100, + Average: 72, + Max: 150, + Limit: 200, + }, + }, + }, + }, + } + + p := ProcessReport{} + p.Unmarshal(&original) + restored := p.Marshal() + + require.Equal(t, original, restored) +} diff --git a/http/api/session.go b/http/api/session.go index 3ce69839..3ee588df 100644 --- a/http/api/session.go +++ b/http/api/session.go @@ -42,8 +42,8 @@ func (s *Session) Unmarshal(sess session.Session) { s.Extra = sess.Extra s.RxBytes = sess.RxBytes s.TxBytes = sess.TxBytes - s.RxBitrate = ToNumber(sess.RxBitrate / 1024) - s.TxBitrate = ToNumber(sess.TxBitrate / 1024) + s.RxBitrate = json.ToNumber(sess.RxBitrate / 1024) + s.TxBitrate = json.ToNumber(sess.TxBitrate / 1024) } // SessionSummaryActive represents the currently active sessions @@ -80,12 +80,12 @@ type SessionsActive map[string][]Session // Unmarshal creates a new SessionSummary from a session.Summary func (summary *SessionSummary) Unmarshal(sum session.Summary) { summary.Active.MaxSessions = sum.MaxSessions - summary.Active.MaxRxBitrate = ToNumber(sum.MaxRxBitrate / 1024 / 1024) - summary.Active.MaxTxBitrate = ToNumber(sum.MaxTxBitrate / 1024 / 1024) + summary.Active.MaxRxBitrate = json.ToNumber(sum.MaxRxBitrate / 1024 / 1024) + summary.Active.MaxTxBitrate = json.ToNumber(sum.MaxTxBitrate / 1024 / 1024) summary.Active.Sessions = sum.CurrentSessions - summary.Active.RxBitrate = ToNumber(sum.CurrentRxBitrate / 1024 / 1024) - summary.Active.TxBitrate = ToNumber(sum.CurrentTxBitrate / 1024 / 1024) + summary.Active.RxBitrate = json.ToNumber(sum.CurrentRxBitrate / 1024 / 1024) + summary.Active.TxBitrate = json.ToNumber(sum.CurrentTxBitrate / 1024 / 1024) summary.Active.SessionList = make([]Session, len(sum.Active)) diff --git a/http/client/client.go b/http/client/client.go index 7d8abc05..d73fab98 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -69,6 +69,7 @@ type RestClient interface { ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config ProcessReport(id app.ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report + ProcessReportSet(id app.ProcessID, report *app.Report) error // PUT /v3/process/{id}/report ProcessState(id app.ProcessID) (api.ProcessState, error) // GET /v3/process/{id}/state ProcessMetadata(id app.ProcessID, key string) (api.Metadata, error) // GET /v3/process/{id}/metadata/{key} ProcessMetadataSet(id app.ProcessID, key string, metadata api.Metadata) error // PUT /v3/process/{id}/metadata/{key} @@ -454,6 +455,10 @@ func New(config Config) (RestClient, error) { path: mustNewGlob("/v3/cluster/node/*/state"), constraint: mustNewConstraint("^16.14.0"), }, + { + path: mustNewGlob("/v3/process/*/report"), + constraint: mustNewConstraint("^16.20.0"), + }, }, "DELETE": { { diff --git a/http/client/process.go b/http/client/process.go index 76fd1883..87938a49 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -104,6 +104,26 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map return nil } +func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) error { + var buf bytes.Buffer + + data := api.ProcessReport{} + data.Unmarshal(report) + + e := json.NewEncoder(&buf) + e.Encode(data) + + query := &url.Values{} + query.Set("domain", id.Domain) + + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", &buf) + if err != nil { + return err + } + + return nil +} + func (r *restclient) ProcessDelete(id app.ProcessID) error { query := &url.Values{} query.Set("domain", id.Domain) diff --git a/http/graph/resolver/resolver.go b/http/graph/resolver/resolver.go index 22f628d6..76aa3240 100644 --- a/http/graph/resolver/resolver.go +++ b/http/graph/resolver/resolver.go @@ -36,7 +36,7 @@ func (r *queryResolver) getProcess(id app.ProcessID) (*models.Process, error) { return nil, err } - report, err := r.Restream.GetProcessLog(id) + report, err := r.Restream.GetProcessReport(id) if err != nil { return nil, err } diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index d4c12931..247a5a4c 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -252,8 +252,8 @@ func (h *ClusterHandler) convertStoreProcessToAPIProcess(p store.Process, filter LastLog: p.Error, Resources: api.ProcessUsage{ CPU: api.ProcessUsageCPU{ - NCPU: api.ToNumber(1), - Limit: api.ToNumber(p.Config.LimitCPU), + NCPU: json.ToNumber(1), + Limit: json.ToNumber(p.Config.LimitCPU), }, Memory: api.ProcessUsageMemory{ Limit: p.Config.LimitMemory, @@ -284,8 +284,8 @@ func (h *ClusterHandler) convertStoreProcessToAPIProcess(p store.Process, filter process.Report.Log = [][2]string{ {strconv.FormatInt(p.CreatedAt.Unix(), 10), p.Error}, } - process.Report.ExitedAt = p.CreatedAt.Unix() - process.Report.ExitState = "failed" + //process.Report.ExitedAt = p.CreatedAt.Unix() + //process.Report.ExitState = "failed" } } diff --git a/http/handler/api/events.go b/http/handler/api/events.go index decb146d..bec1b2e2 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -113,7 +113,7 @@ func (h *EventsHandler) Events(c echo.Context) error { res.Write([]byte(":keepalive\n\n")) res.Flush() case e := <-evts: - event.Marshal(&e) + event.Unmarshal(&e) if !filterEvent(&event) { continue @@ -141,7 +141,7 @@ func (h *EventsHandler) Events(c echo.Context) error { res.Write([]byte("{\"event\": \"keepalive\"}\n")) res.Flush() case e := <-evts: - event.Marshal(&e) + event.Unmarshal(&e) if !filterEvent(&event) { continue diff --git a/http/handler/api/process.go b/http/handler/api/process.go index 0ec907f1..f362bda4 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "net/http" "runtime" "sort" @@ -333,11 +334,6 @@ func (h *ProcessHandler) Update(c echo.Context) error { config, metadata := process.Marshal() - tid = app.ProcessID{ - ID: id, - Domain: domain, - } - if err := h.restream.UpdateProcess(tid, config); err != nil { if err == restream.ErrUnknownProcess { return api.Err(http.StatusNotFound, "", "process not found: %s", id) @@ -545,7 +541,7 @@ func (h *ProcessHandler) GetReport(c echo.Context) error { Domain: domain, } - l, err := h.restream.GetProcessLog(tid) + l, err := h.restream.GetProcessReport(tid) if err != nil { return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) } @@ -560,13 +556,15 @@ func (h *ProcessHandler) GetReport(c echo.Context) error { filteredReport := api.ProcessReport{} // Add the current report as a fake history entry - report.History = append(report.History, api.ProcessReportEntry{ - CreatedAt: report.CreatedAt, - Prelude: report.Prelude, - Log: report.Log, + report.History = append(report.History, api.ProcessReportHistoryEntry{ + ProcessReportEntry: api.ProcessReportEntry{ + CreatedAt: report.CreatedAt, + Prelude: report.Prelude, + Log: report.Log, + }, }) - entries := []api.ProcessReportEntry{} + entries := []api.ProcessReportHistoryEntry{} for _, r := range report.History { if createdAt != nil && exitedAt == nil { @@ -606,6 +604,53 @@ func (h *ProcessHandler) GetReport(c echo.Context) error { return c.JSON(http.StatusOK, filteredReport) } +// SetReport sets the report history of a process +// @Summary Set the report history a process +// @Description Set the report history a process +// @Tags v16.?.? +// @ID process-3-set-report +// @Accept json +// @Produce json +// @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" +// @Param report body api.ProcessReport true "Process report" +// @Success 200 {string} string +// @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/process/{id}/report [put] +func (h *ProcessHandler) SetReport(c echo.Context) error { + ctxuser := util.DefaultContext(c, "user", "") + domain := util.DefaultQuery(c, "domain", "") + id := util.PathParam(c, "id") + + fmt.Printf("entering SetReport handler\n") + + if !h.iam.Enforce(ctxuser, domain, "process", id, "write") { + return api.Err(http.StatusForbidden, "", "You are not allowed to write this process: %s", id) + } + + tid := app.ProcessID{ + ID: id, + Domain: domain, + } + + report := api.ProcessReport{} + + if err := util.ShouldBindJSON(c, &report); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + appreport := report.Marshal() + + if err := h.restream.SetProcessReport(tid, &appreport); err != nil { + return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + // SearchReportHistory returns a list of matching report references // @Summary Search log history of all processes // @Description Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional. @@ -1031,7 +1076,7 @@ func (h *ProcessHandler) getProcess(id app.ProcessID, filter filter) (api.Proces } if filter.report { - log, err := h.restream.GetProcessLog(id) + log, err := h.restream.GetProcessReport(id) if err != nil { return api.Process{}, err } diff --git a/http/server.go b/http/server.go index f58b04d0..adde251a 100644 --- a/http/server.go +++ b/http/server.go @@ -121,7 +121,7 @@ type server struct { v3handler struct { log *api.LogHandler events *api.EventsHandler - restream *api.ProcessHandler + process *api.ProcessHandler rtmp *api.RTMPHandler srt *api.SRTHandler config *api.ConfigHandler @@ -267,7 +267,7 @@ func NewServer(config Config) (serverhandler.Server, error) { ) if config.Restream != nil { - s.v3handler.restream = api.NewProcess( + s.v3handler.process = api.NewProcess( config.Restream, config.IAM, ) @@ -618,49 +618,50 @@ func (s *server) setRoutesV3(v3 *echo.Group) { } // v3 Restreamer - if s.v3handler.restream != nil { - v3.GET("/skills", s.v3handler.restream.Skills) - v3.GET("/skills/reload", s.v3handler.restream.ReloadSkills) + if s.v3handler.process != nil { + v3.GET("/skills", s.v3handler.process.Skills) + v3.GET("/skills/reload", s.v3handler.process.ReloadSkills) - v3.GET("/process", s.v3handler.restream.GetAll) - v3.GET("/process/:id", s.v3handler.restream.Get) + v3.GET("/process", s.v3handler.process.GetAll) + v3.GET("/process/:id", s.v3handler.process.Get) - v3.GET("/process/:id/config", s.v3handler.restream.GetConfig) - v3.GET("/process/:id/state", s.v3handler.restream.GetState) - v3.GET("/process/:id/report", s.v3handler.restream.GetReport) + v3.GET("/process/:id/config", s.v3handler.process.GetConfig) + v3.GET("/process/:id/state", s.v3handler.process.GetState) + v3.GET("/process/:id/report", s.v3handler.process.GetReport) - v3.GET("/process/:id/metadata", s.v3handler.restream.GetProcessMetadata) - v3.GET("/process/:id/metadata/:key", s.v3handler.restream.GetProcessMetadata) + v3.GET("/process/:id/metadata", s.v3handler.process.GetProcessMetadata) + v3.GET("/process/:id/metadata/:key", s.v3handler.process.GetProcessMetadata) - v3.GET("/metadata", s.v3handler.restream.GetMetadata) - v3.GET("/metadata/:key", s.v3handler.restream.GetMetadata) + v3.GET("/metadata", s.v3handler.process.GetMetadata) + v3.GET("/metadata/:key", s.v3handler.process.GetMetadata) if !s.readOnly { - v3.POST("/process/probe", s.v3handler.restream.ProbeConfig) - v3.GET("/process/:id/probe", s.v3handler.restream.Probe) - v3.POST("/process", s.v3handler.restream.Add) - v3.PUT("/process/:id", s.v3handler.restream.Update) - v3.DELETE("/process/:id", s.v3handler.restream.Delete) - v3.PUT("/process/:id/command", s.v3handler.restream.Command) - v3.PUT("/process/:id/metadata/:key", s.v3handler.restream.SetProcessMetadata) - v3.PUT("/metadata/:key", s.v3handler.restream.SetMetadata) + v3.POST("/process/probe", s.v3handler.process.ProbeConfig) + v3.GET("/process/:id/probe", s.v3handler.process.Probe) + v3.POST("/process", s.v3handler.process.Add) + v3.PUT("/process/:id", s.v3handler.process.Update) + v3.DELETE("/process/:id", s.v3handler.process.Delete) + v3.PUT("/process/:id/command", s.v3handler.process.Command) + v3.PUT("/process/:id/report", s.v3handler.process.SetReport) + v3.PUT("/process/:id/metadata/:key", s.v3handler.process.SetProcessMetadata) + v3.PUT("/metadata/:key", s.v3handler.process.SetMetadata) } // v3 Playout - v3.GET("/process/:id/playout/:inputid/status", s.v3handler.restream.PlayoutStatus) - v3.GET("/process/:id/playout/:inputid/reopen", s.v3handler.restream.PlayoutReopenInput) - v3.GET("/process/:id/playout/:inputid/keyframe/*", s.v3handler.restream.PlayoutKeyframe) - v3.GET("/process/:id/playout/:inputid/errorframe/encode", s.v3handler.restream.PlayoutEncodeErrorframe) + v3.GET("/process/:id/playout/:inputid/status", s.v3handler.process.PlayoutStatus) + v3.GET("/process/:id/playout/:inputid/reopen", s.v3handler.process.PlayoutReopenInput) + v3.GET("/process/:id/playout/:inputid/keyframe/*", s.v3handler.process.PlayoutKeyframe) + v3.GET("/process/:id/playout/:inputid/errorframe/encode", s.v3handler.process.PlayoutEncodeErrorframe) if !s.readOnly { - v3.PUT("/process/:id/playout/:inputid/errorframe/*", s.v3handler.restream.PlayoutSetErrorframe) - v3.POST("/process/:id/playout/:inputid/errorframe/*", s.v3handler.restream.PlayoutSetErrorframe) + v3.PUT("/process/:id/playout/:inputid/errorframe/*", s.v3handler.process.PlayoutSetErrorframe) + v3.POST("/process/:id/playout/:inputid/errorframe/*", s.v3handler.process.PlayoutSetErrorframe) - v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.restream.PlayoutSetStream) + v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.process.PlayoutSetStream) } // v3 Report - v3.GET("/report/process", s.v3handler.restream.SearchReportHistory) + v3.GET("/report/process", s.v3handler.process.SearchReportHistory) } // v3 Filesystems diff --git a/restream/app/avstream.go b/restream/app/avstream.go index 3a780b7a..2a5deb5e 100644 --- a/restream/app/avstream.go +++ b/restream/app/avstream.go @@ -1,5 +1,7 @@ package app +import "github.com/datarhei/core/v16/ffmpeg/parse" + type AVstreamIO struct { State string Packet uint64 // counter @@ -7,6 +9,24 @@ type AVstreamIO struct { Size uint64 // bytes } +func (a *AVstreamIO) UnmarshalParser(p *parse.AVstreamIO) { + a.State = p.State + a.Packet = p.Packet + a.Time = p.Time + a.Size = p.Size +} + +func (a *AVstreamIO) MarshalParser() parse.AVstreamIO { + p := parse.AVstreamIO{ + State: a.State, + Packet: a.Packet, + Time: a.Time, + Size: a.Size, + } + + return p +} + type AVStreamSwap struct { URL string Status string @@ -14,6 +34,24 @@ type AVStreamSwap struct { LastError string } +func (a *AVStreamSwap) UnmarshalParser(p *parse.AVStreamSwap) { + a.URL = p.URL + a.Status = p.Status + a.LastURL = p.LastURL + a.LastError = p.LastError +} + +func (a *AVStreamSwap) MarshalParser() parse.AVStreamSwap { + p := parse.AVStreamSwap{ + URL: a.URL, + Status: a.Status, + LastURL: a.LastURL, + LastError: a.LastError, + } + + return p +} + type AVstream struct { Input AVstreamIO Output AVstreamIO @@ -30,3 +68,44 @@ type AVstream struct { Debug interface{} Swap AVStreamSwap } + +func (a *AVstream) UnmarshalParser(p *parse.AVstream) { + if p == nil { + return + } + + a.Aqueue = p.Aqueue + a.Queue = p.Queue + a.Dup = p.Dup + a.Drop = p.Drop + a.Enc = p.Enc + a.Looping = p.Looping + a.LoopingRuntime = p.LoopingRuntime + a.Duplicating = p.Duplicating + a.GOP = p.GOP + a.Mode = p.Mode + a.Swap.UnmarshalParser(&p.Swap) + a.Input.UnmarshalParser(&p.Input) + a.Output.UnmarshalParser(&p.Output) +} + +func (a *AVstream) MarshalParser() *parse.AVstream { + p := &parse.AVstream{ + Input: a.Input.MarshalParser(), + Output: a.Output.MarshalParser(), + Aqueue: a.Aqueue, + Queue: a.Queue, + Dup: a.Dup, + Drop: a.Drop, + Enc: a.Enc, + Looping: a.Looping, + LoopingRuntime: a.LoopingRuntime, + Duplicating: a.Duplicating, + GOP: a.GOP, + Mode: a.Mode, + Debug: a.Debug, + Swap: a.Swap.MarshalParser(), + } + + return p +} diff --git a/restream/app/probe.go b/restream/app/probe.go index e2e9f3da..dcca51e8 100644 --- a/restream/app/probe.go +++ b/restream/app/probe.go @@ -1,5 +1,7 @@ package app +import "github.com/datarhei/core/v16/ffmpeg/probe" + type ProbeIO struct { Address string @@ -26,7 +28,79 @@ type ProbeIO struct { Channels uint64 } +func (p *ProbeIO) UnmarshalProber(pp *probe.ProbeIO) { + p.Address = pp.Address + p.Index = pp.Index + p.Stream = pp.Stream + p.Language = pp.Language + p.Format = pp.Format + p.Type = pp.Type + p.Codec = pp.Codec + p.Coder = pp.Coder + p.Bitrate = pp.Bitrate + p.Duration = pp.Duration + p.Pixfmt = pp.Pixfmt + p.Width = pp.Width + p.Height = pp.Height + p.FPS = pp.FPS + p.Sampling = pp.Sampling + p.Layout = pp.Layout + p.Channels = pp.Channels +} + +/* + func (app *ProbeIO) MarshallAPI() api.ProbeIO { + return api.ProbeIO{ + Address: app.Address, + Format: app.Format, + Index: app.Index, + Stream: app.Stream, + Language: app.Language, + Type: app.Type, + Codec: app.Codec, + Coder: app.Coder, + Bitrate: json.ToNumber(app.Bitrate), + Duration: json.ToNumber(app.Duration), + FPS: json.ToNumber(app.FPS), + Pixfmt: app.Pixfmt, + Width: app.Width, + Height: app.Height, + Sampling: app.Sampling, + Layout: app.Layout, + Channels: app.Channels, + } + } +*/ type Probe struct { Streams []ProbeIO Log []string } + +func (p *Probe) UnmarshalProber(pp *probe.Probe) { + p.Log = make([]string, len(pp.Log)) + copy(p.Log, pp.Log) + + p.Streams = make([]ProbeIO, len(pp.Streams)) + + for i, s := range pp.Streams { + p.Streams[i].UnmarshalProber(&s) + } +} + +/* +// Unmarshal converts a restreamer Probe to a Probe in API representation +func (app *Probe) MarshallAPI() api.Probe { + p := api.Probe{ + Streams: make([]api.ProbeIO, len(app.Streams)), + Log: make([]string, len(app.Log)), + } + + for i, io := range app.Streams { + p.Streams[i] = io.MarshallAPI() + } + + copy(p.Log, app.Log) + + return p +} +*/ diff --git a/restream/app/process.go b/restream/app/process.go index f48bec38..9db4083e 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/process" ) @@ -272,6 +273,24 @@ type ProcessUsageCPU struct { IsThrottling bool } +func (p *ProcessUsageCPU) UnmarshalParser(pp *parse.UsageCPU) { + p.NCPU = pp.NCPU + p.Average = pp.Average + p.Max = pp.Max + p.Limit = pp.Limit +} + +func (p *ProcessUsageCPU) MarshalParser() parse.UsageCPU { + pp := parse.UsageCPU{ + NCPU: p.NCPU, + Average: p.Average, + Max: p.Max, + Limit: p.Limit, + } + + return pp +} + type ProcessUsageMemory struct { Current uint64 // bytes Average float64 // bytes @@ -279,11 +298,41 @@ type ProcessUsageMemory struct { Limit uint64 // bytes } +func (p *ProcessUsageMemory) UnmarshalParser(pp *parse.UsageMemory) { + p.Average = pp.Average + p.Max = pp.Max + p.Limit = pp.Limit +} + +func (p *ProcessUsageMemory) MarshalParser() parse.UsageMemory { + pp := parse.UsageMemory{ + Average: p.Average, + Max: p.Max, + Limit: p.Limit, + } + + return pp +} + type ProcessUsage struct { CPU ProcessUsageCPU Memory ProcessUsageMemory } +func (p *ProcessUsage) UnmarshalParser(pp *parse.Usage) { + p.CPU.UnmarshalParser(&pp.CPU) + p.Memory.UnmarshalParser(&pp.Memory) +} + +func (p *ProcessUsage) MarshalParser() parse.Usage { + pp := parse.Usage{ + CPU: p.CPU.MarshalParser(), + Memory: p.Memory.MarshalParser(), + } + + return pp +} + type ProcessID struct { ID string Domain string diff --git a/restream/app/progress.go b/restream/app/progress.go index b59b5db8..7982aff8 100644 --- a/restream/app/progress.go +++ b/restream/app/progress.go @@ -1,5 +1,13 @@ package app +import "github.com/datarhei/core/v16/ffmpeg/parse" + +type ProgressIOFramerate struct { + Min float64 + Max float64 + Average float64 +} + type ProgressIO struct { ID string Address string @@ -13,11 +21,7 @@ type ProgressIO struct { Coder string Frame uint64 // counter Keyframe uint64 // counter - Framerate struct { - Min float64 - Max float64 - Average float64 - } + Framerate ProgressIOFramerate FPS float64 // rate, frames per second Packet uint64 // counter PPS float64 // rate, packets per second @@ -40,6 +44,67 @@ type ProgressIO struct { AVstream *AVstream } +func (p *ProgressIO) UnmarshalParser(pp *parse.ProgressIO) { + p.Address = pp.Address + p.Index = pp.Index + p.Stream = pp.Stream + p.Format = pp.Format + p.Type = pp.Type + p.Codec = pp.Codec + p.Coder = pp.Coder + p.Frame = pp.Frame + p.Keyframe = pp.Keyframe + p.Framerate = pp.Framerate + p.FPS = pp.FPS + p.Packet = pp.Packet + p.PPS = pp.PPS + p.Size = pp.Size + p.Bitrate = pp.Bitrate + p.Extradata = pp.Extradata + p.Pixfmt = pp.Pixfmt + p.Quantizer = pp.Quantizer + p.Width = pp.Width + p.Height = pp.Height + p.Sampling = pp.Sampling + p.Layout = pp.Layout + p.Channels = pp.Channels + p.AVstream.UnmarshalParser(pp.AVstream) +} + +func (p *ProgressIO) MarshalParser() parse.ProgressIO { + pp := parse.ProgressIO{ + Address: p.Address, + Index: p.Index, + Stream: p.Stream, + Format: p.Format, + Type: p.Type, + Codec: p.Codec, + Coder: p.Coder, + Frame: p.Frame, + Keyframe: p.Keyframe, + Framerate: p.Framerate, + FPS: p.FPS, + Packet: p.Packet, + PPS: p.PPS, + Size: p.Size, + Bitrate: p.Bitrate, + Extradata: p.Extradata, + Pixfmt: p.Pixfmt, + Quantizer: p.Quantizer, + Width: p.Width, + Height: p.Height, + Sampling: p.Sampling, + Layout: p.Layout, + Channels: p.Channels, + } + + if p.AVstream != nil { + pp.AVstream = p.AVstream.MarshalParser() + } + + return pp +} + type Progress struct { Started bool Input []ProgressIO @@ -58,6 +123,69 @@ type Progress struct { Dup uint64 // counter } +func (p *Progress) UnmarshalParser(pp *parse.Progress) { + p.Started = pp.Started + p.Frame = pp.Frame + p.Packet = pp.Packet + p.FPS = pp.FPS + p.PPS = pp.PPS + p.Quantizer = pp.Quantizer + p.Size = pp.Size + p.Time = pp.Time + p.Bitrate = pp.Bitrate + p.Speed = pp.Speed + p.Drop = pp.Drop + p.Dup = pp.Dup + + p.Input = make([]ProgressIO, len(pp.Input)) + + for i, pinput := range pp.Input { + p.Input[i].UnmarshalParser(&pinput) + } + + p.Output = make([]ProgressIO, len(pp.Output)) + + for i, poutput := range pp.Output { + p.Output[i].UnmarshalParser(&poutput) + } + + p.Mapping.UnmarshalParser(&pp.Mapping) +} + +func (p *Progress) MarshalParser() parse.Progress { + pp := parse.Progress{ + Started: p.Started, + Input: []parse.ProgressIO{}, + Output: []parse.ProgressIO{}, + Mapping: p.Mapping.MarshalParser(), + Frame: p.Frame, + Packet: p.Packet, + FPS: p.FPS, + PPS: p.PPS, + Quantizer: p.Quantizer, + Size: p.Size, + Time: p.Time, + Bitrate: p.Bitrate, + Speed: p.Speed, + Drop: p.Drop, + Dup: p.Dup, + } + + pp.Input = make([]parse.ProgressIO, len(p.Input)) + + for i, pinput := range p.Input { + pp.Input[i] = pinput.MarshalParser() + } + + pp.Output = make([]parse.ProgressIO, len(p.Output)) + + for i, poutput := range p.Output { + pp.Output[i] = poutput.MarshalParser() + } + + return pp +} + type GraphElement struct { Index int Name string @@ -75,6 +203,44 @@ type GraphElement struct { Height uint64 } +func (g *GraphElement) UnmarshalParser(p *parse.GraphElement) { + g.Index = p.Index + g.Name = p.Name + g.Filter = p.Filter + g.DstName = p.DstName + g.DstFilter = p.DstFilter + g.Inpad = p.Inpad + g.Outpad = p.Outpad + g.Timebase = p.Timebase + g.Type = p.Type + g.Format = p.Format + g.Sampling = p.Sampling + g.Layout = p.Layout + g.Width = p.Width + g.Height = p.Height +} + +func (g *GraphElement) MarshalParser() parse.GraphElement { + p := parse.GraphElement{ + Index: g.Index, + Name: g.Name, + Filter: g.Filter, + DstName: g.DstName, + DstFilter: g.DstFilter, + Inpad: g.Inpad, + Outpad: g.Outpad, + Timebase: g.Timebase, + Type: g.Type, + Format: g.Format, + Sampling: g.Sampling, + Layout: g.Layout, + Width: g.Width, + Height: g.Height, + } + + return p +} + type GraphMapping struct { Input int // Index of input stream, negative if output element Output int // Index of output stream, negative if input element @@ -83,7 +249,58 @@ type GraphMapping struct { Copy bool // Whether it's a streamcopy i.e. there's no graph } +func (g *GraphMapping) UnmarshalParser(p *parse.GraphMapping) { + g.Input = p.Input + g.Output = p.Output + g.Index = p.Index + g.Name = p.Name + g.Copy = p.Copy +} + +func (g *GraphMapping) MarshalParser() parse.GraphMapping { + p := parse.GraphMapping{ + Input: g.Input, + Output: g.Output, + Index: g.Index, + Name: g.Name, + Copy: g.Copy, + } + + return p +} + type StreamMapping struct { Graphs []GraphElement Mapping []GraphMapping } + +func (s *StreamMapping) UnmarshalParser(p *parse.StreamMapping) { + s.Graphs = make([]GraphElement, len(p.Graphs)) + + for i, graph := range p.Graphs { + s.Graphs[i].UnmarshalParser(&graph) + } + + s.Mapping = make([]GraphMapping, len(p.Mapping)) + + for i, mapping := range p.Mapping { + s.Mapping[i].UnmarshalParser(&mapping) + } +} + +func (s *StreamMapping) MarshalParser() parse.StreamMapping { + p := parse.StreamMapping{ + Graphs: make([]parse.GraphElement, len(s.Graphs)), + Mapping: make([]parse.GraphMapping, len(s.Mapping)), + } + + for i, graph := range s.Graphs { + p.Graphs[i] = graph.MarshalParser() + } + + for i, mapping := range s.Mapping { + p.Mapping[i] = mapping.MarshalParser() + } + + return p +} diff --git a/restream/app/report.go b/restream/app/report.go index 8bc80848..3bf69500 100644 --- a/restream/app/report.go +++ b/restream/app/report.go @@ -2,6 +2,10 @@ package app import ( "time" + + "github.com/datarhei/core/v16/ffmpeg/parse" + "github.com/datarhei/core/v16/process" + "github.com/datarhei/core/v16/slices" ) type LogLine struct { @@ -9,6 +13,20 @@ type LogLine struct { Data string } +func (l *LogLine) UnmarshalProcess(p *process.Line) { + l.Timestamp = p.Timestamp + l.Data = p.Data +} + +func (l *LogLine) MarshalProcess() process.Line { + p := process.Line{ + Timestamp: l.Timestamp, + Data: l.Data, + } + + return p +} + type ReportEntry struct { CreatedAt time.Time Prelude []string @@ -16,6 +34,32 @@ type ReportEntry struct { Matches []string } +func (r *ReportEntry) UnmarshalParser(p *parse.Report) { + r.CreatedAt = p.CreatedAt + r.Prelude = slices.Copy(p.Prelude) + r.Matches = slices.Copy(p.Matches) + + r.Log = make([]LogLine, len(p.Log)) + for i, line := range p.Log { + r.Log[i].UnmarshalProcess(&line) + } +} + +func (r *ReportEntry) MarshalParser() parse.Report { + p := parse.Report{ + CreatedAt: r.CreatedAt, + Prelude: slices.Copy(r.Prelude), + Matches: slices.Copy(r.Matches), + } + + p.Log = make([]process.Line, len(r.Log)) + for i, line := range r.Log { + p.Log[i] = line.MarshalProcess() + } + + return p +} + type ReportHistoryEntry struct { ReportEntry @@ -25,11 +69,47 @@ type ReportHistoryEntry struct { Usage ProcessUsage } +func (r *ReportHistoryEntry) UnmarshalParser(p *parse.ReportHistoryEntry) { + r.ReportEntry.UnmarshalParser(&p.Report) + + r.ExitedAt = p.ExitedAt + r.ExitState = p.ExitState + r.Usage.UnmarshalParser(&p.Usage) + r.Progress.UnmarshalParser(&p.Progress) +} + +func (r *ReportHistoryEntry) MarshalParser() parse.ReportHistoryEntry { + p := parse.ReportHistoryEntry{ + Report: r.ReportEntry.MarshalParser(), + ExitedAt: r.ExitedAt, + ExitState: r.ExitState, + Progress: r.Progress.MarshalParser(), + Usage: r.Usage.MarshalParser(), + } + + return p +} + type Report struct { ReportEntry History []ReportHistoryEntry } +func (r *Report) UnmarshalParser(p *parse.Report) { + r.ReportEntry.UnmarshalParser(p) +} + +func (r *Report) MarshalParser() (parse.Report, []parse.ReportHistoryEntry) { + report := r.ReportEntry.MarshalParser() + history := make([]parse.ReportHistoryEntry, 0, len(r.History)) + + for _, h := range r.History { + history = append(history, h.MarshalParser()) + } + + return report, history +} + type ReportHistorySearchResult struct { ProcessID string Reference string diff --git a/restream/restream.go b/restream/restream.go index 365959c4..97dab80f 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -13,7 +13,6 @@ import ( "github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/ffmpeg/parse" - "github.com/datarhei/core/v16/ffmpeg/probe" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/io/fs" @@ -55,7 +54,8 @@ type Restreamer interface { ReloadProcess(id app.ProcessID) error // Reload a process GetProcess(id app.ProcessID) (*app.Process, error) // Get a process GetProcessState(id app.ProcessID) (*app.State, error) // Get the state of a process - GetProcessLog(id app.ProcessID) (*app.Report, error) // Get the logs of a process + GetProcessReport(id app.ProcessID) (*app.Report, error) // Get the logs of a process + SetProcessReport(id app.ProcessID, report *app.Report) error // Set the log history of a process SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.ReportHistorySearchResult // Search the log history of all processes GetPlayout(id app.ProcessID, inputid string) (string, error) // Get the URL of the playout API for a process SetProcessMetadata(id app.ProcessID, key string, data interface{}) error // Set metatdata to a process @@ -1211,7 +1211,8 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { t.process.UpdatedAt = time.Now().Unix() // Transfer the report history to the new process - task.parser.TransferReportHistory(t.parser) + history := task.parser.ReportHistory() + t.parser.ImportReportHistory(history) // Transfer the metadata to the new process t.metadata = task.metadata @@ -1532,8 +1533,10 @@ func (r *restream) reloadProcess(tid app.ProcessID) error { r.stopProcess(tid) } + history := t.parser.ReportHistory() + parser := r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference, t.config.LogPatterns) - t.parser.TransferReportHistory(parser) + t.parser.ImportReportHistory(history) t.parser = parser limitMode := "hard" @@ -1616,7 +1619,8 @@ func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds() } - convertProgressFromParser(&state.Progress, task.parser.Progress()) + progress := task.parser.Progress() + state.Progress.UnmarshalParser(&progress) for i, p := range state.Progress.Input { if int(p.Index) >= len(task.process.Config.Input) { @@ -1637,201 +1641,32 @@ func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { return state, nil } -// convertProgressFromParser converts a ffmpeg/parse.Progress type into a restream/app.Progress type. -func convertProgressFromParser(progress *app.Progress, pprogress parse.Progress) { - progress.Started = pprogress.Started - progress.Frame = pprogress.Frame - progress.Packet = pprogress.Packet - progress.FPS = pprogress.FPS - progress.PPS = pprogress.PPS - progress.Quantizer = pprogress.Quantizer - progress.Size = pprogress.Size - progress.Time = pprogress.Time - progress.Bitrate = pprogress.Bitrate - progress.Speed = pprogress.Speed - progress.Drop = pprogress.Drop - progress.Dup = pprogress.Dup - - for _, pinput := range pprogress.Input { - input := app.ProgressIO{ - Address: pinput.Address, - Index: pinput.Index, - Stream: pinput.Stream, - Format: pinput.Format, - Type: pinput.Type, - Codec: pinput.Codec, - Coder: pinput.Coder, - Frame: pinput.Frame, - Keyframe: pinput.Keyframe, - Framerate: pinput.Framerate, - FPS: pinput.FPS, - Packet: pinput.Packet, - PPS: pinput.PPS, - Size: pinput.Size, - Bitrate: pinput.Bitrate, - Extradata: pinput.Extradata, - Pixfmt: pinput.Pixfmt, - Quantizer: pinput.Quantizer, - Width: pinput.Width, - Height: pinput.Height, - Sampling: pinput.Sampling, - Layout: pinput.Layout, - Channels: pinput.Channels, - AVstream: nil, - } - - if pinput.AVstream != nil { - avstream := &app.AVstream{ - Input: app.AVstreamIO{ - State: pinput.AVstream.Input.State, - Packet: pinput.AVstream.Input.Packet, - Time: pinput.AVstream.Input.Time, - Size: pinput.AVstream.Input.Size, - }, - Output: app.AVstreamIO{ - State: pinput.AVstream.Output.State, - Packet: pinput.AVstream.Output.Packet, - Time: pinput.AVstream.Output.Time, - Size: pinput.AVstream.Output.Size, - }, - Aqueue: pinput.AVstream.Aqueue, - Queue: pinput.AVstream.Queue, - Dup: pinput.AVstream.Dup, - Drop: pinput.AVstream.Drop, - Enc: pinput.AVstream.Enc, - Looping: pinput.AVstream.Looping, - LoopingRuntime: pinput.AVstream.LoopingRuntime, - Duplicating: pinput.AVstream.Duplicating, - GOP: pinput.AVstream.GOP, - Mode: pinput.AVstream.Mode, - } - - input.AVstream = avstream - } - - progress.Input = append(progress.Input, input) - } - - for _, poutput := range pprogress.Output { - output := app.ProgressIO{ - Address: poutput.Address, - Index: poutput.Index, - Stream: poutput.Stream, - Format: poutput.Format, - Type: poutput.Type, - Codec: poutput.Codec, - Coder: poutput.Coder, - Frame: poutput.Frame, - Keyframe: poutput.Keyframe, - Framerate: poutput.Framerate, - FPS: poutput.FPS, - Packet: poutput.Packet, - PPS: poutput.PPS, - Size: poutput.Size, - Bitrate: poutput.Bitrate, - Extradata: poutput.Extradata, - Pixfmt: poutput.Pixfmt, - Quantizer: poutput.Quantizer, - Width: poutput.Width, - Height: poutput.Height, - Sampling: poutput.Sampling, - Layout: poutput.Layout, - Channels: poutput.Channels, - AVstream: nil, - } - - progress.Output = append(progress.Output, output) - } - - for _, pgraph := range pprogress.Mapping.Graphs { - graph := app.GraphElement{ - Index: pgraph.Index, - Name: pgraph.Name, - Filter: pgraph.Filter, - DstName: pgraph.DstName, - DstFilter: pgraph.DstFilter, - Inpad: pgraph.Inpad, - Outpad: pgraph.Outpad, - Timebase: pgraph.Timebase, - Type: pgraph.Type, - Format: pgraph.Format, - Sampling: pgraph.Sampling, - Layout: pgraph.Layout, - Width: pgraph.Width, - Height: pgraph.Height, - } - - progress.Mapping.Graphs = append(progress.Mapping.Graphs, graph) - } - - for _, pmapping := range pprogress.Mapping.Mapping { - mapping := app.GraphMapping{ - Input: pmapping.Input, - Output: pmapping.Output, - Index: pmapping.Index, - Name: pmapping.Name, - Copy: pmapping.Copy, - } - - progress.Mapping.Mapping = append(progress.Mapping.Mapping, mapping) - } -} - -func (r *restream) GetProcessLog(id app.ProcessID) (*app.Report, error) { - log := &app.Report{} +func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) { + report := &app.Report{} r.lock.RLock() defer r.lock.RUnlock() task, ok := r.tasks[id] if !ok { - return log, ErrUnknownProcess + return report, ErrUnknownProcess } if !task.valid { - return log, nil + return report, nil } current := task.parser.Report() - log.CreatedAt = current.CreatedAt - log.Prelude = current.Prelude - log.Log = make([]app.LogLine, len(current.Log)) - for i, line := range current.Log { - log.Log[i] = app.LogLine{ - Timestamp: line.Timestamp, - Data: line.Data, - } - } - log.Matches = current.Matches + report.UnmarshalParser(¤t) history := task.parser.ReportHistory() - for _, h := range history { - e := app.ReportHistoryEntry{ - ReportEntry: app.ReportEntry{ - CreatedAt: h.CreatedAt, - Prelude: h.Prelude, - Matches: h.Matches, - }, - ExitedAt: h.ExitedAt, - ExitState: h.ExitState, - Usage: app.ProcessUsage{ - CPU: app.ProcessUsageCPU{ - NCPU: h.Usage.CPU.NCPU, - Average: h.Usage.CPU.Average, - Max: h.Usage.CPU.Max, - Limit: h.Usage.CPU.Limit, - }, - Memory: app.ProcessUsageMemory{ - Average: h.Usage.Memory.Average, - Max: h.Usage.Memory.Max, - Limit: h.Usage.Memory.Limit, - }, - }, - } + report.History = make([]app.ReportHistoryEntry, len(history)) - convertProgressFromParser(&e.Progress, h.Progress) + for i, h := range history { + report.History[i].UnmarshalParser(&h) + e := &report.History[i] for i, p := range e.Progress.Input { if int(p.Index) >= len(task.process.Config.Input) { @@ -1848,19 +1683,29 @@ func (r *restream) GetProcessLog(id app.ProcessID) (*app.Report, error) { e.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID } - - e.ReportEntry.Log = make([]app.LogLine, len(h.Log)) - for i, line := range h.Log { - e.ReportEntry.Log[i] = app.LogLine{ - Timestamp: line.Timestamp, - Data: line.Data, - } - } - - log.History = append(log.History, e) } - return log, nil + return report, nil +} + +func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error { + r.lock.RLock() + defer r.lock.RUnlock() + + task, ok := r.tasks[id] + if !ok { + return ErrUnknownProcess + } + + if !task.valid { + return nil + } + + _, history := report.MarshalParser() + + task.parser.ImportReportHistory(history) + + return nil } func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.ReportHistorySearchResult { @@ -1894,7 +1739,7 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, } func (r *restream) Probe(config *app.Config, timeout time.Duration) app.Probe { - appprobe := app.Probe{} + probe := app.Probe{} config = config.Clone() @@ -1902,16 +1747,16 @@ func (r *restream) Probe(config *app.Config, timeout time.Duration) app.Probe { err := r.resolveAddresses(r.tasks, config) if err != nil { - appprobe.Log = append(appprobe.Log, err.Error()) - return appprobe + probe.Log = append(probe.Log, err.Error()) + return probe } resolveDynamicPlaceholder(config, r.replace) _, err = validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { - appprobe.Log = append(appprobe.Log, err.Error()) - return appprobe + probe.Log = append(probe.Log, err.Error()) + return probe } var command []string @@ -1950,51 +1795,22 @@ func (r *restream) Probe(config *app.Config, timeout time.Duration) app.Probe { formatter := log.NewConsoleFormatter(false) for _, e := range logbuffer.Events() { - appprobe.Log = append(appprobe.Log, strings.TrimSpace(formatter.String(e))) + probe.Log = append(probe.Log, strings.TrimSpace(formatter.String(e))) } - appprobe.Log = append(appprobe.Log, err.Error()) + probe.Log = append(probe.Log, err.Error()) - return appprobe + return probe } ffmpeg.Start() wg.Wait() - convertProbeFromProber(&appprobe, prober.Probe()) + p := prober.Probe() + probe.UnmarshalProber(&p) - return appprobe -} - -// convertProbeFromProber converts a ffmpeg/probe.Probe type into an restream/app.Probe type. -func convertProbeFromProber(appprobe *app.Probe, pprobe probe.Probe) { - appprobe.Log = make([]string, len(pprobe.Log)) - copy(appprobe.Log, pprobe.Log) - - for _, s := range pprobe.Streams { - stream := app.ProbeIO{ - Address: s.Address, - Index: s.Index, - Stream: s.Stream, - Language: s.Language, - Format: s.Format, - Type: s.Type, - Codec: s.Codec, - Coder: s.Coder, - Bitrate: s.Bitrate, - Duration: s.Duration, - Pixfmt: s.Pixfmt, - Width: s.Width, - Height: s.Height, - FPS: s.FPS, - Sampling: s.Sampling, - Layout: s.Layout, - Channels: s.Channels, - } - - appprobe.Streams = append(appprobe.Streams, stream) - } + return probe } func (r *restream) Skills() skills.Skills { diff --git a/restream/restream_test.go b/restream/restream_test.go index 3932b6a7..a4b9da65 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -354,7 +354,7 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { return state.State == "running" }, 10*time.Second, time.Second) - log, err := rs.GetProcessLog(tid1) + log, err := rs.GetProcessReport(tid1) require.NoError(t, err) require.Equal(t, 0, len(log.History)) @@ -379,7 +379,7 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { return state.State == "running" }, 10*time.Second, time.Second) - log, err = rs.GetProcessLog(tid2) + log, err = rs.GetProcessReport(tid2) require.NoError(t, err) require.Equal(t, 1, len(log.History)) @@ -617,7 +617,7 @@ func TestParseProcessPattern(t *testing.T) { rs.StopProcess(tid) - log, err := rs.GetProcessLog(tid) + log, err := rs.GetProcessReport(tid) require.NoError(t, err) require.Equal(t, 1, len(log.History)) @@ -684,10 +684,10 @@ func TestLog(t *testing.T) { rs.AddProcess(process) - _, err = rs.GetProcessLog(app.ProcessID{ID: "foobar"}) + _, err = rs.GetProcessReport(app.ProcessID{ID: "foobar"}) require.Error(t, err) - log, err := rs.GetProcessLog(tid) + log, err := rs.GetProcessReport(tid) require.NoError(t, err) require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Log)) @@ -698,7 +698,7 @@ func TestLog(t *testing.T) { time.Sleep(3 * time.Second) - log, _ = rs.GetProcessLog(tid) + log, _ = rs.GetProcessReport(tid) require.NotEqual(t, 0, len(log.Prelude)) require.NotEqual(t, 0, len(log.Log)) @@ -707,7 +707,7 @@ func TestLog(t *testing.T) { rs.StopProcess(tid) - log, _ = rs.GetProcessLog(tid) + log, _ = rs.GetProcessReport(tid) require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Log)) @@ -728,14 +728,14 @@ func TestLogTransfer(t *testing.T) { time.Sleep(3 * time.Second) rs.StopProcess(tid) - log, _ := rs.GetProcessLog(tid) + log, _ := rs.GetProcessReport(tid) require.Equal(t, 1, len(log.History)) err = rs.UpdateProcess(tid, process) require.NoError(t, err) - log, _ = rs.GetProcessLog(tid) + log, _ = rs.GetProcessReport(tid) require.Equal(t, 1, len(log.History)) } @@ -1493,7 +1493,7 @@ func TestProcessLogPattern(t *testing.T) { time.Sleep(5 * time.Second) - log, err := rs.GetProcessLog(tid) + log, err := rs.GetProcessReport(tid) require.NoError(t, err) require.Equal(t, 1, len(log.Matches))