Add ID and type to process progress events for each in/output

This commit is contained in:
Ingo Oppermann
2025-12-09 13:39:19 +01:00
parent 3b52145b4d
commit 6657f8d723
11 changed files with 156 additions and 77 deletions

View File

@@ -1,4 +1,4 @@
ARG GOLANG_IMAGE=golang:1.24-alpine3.21
ARG GOLANG_IMAGE=golang:1.25-alpine3.21
ARG BUILD_IMAGE=alpine:3.21
# Cross-Compilation

View File

@@ -343,7 +343,7 @@ func (n *Core) connect() error {
func (n *Core) mediaEvents(ctx context.Context, storage string) {
defer func() {
n.logger.Warn().WithField("storage", storage).Log("Disconnected from event source")
n.logger.Warn().WithField("source", storage).Log("Disconnected from event source")
}()
m := &Media{}
@@ -360,7 +360,7 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) {
n.lock.RUnlock()
if client == nil {
n.logger.Error().WithField("storage", storage).Log("Failed to connect to event source, client not connected")
n.logger.Error().WithField("source", storage).Log("Failed to connect to event source, client not connected")
time.Sleep(5 * time.Second)
continue
}
@@ -374,12 +374,12 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) {
n.media[storage] = m
n.mediaLock.Unlock()
n.logger.Error().WithField("storage", storage).WithError(err).Log("Failed to connect to event source")
n.logger.Error().WithField("source", storage).WithError(err).Log("Failed to connect to event source")
time.Sleep(5 * time.Second)
continue
}
n.logger.Info().WithField("storage", storage).Log("Connected to event source")
n.logger.Info().WithField("source", storage).Log("Connected to event source")
m.available = true
m.media = map[string]int64{}
@@ -407,7 +407,7 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) {
}
}
n.logger.Info().WithField("storage", storage).Log("Reconnecting to event source")
n.logger.Info().WithField("source", storage).Log("Reconnecting to event source")
time.Sleep(5 * time.Second)
}
}

View File

@@ -158,10 +158,8 @@ func (w *PubSub) broadcast() {
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.Clone()
select {
case c <- pp:
case c <- e.Clone():
default:
}
}

View File

@@ -52,6 +52,9 @@ func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent {
}
type ProcessProgressInput struct {
ID string
URL string
Type string
Bitrate float64
FPS float64
AVstream ProcessProgressInputAVstream
@@ -59,6 +62,9 @@ type ProcessProgressInput struct {
func (p *ProcessProgressInput) Clone() ProcessProgressInput {
c := ProcessProgressInput{
ID: p.ID,
URL: p.URL,
Type: p.Type,
Bitrate: p.Bitrate,
FPS: p.FPS,
AVstream: p.AVstream.Clone(),
@@ -68,6 +74,7 @@ func (p *ProcessProgressInput) Clone() ProcessProgressInput {
}
type ProcessProgressInputAVstream struct {
Enabled bool
Looping bool
Enc uint64
Drop uint64
@@ -77,6 +84,7 @@ type ProcessProgressInputAVstream struct {
func (p *ProcessProgressInputAVstream) Clone() ProcessProgressInputAVstream {
c := ProcessProgressInputAVstream{
Enabled: p.Enabled,
Looping: p.Looping,
Enc: p.Enc,
Drop: p.Drop,
@@ -88,12 +96,18 @@ func (p *ProcessProgressInputAVstream) Clone() ProcessProgressInputAVstream {
}
type ProcessProgressOutput struct {
ID string
URL string
Type string
Bitrate float64
FPS float64
}
func (p *ProcessProgressOutput) Clone() ProcessProgressOutput {
c := ProcessProgressOutput{
ID: p.ID,
URL: p.URL,
Type: p.Type,
Bitrate: p.Bitrate,
FPS: p.FPS,
}

View File

@@ -474,18 +474,24 @@ func (p *parser) Parse(line []byte) uint64 {
for _, io := range progress.Input {
input := event.ProcessProgressInput{
ID: "",
URL: io.URL,
Type: io.Type,
Bitrate: io.Bitrate,
FPS: io.FPS,
}
if io.AVstream != nil {
input.AVstream = event.ProcessProgressInputAVstream{
Enabled: true,
Looping: io.AVstream.Looping,
Enc: io.AVstream.Enc,
Drop: io.AVstream.Drop,
Dup: io.AVstream.Dup,
Time: io.AVstream.Input.Time,
}
} else {
input.AVstream = event.ProcessProgressInputAVstream{}
}
evt.Input = append(evt.Input, input)
@@ -493,6 +499,9 @@ func (p *parser) Parse(line []byte) uint64 {
for _, io := range progress.Output {
evt.Output = append(evt.Output, event.ProcessProgressOutput{
ID: "",
URL: io.URL,
Type: io.Type,
Bitrate: io.Bitrate,
FPS: io.FPS,
})

View File

@@ -242,6 +242,8 @@ func (e *ProcessEventRaw) Clone() event.Event {
}
type ProcessProgressInput struct {
ID string `json:"id"`
Type string `json:"type"`
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`
AVstream ProcessProgressInputAVstream `json:"avstream"`
@@ -264,6 +266,7 @@ func (p *ProcessProgressInput) Marshal() event.ProcessProgressInput {
}
type ProcessProgressInputAVstream struct {
Enabled bool `json:"enabled"`
Looping bool `json:"looping"`
Enc uint64 `json:"enc"`
Drop uint64 `json:"drop"`
@@ -273,6 +276,7 @@ type ProcessProgressInputAVstream struct {
func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstream {
o := event.ProcessProgressInputAVstream{
Enabled: p.Enabled,
Looping: p.Looping,
Enc: p.Enc,
Drop: p.Drop,
@@ -283,13 +287,27 @@ func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstr
return o
}
func (p *ProcessProgressInputAVstream) Unmarshal(e event.ProcessProgressInputAVstream) {
p.Enabled = e.Enabled
p.Looping = e.Looping
p.Enc = e.Enc
p.Drop = e.Drop
p.Dup = e.Dup
p.Time = e.Time
}
type ProcessProgressOutput struct {
ID string `json:"id"`
Type string `json:"type"`
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`
}
func (p *ProcessProgressOutput) Marshal() event.ProcessProgressOutput {
o := event.ProcessProgressOutput{}
o := event.ProcessProgressOutput{
ID: p.ID,
Type: p.Type,
}
if x, err := p.Bitrate.Float64(); err == nil {
o.Bitrate = x
@@ -310,21 +328,21 @@ type ProcessProgress struct {
func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) {
for _, io := range e.Input {
p.Input = append(p.Input, ProcessProgressInput{
x := ProcessProgressInput{
ID: io.ID,
Type: io.Type,
Bitrate: json.ToNumber(io.Bitrate),
FPS: json.ToNumber(io.FPS),
AVstream: ProcessProgressInputAVstream{
Looping: io.AVstream.Looping,
Enc: io.AVstream.Enc,
Drop: io.AVstream.Drop,
Dup: io.AVstream.Dup,
Time: io.AVstream.Time,
},
})
}
x.AVstream.Unmarshal(io.AVstream)
p.Input = append(p.Input, x)
}
for _, io := range e.Output {
p.Output = append(p.Output, ProcessProgressOutput{
ID: io.ID,
Type: io.Type,
Bitrate: json.ToNumber(io.Bitrate),
FPS: json.ToNumber(io.FPS),
})

View File

@@ -42,18 +42,6 @@ func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters)
ch <- data
}
/*
decoder := json.NewDecoder(stream)
for decoder.More() {
var event api.LogEventRaw
if err := decoder.Decode(&event); err != nil {
return
}
ch <- event
}
*/
}(stream, channel)
return channel, nil
@@ -134,19 +122,6 @@ func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEvent
ch <- data
}
/*
decoder := json.NewDecoder(io.TeeReader(stream, os.Stdout))
for decoder.More() {
var event api.ProcessEventRaw
if err := decoder.Decode(&event); err != nil {
return
}
ch <- event
}
*/
}(stream, channel)
return channel, nil

View File

@@ -151,7 +151,10 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
case <-reqctx.Done():
return nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
_, err := res.Write([]byte(":keepalive\n\n"))
if err != nil {
return err
}
res.Flush()
case e, ok := <-evts:
if !ok {
@@ -181,7 +184,10 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
case <-reqctx.Done():
return nil
case <-ticker.C:
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
_, err := res.Write([]byte("{\"event\": \"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
case e, ok := <-evts:
if !ok {
@@ -297,12 +303,21 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
event := api.MediaEvent{}
_, err = res.Write([]byte("{\"action\":\"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
for {
select {
case <-reqctx.Done():
return nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"action\":\"keepalive\"}\n"))
_, err := res.Write([]byte("{\"action\":\"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
@@ -396,6 +411,12 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error {
event := api.ProcessEvent{}
_, err = res.Write([]byte("{\"type\":\"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
for {
select {
case <-reqctx.Done():

View File

@@ -216,6 +216,26 @@ func (c *Config) ProcessID() ProcessID {
}
}
func (c *Config) InputIDFromAddress(address string) string {
for _, input := range c.Input {
if input.Address == address {
return input.ID
}
}
return ""
}
func (c *Config) OutputIDFromAddress(address string) string {
for _, output := range c.Output {
if output.Address == address {
return output.ID
}
}
return ""
}
type order struct {
order string
lock sync.RWMutex

View File

@@ -675,17 +675,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t.ffmpeg = ffmpeg
r.events.Consume(t.parser, func(e event.Event) event.Event {
pe, ok := e.(*event.ProcessEvent)
if !ok {
return e
}
pe.ProcessID = t.process.ID
pe.Domain = t.process.Domain
return pe
})
r.events.Consume(t.parser, t.RewriteEvent)
return t, nil
}

View File

@@ -14,21 +14,21 @@ import (
)
type task struct {
readers *atomic.Int64 // Number of concurrent readers
id string // ID of the task/process
owner string // Owner of the process
domain string // Domain of the process
reference string // reference of the process
process *app.Process // The process definition
config *app.Config // Process config with replaced static placeholders
command []string // The actual command parameter for ffmpeg
ffmpeg process.Process // The OS process
parser parse.Parser // Parser for the OS process' output
playout map[string]int // Port mapping to access playout API
logger log.Logger // Logger
usesDisk bool // Whether this task uses the disk
hwdevice *atomic.Int32 // Index of the GPU this task uses
metadata map[string]interface{} // Metadata of the process
readers *atomic.Int64 // Number of concurrent readers
id string // ID of the task/process
owner string // Owner of the process
domain string // Domain of the process
reference string // reference of the process
process *app.Process // The process definition
config *app.Config // Process config with replaced static placeholders
command []string // The actual command parameter for ffmpeg
ffmpeg process.Process // The OS process
parser parse.Parser // Parser for the OS process' output
playout map[string]int // Port mapping to access playout API
logger log.Logger // Logger
usesDisk bool // Whether this task uses the disk
hwdevice *atomic.Int32 // Index of the GPU this task uses
metadata map[string]any // Metadata of the process
}
func NewTask(process *app.Process, logger log.Logger) *task {
@@ -175,8 +175,13 @@ func (t *task) State() (*app.State, error) {
progress := t.parser.Progress()
state.Progress.UnmarshalParser(&progress)
state.Progress.Input = assignConfigID(state.Progress.Input, t.config.Input)
state.Progress.Output = assignConfigID(state.Progress.Output, t.config.Output)
for i, io := range state.Progress.Input {
state.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL)
}
for i, io := range state.Progress.Output {
state.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL)
}
state.PID = status.PID
@@ -214,8 +219,13 @@ func (t *task) Report() (*app.Report, error) {
report.History[i].UnmarshalParser(&h)
e := &report.History[i]
e.Progress.Input = assignConfigID(e.Progress.Input, t.config.Input)
e.Progress.Output = assignConfigID(e.Progress.Output, t.config.Input)
for i, io := range e.Progress.Input {
e.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL)
}
for i, io := range e.Progress.Output {
e.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL)
}
}
return report, nil
@@ -385,3 +395,27 @@ func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) {
func (t *task) Events() (<-chan event.Event, event.CancelFunc, error) {
return t.parser.Events()
}
func (t *task) RewriteEvent(e event.Event) event.Event {
pe, ok := e.(*event.ProcessEvent)
if !ok {
return e
}
pe.ProcessID = t.process.ID
pe.Domain = t.process.Domain
if pe.Progress != nil {
for i, io := range pe.Progress.Input {
pe.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL)
pe.Progress.Input[i].URL = ""
}
for i, io := range pe.Progress.Output {
pe.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL)
pe.Progress.Output[i].URL = ""
}
}
return pe
}