Don't unmarshal events from cluster nodes

This commit is contained in:
Ingo Oppermann
2025-12-08 15:48:52 +01:00
parent 29143753f6
commit 3c4e639a3d
11 changed files with 132 additions and 58 deletions

View File

@@ -23,6 +23,7 @@ import (
configstore "github.com/datarhei/core/v16/config/store"
configvars "github.com/datarhei/core/v16/config/vars"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/global"
"github.com/datarhei/core/v16/http"
"github.com/datarhei/core/v16/http/cache"
httpfs "github.com/datarhei/core/v16/http/fs"
@@ -292,6 +293,8 @@ func (a *api) Reload() error {
a.log.buffer = buffer
a.log.events = events
global.SetCoreID(cfg.ID)
return nil
}

View File

@@ -989,13 +989,11 @@ func (n *Core) logEvents(ctx context.Context) {
break innerloop
}
e.CoreID = n.id
n.events.log.Publish(e.Marshal())
n.events.log.Publish(&e)
}
}
n.logger.Info().WithField("source", "process").Log("Reconnecting to event source")
n.logger.Info().WithField("source", "log").Log("Reconnecting to event source")
time.Sleep(5 * time.Second)
}
}
@@ -1045,9 +1043,7 @@ func (n *Core) processEvents(ctx context.Context) {
break innerloop
}
e.CoreID = n.id
n.events.process.Publish(e.Marshal())
n.events.process.Publish(&e)
}
}

View File

@@ -10,6 +10,7 @@ import (
type UnmarshalTypeError = json.UnmarshalTypeError
type SyntaxError = json.SyntaxError
type Number = json.Number
type RawMessage = json.RawMessage
// Unmarshal is a wrapper for json.Unmarshal
func Unmarshal(data []byte, v interface{}) error {

View File

@@ -3,6 +3,8 @@ package event
import (
"maps"
"time"
"github.com/datarhei/core/v16/global"
)
type LogEvent struct {
@@ -38,5 +40,6 @@ func NewLogEvent(ts time.Time, level, component, caller, message string, data ma
Caller: caller,
Message: message,
Data: maps.Clone(data),
CoreID: global.GetCoreID(),
}
}

View File

@@ -2,6 +2,8 @@ package event
import (
"time"
"github.com/datarhei/core/v16/global"
)
type ProcessEvent struct {
@@ -36,6 +38,7 @@ func NewProcessLogEvent(logline string) *ProcessEvent {
Type: "line",
Line: logline,
Timestamp: time.Now(),
CoreID: global.GetCoreID(),
}
}
@@ -44,6 +47,7 @@ func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent {
Type: "progress",
Progress: progress,
Timestamp: time.Now(),
CoreID: global.GetCoreID(),
}
}

11
global/global.go Normal file
View File

@@ -0,0 +1,11 @@
package global
var coreid string = ""
func SetCoreID(id string) {
coreid = id
}
func GetCoreID() string {
return coreid
}

View File

@@ -1,6 +1,7 @@
package api
import (
"bytes"
"fmt"
"regexp"
"strings"
@@ -119,6 +120,16 @@ func (e *LogEvent) Filter(ef *LogEventFilter) bool {
return true
}
type LogEventRaw json.RawMessage
func (e *LogEventRaw) Clone() event.Event {
p := bytes.Clone([]byte(*e))
x := LogEventRaw(p)
return &x
}
type LogEventFilter struct {
Component string `json:"event"`
Message string `json:"message"`
@@ -220,6 +231,16 @@ type ProcessEvent struct {
Timestamp int64 `json:"ts"`
}
type ProcessEventRaw json.RawMessage
func (e *ProcessEventRaw) Clone() event.Event {
p := bytes.Clone([]byte(*e))
x := ProcessEventRaw(p)
return &x
}
type ProcessProgressInput struct {
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`

View File

@@ -57,9 +57,9 @@ type RestClient interface {
FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path}
FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path}
LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage}
ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) // POST /v3/events/process
LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEventRaw, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage}
ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEventRaw, error) // POST /v3/events/process
ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process
ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process

View File

@@ -1,6 +1,8 @@
package client
import (
"bufio"
"bytes"
"context"
"io"
"net/http"
@@ -11,7 +13,7 @@ import (
"github.com/datarhei/core/v16/mem"
)
func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEventRaw, error) {
buf := mem.Get()
defer mem.Put(buf)
@@ -26,34 +28,32 @@ func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters)
return nil, err
}
channel := make(chan api.LogEvent, 128)
channel := make(chan api.LogEventRaw, 128)
go func(stream io.ReadCloser, ch chan<- api.LogEvent) {
go func(stream io.ReadCloser, ch chan<- api.LogEventRaw) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
scanner := bufio.NewScanner(stream)
scanner.Split(bufio.ScanLines)
for decoder.More() {
var event api.LogEvent
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
event.Component = "error"
event.Message = err.Error()
}
for scanner.Scan() {
data := bytes.Clone(scanner.Bytes())
// Don't emit keepalives
if event.Component == "keepalive" {
continue
}
ch <- event
if event.Component == "" || event.Component == "error" {
return
}
ch <- data
}
/*
decoder := json.NewDecoder(stream)
for decoder.More() {
var event api.LogEventRaw
if err := decoder.Decode(&event); err != nil {
return
}
ch <- event
}
*/
}(stream, channel)
return channel, nil
@@ -105,7 +105,7 @@ func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) (
return channel, nil
}
func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) {
func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEventRaw, error) {
buf := mem.Get()
defer mem.Put(buf)
@@ -120,34 +120,33 @@ func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEvent
return nil, err
}
channel := make(chan api.ProcessEvent, 128)
channel := make(chan api.ProcessEventRaw, 128)
go func(stream io.ReadCloser, ch chan<- api.ProcessEvent) {
go func(stream io.ReadCloser, ch chan<- api.ProcessEventRaw) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
scanner := bufio.NewScanner(stream)
scanner.Split(bufio.ScanLines)
for decoder.More() {
var event api.ProcessEvent
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
event.Type = "error"
event.Line = err.Error()
}
for scanner.Scan() {
data := bytes.Clone(scanner.Bytes())
// Don't emit keepalives
if event.Type == "keepalive" {
continue
}
ch <- event
if event.Type == "" || event.Type == "error" {
return
}
ch <- data
}
/*
decoder := json.NewDecoder(io.TeeReader(stream, os.Stdout))
for decoder.More() {
var event api.ProcessEventRaw
if err := decoder.Decode(&event); err != nil {
return
}
ch <- event
}
*/
}(stream, channel)
return channel, nil

View File

@@ -135,7 +135,25 @@ func (h *ClusterHandler) LogEvents(c echo.Context) error {
return fmt.Errorf("channel closed")
}
event.Unmarshal(e)
ev, ok := e.(*api.LogEventRaw)
if !ok {
continue
}
var event api.LogEvent
err := json.Unmarshal([]byte(*ev), &event)
if err != nil {
continue
}
//if !event.Unmarshal(e) {
// continue
//}
if event.Component == "keepalive" {
continue
}
if !filterEvent(&event) {
continue
@@ -212,7 +230,7 @@ func (h *ClusterHandler) ProcessEvents(c echo.Context) error {
return goslices.ContainsFunc(filter, event.Filter)
}
event := api.ProcessEvent{}
//event := api.ProcessEvent{}
for {
select {
@@ -229,7 +247,24 @@ func (h *ClusterHandler) ProcessEvents(c echo.Context) error {
return fmt.Errorf("channel closed")
}
if !event.Unmarshal(e) {
ev, ok := e.(*api.ProcessEventRaw)
if !ok {
continue
}
var event api.ProcessEvent
err := json.Unmarshal([]byte(*ev), &event)
if err != nil {
continue
}
//json.RawMessage(ev)
//if !event.Unmarshal(e) {
// continue
//}
if event.Type == "keepalive" {
continue
}

View File

@@ -2,6 +2,7 @@ package api
import (
"fmt"
"io"
"net/http"
goslices "slices"
"strings"
@@ -123,7 +124,7 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
}
defer cancel()
enc := json.NewEncoder(res)
enc := json.NewEncoder(io.MultiWriter(res))
enc.SetIndent("", "")
filterEvent := func(event *api.LogEvent) bool {