diff --git a/docs/docs.go b/docs/docs.go index 371f0a6b..cddffdea 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -7817,6 +7817,13 @@ const docTemplate = `{ "type": "integer", "format": "uint64" }, + "tee": { + "description": "Format specific", + "type": "array", + "items": { + "$ref": "#/definitions/api.ProgressIOTee" + } + }, "type": { "type": "string" }, @@ -7840,6 +7847,32 @@ const docTemplate = `{ } } }, + "api.ProgressIOTee": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "fifo_enabled": { + "type": "boolean" + }, + "fifo_revocery_attempts_total": { + "type": "number" + }, + "fifo_state": { + "type": "string" + }, + "format": { + "type": "string" + }, + "id": { + "type": "string" + }, + "state": { + "type": "string" + } + } + }, "api.RTMPChannel": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 7ce79286..5b2f3150 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -7810,6 +7810,13 @@ "type": "integer", "format": "uint64" }, + "tee": { + "description": "Format specific", + "type": "array", + "items": { + "$ref": "#/definitions/api.ProgressIOTee" + } + }, "type": { "type": "string" }, @@ -7833,6 +7840,32 @@ } } }, + "api.ProgressIOTee": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "fifo_enabled": { + "type": "boolean" + }, + "fifo_revocery_attempts_total": { + "type": "number" + }, + "fifo_state": { + "type": "string" + }, + "format": { + "type": "string" + }, + "id": { + "type": "string" + }, + "state": { + "type": "string" + } + } + }, "api.RTMPChannel": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 26785bbe..9b7c2ab6 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1752,6 +1752,11 @@ definitions: stream: format: uint64 type: integer + tee: + description: Format specific + items: + $ref: '#/definitions/api.ProgressIOTee' + type: array type: type: string width: @@ -1767,6 +1772,23 @@ definitions: min: type: number type: object + api.ProgressIOTee: + properties: + address: + type: string + fifo_enabled: + type: boolean + fifo_revocery_attempts_total: + type: number + fifo_state: + type: string + format: + type: string + id: + type: string + state: + type: string + type: object api.RTMPChannel: properties: name: diff --git a/ffmpeg/parse/types.go b/ffmpeg/parse/types.go index 38755688..85ca0597 100644 --- a/ffmpeg/parse/types.go +++ b/ffmpeg/parse/types.go @@ -167,6 +167,12 @@ func (av *ffmpegAVstream) export(trackType string) *AVstream { return avs } +type ffmpegProgressIOTee struct { + State string `json:"state"` + FifoRecoveryAttempts uint64 `json:"fifo_recovery_attempts_total"` + FifoState string `json:"fifo_state"` +} + type ffmpegProgressIO struct { // common Index uint64 `json:"index"` @@ -188,6 +194,9 @@ type ffmpegProgressIO struct { // video Quantizer float64 `json:"q"` + + // format specific + Tee []ffmpegProgressIOTee `json:"tee"` } func (io *ffmpegProgressIO) exportTo(progress *ProgressIO) { @@ -208,6 +217,16 @@ func (io *ffmpegProgressIO) exportTo(progress *ProgressIO) { } else { progress.Size = io.Size } + + if len(progress.Tee) == len(io.Tee) { + for i, t := range io.Tee { + tee := progress.Tee[i] + tee.State = t.State + tee.FifoRecoveryAttempts = t.FifoRecoveryAttempts + tee.FifoState = t.FifoState + progress.Tee[i] = tee + } + } } type ffmpegProgress struct { @@ -262,6 +281,13 @@ func (p *ffmpegProgress) exportTo(progress *Progress) { } } +type ffmpegTee struct { + ID string `json:"id"` + Address string `json:"address"` + Format string `json:"format"` + Fifo bool `json:"fifo_enabled"` +} + type ffmpegProcessIO struct { // common Address string `json:"url"` @@ -285,10 +311,13 @@ type ffmpegProcessIO struct { Sampling uint64 `json:"sampling_hz"` Layout string `json:"layout"` Channels uint64 `json:"channels"` + + // format specific + Tee []ffmpegTee `json:"tee"` } func (io *ffmpegProcessIO) export() ProgressIO { - return ProgressIO{ + pio := ProgressIO{ URL: io.Address, Address: io.Address, Format: io.Format, @@ -307,6 +336,17 @@ func (io *ffmpegProcessIO) export() ProgressIO { Layout: io.Layout, Channels: io.Channels, } + + for _, t := range io.Tee { + pio.Tee = append(pio.Tee, ProgressIOTee{ + ID: t.ID, + Address: t.Address, + Format: t.Format, + Fifo: t.Fifo, + }) + } + + return pio } type ffmpegGraphElement struct { @@ -521,6 +561,16 @@ type ffmpegHLSVariant struct { Streams []int `json:"streams"` } +type ProgressIOTee struct { + ID string + Address string + Format string + State string + Fifo bool + FifoRecoveryAttempts uint64 + FifoState string +} + type ProgressIO struct { URL string Address string @@ -562,6 +612,9 @@ type ProgressIO struct { // avstream AVstream *AVstream + + // Format specific + Tee []ProgressIOTee } type Progress struct { diff --git a/http/api/progress.go b/http/api/progress.go index 0ca0a02e..3707a14b 100644 --- a/http/api/progress.go +++ b/http/api/progress.go @@ -11,6 +11,16 @@ type ProgressIOFramerate struct { Average json.Number `json:"avg" swaggertype:"number" jsonschema:"type=number"` } +type ProgressIOTee struct { + ID string `json:"id"` + Address string `json:"address"` + Format string `json:"format"` + State string `json:"state"` + Fifo bool `json:"fifo_enabled"` + FifoRecoveryAttempts json.Number `json:"fifo_revocery_attempts_total" swaggertype:"number" jsonschema:"type=number"` + FifoState string `json:"fifo_state"` +} + // ProgressIO represents the progress of an ffmpeg input or output type ProgressIO struct { ID string `json:"id" jsonschema:"minLength=1"` @@ -49,6 +59,9 @@ type ProgressIO struct { // avstream AVstream *AVstream `json:"avstream" jsonschema:"anyof_type=null;object"` + + // Format specific + Tee []ProgressIOTee `json:"tee"` } // Unmarshal converts a core ProgressIO to a ProgressIO in API representation @@ -91,6 +104,18 @@ func (i *ProgressIO) Unmarshal(io *app.ProgressIO) { i.AVstream = &AVstream{} i.AVstream.Unmarshal(io.AVstream) } + + for _, t := range io.Tee { + i.Tee = append(i.Tee, ProgressIOTee{ + ID: t.ID, + Address: t.Address, + Format: t.Format, + State: t.State, + Fifo: t.Fifo, + FifoRecoveryAttempts: json.ToNumber(float64(t.FifoRecoveryAttempts)), + FifoState: t.FifoState, + }) + } } func (i *ProgressIO) Marshal() app.ProgressIO { @@ -151,6 +176,23 @@ func (i *ProgressIO) Marshal() app.ProgressIO { p.AVstream = i.AVstream.Marshal() } + for _, t := range i.Tee { + tee := app.ProgressIOTee{ + ID: t.ID, + Address: t.Address, + Format: t.Format, + State: t.State, + Fifo: t.Fifo, + FifoState: t.FifoState, + } + + if x, err := t.FifoRecoveryAttempts.Int64(); err == nil { + tee.FifoRecoveryAttempts = uint64(x) + } + + p.Tee = append(p.Tee, tee) + } + return p } diff --git a/process/process_test.go b/process/process_test.go index 7ce6542d..d4ce18cc 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -365,7 +365,11 @@ func TestProcessDuration(t *testing.T) { px := p.(*process) - require.Nil(t, px.stopTimer) + px.stopTimerLock.Lock() + stopTimer := px.stopTimer + px.stopTimerLock.Unlock() + + require.Nil(t, stopTimer) } func TestProcessSchedulePointInTime(t *testing.T) { diff --git a/restream/app/progress.go b/restream/app/progress.go index 788e868d..bdfb93c4 100644 --- a/restream/app/progress.go +++ b/restream/app/progress.go @@ -8,6 +8,16 @@ type ProgressIOFramerate struct { Average float64 } +type ProgressIOTee struct { + ID string + Address string + Format string + State string + Fifo bool + FifoRecoveryAttempts uint64 + FifoState string +} + type ProgressIO struct { ID string URL string // The original URL as reported by ffmpeg @@ -46,6 +56,9 @@ type ProgressIO struct { // avstream AVstream *AVstream + + // Format specific + Tee []ProgressIOTee } func (p *ProgressIO) UnmarshalParser(pp *parse.ProgressIO) { @@ -83,6 +96,10 @@ func (p *ProgressIO) UnmarshalParser(pp *parse.ProgressIO) { } else { p.AVstream = nil } + + for _, t := range pp.Tee { + p.Tee = append(p.Tee, ProgressIOTee(t)) + } } func (p *ProgressIO) MarshalParser() parse.ProgressIO {