mirror of
https://github.com/datarhei/core.git
synced 2025-09-30 22:02:28 +08:00

If a process has some cleanup with purge-on-delete defined, the purge has to wait until the process actually exited. Otherwise it may happen that the process got the signal, files are purged, but the process is still writing some files in order to exit cleanly. This would lead to some artefacts left on the filesystem.
882 lines
21 KiB
Go
882 lines
21 KiB
Go
// Package process is a wrapper of exec.Cmd for controlling a ffmpeg process.
|
|
// It could be used to run other executables but it is tailored to the specifics
|
|
// of ffmpeg, e.g. only stderr is captured, and some exit codes != 0 plus certain
|
|
// signals are still considered as a non-error exit condition.
|
|
package process
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/datarhei/core/v16/log"
|
|
"github.com/datarhei/core/v16/psutil"
|
|
)
|
|
|
|
// Process represents a process and ways to control it
|
|
// and to extract information.
|
|
type Process interface {
|
|
// Status returns the current status of this process
|
|
Status() Status
|
|
|
|
// Start starts the process. If the process stops by itself
|
|
// it will restart automatically if it is defined to do so.
|
|
Start() error
|
|
|
|
// Stop stops the process and will not let it restart
|
|
// automatically.
|
|
Stop(wait bool) error
|
|
|
|
// Kill stops the process such that it will restart
|
|
// automatically if it is defined to do so.
|
|
Kill(wait bool) error
|
|
|
|
// IsRunning returns whether the process is currently
|
|
// running or not.
|
|
IsRunning() bool
|
|
}
|
|
|
|
// Config is the configuration of a process
|
|
type Config struct {
|
|
Binary string // Path to the ffmpeg binary
|
|
Args []string // List of arguments for the binary
|
|
Reconnect bool // Whether to restart the process if it exited
|
|
ReconnectDelay time.Duration // Duration to wait before restarting the process
|
|
StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output
|
|
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value
|
|
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value
|
|
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration
|
|
Parser Parser // A parser for the output of the process
|
|
OnStart func() // A callback which is called after the process started
|
|
OnExit func() // A callback which is called after the process exited
|
|
OnStateChange func(from, to string) // A callback which is called after a state changed
|
|
Logger log.Logger
|
|
}
|
|
|
|
// Status represents the current status of a process
|
|
type Status struct {
|
|
// State is the current state of the process. See stateType for the known states.
|
|
State string
|
|
|
|
// States is the cumulative history of states the process had.
|
|
States States
|
|
|
|
// Order is the wanted condition of process, either "start" or "stop"
|
|
Order string
|
|
|
|
// Duration is the time since the last change of the state
|
|
Duration time.Duration
|
|
|
|
// Time is the time of the last change of the state
|
|
Time time.Time
|
|
|
|
// Used CPU in percent
|
|
CPU float64
|
|
|
|
// Used memory in bytes
|
|
Memory uint64
|
|
}
|
|
|
|
// States
|
|
//
|
|
// finished - Process has been stopped
|
|
//
|
|
// starting - if process has been actively started or has been waiting for reconnect (order=start, reconnect=any)
|
|
// finished - if process shall not reconnect (order=stop, reconnect=any)
|
|
//
|
|
// starting - Process is about to start
|
|
//
|
|
// finishing - if process should be immediately stopped (order=stop, reconnect=any)
|
|
// running - if process could be started (order=start, reconnect=any)
|
|
// failed - if process couldn't be started (e.g. binary not found) (order=start, reconnect=any)
|
|
//
|
|
// running - Process is running
|
|
//
|
|
// finished - if process exited normally (order=any, reconnect=any)
|
|
// finishing - if process has been actively stopped (order=stop, reconnect=any)
|
|
// failed - if process exited abnormally (order=any, reconnect=any)
|
|
// killed - if process has been actively killed with SIGKILL (order=any, reconnect=any)
|
|
//
|
|
// finishing - Process has been actively stopped and will be killed
|
|
//
|
|
// finished - if process has been actively killed with SIGINT and ffmpeg exited normally (order=stop, reconnect=any)
|
|
// killed - if process has been actively killed with SIGKILL (order=stop, reconnect=any)
|
|
//
|
|
// failed - Process has been failed either by starting or during running
|
|
//
|
|
// starting - if process has been waiting for reconnect (order=start, reconnect=true)
|
|
// failed - if process shall not reconnect (order=any, reconnect=false)
|
|
//
|
|
// killed - Process has been stopped
|
|
//
|
|
// starting - if process has been waiting for reconnect (order=start, reconnect=true)
|
|
// killed - if process shall not reconnect (order=start, reconnect=false)
|
|
type stateType string
|
|
|
|
const (
|
|
stateFinished stateType = "finished"
|
|
stateStarting stateType = "starting"
|
|
stateRunning stateType = "running"
|
|
stateFinishing stateType = "finishing"
|
|
stateFailed stateType = "failed"
|
|
stateKilled stateType = "killed"
|
|
)
|
|
|
|
// String returns a string representation of the state
|
|
func (s stateType) String() string {
|
|
return string(s)
|
|
}
|
|
|
|
// IsRunning returns whether the state is representing a running state
|
|
func (s stateType) IsRunning() bool {
|
|
if s == stateStarting || s == stateRunning || s == stateFinishing {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
type States struct {
|
|
Finished uint64
|
|
Starting uint64
|
|
Running uint64
|
|
Finishing uint64
|
|
Failed uint64
|
|
Killed uint64
|
|
}
|
|
|
|
// Process represents a ffmpeg process
|
|
type process struct {
|
|
binary string
|
|
args []string
|
|
cmd *exec.Cmd
|
|
pid int32
|
|
stdout io.ReadCloser
|
|
lastLine string
|
|
state struct {
|
|
state stateType
|
|
time time.Time
|
|
states States
|
|
lock sync.Mutex
|
|
}
|
|
order struct {
|
|
order string
|
|
lock sync.Mutex
|
|
}
|
|
parser Parser
|
|
stale struct {
|
|
last time.Time
|
|
timeout time.Duration
|
|
cancel context.CancelFunc
|
|
lock sync.Mutex
|
|
}
|
|
reconn struct {
|
|
enable bool
|
|
delay time.Duration
|
|
timer *time.Timer
|
|
lock sync.Mutex
|
|
}
|
|
killTimer *time.Timer
|
|
killTimerLock sync.Mutex
|
|
logger log.Logger
|
|
debuglogger log.Logger
|
|
callbacks struct {
|
|
onStart func()
|
|
onExit func()
|
|
onStateChange func(from, to string)
|
|
}
|
|
limits Limiter
|
|
}
|
|
|
|
var _ Process = &process{}
|
|
|
|
// New creates a new process wrapper
|
|
func New(config Config) (Process, error) {
|
|
p := &process{
|
|
binary: config.Binary,
|
|
args: config.Args,
|
|
cmd: nil,
|
|
parser: config.Parser,
|
|
logger: config.Logger,
|
|
}
|
|
|
|
// This is a loose check on purpose. If the e.g. the binary
|
|
// doesn't exist or it is not executable, it will be
|
|
// reflected in the resulting state.
|
|
if len(p.binary) == 0 {
|
|
return nil, fmt.Errorf("no valid binary given")
|
|
}
|
|
|
|
if p.parser == nil {
|
|
p.parser = NewNullParser()
|
|
}
|
|
|
|
if p.logger == nil {
|
|
p.logger = log.New("Process")
|
|
}
|
|
|
|
p.debuglogger = p.logger.WithFields(log.Fields{
|
|
"binary": p.binary,
|
|
"args": p.args,
|
|
})
|
|
|
|
p.order.order = "stop"
|
|
|
|
p.initState(stateFinished)
|
|
|
|
p.reconn.enable = config.Reconnect
|
|
p.reconn.delay = config.ReconnectDelay
|
|
|
|
p.stale.last = time.Now()
|
|
p.stale.timeout = config.StaleTimeout
|
|
|
|
p.callbacks.onStart = config.OnStart
|
|
p.callbacks.onExit = config.OnExit
|
|
p.callbacks.onStateChange = config.OnStateChange
|
|
|
|
p.limits = NewLimiter(LimiterConfig{
|
|
CPU: config.LimitCPU,
|
|
Memory: config.LimitMemory,
|
|
WaitFor: config.LimitDuration,
|
|
OnLimit: func(cpu float64, memory uint64) {
|
|
p.logger.WithFields(log.Fields{
|
|
"cpu": cpu,
|
|
"memory": memory,
|
|
}).Warn().Log("Stopping because limits are exceeded")
|
|
p.Kill(false)
|
|
},
|
|
})
|
|
|
|
p.logger.Info().Log("Created")
|
|
p.debuglogger.Debug().Log("Created")
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *process) initState(state stateType) {
|
|
p.state.lock.Lock()
|
|
defer p.state.lock.Unlock()
|
|
|
|
p.state.state = state
|
|
p.state.time = time.Now()
|
|
}
|
|
|
|
// setState sets a new state. It also checks if the transition
|
|
// of the current state to the new state is allowed. If not,
|
|
// the current state will not be changed.
|
|
func (p *process) setState(state stateType) error {
|
|
p.state.lock.Lock()
|
|
defer p.state.lock.Unlock()
|
|
|
|
prevState := p.state.state
|
|
failed := false
|
|
|
|
if p.state.state == stateFinished {
|
|
switch state {
|
|
case stateStarting:
|
|
p.state.state = state
|
|
p.state.states.Starting++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else if p.state.state == stateStarting {
|
|
switch state {
|
|
case stateFinishing:
|
|
p.state.state = state
|
|
p.state.states.Finishing++
|
|
case stateRunning:
|
|
p.state.state = state
|
|
p.state.states.Running++
|
|
case stateFailed:
|
|
p.state.state = state
|
|
p.state.states.Failed++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else if p.state.state == stateRunning {
|
|
switch state {
|
|
case stateFinished:
|
|
p.state.state = state
|
|
p.state.states.Finished++
|
|
case stateFinishing:
|
|
p.state.state = state
|
|
p.state.states.Finishing++
|
|
case stateFailed:
|
|
p.state.state = state
|
|
p.state.states.Failed++
|
|
case stateKilled:
|
|
p.state.state = state
|
|
p.state.states.Killed++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else if p.state.state == stateFinishing {
|
|
switch state {
|
|
case stateFinished:
|
|
p.state.state = state
|
|
p.state.states.Finished++
|
|
case stateFailed:
|
|
p.state.state = state
|
|
p.state.states.Failed++
|
|
case stateKilled:
|
|
p.state.state = state
|
|
p.state.states.Killed++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else if p.state.state == stateFailed {
|
|
switch state {
|
|
case stateStarting:
|
|
p.state.state = state
|
|
p.state.states.Starting++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else if p.state.state == stateKilled {
|
|
switch state {
|
|
case stateStarting:
|
|
p.state.state = state
|
|
p.state.states.Starting++
|
|
default:
|
|
failed = true
|
|
}
|
|
} else {
|
|
return fmt.Errorf("current state is unhandled: %s", p.state.state)
|
|
}
|
|
|
|
if failed {
|
|
return fmt.Errorf("can't change from state %s to %s", p.state.state, state)
|
|
}
|
|
|
|
p.state.time = time.Now()
|
|
|
|
if p.callbacks.onStateChange != nil {
|
|
go p.callbacks.onStateChange(prevState.String(), p.state.state.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *process) getState() stateType {
|
|
p.state.lock.Lock()
|
|
defer p.state.lock.Unlock()
|
|
|
|
return p.state.state
|
|
}
|
|
|
|
func (p *process) isRunning() bool {
|
|
p.state.lock.Lock()
|
|
defer p.state.lock.Unlock()
|
|
|
|
return p.state.state.IsRunning()
|
|
}
|
|
|
|
func (p *process) getStateString() string {
|
|
p.state.lock.Lock()
|
|
defer p.state.lock.Unlock()
|
|
|
|
return p.state.state.String()
|
|
}
|
|
|
|
// Status returns the current status of the process
|
|
func (p *process) Status() Status {
|
|
cpu, memory := p.limits.Current()
|
|
|
|
p.state.lock.Lock()
|
|
stateTime := p.state.time
|
|
stateString := p.state.state.String()
|
|
states := p.state.states
|
|
p.state.lock.Unlock()
|
|
|
|
p.order.lock.Lock()
|
|
order := p.order.order
|
|
p.order.lock.Unlock()
|
|
|
|
s := Status{
|
|
State: stateString,
|
|
States: states,
|
|
Order: order,
|
|
Duration: time.Since(stateTime),
|
|
Time: stateTime,
|
|
CPU: cpu,
|
|
Memory: memory,
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
// IsRunning returns whether the process is considered running
|
|
func (p *process) IsRunning() bool {
|
|
return p.isRunning()
|
|
}
|
|
|
|
// Start will start the process and sets the order to "start". If the
|
|
// process has alread the "start" order, nothing will be done. Returns
|
|
// an error if start failed.
|
|
func (p *process) Start() error {
|
|
p.order.lock.Lock()
|
|
defer p.order.lock.Unlock()
|
|
|
|
if p.order.order == "start" {
|
|
return nil
|
|
}
|
|
|
|
p.order.order = "start"
|
|
|
|
err := p.start()
|
|
if err != nil {
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
"error": err,
|
|
}).Debug().Log("Starting failed")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// start will start the process considering the current order. Returns an
|
|
// error in case something goes wrong, and it will try to restart the process.
|
|
func (p *process) start() error {
|
|
var err error
|
|
|
|
// Bail out if the process is already running
|
|
if p.isRunning() {
|
|
return nil
|
|
}
|
|
|
|
p.logger.Info().Log("Starting")
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
}).Debug().Log("Starting")
|
|
|
|
// Stop any restart timer in order to start the process immediately
|
|
p.unreconnect()
|
|
|
|
p.setState(stateStarting)
|
|
|
|
p.cmd = exec.Command(p.binary, p.args...)
|
|
p.cmd.Env = []string{}
|
|
|
|
p.stdout, err = p.cmd.StderrPipe()
|
|
if err != nil {
|
|
p.setState(stateFailed)
|
|
|
|
p.parser.Parse(err.Error())
|
|
p.logger.WithError(err).Error().Log("Command failed")
|
|
p.reconnect()
|
|
|
|
return err
|
|
}
|
|
if err := p.cmd.Start(); err != nil {
|
|
p.setState(stateFailed)
|
|
|
|
p.parser.Parse(err.Error())
|
|
p.logger.WithError(err).Error().Log("Command failed")
|
|
p.reconnect()
|
|
|
|
return err
|
|
}
|
|
|
|
p.pid = int32(p.cmd.Process.Pid)
|
|
|
|
if proc, err := psutil.NewProcess(p.pid); err == nil {
|
|
p.limits.Start(proc)
|
|
}
|
|
|
|
p.setState(stateRunning)
|
|
|
|
p.logger.Info().Log("Started")
|
|
p.debuglogger.Debug().Log("Started")
|
|
|
|
if p.callbacks.onStart != nil {
|
|
go p.callbacks.onStart()
|
|
}
|
|
|
|
// Start the reader
|
|
go p.reader()
|
|
|
|
// Wait for the process to finish
|
|
go p.waiter()
|
|
|
|
// Start the stale timeout if enabled
|
|
if p.stale.timeout != 0 {
|
|
var ctx context.Context
|
|
|
|
p.stale.lock.Lock()
|
|
ctx, p.stale.cancel = context.WithCancel(context.Background())
|
|
p.stale.lock.Unlock()
|
|
|
|
go p.staler(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop will stop the process and set the order to "stop"
|
|
func (p *process) Stop(wait bool) error {
|
|
p.order.lock.Lock()
|
|
defer p.order.lock.Unlock()
|
|
|
|
if p.order.order == "stop" {
|
|
return nil
|
|
}
|
|
|
|
p.order.order = "stop"
|
|
|
|
err := p.stop(wait)
|
|
if err != nil {
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
"error": err,
|
|
}).Debug().Log("Stopping failed")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Kill will stop the process without changing the order such that it
|
|
// will restart automatically if enabled.
|
|
func (p *process) Kill(wait bool) error {
|
|
// If the process is currently not running, we don't need
|
|
// to do anything.
|
|
if !p.isRunning() {
|
|
return nil
|
|
}
|
|
|
|
p.order.lock.Lock()
|
|
defer p.order.lock.Unlock()
|
|
|
|
err := p.stop(wait)
|
|
|
|
return err
|
|
}
|
|
|
|
// stop will stop a process considering the current order and state.
|
|
func (p *process) stop(wait bool) error {
|
|
// If the process is currently not running, stop the restart timer
|
|
if !p.isRunning() {
|
|
p.unreconnect()
|
|
return nil
|
|
}
|
|
|
|
// If the process is already in the finishing state, don't do anything
|
|
if p.getState() == stateFinishing {
|
|
return nil
|
|
}
|
|
|
|
p.setState(stateFinishing)
|
|
|
|
p.logger.Info().Log("Stopping")
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
}).Debug().Log("Stopping")
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
if wait {
|
|
wg.Add(1)
|
|
|
|
if p.callbacks.onExit == nil {
|
|
p.callbacks.onExit = func() {
|
|
wg.Done()
|
|
p.callbacks.onExit = nil
|
|
}
|
|
} else {
|
|
cb := p.callbacks.onExit
|
|
p.callbacks.onExit = func() {
|
|
cb()
|
|
wg.Done()
|
|
p.callbacks.onExit = cb
|
|
}
|
|
}
|
|
}
|
|
|
|
var err error
|
|
if runtime.GOOS == "windows" {
|
|
// Windows doesn't know the SIGINT
|
|
err = p.cmd.Process.Kill()
|
|
} else {
|
|
// First try to kill the process gracefully. On a SIGINT ffmpeg will exit
|
|
// normally as if "q" has been pressed.
|
|
err = p.cmd.Process.Signal(os.Interrupt)
|
|
if err != nil {
|
|
// If sending the signal fails, try it the hard way, however this will highly
|
|
// likely also fail because it is simply a shortcut for Signal(Kill).
|
|
err = p.cmd.Process.Kill()
|
|
} else {
|
|
// Set up a timer to kill the process with SIGKILL in case SIGINT didn't have
|
|
// an effect.
|
|
p.killTimerLock.Lock()
|
|
p.killTimer = time.AfterFunc(5*time.Second, func() {
|
|
p.cmd.Process.Kill()
|
|
})
|
|
p.killTimerLock.Unlock()
|
|
}
|
|
}
|
|
|
|
if err == nil && wait {
|
|
wg.Wait()
|
|
}
|
|
|
|
if err != nil {
|
|
p.parser.Parse(err.Error())
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
"error": err,
|
|
}).Debug().Log("Stopping failed")
|
|
|
|
p.setState(stateFailed)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// reconnect will setup a timer to restart the process
|
|
func (p *process) reconnect() {
|
|
// If restarting a process is not enabled, don't do anything
|
|
if !p.reconn.enable {
|
|
return
|
|
}
|
|
|
|
// Stop a currently running timer
|
|
p.unreconnect()
|
|
|
|
p.logger.Info().Log("Scheduling restart in %s", p.reconn.delay)
|
|
|
|
p.reconn.lock.Lock()
|
|
defer p.reconn.lock.Unlock()
|
|
|
|
p.reconn.timer = time.AfterFunc(p.reconn.delay, func() {
|
|
p.order.lock.Lock()
|
|
defer p.order.lock.Unlock()
|
|
|
|
p.start()
|
|
})
|
|
}
|
|
|
|
// unreconnect will stop the restart timer
|
|
func (p *process) unreconnect() {
|
|
p.reconn.lock.Lock()
|
|
defer p.reconn.lock.Unlock()
|
|
|
|
if p.reconn.timer == nil {
|
|
return
|
|
}
|
|
|
|
p.reconn.timer.Stop()
|
|
p.reconn.timer = nil
|
|
}
|
|
|
|
// staler checks if the currently running process is stale, i.e. the reader
|
|
// didn't update the time of the last read. If the timeout is reached, the
|
|
// process will be stopped such that it can restart automatically afterwards.
|
|
func (p *process) staler(ctx context.Context) {
|
|
p.stale.lock.Lock()
|
|
p.stale.last = time.Now()
|
|
p.stale.lock.Unlock()
|
|
|
|
p.debuglogger.Debug().Log("Starting stale watcher")
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
p.debuglogger.Debug().Log("Stopping stale watcher")
|
|
return
|
|
case t := <-ticker.C:
|
|
p.stale.lock.Lock()
|
|
last := p.stale.last
|
|
timeout := p.stale.timeout
|
|
p.stale.lock.Unlock()
|
|
|
|
d := t.Sub(last)
|
|
if d.Seconds() > timeout.Seconds() {
|
|
p.logger.Info().Log("Stale timeout after %s (%.2f).", timeout, d.Seconds())
|
|
p.stop(false)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reader reads the output from the process line by line and gives
|
|
// each line to the parser. The parser returns a postive number to
|
|
// indicate progress. If the returned number is zero, then the time
|
|
// of the last progress will not be updated thus the stale timeout
|
|
// may kick in.
|
|
func (p *process) reader() {
|
|
scanner := bufio.NewScanner(p.stdout)
|
|
scanner.Split(scanLine)
|
|
|
|
// Reset the parser statistics
|
|
p.parser.ResetStats()
|
|
|
|
// Reset the parser logs
|
|
p.parser.ResetLog()
|
|
|
|
var n uint64 = 0
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
|
|
p.lastLine = line
|
|
|
|
// Parse the output line from ffmpeg
|
|
n = p.parser.Parse(line)
|
|
|
|
// Reset the stale progress timer only if the
|
|
// parser reports progress
|
|
if n != 0 {
|
|
p.stale.lock.Lock()
|
|
p.stale.last = time.Now()
|
|
p.stale.lock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// waiter waits for the process to finish. If enabled, the process will
|
|
// be scheduled for a restart.
|
|
func (p *process) waiter() {
|
|
if p.getState() == stateFinishing {
|
|
p.stop(false)
|
|
}
|
|
|
|
if err := p.cmd.Wait(); err != nil {
|
|
// The process exited abnormally, i.e. the return code is non-zero or a signal
|
|
// has been raised.
|
|
if exiterr, ok := err.(*exec.ExitError); ok {
|
|
// The process exited and the status can be examined
|
|
status := exiterr.Sys().(syscall.WaitStatus)
|
|
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"exited": status.Exited(),
|
|
"signaled": status.Signaled(),
|
|
"status": status.ExitStatus(),
|
|
"exit_code": exiterr.ExitCode(),
|
|
"exit_string": exiterr.String(),
|
|
"signal": status.Signal(),
|
|
}).Debug().Log("Exited")
|
|
|
|
if status.Exited() {
|
|
if status.ExitStatus() == 255 {
|
|
// If ffmpeg has been killed with a SIGINT, SIGTERM, etc., then it exited normally,
|
|
// i.e. closing all stream properly such that all written data is sane.
|
|
p.logger.Info().Log("Finished")
|
|
p.setState(stateFinished)
|
|
} else {
|
|
// The process exited by itself with a non-zero return code
|
|
p.logger.Info().Log("Failed")
|
|
p.setState(stateFailed)
|
|
}
|
|
} else if status.Signaled() {
|
|
// If ffmpeg has been killed the hard way, something went wrong and
|
|
// it can be assumed that any written data is not sane.
|
|
p.logger.Info().Log("Killed")
|
|
p.setState(stateKilled)
|
|
} else {
|
|
// The process exited because of something else (e.g. coredump, ...)
|
|
p.logger.Info().Log("Killed")
|
|
p.setState(stateKilled)
|
|
}
|
|
} else {
|
|
// Some other error regarding I/O triggered during Wait()
|
|
p.logger.Info().Log("Killed")
|
|
p.logger.WithError(err).Debug().Log("Killed")
|
|
p.setState(stateKilled)
|
|
}
|
|
} else {
|
|
// The process exited normally, i.e. the return code is zero and no signal
|
|
// has been raised
|
|
p.setState(stateFinished)
|
|
}
|
|
|
|
p.logger.Info().Log("Stopped")
|
|
p.debuglogger.WithField("log", p.parser.Log()).Debug().Log("Stopped")
|
|
|
|
p.limits.Stop()
|
|
|
|
// Stop the kill timer
|
|
p.killTimerLock.Lock()
|
|
if p.killTimer != nil {
|
|
p.killTimer.Stop()
|
|
p.killTimer = nil
|
|
}
|
|
p.killTimerLock.Unlock()
|
|
|
|
// Stop the stale progress timer
|
|
p.stale.lock.Lock()
|
|
if p.stale.cancel != nil {
|
|
p.stale.cancel()
|
|
p.stale.cancel = nil
|
|
}
|
|
p.stale.lock.Unlock()
|
|
|
|
// Reset the parser stats
|
|
p.parser.ResetStats()
|
|
|
|
// Call the onStop callback
|
|
if p.callbacks.onExit != nil {
|
|
go p.callbacks.onExit()
|
|
}
|
|
|
|
p.order.lock.Lock()
|
|
defer p.order.lock.Unlock()
|
|
|
|
p.debuglogger.WithFields(log.Fields{
|
|
"state": p.getStateString(),
|
|
"order": p.order.order,
|
|
}).Debug().Log("Waiting")
|
|
|
|
// Restart the process
|
|
if p.order.order == "start" {
|
|
p.reconnect()
|
|
}
|
|
}
|
|
|
|
// scanLine splits the data on \r, \n, or \r\n line endings
|
|
func scanLine(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
|
// Skip leading spaces.
|
|
start := 0
|
|
for width := 0; start < len(data); start += width {
|
|
var r rune
|
|
r, width = utf8.DecodeRune(data[start:])
|
|
if r != '\n' && r != '\r' {
|
|
break
|
|
}
|
|
|
|
}
|
|
|
|
// Scan until new line, marking end of line.
|
|
for width, i := 0, start; i < len(data); i += width {
|
|
var r rune
|
|
r, width = utf8.DecodeRune(data[i:])
|
|
if r == '\n' || r == '\r' {
|
|
return i + width, data[start:i], nil
|
|
}
|
|
|
|
}
|
|
|
|
// If we're at EOF, we have a final, non-empty, non-terminated word. Return it.
|
|
if atEOF && len(data) > start {
|
|
return len(data), data[start:], nil
|
|
}
|
|
|
|
// Request more data.
|
|
return start, nil, nil
|
|
}
|