diff --git a/app/api/api.go b/app/api/api.go index e523a08f..2e19fe12 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -387,8 +387,11 @@ func (a *api) start() error { } diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{ - Root: cfg.Storage.Disk.Dir, - Logger: a.log.logger.core.WithComponent("DiskFS"), + Root: cfg.Storage.Disk.Dir, + Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{ + "type": "disk", + "name": "disk", + }), }) if err != nil { return fmt.Errorf("disk filesystem: %w", err) @@ -420,7 +423,10 @@ func (a *api) start() error { if a.memfs == nil { memfs, _ := fs.NewMemFilesystem(fs.MemConfig{ - Logger: a.log.logger.core.WithComponent("MemFS"), + Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{ + "type": "mem", + "name": "mem", + }), }) memfs.SetMetadata("base", baseMemFS.String()) @@ -464,7 +470,10 @@ func (a *api) start() error { Region: s3.Region, Bucket: s3.Bucket, UseSSL: s3.UseSSL, - Logger: a.log.logger.core.WithComponent("FS"), + Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{ + "type": "s3", + "name": s3.Name, + }), }) if err != nil { return fmt.Errorf("s3 filesystem (%s): %w", s3.Name, err) diff --git a/docs/docs.go b/docs/docs.go index 3f3ba947..53c4e50f 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -319,12 +319,12 @@ const docTemplate = `{ ], "description": "Stream of event of whats happening in the core", "consumes": [ - "application/x-json-stream", - "text/event-stream" + "text/event-stream", + "application/x-json-stream" ], "produces": [ - "application/x-json-stream", - "text/event-stream" + "text/event-stream", + "application/x-json-stream" ], "tags": [ "v16.?.?" @@ -337,10 +337,7 @@ const docTemplate = `{ "name": "filters", "in": "body", "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/api.EventFilter" - } + "$ref": "#/definitions/api.EventFilters" } } ], @@ -2949,14 +2946,25 @@ const docTemplate = `{ "api.EventFilter": { "type": "object", "properties": { - "event": { - "type": "string" - }, - "filter": { + "data": { "type": "object", "additionalProperties": { "type": "string" } + }, + "event": { + "type": "string" + } + } + }, + "api.EventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.EventFilter" + } } } }, diff --git a/docs/swagger.json b/docs/swagger.json index 6d5c56ee..5e1a887e 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -312,12 +312,12 @@ ], "description": "Stream of event of whats happening in the core", "consumes": [ - "application/x-json-stream", - "text/event-stream" + "text/event-stream", + "application/x-json-stream" ], "produces": [ - "application/x-json-stream", - "text/event-stream" + "text/event-stream", + "application/x-json-stream" ], "tags": [ "v16.?.?" @@ -330,10 +330,7 @@ "name": "filters", "in": "body", "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/api.EventFilter" - } + "$ref": "#/definitions/api.EventFilters" } } ], @@ -2942,14 +2939,25 @@ "api.EventFilter": { "type": "object", "properties": { - "event": { - "type": "string" - }, - "filter": { + "data": { "type": "object", "additionalProperties": { "type": "string" } + }, + "event": { + "type": "string" + } + } + }, + "api.EventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.EventFilter" + } } } }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 85c1ff26..18959ee0 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -457,12 +457,19 @@ definitions: type: object api.EventFilter: properties: - event: - type: string - filter: + data: additionalProperties: type: string type: object + event: + type: string + type: object + api.EventFilters: + properties: + filters: + items: + $ref: '#/definitions/api.EventFilter' + type: array type: object api.FileInfo: properties: @@ -2194,8 +2201,8 @@ paths: /api/v3/events: post: consumes: - - application/x-json-stream - text/event-stream + - application/x-json-stream description: Stream of event of whats happening in the core operationId: events parameters: @@ -2203,12 +2210,10 @@ paths: in: body name: filters schema: - items: - $ref: '#/definitions/api.EventFilter' - type: array + $ref: '#/definitions/api.EventFilters' produces: - - application/x-json-stream - text/event-stream + - application/x-json-stream responses: "200": description: OK diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index b335fe14..7adfd764 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -188,6 +188,13 @@ func (p *parser) Parse(line string) uint64 { if p.logStart.IsZero() { p.lock.log.Lock() p.logStart = time.Now() + + p.logger.WithComponent("ProcessReport").WithFields(log.Fields{ + "exec_state": "running", + "report": "created", + "timestamp": p.logStart.Unix(), + }).Info().Log("Created") + p.lock.log.Unlock() } @@ -820,6 +827,12 @@ func (p *parser) storeReportHistory(state string) { } p.logHistory = p.logHistory.Next() + + p.logger.WithComponent("ProcessReport").WithFields(log.Fields{ + "exec_state": state, + "report": "exited", + "timestamp": h.ExitedAt.Unix(), + }).Info().Log("Exited") } func (p *parser) Report() Report { diff --git a/http/api/event.go b/http/api/event.go index 433a1d52..2e6c7448 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "regexp" + "strings" "github.com/datarhei/core/v16/log" ) @@ -18,9 +19,9 @@ type Event struct { } func (e *Event) Marshal(le *log.Event) { - e.Timestamp = le.Time.UnixMilli() + e.Timestamp = le.Time.Unix() e.Level = int(le.Level) - e.Component = le.Component + e.Component = strings.ToLower(le.Component) e.Message = le.Message e.Data = make(map[string]string) @@ -50,11 +51,7 @@ func (e *Event) Marshal(le *log.Event) { } func (e *Event) Filter(ef *EventFilter) bool { - if e.Component != ef.Component { - return false - } - - for k, r := range ef.filter { + for k, r := range ef.data { v, ok := e.Data[k] if !ok { continue @@ -70,20 +67,24 @@ func (e *Event) Filter(ef *EventFilter) bool { type EventFilter struct { Component string `json:"event"` - Filter map[string]string `json:"filter"` - filter map[string]*regexp.Regexp + Data map[string]string `json:"data"` + data map[string]*regexp.Regexp } -func (ef *EventFilter) compile() error { - ef.filter = make(map[string]*regexp.Regexp) +type EventFilters struct { + Filters []EventFilter `json:"filters"` +} - for k, v := range ef.Filter { +func (ef *EventFilter) Compile() error { + ef.data = make(map[string]*regexp.Regexp) + + for k, v := range ef.Data { r, err := regexp.Compile(v) if err != nil { return err } - ef.filter[k] = r + ef.data[k] = r } return nil diff --git a/http/api/event_test.go b/http/api/event_test.go new file mode 100644 index 00000000..b5207e53 --- /dev/null +++ b/http/api/event_test.go @@ -0,0 +1,45 @@ +package api + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEventFilter(t *testing.T) { + event := Event{ + Timestamp: 1234, + Level: 0, + Component: "foobar", + Message: "none", + Data: map[string]string{ + "foo": "bar", + }, + } + + foobarfilter := EventFilter{ + Component: "foobar", + Data: map[string]string{ + "foo": "^b.*$", + }, + } + + err := foobarfilter.Compile() + require.NoError(t, err) + + foobazfilter := EventFilter{ + Component: "foobaz", + Data: map[string]string{ + "foo": "baz", + }, + } + + err = foobazfilter.Compile() + require.NoError(t, err) + + res := event.Filter(&foobarfilter) + require.True(t, res) + + res = event.Filter(&foobazfilter) + require.False(t, res) +} diff --git a/http/handler/api/events.go b/http/handler/api/events.go index c0a514b2..985210a1 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -7,6 +7,7 @@ import ( "time" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/log" "github.com/labstack/echo/v4" @@ -34,11 +35,30 @@ func NewEvents(events log.ChannelWriter) *EventsHandler { // @Accept json-stream // @Produce text/event-stream // @Produce json-stream -// @Param filters body []api.EventFilter false "Event filters" +// @Param filters body api.EventFilters false "Event filters" // @Success 200 {object} api.Event // @Security ApiKeyAuth // @Router /api/v3/events [post] func (h *EventsHandler) Events(c echo.Context) error { + filters := api.EventFilters{} + + if err := util.ShouldBindJSON(c, &filters); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + filter := map[string]*api.EventFilter{} + + for _, f := range filters.Filters { + f := f + + if err := f.Compile(); err != nil { + return api.Err(http.StatusBadRequest, "Invalid filter", "%s: %s", f.Component, err) + } + + component := strings.ToLower(f.Component) + filter[component] = &f + } + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -64,6 +84,19 @@ func (h *EventsHandler) Events(c echo.Context) error { done := make(chan struct{}) + filterEvent := func(event *api.Event) bool { + if len(filter) == 0 { + return true + } + + f, ok := filter[event.Component] + if !ok { + return false + } + + return event.Filter(f) + } + event := api.Event{} if contentType == "text/event-stream" { @@ -79,7 +112,12 @@ func (h *EventsHandler) Events(c echo.Context) error { res.Flush() case e := <-evts: event.Marshal(&e) - res.Write([]byte("event: " + strings.ToLower(event.Component) + "\ndata: ")) + + if !filterEvent(&event) { + continue + } + + res.Write([]byte("event: " + event.Component + "\ndata: ")) if err := enc.Encode(event); err != nil { close(done) } @@ -100,6 +138,11 @@ func (h *EventsHandler) Events(c echo.Context) error { res.Flush() case e := <-evts: event.Marshal(&e) + + if !filterEvent(&event) { + continue + } + if err := enc.Encode(event); err != nil { close(done) } diff --git a/restream/restream.go b/restream/restream.go index ff4d9622..7948da03 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -291,7 +291,11 @@ func (r *restream) load() error { reference: process.Reference, process: process, config: process.Config.Clone(), - logger: r.logger.WithField("id", id), + logger: r.logger.WithFields(log.Fields{ + "id": process.ID, + "reference": process.Reference, + }, + ), } // Replace all placeholders in the config @@ -474,7 +478,11 @@ func (r *restream) createTask(config *app.Config) (*task, error) { reference: process.Reference, process: process, config: process.Config.Clone(), - logger: r.logger.WithField("id", process.ID), + logger: r.logger.WithFields(log.Fields{ + "id": process.ID, + "reference": process.Reference, + }, + ), } resolveStaticPlaceholders(t.config, r.replace)