diff --git a/internal/testhelper/sigintwait/sigintwait b/internal/testhelper/sigintwait/sigintwait new file mode 100755 index 00000000..005470f9 Binary files /dev/null and b/internal/testhelper/sigintwait/sigintwait differ diff --git a/internal/testhelper/sigintwait/sigintwait.go b/internal/testhelper/sigintwait/sigintwait.go new file mode 100644 index 00000000..f427a06a --- /dev/null +++ b/internal/testhelper/sigintwait/sigintwait.go @@ -0,0 +1,18 @@ +package main + +import ( + "os" + "os/signal" + "time" +) + +func main() { + // Wait for interrupt signal to gracefully shutdown the app + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + <-quit + + time.Sleep(3 * time.Second) + + os.Exit(255) +} diff --git a/process/process.go b/process/process.go index 013ff1d8..3c927e2b 100644 --- a/process/process.go +++ b/process/process.go @@ -33,11 +33,11 @@ type Process interface { // Stop stops the process and will not let it restart // automatically. - Stop() error + Stop(wait bool) error // Kill stops the process such that it will restart // automatically if it is defined to do so. - Kill() error + Kill(wait bool) error // IsRunning returns whether the process is currently // running or not. @@ -190,7 +190,7 @@ type process struct { debuglogger log.Logger callbacks struct { onStart func() - onStop func() + onExit func() onStateChange func(from, to string) } limits Limiter @@ -239,7 +239,7 @@ func New(config Config) (Process, error) { p.stale.timeout = config.StaleTimeout p.callbacks.onStart = config.OnStart - p.callbacks.onStop = config.OnExit + p.callbacks.onExit = config.OnExit p.callbacks.onStateChange = config.OnStateChange p.limits = NewLimiter(LimiterConfig{ @@ -251,7 +251,7 @@ func New(config Config) (Process, error) { "cpu": cpu, "memory": memory, }).Warn().Log("Stopping because limits are exceeded") - p.Kill() + p.Kill(false) }, }) @@ -523,7 +523,7 @@ func (p *process) start() error { } // Stop will stop the process and set the order to "stop" -func (p *process) Stop() error { +func (p *process) Stop(wait bool) error { p.order.lock.Lock() defer p.order.lock.Unlock() @@ -533,7 +533,7 @@ func (p *process) Stop() error { p.order.order = "stop" - err := p.stop() + err := p.stop(wait) if err != nil { p.debuglogger.WithFields(log.Fields{ "state": p.getStateString(), @@ -547,7 +547,7 @@ func (p *process) Stop() error { // Kill will stop the process without changing the order such that it // will restart automatically if enabled. -func (p *process) Kill() error { +func (p *process) Kill(wait bool) error { // If the process is currently not running, we don't need // to do anything. if !p.isRunning() { @@ -557,13 +557,13 @@ func (p *process) Kill() error { p.order.lock.Lock() defer p.order.lock.Unlock() - err := p.stop() + err := p.stop(wait) return err } // stop will stop a process considering the current order and state. -func (p *process) stop() error { +func (p *process) stop(wait bool) error { // If the process is currently not running, stop the restart timer if !p.isRunning() { p.unreconnect() @@ -583,6 +583,26 @@ func (p *process) stop() error { "order": p.order.order, }).Debug().Log("Stopping") + wg := sync.WaitGroup{} + + if wait { + wg.Add(1) + + if p.callbacks.onExit == nil { + p.callbacks.onExit = func() { + wg.Done() + p.callbacks.onExit = nil + } + } else { + cb := p.callbacks.onExit + p.callbacks.onExit = func() { + cb() + wg.Done() + p.callbacks.onExit = cb + } + } + } + var err error if runtime.GOOS == "windows" { // Windows doesn't know the SIGINT @@ -606,6 +626,10 @@ func (p *process) stop() error { } } + if err == nil && wait { + wg.Wait() + } + if err != nil { p.parser.Parse(err.Error()) p.debuglogger.WithFields(log.Fields{ @@ -683,7 +707,7 @@ func (p *process) staler(ctx context.Context) { d := t.Sub(last) if d.Seconds() > timeout.Seconds() { p.logger.Info().Log("Stale timeout after %s (%.2f).", timeout, d.Seconds()) - p.stop() + p.stop(false) return } } @@ -729,7 +753,7 @@ func (p *process) reader() { // be scheduled for a restart. func (p *process) waiter() { if p.getState() == stateFinishing { - p.stop() + p.stop(false) } if err := p.cmd.Wait(); err != nil { @@ -806,8 +830,8 @@ func (p *process) waiter() { p.parser.ResetStats() // Call the onStop callback - if p.callbacks.onStop != nil { - go p.callbacks.onStop() + if p.callbacks.onExit != nil { + go p.callbacks.onExit() } p.order.lock.Lock() diff --git a/process/process_test.go b/process/process_test.go index 5f999a92..3a9a1fa3 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -28,7 +28,7 @@ func TestProcess(t *testing.T) { require.Equal(t, "running", p.Status().State) - p.Stop() + p.Stop(false) time.Sleep(2 * time.Second) @@ -52,7 +52,7 @@ func TestReconnectProcess(t *testing.T) { require.Equal(t, "finished", p.Status().State) - p.Stop() + p.Stop(false) require.Equal(t, "finished", p.Status().State) } @@ -73,7 +73,7 @@ func TestStaleProcess(t *testing.T) { require.Equal(t, "killed", p.Status().State) - p.Stop() + p.Stop(false) require.Equal(t, "killed", p.Status().State) } @@ -94,7 +94,7 @@ func TestStaleReconnectProcess(t *testing.T) { require.Equal(t, "killed", p.Status().State) - p.Stop() + p.Stop(false) require.Equal(t, "killed", p.Status().State) } @@ -116,7 +116,7 @@ func TestNonExistingProcess(t *testing.T) { require.Equal(t, "failed", p.Status().State) - p.Stop() + p.Stop(false) require.Equal(t, "failed", p.Status().State) } @@ -138,7 +138,7 @@ func TestNonExistingReconnectProcess(t *testing.T) { require.Equal(t, "failed", p.Status().State) - p.Stop() + p.Stop(false) require.Equal(t, "failed", p.Status().State) } @@ -157,11 +157,35 @@ func TestProcessFailed(t *testing.T) { time.Sleep(5 * time.Second) - p.Stop() + p.Stop(false) require.Equal(t, "failed", p.Status().State) } +func TestFFmpegWaitStop(t *testing.T) { + binary, err := testhelper.BuildBinary("sigintwait", "../internal/testhelper") + require.NoError(t, err, "Failed to build helper program") + + p, _ := New(Config{ + Binary: binary, + Args: []string{}, + Reconnect: false, + StaleTimeout: 0, + OnExit: func() { + time.Sleep(2 * time.Second) + }, + }) + + err = p.Start() + require.NoError(t, err) + + time.Sleep(4 * time.Second) + + p.Stop(true) + + require.Equal(t, "finished", p.Status().State) +} + func TestFFmpegKill(t *testing.T) { binary, err := testhelper.BuildBinary("sigint", "../internal/testhelper") require.NoError(t, err, "Failed to build helper program") @@ -178,7 +202,7 @@ func TestFFmpegKill(t *testing.T) { time.Sleep(5 * time.Second) - p.Stop() + p.Stop(false) time.Sleep(3 * time.Second) @@ -201,7 +225,7 @@ func TestProcessForceKill(t *testing.T) { time.Sleep(3 * time.Second) - p.Stop() + p.Stop(false) time.Sleep(1 * time.Second) diff --git a/restream/restream.go b/restream/restream.go index 54eb9188..3793a787 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -204,7 +204,7 @@ func (r *restream) Stop() { // Start() they will get restarted. for id, t := range r.tasks { if t.ffmpeg != nil { - t.ffmpeg.Stop() + t.ffmpeg.Stop(true) } r.unsetCleanup(id) @@ -996,7 +996,7 @@ func (r *restream) stopProcess(id string) error { task.process.Order = "stop" - task.ffmpeg.Stop() + task.ffmpeg.Stop(true) r.nProc-- @@ -1024,7 +1024,7 @@ func (r *restream) restartProcess(id string) error { return nil } - task.ffmpeg.Kill() + task.ffmpeg.Kill(true) return nil }