Files
core/ffmpeg/ffmpeg.go
2023-04-26 09:49:28 +02:00

214 lines
4.6 KiB
Go

package ffmpeg
import (
"fmt"
"os/exec"
"sync"
"time"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/ffmpeg/probe"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/session"
)
type FFmpeg interface {
New(config ProcessConfig) (process.Process, error)
NewProcessParser(logger log.Logger, id, reference string) parse.Parser
NewProbeParser(logger log.Logger) probe.Parser
ValidateInputAddress(address string) bool
ValidateOutputAddress(address string) bool
Skills() skills.Skills
ReloadSkills() error
GetPort() (int, error)
PutPort(port int)
States() process.States
}
type ProcessConfig struct {
Reconnect bool
ReconnectDelay time.Duration
StaleTimeout time.Duration
LimitCPU float64
LimitMemory uint64
LimitDuration time.Duration
Command []string
Parser process.Parser
Logger log.Logger
OnExit func()
OnStart func()
OnStateChange func(from, to string)
}
// Config is the configuration for ffmpeg that is part of the configuration
// for the restreamer instance.
type Config struct {
Binary string
MaxProc int64
MaxLogLines int
LogHistoryLength int
ValidatorInput Validator
ValidatorOutput Validator
Portrange net.Portranger
Collector session.Collector
}
type ffmpeg struct {
binary string
validatorIn Validator
validatorOut Validator
portrange net.Portranger
skills skills.Skills
logLines int
historyLength int
collector session.Collector
states process.States
statesLock sync.RWMutex
}
func New(config Config) (FFmpeg, error) {
f := &ffmpeg{}
binary, err := exec.LookPath(config.Binary)
if err != nil {
return nil, fmt.Errorf("invalid ffmpeg binary given: %w", err)
}
f.binary = binary
f.historyLength = config.LogHistoryLength
f.logLines = config.MaxLogLines
f.portrange = config.Portrange
if f.portrange == nil {
f.portrange = net.NewDummyPortrange()
}
f.validatorIn = config.ValidatorInput
if f.validatorIn == nil {
f.validatorIn, _ = NewValidator(nil, nil)
}
f.validatorOut = config.ValidatorOutput
if f.validatorOut == nil {
f.validatorOut, _ = NewValidator(nil, nil)
}
f.collector = config.Collector
if f.collector == nil {
f.collector = session.NewNullCollector()
}
s, err := skills.New(f.binary)
if err != nil {
return nil, fmt.Errorf("invalid ffmpeg binary given: %w", err)
}
f.skills = s
return f, nil
}
func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
ffmpeg, err := process.New(process.Config{
Binary: f.binary,
Args: config.Command,
Reconnect: config.Reconnect,
ReconnectDelay: config.ReconnectDelay,
StaleTimeout: config.StaleTimeout,
LimitCPU: config.LimitCPU,
LimitMemory: config.LimitMemory,
LimitDuration: config.LimitDuration,
Parser: config.Parser,
Logger: config.Logger,
OnStart: config.OnStart,
OnExit: config.OnExit,
OnStateChange: func(from, to string) {
f.statesLock.Lock()
switch to {
case "finished":
f.states.Finished++
case "starting":
f.states.Starting++
case "running":
f.states.Running++
case "finishing":
f.states.Finishing++
case "failed":
f.states.Failed++
case "killed":
f.states.Killed++
default:
}
f.statesLock.Unlock()
if config.OnStateChange != nil {
config.OnStateChange(from, to)
}
},
})
return ffmpeg, err
}
func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser {
p := parse.New(parse.Config{
LogHistory: f.historyLength,
LogLines: f.logLines,
Logger: logger,
Collector: NewWrappedCollector(id, reference, f.collector),
})
return p
}
func (f *ffmpeg) NewProbeParser(logger log.Logger) probe.Parser {
p := probe.New(probe.Config{
Logger: logger,
})
return p
}
func (f *ffmpeg) ValidateInputAddress(address string) bool {
return f.validatorIn.IsValid(address)
}
func (f *ffmpeg) ValidateOutputAddress(address string) bool {
return f.validatorOut.IsValid(address)
}
func (f *ffmpeg) Skills() skills.Skills {
return f.skills
}
func (f *ffmpeg) ReloadSkills() error {
s, err := skills.New(f.binary)
if err != nil {
return fmt.Errorf("invalid ffmpeg binary given: %w", err)
}
f.skills = s
return nil
}
func (f *ffmpeg) GetPort() (int, error) {
return f.portrange.Get()
}
func (f *ffmpeg) PutPort(port int) {
f.portrange.Put(port)
}
func (f *ffmpeg) States() process.States {
f.statesLock.RLock()
defer f.statesLock.RUnlock()
return f.states
}