Add event filter support

This commit is contained in:
Ingo Oppermann
2023-03-23 14:50:59 +01:00
parent b84fdddd81
commit 99c571d623
9 changed files with 193 additions and 53 deletions

View File

@@ -387,8 +387,11 @@ func (a *api) start() error {
}
diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: cfg.Storage.Disk.Dir,
Logger: a.log.logger.core.WithComponent("DiskFS"),
Root: cfg.Storage.Disk.Dir,
Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{
"type": "disk",
"name": "disk",
}),
})
if err != nil {
return fmt.Errorf("disk filesystem: %w", err)
@@ -420,7 +423,10 @@ func (a *api) start() error {
if a.memfs == nil {
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{
Logger: a.log.logger.core.WithComponent("MemFS"),
Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{
"type": "mem",
"name": "mem",
}),
})
memfs.SetMetadata("base", baseMemFS.String())
@@ -464,7 +470,10 @@ func (a *api) start() error {
Region: s3.Region,
Bucket: s3.Bucket,
UseSSL: s3.UseSSL,
Logger: a.log.logger.core.WithComponent("FS"),
Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{
"type": "s3",
"name": s3.Name,
}),
})
if err != nil {
return fmt.Errorf("s3 filesystem (%s): %w", s3.Name, err)

View File

@@ -319,12 +319,12 @@ const docTemplate = `{
],
"description": "Stream of event of whats happening in the core",
"consumes": [
"application/x-json-stream",
"text/event-stream"
"text/event-stream",
"application/x-json-stream"
],
"produces": [
"application/x-json-stream",
"text/event-stream"
"text/event-stream",
"application/x-json-stream"
],
"tags": [
"v16.?.?"
@@ -337,10 +337,7 @@ const docTemplate = `{
"name": "filters",
"in": "body",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
"$ref": "#/definitions/api.EventFilters"
}
}
],
@@ -2949,14 +2946,25 @@ const docTemplate = `{
"api.EventFilter": {
"type": "object",
"properties": {
"event": {
"type": "string"
},
"filter": {
"data": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"event": {
"type": "string"
}
}
},
"api.EventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
}
}
},

View File

@@ -312,12 +312,12 @@
],
"description": "Stream of event of whats happening in the core",
"consumes": [
"application/x-json-stream",
"text/event-stream"
"text/event-stream",
"application/x-json-stream"
],
"produces": [
"application/x-json-stream",
"text/event-stream"
"text/event-stream",
"application/x-json-stream"
],
"tags": [
"v16.?.?"
@@ -330,10 +330,7 @@
"name": "filters",
"in": "body",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
"$ref": "#/definitions/api.EventFilters"
}
}
],
@@ -2942,14 +2939,25 @@
"api.EventFilter": {
"type": "object",
"properties": {
"event": {
"type": "string"
},
"filter": {
"data": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"event": {
"type": "string"
}
}
},
"api.EventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
}
}
},

View File

@@ -457,12 +457,19 @@ definitions:
type: object
api.EventFilter:
properties:
event:
type: string
filter:
data:
additionalProperties:
type: string
type: object
event:
type: string
type: object
api.EventFilters:
properties:
filters:
items:
$ref: '#/definitions/api.EventFilter'
type: array
type: object
api.FileInfo:
properties:
@@ -2194,8 +2201,8 @@ paths:
/api/v3/events:
post:
consumes:
- application/x-json-stream
- text/event-stream
- application/x-json-stream
description: Stream of event of whats happening in the core
operationId: events
parameters:
@@ -2203,12 +2210,10 @@ paths:
in: body
name: filters
schema:
items:
$ref: '#/definitions/api.EventFilter'
type: array
$ref: '#/definitions/api.EventFilters'
produces:
- application/x-json-stream
- text/event-stream
- application/x-json-stream
responses:
"200":
description: OK

View File

@@ -188,6 +188,13 @@ func (p *parser) Parse(line string) uint64 {
if p.logStart.IsZero() {
p.lock.log.Lock()
p.logStart = time.Now()
p.logger.WithComponent("ProcessReport").WithFields(log.Fields{
"exec_state": "running",
"report": "created",
"timestamp": p.logStart.Unix(),
}).Info().Log("Created")
p.lock.log.Unlock()
}
@@ -820,6 +827,12 @@ func (p *parser) storeReportHistory(state string) {
}
p.logHistory = p.logHistory.Next()
p.logger.WithComponent("ProcessReport").WithFields(log.Fields{
"exec_state": state,
"report": "exited",
"timestamp": h.ExitedAt.Unix(),
}).Info().Log("Exited")
}
func (p *parser) Report() Report {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"regexp"
"strings"
"github.com/datarhei/core/v16/log"
)
@@ -18,9 +19,9 @@ type Event struct {
}
func (e *Event) Marshal(le *log.Event) {
e.Timestamp = le.Time.UnixMilli()
e.Timestamp = le.Time.Unix()
e.Level = int(le.Level)
e.Component = le.Component
e.Component = strings.ToLower(le.Component)
e.Message = le.Message
e.Data = make(map[string]string)
@@ -50,11 +51,7 @@ func (e *Event) Marshal(le *log.Event) {
}
func (e *Event) Filter(ef *EventFilter) bool {
if e.Component != ef.Component {
return false
}
for k, r := range ef.filter {
for k, r := range ef.data {
v, ok := e.Data[k]
if !ok {
continue
@@ -70,20 +67,24 @@ func (e *Event) Filter(ef *EventFilter) bool {
type EventFilter struct {
Component string `json:"event"`
Filter map[string]string `json:"filter"`
filter map[string]*regexp.Regexp
Data map[string]string `json:"data"`
data map[string]*regexp.Regexp
}
func (ef *EventFilter) compile() error {
ef.filter = make(map[string]*regexp.Regexp)
type EventFilters struct {
Filters []EventFilter `json:"filters"`
}
for k, v := range ef.Filter {
func (ef *EventFilter) Compile() error {
ef.data = make(map[string]*regexp.Regexp)
for k, v := range ef.Data {
r, err := regexp.Compile(v)
if err != nil {
return err
}
ef.filter[k] = r
ef.data[k] = r
}
return nil

45
http/api/event_test.go Normal file
View File

@@ -0,0 +1,45 @@
package api
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEventFilter(t *testing.T) {
event := Event{
Timestamp: 1234,
Level: 0,
Component: "foobar",
Message: "none",
Data: map[string]string{
"foo": "bar",
},
}
foobarfilter := EventFilter{
Component: "foobar",
Data: map[string]string{
"foo": "^b.*$",
},
}
err := foobarfilter.Compile()
require.NoError(t, err)
foobazfilter := EventFilter{
Component: "foobaz",
Data: map[string]string{
"foo": "baz",
},
}
err = foobazfilter.Compile()
require.NoError(t, err)
res := event.Filter(&foobarfilter)
require.True(t, res)
res = event.Filter(&foobazfilter)
require.False(t, res)
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/log"
"github.com/labstack/echo/v4"
@@ -34,11 +35,30 @@ func NewEvents(events log.ChannelWriter) *EventsHandler {
// @Accept json-stream
// @Produce text/event-stream
// @Produce json-stream
// @Param filters body []api.EventFilter false "Event filters"
// @Param filters body api.EventFilters false "Event filters"
// @Success 200 {object} api.Event
// @Security ApiKeyAuth
// @Router /api/v3/events [post]
func (h *EventsHandler) Events(c echo.Context) error {
filters := api.EventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
filter := map[string]*api.EventFilter{}
for _, f := range filters.Filters {
f := f
if err := f.Compile(); err != nil {
return api.Err(http.StatusBadRequest, "Invalid filter", "%s: %s", f.Component, err)
}
component := strings.ToLower(f.Component)
filter[component] = &f
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@@ -64,6 +84,19 @@ func (h *EventsHandler) Events(c echo.Context) error {
done := make(chan struct{})
filterEvent := func(event *api.Event) bool {
if len(filter) == 0 {
return true
}
f, ok := filter[event.Component]
if !ok {
return false
}
return event.Filter(f)
}
event := api.Event{}
if contentType == "text/event-stream" {
@@ -79,7 +112,12 @@ func (h *EventsHandler) Events(c echo.Context) error {
res.Flush()
case e := <-evts:
event.Marshal(&e)
res.Write([]byte("event: " + strings.ToLower(event.Component) + "\ndata: "))
if !filterEvent(&event) {
continue
}
res.Write([]byte("event: " + event.Component + "\ndata: "))
if err := enc.Encode(event); err != nil {
close(done)
}
@@ -100,6 +138,11 @@ func (h *EventsHandler) Events(c echo.Context) error {
res.Flush()
case e := <-evts:
event.Marshal(&e)
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
close(done)
}

View File

@@ -291,7 +291,11 @@ func (r *restream) load() error {
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithField("id", id),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"reference": process.Reference,
},
),
}
// Replace all placeholders in the config
@@ -474,7 +478,11 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithField("id", process.ID),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"reference": process.Reference,
},
),
}
resolveStaticPlaceholders(t.config, r.replace)