Add process report API

This commit is contained in:
Ingo Oppermann
2023-03-02 11:15:57 +01:00
parent cceb39192a
commit 0dedcddece
10 changed files with 499 additions and 96 deletions

View File

@@ -1664,14 +1664,14 @@ const docTemplate = `{
"ApiKeyAuth": [] "ApiKeyAuth": []
} }
], ],
"description": "Get the logs and the log history of a process.", "description": "Get the log history entry of a process at a certain time.",
"produces": [ "produces": [
"application/json" "application/json"
], ],
"tags": [ "tags": [
"v16.?.?" "v16.?.?"
], ],
"summary": "Get the logs of a process", "summary": "Get the log history entry of a process",
"operationId": "process-3-get-report-at", "operationId": "process-3-get-report-at",
"parameters": [ "parameters": [
{ {
@@ -1680,13 +1680,20 @@ const docTemplate = `{
"name": "id", "name": "id",
"in": "path", "in": "path",
"required": true "required": true
},
{
"type": "integer",
"description": "Unix timestamp",
"name": "at",
"in": "path",
"required": true
} }
], ],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
"schema": { "schema": {
"$ref": "#/definitions/api.ProcessReport" "$ref": "#/definitions/api.ProcessReportHistoryEntry"
} }
}, },
"400": { "400": {
@@ -1751,6 +1758,73 @@ const docTemplate = `{
} }
} }
}, },
"/api/v3/report/process": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Search log history of all processes",
"operationId": "process-3-search-report-history",
"parameters": [
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "State of a process, leave empty for any",
"name": "state",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any",
"name": "from",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any",
"name": "to",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessReportSearchResult"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/rtmp": { "/api/v3/rtmp": {
"get": { "get": {
"security": [ "security": [
@@ -3254,6 +3328,24 @@ const docTemplate = `{
} }
} }
}, },
"api.ProcessReportSearchResult": {
"type": "object",
"properties": {
"created_at": {
"type": "integer",
"format": "int64"
},
"exit_state": {
"type": "string"
},
"id": {
"type": "string"
},
"reference": {
"type": "string"
}
}
},
"api.ProcessState": { "api.ProcessState": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -1657,14 +1657,14 @@
"ApiKeyAuth": [] "ApiKeyAuth": []
} }
], ],
"description": "Get the logs and the log history of a process.", "description": "Get the log history entry of a process at a certain time.",
"produces": [ "produces": [
"application/json" "application/json"
], ],
"tags": [ "tags": [
"v16.?.?" "v16.?.?"
], ],
"summary": "Get the logs of a process", "summary": "Get the log history entry of a process",
"operationId": "process-3-get-report-at", "operationId": "process-3-get-report-at",
"parameters": [ "parameters": [
{ {
@@ -1673,13 +1673,20 @@
"name": "id", "name": "id",
"in": "path", "in": "path",
"required": true "required": true
},
{
"type": "integer",
"description": "Unix timestamp",
"name": "at",
"in": "path",
"required": true
} }
], ],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
"schema": { "schema": {
"$ref": "#/definitions/api.ProcessReport" "$ref": "#/definitions/api.ProcessReportHistoryEntry"
} }
}, },
"400": { "400": {
@@ -1744,6 +1751,73 @@
} }
} }
}, },
"/api/v3/report/process": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Search log history of all processes",
"operationId": "process-3-search-report-history",
"parameters": [
{
"type": "string",
"description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern.",
"name": "idpattern",
"in": "query"
},
{
"type": "string",
"description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern.",
"name": "refpattern",
"in": "query"
},
{
"type": "string",
"description": "State of a process, leave empty for any",
"name": "state",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any",
"name": "from",
"in": "query"
},
{
"type": "integer",
"description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any",
"name": "to",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessReportSearchResult"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/rtmp": { "/api/v3/rtmp": {
"get": { "get": {
"security": [ "security": [
@@ -3247,6 +3321,24 @@
} }
} }
}, },
"api.ProcessReportSearchResult": {
"type": "object",
"properties": {
"created_at": {
"type": "integer",
"format": "int64"
},
"exit_state": {
"type": "string"
},
"id": {
"type": "string"
},
"reference": {
"type": "string"
}
}
},
"api.ProcessState": { "api.ProcessState": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -824,6 +824,18 @@ definitions:
progress: progress:
$ref: '#/definitions/api.Progress' $ref: '#/definitions/api.Progress'
type: object type: object
api.ProcessReportSearchResult:
properties:
created_at:
format: int64
type: integer
exit_state:
type: string
id:
type: string
reference:
type: string
type: object
api.ProcessState: api.ProcessState:
properties: properties:
command: command:
@@ -3010,7 +3022,7 @@ paths:
- v16.7.2 - v16.7.2
/api/v3/process/{id}/report/{at}: /api/v3/process/{id}/report/{at}:
get: get:
description: Get the logs and the log history of a process. description: Get the log history entry of a process at a certain time.
operationId: process-3-get-report-at operationId: process-3-get-report-at
parameters: parameters:
- description: Process ID - description: Process ID
@@ -3018,13 +3030,18 @@ paths:
name: id name: id
required: true required: true
type: string type: string
- description: Unix timestamp
in: path
name: at
required: true
type: integer
produces: produces:
- application/json - application/json
responses: responses:
"200": "200":
description: OK description: OK
schema: schema:
$ref: '#/definitions/api.ProcessReport' $ref: '#/definitions/api.ProcessReportHistoryEntry'
"400": "400":
description: Bad Request description: Bad Request
schema: schema:
@@ -3035,7 +3052,7 @@ paths:
$ref: '#/definitions/api.Error' $ref: '#/definitions/api.Error'
security: security:
- ApiKeyAuth: [] - ApiKeyAuth: []
summary: Get the logs of a process summary: Get the log history entry of a process
tags: tags:
- v16.?.? - v16.?.?
/api/v3/process/{id}/state: /api/v3/process/{id}/state:
@@ -3068,6 +3085,54 @@ paths:
summary: Get the state of a process summary: Get the state of a process
tags: tags:
- v16.7.2 - v16.7.2
/api/v3/report/process:
get:
description: Search log history of all processes by providing patterns for process
IDs and references, a state and a time range. All are optional.
operationId: process-3-search-report-history
parameters:
- description: Glob pattern for process IDs. If empty all IDs will be returned.
Intersected with results from refpattern.
in: query
name: idpattern
type: string
- description: Glob pattern for process references. If empty all IDs will be
returned. Intersected with results from idpattern.
in: query
name: refpattern
type: string
- description: State of a process, leave empty for any
in: query
name: state
type: string
- description: Search range of when the report has been created, older than
this value. Unix timestamp, leave empty for any
in: query
name: from
type: integer
- description: Search range of when the report has been created, younger than
this value. Unix timestamp, leave empty for any
in: query
name: to
type: integer
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.ProcessReportSearchResult'
type: array
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Search log history of all processes
tags:
- v16.?.?
/api/v3/rtmp: /api/v3/rtmp:
get: get:
description: List all currently publishing RTMP streams. description: List all currently publishing RTMP streams.

View File

@@ -33,6 +33,10 @@ type Parser interface {
// ReportHistory returns an array of previews logs // ReportHistory returns an array of previews logs
ReportHistory() []ReportHistoryEntry ReportHistory() []ReportHistoryEntry
// SearchReportHistory returns a list of CreatedAt dates of reports that match the
// provided state and time range.
SearchReportHistory(state string, from, to *time.Time) []ReportHistorySearchResult
// LastLogline returns the last parsed log line // LastLogline returns the last parsed log line
LastLogline() string LastLogline() string
} }
@@ -732,6 +736,46 @@ type ReportHistoryEntry struct {
Progress Progress Progress Progress
} }
type ReportHistorySearchResult struct {
CreatedAt time.Time
ExitState string
}
func (p *parser) SearchReportHistory(state string, from, to *time.Time) []ReportHistorySearchResult {
result := []ReportHistorySearchResult{}
p.logHistory.Do(func(l interface{}) {
if l == nil {
return
}
e := l.(ReportHistoryEntry)
if len(state) != 0 && state != e.ExitState {
return
}
if from != nil {
if e.CreatedAt.Before(*from) {
return
}
}
if to != nil {
if e.CreatedAt.After(*to) {
return
}
}
result = append(result, ReportHistorySearchResult{
CreatedAt: e.CreatedAt,
ExitState: e.ExitState,
})
})
return result
}
func (p *parser) storeReportHistory(state string) { func (p *parser) storeReportHistory(state string) {
if p.logHistory == nil { if p.logHistory == nil {
return return

View File

@@ -2,7 +2,6 @@ package api
import ( import (
"encoding/json" "encoding/json"
"strconv"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
"github.com/lithammer/shortuuid/v4" "github.com/lithammer/shortuuid/v4"
@@ -186,63 +185,6 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
} }
} }
// ProcessReportEntry represents the logs of a run of a restream process
type ProcessReportEntry struct {
CreatedAt int64 `json:"created_at" format:"int64"`
Prelude []string `json:"prelude"`
Log [][2]string `json:"log"`
}
type ProcessReportHistoryEntry struct {
ProcessReportEntry
ExitState string `json:"exit_state"`
Progress Progress `json:"progress"`
}
// ProcessReport represents the current log and the logs of previous runs of a restream process
type ProcessReport struct {
ProcessReportEntry
History []ProcessReportHistoryEntry `json:"history"`
}
// Unmarshal converts a restream log to a report
func (report *ProcessReport) Unmarshal(l *app.Log) {
if l == nil {
return
}
report.CreatedAt = l.CreatedAt.Unix()
report.Prelude = l.Prelude
report.Log = make([][2]string, len(l.Log))
for i, line := range l.Log {
report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
report.Log[i][1] = line.Data
}
report.History = []ProcessReportHistoryEntry{}
for _, h := range l.History {
he := ProcessReportHistoryEntry{
ProcessReportEntry: ProcessReportEntry{
CreatedAt: h.CreatedAt.Unix(),
Prelude: h.Prelude,
Log: make([][2]string, len(h.Log)),
},
ExitState: h.ExitState,
}
he.Progress.Unmarshal(&h.Progress)
for i, line := range h.Log {
he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
he.ProcessReportEntry.Log[i][1] = line.Data
}
report.History = append(report.History, he)
}
}
// ProcessState represents the current state of an ffmpeg process // ProcessState represents the current state of an ffmpeg process
type ProcessState struct { type ProcessState struct {
Order string `json:"order" jsonschema:"enum=start,enum=stop"` Order string `json:"order" jsonschema:"enum=start,enum=stop"`

71
http/api/report.go Normal file
View File

@@ -0,0 +1,71 @@
package api
import (
"strconv"
"github.com/datarhei/core/v16/restream/app"
)
// ProcessReportEntry represents the logs of a run of a restream process
type ProcessReportEntry struct {
CreatedAt int64 `json:"created_at" format:"int64"`
Prelude []string `json:"prelude"`
Log [][2]string `json:"log"`
}
type ProcessReportHistoryEntry struct {
ProcessReportEntry
ExitState string `json:"exit_state"`
Progress Progress `json:"progress"`
}
// ProcessReport represents the current log and the logs of previous runs of a restream process
type ProcessReport struct {
ProcessReportEntry
History []ProcessReportHistoryEntry `json:"history"`
}
// Unmarshal converts a restream log to a report
func (report *ProcessReport) Unmarshal(l *app.Log) {
if l == nil {
return
}
report.CreatedAt = l.CreatedAt.Unix()
report.Prelude = l.Prelude
report.Log = make([][2]string, len(l.Log))
for i, line := range l.Log {
report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
report.Log[i][1] = line.Data
}
report.History = []ProcessReportHistoryEntry{}
for _, h := range l.History {
he := ProcessReportHistoryEntry{
ProcessReportEntry: ProcessReportEntry{
CreatedAt: h.CreatedAt.Unix(),
Prelude: h.Prelude,
Log: make([][2]string, len(h.Log)),
},
ExitState: h.ExitState,
}
he.Progress.Unmarshal(&h.Progress)
for i, line := range h.Log {
he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10)
he.ProcessReportEntry.Log[i][1] = line.Data
}
report.History = append(report.History, he)
}
}
type ProcessReportSearchResult struct {
ProcessID string `json:"id"`
Reference string `json:"reference"`
ExitState string `json:"exit_state"`
CreatedAt int64 `json:"created_at" format:"int64"`
}

View File

@@ -4,6 +4,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/http/handler/util"
@@ -337,14 +338,15 @@ func (h *RestreamHandler) GetReport(c echo.Context) error {
return c.JSON(http.StatusOK, report) return c.JSON(http.StatusOK, report)
} }
// GetReport return the current log and the log history of a process // GetReportAt return the loh history entry of a process
// @Summary Get the logs of a process // @Summary Get the log history entry of a process
// @Description Get the logs and the log history of a process. // @Description Get the log history entry of a process at a certain time.
// @Tags v16.?.? // @Tags v16.?.?
// @ID process-3-get-report-at // @ID process-3-get-report-at
// @Produce json // @Produce json
// @Param id path string true "Process ID" // @Param id path string true "Process ID"
// @Success 200 {object} api.ProcessReport // @Param at path integer true "Unix timestamp"
// @Success 200 {object} api.ProcessReportHistoryEntry
// @Failure 404 {object} api.Error // @Failure 404 {object} api.Error
// @Failure 400 {object} api.Error // @Failure 400 {object} api.Error
// @Security ApiKeyAuth // @Security ApiKeyAuth
@@ -373,6 +375,61 @@ func (h *RestreamHandler) GetReportAt(c echo.Context) error {
return api.Err(http.StatusNotFound, "Unknown process report date") return api.Err(http.StatusNotFound, "Unknown process report date")
} }
// SearchReportHistory returns a list of matching report references
// @Summary Search log history of all processes
// @Description Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional.
// @Tags v16.?.?
// @ID process-3-search-report-history
// @Produce json
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern."
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern."
// @Param state query string false "State of a process, leave empty for any"
// @Param from query int64 false "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any"
// @Param to query int64 false "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any"
// @Success 200 {array} api.ProcessReportSearchResult
// @Failure 400 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/report/process [get]
func (h *RestreamHandler) SearchReportHistory(c echo.Context) error {
idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "")
state := util.DefaultQuery(c, "state", "")
fromUnix := util.DefaultQuery(c, "from", "")
toUnix := util.DefaultQuery(c, "to", "")
var from, to *time.Time = nil, nil
if len(fromUnix) != 0 {
if x, err := strconv.ParseInt(fromUnix, 10, 64); err != nil {
return api.Err(http.StatusBadRequest, "Invalid search range", "%s", err)
} else {
t := time.Unix(x, 0)
from = &t
}
}
if len(toUnix) != 0 {
if x, err := strconv.ParseInt(toUnix, 10, 64); err != nil {
return api.Err(http.StatusBadRequest, "Invalid search range", "%s", err)
} else {
t := time.Unix(x, 0)
to = &t
}
}
result := h.restream.SearchProcessLogHistory(idpattern, refpattern, state, from, to)
response := make([]api.ProcessReportSearchResult, len(result))
for i, b := range result {
response[i].ProcessID = b.ProcessID
response[i].Reference = b.Reference
response[i].ExitState = b.ExitState
response[i].CreatedAt = b.CreatedAt.Unix()
}
return c.JSON(http.StatusOK, response)
}
// Probe probes a process // Probe probes a process
// @Summary Probe a process // @Summary Probe a process
// @Description Probe an existing process to get a detailed stream information on the inputs. // @Description Probe an existing process to get a detailed stream information on the inputs.

View File

@@ -587,6 +587,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.playout.SetStream) v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.playout.SetStream)
} }
} }
// v3 Report
v3.GET("/report/process", s.v3handler.restream.SearchReportHistory)
} }
// v3 Filesystems // v3 Filesystems

View File

@@ -26,3 +26,10 @@ type Log struct {
LogEntry LogEntry
History []LogHistoryEntry History []LogHistoryEntry
} }
type LogHistorySearchResult struct {
ProcessID string
Reference string
ExitState string
CreatedAt time.Time
}

View File

@@ -47,6 +47,7 @@ type Restreamer interface {
GetProcess(id string) (*app.Process, error) // Get a process GetProcess(id string) (*app.Process, error) // Get a process
GetProcessState(id string) (*app.State, error) // Get the state of a process GetProcessState(id string) (*app.State, error) // Get the state of a process
GetProcessLog(id string) (*app.Log, error) // Get the logs of a process GetProcessLog(id string) (*app.Log, error) // Get the logs of a process
SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult // Search the log history of all processes
GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process
Probe(id string) app.Probe // Probe a process Probe(id string) app.Probe // Probe a process
ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout
@@ -1448,6 +1449,35 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
return log, nil return log, nil
} }
func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult {
r.lock.RLock()
defer r.lock.RUnlock()
result := []app.LogHistorySearchResult{}
ids := r.GetProcessIDs(idpattern, refpattern)
for _, id := range ids {
task, ok := r.tasks[id]
if !ok {
continue
}
presult := task.parser.SearchReportHistory(state, from, to)
for _, f := range presult {
result = append(result, app.LogHistorySearchResult{
ProcessID: task.id,
Reference: task.reference,
ExitState: f.ExitState,
CreatedAt: f.CreatedAt,
})
}
}
return result
}
func (r *restream) Probe(id string) app.Probe { func (r *restream) Probe(id string) app.Probe {
return r.ProbeWithTimeout(id, 20*time.Second) return r.ProbeWithTimeout(id, 20*time.Second)
} }