mirror of
https://github.com/datarhei/core.git
synced 2025-09-27 04:16:25 +08:00
214 lines
4.6 KiB
Go
214 lines
4.6 KiB
Go
package ffmpeg
|
|
|
|
import (
|
|
"fmt"
|
|
"os/exec"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/datarhei/core/v16/ffmpeg/parse"
|
|
"github.com/datarhei/core/v16/ffmpeg/probe"
|
|
"github.com/datarhei/core/v16/ffmpeg/skills"
|
|
"github.com/datarhei/core/v16/log"
|
|
"github.com/datarhei/core/v16/net"
|
|
"github.com/datarhei/core/v16/process"
|
|
"github.com/datarhei/core/v16/session"
|
|
)
|
|
|
|
type FFmpeg interface {
|
|
New(config ProcessConfig) (process.Process, error)
|
|
NewProcessParser(logger log.Logger, id, reference string) parse.Parser
|
|
NewProbeParser(logger log.Logger) probe.Parser
|
|
ValidateInputAddress(address string) bool
|
|
ValidateOutputAddress(address string) bool
|
|
Skills() skills.Skills
|
|
ReloadSkills() error
|
|
GetPort() (int, error)
|
|
PutPort(port int)
|
|
States() process.States
|
|
}
|
|
|
|
type ProcessConfig struct {
|
|
Reconnect bool
|
|
ReconnectDelay time.Duration
|
|
StaleTimeout time.Duration
|
|
LimitCPU float64
|
|
LimitMemory uint64
|
|
LimitDuration time.Duration
|
|
Command []string
|
|
Parser process.Parser
|
|
Logger log.Logger
|
|
OnExit func()
|
|
OnStart func()
|
|
OnStateChange func(from, to string)
|
|
}
|
|
|
|
// Config is the configuration for ffmpeg that is part of the configuration
|
|
// for the restreamer instance.
|
|
type Config struct {
|
|
Binary string
|
|
MaxProc int64
|
|
MaxLogLines int
|
|
LogHistoryLength int
|
|
ValidatorInput Validator
|
|
ValidatorOutput Validator
|
|
Portrange net.Portranger
|
|
Collector session.Collector
|
|
}
|
|
|
|
type ffmpeg struct {
|
|
binary string
|
|
validatorIn Validator
|
|
validatorOut Validator
|
|
portrange net.Portranger
|
|
skills skills.Skills
|
|
|
|
logLines int
|
|
historyLength int
|
|
|
|
collector session.Collector
|
|
|
|
states process.States
|
|
statesLock sync.RWMutex
|
|
}
|
|
|
|
func New(config Config) (FFmpeg, error) {
|
|
f := &ffmpeg{}
|
|
|
|
binary, err := exec.LookPath(config.Binary)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid ffmpeg binary given: %w", err)
|
|
}
|
|
|
|
f.binary = binary
|
|
f.historyLength = config.LogHistoryLength
|
|
f.logLines = config.MaxLogLines
|
|
|
|
f.portrange = config.Portrange
|
|
if f.portrange == nil {
|
|
f.portrange = net.NewDummyPortrange()
|
|
}
|
|
|
|
f.validatorIn = config.ValidatorInput
|
|
if f.validatorIn == nil {
|
|
f.validatorIn, _ = NewValidator(nil, nil)
|
|
}
|
|
|
|
f.validatorOut = config.ValidatorOutput
|
|
if f.validatorOut == nil {
|
|
f.validatorOut, _ = NewValidator(nil, nil)
|
|
}
|
|
|
|
f.collector = config.Collector
|
|
if f.collector == nil {
|
|
f.collector = session.NewNullCollector()
|
|
}
|
|
|
|
s, err := skills.New(f.binary)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid ffmpeg binary given: %w", err)
|
|
}
|
|
f.skills = s
|
|
|
|
return f, nil
|
|
}
|
|
|
|
func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
|
|
ffmpeg, err := process.New(process.Config{
|
|
Binary: f.binary,
|
|
Args: config.Command,
|
|
Reconnect: config.Reconnect,
|
|
ReconnectDelay: config.ReconnectDelay,
|
|
StaleTimeout: config.StaleTimeout,
|
|
LimitCPU: config.LimitCPU,
|
|
LimitMemory: config.LimitMemory,
|
|
LimitDuration: config.LimitDuration,
|
|
Parser: config.Parser,
|
|
Logger: config.Logger,
|
|
OnStart: config.OnStart,
|
|
OnExit: config.OnExit,
|
|
OnStateChange: func(from, to string) {
|
|
f.statesLock.Lock()
|
|
switch to {
|
|
case "finished":
|
|
f.states.Finished++
|
|
case "starting":
|
|
f.states.Starting++
|
|
case "running":
|
|
f.states.Running++
|
|
case "finishing":
|
|
f.states.Finishing++
|
|
case "failed":
|
|
f.states.Failed++
|
|
case "killed":
|
|
f.states.Killed++
|
|
default:
|
|
}
|
|
f.statesLock.Unlock()
|
|
|
|
if config.OnStateChange != nil {
|
|
config.OnStateChange(from, to)
|
|
}
|
|
},
|
|
})
|
|
|
|
return ffmpeg, err
|
|
}
|
|
|
|
func (f *ffmpeg) NewProcessParser(logger log.Logger, id, reference string) parse.Parser {
|
|
p := parse.New(parse.Config{
|
|
LogHistory: f.historyLength,
|
|
LogLines: f.logLines,
|
|
Logger: logger,
|
|
Collector: NewWrappedCollector(id, reference, f.collector),
|
|
})
|
|
|
|
return p
|
|
}
|
|
|
|
func (f *ffmpeg) NewProbeParser(logger log.Logger) probe.Parser {
|
|
p := probe.New(probe.Config{
|
|
Logger: logger,
|
|
})
|
|
|
|
return p
|
|
}
|
|
|
|
func (f *ffmpeg) ValidateInputAddress(address string) bool {
|
|
return f.validatorIn.IsValid(address)
|
|
}
|
|
|
|
func (f *ffmpeg) ValidateOutputAddress(address string) bool {
|
|
return f.validatorOut.IsValid(address)
|
|
}
|
|
|
|
func (f *ffmpeg) Skills() skills.Skills {
|
|
return f.skills
|
|
}
|
|
|
|
func (f *ffmpeg) ReloadSkills() error {
|
|
s, err := skills.New(f.binary)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ffmpeg binary given: %w", err)
|
|
}
|
|
|
|
f.skills = s
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *ffmpeg) GetPort() (int, error) {
|
|
return f.portrange.Get()
|
|
}
|
|
|
|
func (f *ffmpeg) PutPort(port int) {
|
|
f.portrange.Put(port)
|
|
}
|
|
|
|
func (f *ffmpeg) States() process.States {
|
|
f.statesLock.RLock()
|
|
defer f.statesLock.RUnlock()
|
|
|
|
return f.states
|
|
}
|