diff --git a/docs/docs.go b/docs/docs.go index fbf7f217..cb30b0f3 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1664,14 +1664,14 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Get the logs and the log history of a process.", + "description": "Get the log history entry of a process at a certain time.", "produces": [ "application/json" ], "tags": [ "v16.?.?" ], - "summary": "Get the logs of a process", + "summary": "Get the log history entry of a process", "operationId": "process-3-get-report-at", "parameters": [ { @@ -1680,13 +1680,20 @@ const docTemplate = `{ "name": "id", "in": "path", "required": true + }, + { + "type": "integer", + "description": "Unix timestamp", + "name": "at", + "in": "path", + "required": true } ], "responses": { "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.ProcessReport" + "$ref": "#/definitions/api.ProcessReportHistoryEntry" } }, "400": { @@ -1751,6 +1758,73 @@ const docTemplate = `{ } } }, + "/api/v3/report/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Search log history of all processes", + "operationId": "process-3-search-report-history", + "parameters": [ + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "State of a process, leave empty for any", + "name": "state", + "in": "query" + }, + { + "type": "integer", + "description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any", + "name": "from", + "in": "query" + }, + { + "type": "integer", + "description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any", + "name": "to", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessReportSearchResult" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/rtmp": { "get": { "security": [ @@ -3254,6 +3328,24 @@ const docTemplate = `{ } } }, + "api.ProcessReportSearchResult": { + "type": "object", + "properties": { + "created_at": { + "type": "integer", + "format": "int64" + }, + "exit_state": { + "type": "string" + }, + "id": { + "type": "string" + }, + "reference": { + "type": "string" + } + } + }, "api.ProcessState": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index c1bb0ae7..2b8b4b3f 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1657,14 +1657,14 @@ "ApiKeyAuth": [] } ], - "description": "Get the logs and the log history of a process.", + "description": "Get the log history entry of a process at a certain time.", "produces": [ "application/json" ], "tags": [ "v16.?.?" ], - "summary": "Get the logs of a process", + "summary": "Get the log history entry of a process", "operationId": "process-3-get-report-at", "parameters": [ { @@ -1673,13 +1673,20 @@ "name": "id", "in": "path", "required": true + }, + { + "type": "integer", + "description": "Unix timestamp", + "name": "at", + "in": "path", + "required": true } ], "responses": { "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.ProcessReport" + "$ref": "#/definitions/api.ProcessReportHistoryEntry" } }, "400": { @@ -1744,6 +1751,73 @@ } } }, + "/api/v3/report/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Search log history of all processes", + "operationId": "process-3-search-report-history", + "parameters": [ + { + "type": "string", + "description": "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern.", + "name": "idpattern", + "in": "query" + }, + { + "type": "string", + "description": "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern.", + "name": "refpattern", + "in": "query" + }, + { + "type": "string", + "description": "State of a process, leave empty for any", + "name": "state", + "in": "query" + }, + { + "type": "integer", + "description": "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any", + "name": "from", + "in": "query" + }, + { + "type": "integer", + "description": "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any", + "name": "to", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessReportSearchResult" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/rtmp": { "get": { "security": [ @@ -3247,6 +3321,24 @@ } } }, + "api.ProcessReportSearchResult": { + "type": "object", + "properties": { + "created_at": { + "type": "integer", + "format": "int64" + }, + "exit_state": { + "type": "string" + }, + "id": { + "type": "string" + }, + "reference": { + "type": "string" + } + } + }, "api.ProcessState": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index cb179598..8443c6c9 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -824,6 +824,18 @@ definitions: progress: $ref: '#/definitions/api.Progress' type: object + api.ProcessReportSearchResult: + properties: + created_at: + format: int64 + type: integer + exit_state: + type: string + id: + type: string + reference: + type: string + type: object api.ProcessState: properties: command: @@ -3010,7 +3022,7 @@ paths: - v16.7.2 /api/v3/process/{id}/report/{at}: get: - description: Get the logs and the log history of a process. + description: Get the log history entry of a process at a certain time. operationId: process-3-get-report-at parameters: - description: Process ID @@ -3018,13 +3030,18 @@ paths: name: id required: true type: string + - description: Unix timestamp + in: path + name: at + required: true + type: integer produces: - application/json responses: "200": description: OK schema: - $ref: '#/definitions/api.ProcessReport' + $ref: '#/definitions/api.ProcessReportHistoryEntry' "400": description: Bad Request schema: @@ -3035,7 +3052,7 @@ paths: $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] - summary: Get the logs of a process + summary: Get the log history entry of a process tags: - v16.?.? /api/v3/process/{id}/state: @@ -3068,6 +3085,54 @@ paths: summary: Get the state of a process tags: - v16.7.2 + /api/v3/report/process: + get: + description: Search log history of all processes by providing patterns for process + IDs and references, a state and a time range. All are optional. + operationId: process-3-search-report-history + parameters: + - description: Glob pattern for process IDs. If empty all IDs will be returned. + Intersected with results from refpattern. + in: query + name: idpattern + type: string + - description: Glob pattern for process references. If empty all IDs will be + returned. Intersected with results from idpattern. + in: query + name: refpattern + type: string + - description: State of a process, leave empty for any + in: query + name: state + type: string + - description: Search range of when the report has been created, older than + this value. Unix timestamp, leave empty for any + in: query + name: from + type: integer + - description: Search range of when the report has been created, younger than + this value. Unix timestamp, leave empty for any + in: query + name: to + type: integer + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/api.ProcessReportSearchResult' + type: array + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Search log history of all processes + tags: + - v16.?.? /api/v3/rtmp: get: description: List all currently publishing RTMP streams. diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 4e35b5d5..bdf664b4 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -33,6 +33,10 @@ type Parser interface { // ReportHistory returns an array of previews logs ReportHistory() []ReportHistoryEntry + // SearchReportHistory returns a list of CreatedAt dates of reports that match the + // provided state and time range. + SearchReportHistory(state string, from, to *time.Time) []ReportHistorySearchResult + // LastLogline returns the last parsed log line LastLogline() string } @@ -732,6 +736,46 @@ type ReportHistoryEntry struct { Progress Progress } +type ReportHistorySearchResult struct { + CreatedAt time.Time + ExitState string +} + +func (p *parser) SearchReportHistory(state string, from, to *time.Time) []ReportHistorySearchResult { + result := []ReportHistorySearchResult{} + + p.logHistory.Do(func(l interface{}) { + if l == nil { + return + } + + e := l.(ReportHistoryEntry) + + if len(state) != 0 && state != e.ExitState { + return + } + + if from != nil { + if e.CreatedAt.Before(*from) { + return + } + } + + if to != nil { + if e.CreatedAt.After(*to) { + return + } + } + + result = append(result, ReportHistorySearchResult{ + CreatedAt: e.CreatedAt, + ExitState: e.ExitState, + }) + }) + + return result +} + func (p *parser) storeReportHistory(state string) { if p.logHistory == nil { return diff --git a/http/api/process.go b/http/api/process.go index e564cc0b..c627a0c6 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -2,7 +2,6 @@ package api import ( "encoding/json" - "strconv" "github.com/datarhei/core/v16/restream/app" "github.com/lithammer/shortuuid/v4" @@ -186,63 +185,6 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { } } -// ProcessReportEntry represents the logs of a run of a restream process -type ProcessReportEntry struct { - CreatedAt int64 `json:"created_at" format:"int64"` - Prelude []string `json:"prelude"` - Log [][2]string `json:"log"` -} - -type ProcessReportHistoryEntry struct { - ProcessReportEntry - - ExitState string `json:"exit_state"` - Progress Progress `json:"progress"` -} - -// ProcessReport represents the current log and the logs of previous runs of a restream process -type ProcessReport struct { - ProcessReportEntry - History []ProcessReportHistoryEntry `json:"history"` -} - -// Unmarshal converts a restream log to a report -func (report *ProcessReport) Unmarshal(l *app.Log) { - if l == nil { - return - } - - report.CreatedAt = l.CreatedAt.Unix() - report.Prelude = l.Prelude - report.Log = make([][2]string, len(l.Log)) - for i, line := range l.Log { - report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) - report.Log[i][1] = line.Data - } - - report.History = []ProcessReportHistoryEntry{} - - for _, h := range l.History { - he := ProcessReportHistoryEntry{ - ProcessReportEntry: ProcessReportEntry{ - CreatedAt: h.CreatedAt.Unix(), - Prelude: h.Prelude, - Log: make([][2]string, len(h.Log)), - }, - ExitState: h.ExitState, - } - - he.Progress.Unmarshal(&h.Progress) - - for i, line := range h.Log { - he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) - he.ProcessReportEntry.Log[i][1] = line.Data - } - - report.History = append(report.History, he) - } -} - // ProcessState represents the current state of an ffmpeg process type ProcessState struct { Order string `json:"order" jsonschema:"enum=start,enum=stop"` diff --git a/http/api/report.go b/http/api/report.go new file mode 100644 index 00000000..a32cc250 --- /dev/null +++ b/http/api/report.go @@ -0,0 +1,71 @@ +package api + +import ( + "strconv" + + "github.com/datarhei/core/v16/restream/app" +) + +// ProcessReportEntry represents the logs of a run of a restream process +type ProcessReportEntry struct { + CreatedAt int64 `json:"created_at" format:"int64"` + Prelude []string `json:"prelude"` + Log [][2]string `json:"log"` +} + +type ProcessReportHistoryEntry struct { + ProcessReportEntry + + ExitState string `json:"exit_state"` + Progress Progress `json:"progress"` +} + +// ProcessReport represents the current log and the logs of previous runs of a restream process +type ProcessReport struct { + ProcessReportEntry + History []ProcessReportHistoryEntry `json:"history"` +} + +// Unmarshal converts a restream log to a report +func (report *ProcessReport) Unmarshal(l *app.Log) { + if l == nil { + return + } + + report.CreatedAt = l.CreatedAt.Unix() + report.Prelude = l.Prelude + report.Log = make([][2]string, len(l.Log)) + for i, line := range l.Log { + report.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) + report.Log[i][1] = line.Data + } + + report.History = []ProcessReportHistoryEntry{} + + for _, h := range l.History { + he := ProcessReportHistoryEntry{ + ProcessReportEntry: ProcessReportEntry{ + CreatedAt: h.CreatedAt.Unix(), + Prelude: h.Prelude, + Log: make([][2]string, len(h.Log)), + }, + ExitState: h.ExitState, + } + + he.Progress.Unmarshal(&h.Progress) + + for i, line := range h.Log { + he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) + he.ProcessReportEntry.Log[i][1] = line.Data + } + + report.History = append(report.History, he) + } +} + +type ProcessReportSearchResult struct { + ProcessID string `json:"id"` + Reference string `json:"reference"` + ExitState string `json:"exit_state"` + CreatedAt int64 `json:"created_at" format:"int64"` +} diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 3f743f92..2c4c129a 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -4,6 +4,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" @@ -337,14 +338,15 @@ func (h *RestreamHandler) GetReport(c echo.Context) error { return c.JSON(http.StatusOK, report) } -// GetReport return the current log and the log history of a process -// @Summary Get the logs of a process -// @Description Get the logs and the log history of a process. +// GetReportAt return the loh history entry of a process +// @Summary Get the log history entry of a process +// @Description Get the log history entry of a process at a certain time. // @Tags v16.?.? // @ID process-3-get-report-at // @Produce json // @Param id path string true "Process ID" -// @Success 200 {object} api.ProcessReport +// @Param at path integer true "Unix timestamp" +// @Success 200 {object} api.ProcessReportHistoryEntry // @Failure 404 {object} api.Error // @Failure 400 {object} api.Error // @Security ApiKeyAuth @@ -373,6 +375,61 @@ func (h *RestreamHandler) GetReportAt(c echo.Context) error { return api.Err(http.StatusNotFound, "Unknown process report date") } +// SearchReportHistory returns a list of matching report references +// @Summary Search log history of all processes +// @Description Search log history of all processes by providing patterns for process IDs and references, a state and a time range. All are optional. +// @Tags v16.?.? +// @ID process-3-search-report-history +// @Produce json +// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern." +// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern." +// @Param state query string false "State of a process, leave empty for any" +// @Param from query int64 false "Search range of when the report has been created, older than this value. Unix timestamp, leave empty for any" +// @Param to query int64 false "Search range of when the report has been created, younger than this value. Unix timestamp, leave empty for any" +// @Success 200 {array} api.ProcessReportSearchResult +// @Failure 400 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/report/process [get] +func (h *RestreamHandler) SearchReportHistory(c echo.Context) error { + idpattern := util.DefaultQuery(c, "idpattern", "") + refpattern := util.DefaultQuery(c, "refpattern", "") + state := util.DefaultQuery(c, "state", "") + fromUnix := util.DefaultQuery(c, "from", "") + toUnix := util.DefaultQuery(c, "to", "") + + var from, to *time.Time = nil, nil + + if len(fromUnix) != 0 { + if x, err := strconv.ParseInt(fromUnix, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "Invalid search range", "%s", err) + } else { + t := time.Unix(x, 0) + from = &t + } + } + + if len(toUnix) != 0 { + if x, err := strconv.ParseInt(toUnix, 10, 64); err != nil { + return api.Err(http.StatusBadRequest, "Invalid search range", "%s", err) + } else { + t := time.Unix(x, 0) + to = &t + } + } + + result := h.restream.SearchProcessLogHistory(idpattern, refpattern, state, from, to) + + response := make([]api.ProcessReportSearchResult, len(result)) + for i, b := range result { + response[i].ProcessID = b.ProcessID + response[i].Reference = b.Reference + response[i].ExitState = b.ExitState + response[i].CreatedAt = b.CreatedAt.Unix() + } + + return c.JSON(http.StatusOK, response) +} + // Probe probes a process // @Summary Probe a process // @Description Probe an existing process to get a detailed stream information on the inputs. diff --git a/http/server.go b/http/server.go index 7ce50f6e..321605ba 100644 --- a/http/server.go +++ b/http/server.go @@ -587,6 +587,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.playout.SetStream) } } + + // v3 Report + v3.GET("/report/process", s.v3handler.restream.SearchReportHistory) } // v3 Filesystems diff --git a/restream/app/log.go b/restream/app/log.go index 43640b6c..b5e756cc 100644 --- a/restream/app/log.go +++ b/restream/app/log.go @@ -26,3 +26,10 @@ type Log struct { LogEntry History []LogHistoryEntry } + +type LogHistorySearchResult struct { + ProcessID string + Reference string + ExitState string + CreatedAt time.Time +} diff --git a/restream/restream.go b/restream/restream.go index 9e33c45f..59b1b2ab 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -31,31 +31,32 @@ import ( // The Restreamer interface type Restreamer interface { - ID() string // ID of this instance - Name() string // Arbitrary name of this instance - CreatedAt() time.Time // Time of when this instance has been created - Start() // Start all processes that have a "start" order - Stop() // Stop all running process but keep their "start" order - AddProcess(config *app.Config) error // Add a new process - GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference - DeleteProcess(id string) error // Delete a process - UpdateProcess(id string, config *app.Config) error // Update a process - StartProcess(id string) error // Start a process - StopProcess(id string) error // Stop a process - RestartProcess(id string) error // Restart a process - ReloadProcess(id string) error // Reload a process - GetProcess(id string) (*app.Process, error) // Get a process - GetProcessState(id string) (*app.State, error) // Get the state of a process - GetProcessLog(id string) (*app.Log, error) // Get the logs of a process - GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process - Probe(id string) app.Probe // Probe a process - ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout - Skills() skills.Skills // Get the ffmpeg skills - ReloadSkills() error // Reload the ffmpeg skills - SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process - GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process - SetMetadata(key string, data interface{}) error // Set general metadata - GetMetadata(key string) (interface{}, error) // Get previously set general metadata + ID() string // ID of this instance + Name() string // Arbitrary name of this instance + CreatedAt() time.Time // Time of when this instance has been created + Start() // Start all processes that have a "start" order + Stop() // Stop all running process but keep their "start" order + AddProcess(config *app.Config) error // Add a new process + GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference + DeleteProcess(id string) error // Delete a process + UpdateProcess(id string, config *app.Config) error // Update a process + StartProcess(id string) error // Start a process + StopProcess(id string) error // Stop a process + RestartProcess(id string) error // Restart a process + ReloadProcess(id string) error // Reload a process + GetProcess(id string) (*app.Process, error) // Get a process + GetProcessState(id string) (*app.State, error) // Get the state of a process + GetProcessLog(id string) (*app.Log, error) // Get the logs of a process + SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult // Search the log history of all processes + GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process + Probe(id string) app.Probe // Probe a process + ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout + Skills() skills.Skills // Get the ffmpeg skills + ReloadSkills() error // Reload the ffmpeg skills + SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process + GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process + SetMetadata(key string, data interface{}) error // Set general metadata + GetMetadata(key string) (interface{}, error) // Get previously set general metadata } // Config is the required configuration for a new restreamer instance. @@ -1448,6 +1449,35 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { return log, nil } +func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult { + r.lock.RLock() + defer r.lock.RUnlock() + + result := []app.LogHistorySearchResult{} + + ids := r.GetProcessIDs(idpattern, refpattern) + + for _, id := range ids { + task, ok := r.tasks[id] + if !ok { + continue + } + + presult := task.parser.SearchReportHistory(state, from, to) + + for _, f := range presult { + result = append(result, app.LogHistorySearchResult{ + ProcessID: task.id, + Reference: task.reference, + ExitState: f.ExitState, + CreatedAt: f.CreatedAt, + }) + } + } + + return result +} + func (r *restream) Probe(id string) app.Probe { return r.ProbeWithTimeout(id, 20*time.Second) }