diff --git a/LICENSE b/LICENSE index 4967ccf..fdaf7ea 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -The MIT License (MIT) +MIT License -Copyright (c) 2015 Kontera Technologies +Copyright (c) 2018 Kontera Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -19,4 +19,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - diff --git a/README.md b/README.md deleted file mode 100644 index 3fe99f8..0000000 --- a/README.md +++ /dev/null @@ -1,138 +0,0 @@ -# go-supervisor (V1) - -Small library for supervising child processes in `Go`, it exposes `Stdout`,`Stderr` and `Stdin` in the "Go way" using channels... - -## Example -`example.bash` print stuff to stdout and stderr and quit after 5 seconds... -```bash -#!/bin/bash -echo "STDOUT MESSAGE" -echo "STDERR MESSAGE" 1>&2 -sleep 5 -``` - -`supervisor-exapmle.go` spawn and supervise the bash program... -```go -package main - -import ( - "github.com/kontera-technologies/go-supervisor" - "log" -) - -func main() { - p, err := supervisor.Supervise("example.bash", supervisor.Options{ - Args: []string{}, // argumets to pass ( default is none ) - SpawnAttempts: 4, // attempts before giving up ( default 10 ) - AttemptsBeforeTerminate: 10, // on Stop() terminate process after X interrupt attempts (default is 10) - Debug: true, // print events to stdout ( default false ) - Dir: "/tmp", // run dir ( default is current dir ) - Id: "example", // will be added to every log print ( default is "NOID") - MaxSpawns: 10, // Max spawn limit ( default is 1 ) - StdoutIdleTime: 10, // stop worker if we didn't recived stdout message in X seconds ( default is 0 - disbaled ) - StderrIdleTime: 10, // stop worker if we didn't recived stderr message in X seconds ( default is 0 - disbaled ) - - // function that calculate sleep time based in the current sleep time - // useful for exponential backoff ( default is this function ) - DelayBetweenSpawns: func(currentSleep int) (sleepTime int) { - return currentSleep * 2 - }, - }) - - if err != nil { - log.Printf("failed to start process: %s", err) - return - } - - exit := make(chan bool) - done := p.NotifyDone(make(chan bool)) // process is done... - events := p.NotifyEvents(make(chan *supervisor.Event,1000)) - - // read stuff - go func() { - for { - select { - case msg := <-p.Stdout: - log.Printf("Recived STDOUT message %s", msg) - case msg := <-p.Stderr: - log.Printf("Recived STDERR message %s", msg) - case event := <- events: - switch event.Code { - case 22: - log.Printf("instance respawned, new pid is: %v...", p.Pid()) - default: - log.Printf("[%d] %s", event.Code, event.Message) - } - case <-done: // process quit - log.Printf("Closing loop we are done....") - exit <- true - return - } - } - }() - - <-exit -} -``` - -running the program should produce this output -``` -2015/02/12 13:02:14 Recived STDERR message &STDERR MESSAGE -2015/02/12 13:02:14 Recived STDOUT message &STDOUT MESSAGE -2015/02/12 13:02:20 [1] [example] can't read from stderr: EOF -2015/02/12 13:02:20 [1] [example] can't read from stdout: EOF -2015/02/12 13:02:20 [7] [example] instance crashed... -2015/02/12 13:02:20 [10] [example] going to sleep for 2 seconds... -2015/02/12 13:02:20 [20] [example] going to kill process.. -2015/02/12 13:02:20 [5] [example] stderr goroutine is done... -2015/02/12 13:02:20 [5] [example] stdout goroutine is done... -2015/02/12 13:02:20 [19] [example] closing stdin handler... -2015/02/12 13:02:20 [5] [example] stdin goroutine is done... -2015/02/12 13:02:20 [29] [example] entering sleep stage... -2015/02/12 13:02:22 [8] [example] starting instance... -2015/02/12 13:02:22 [0] [example] opening stdin handler... -2015/02/12 13:02:22 [0] [example] opening stdout handler... -2015/02/12 13:02:22 [0] [example] opening stderr handler... -2015/02/12 13:02:22 instance respawned, new pid is: 58312... -2015/02/12 13:02:22 Recived STDOUT message &STDOUT MESSAGE -2015/02/12 13:02:22 Recived STDERR message &STDERR MESSAGE -2015/02/12 13:02:27 [1] [example] can't read from stdout: EOF -2015/02/12 13:02:27 [1] [example] can't read from stderr: EOF -2015/02/12 13:02:27 [7] [example] instance crashed... -2015/02/12 13:02:27 [10] [example] going to sleep for 4 seconds... -2015/02/12 13:02:27 [20] [example] going to kill process.. -2015/02/12 13:02:27 [5] [example] stderr goroutine is done... -2015/02/12 13:02:27 [5] [example] stdout goroutine is done... -2015/02/12 13:02:27 [19] [example] closing stdin handler... -2015/02/12 13:02:27 [5] [example] stdin goroutine is done... -2015/02/12 13:02:27 [29] [example] entering sleep stage... -2015/02/12 13:02:31 [8] [example] starting instance... -2015/02/12 13:02:31 [0] [example] opening stdin handler... -2015/02/12 13:02:31 [0] [example] opening stdout handler... -2015/02/12 13:02:31 [0] [example] opening stderr handler... -2015/02/12 13:02:31 instance respawned, new pid is: 58323... -2015/02/12 13:02:31 Recived STDERR message &STDERR MESSAGE -2015/02/12 13:02:31 Recived STDOUT message &STDOUT MESSAGE -2015/02/12 13:02:36 [1] [example] can't read from stderr: EOF -2015/02/12 13:02:36 [1] [example] can't read from stdout: EOF -2015/02/12 13:02:36 [7] [example] instance crashed... -2015/02/12 13:02:36 [10] [example] going to sleep for 8 seconds... -2015/02/12 13:02:36 [20] [example] going to kill process.. -2015/02/12 13:02:36 [5] [example] stderr goroutine is done... -2015/02/12 13:02:36 [5] [example] stdout goroutine is done... -2015/02/12 13:02:36 [19] [example] closing stdin handler... -2015/02/12 13:02:36 [5] [example] stdin goroutine is done... -2015/02/12 13:02:36 [29] [example] entering sleep stage... -2015/02/12 13:02:45 [8] [example] starting instance... -2015/02/12 13:02:45 [0] [example] opening stdin handler... -2015/02/12 13:02:45 [0] [example] opening stdout handler... -2015/02/12 13:02:45 [0] [example] opening stderr handler... -2015/02/12 13:02:45 instance respawned, new pid is: 58338... -2015/02/12 13:02:45 Recived STDOUT message &STDOUT MESSAGE -2015/02/12 13:02:45 Recived STDERR message &STDERR MESSAGE -2015/02/12 13:02:50 [1] [example] can't read from stderr: EOF -2015/02/12 13:02:50 [1] [example] can't read from stdout: EOF -2015/02/12 13:02:50 [7] [example] instance crashed... -2015/02/12 13:02:50 [9] [example] giving up, instance failed to start... -2015/02/12 13:02:50 Closing loop we are done.... -``` diff --git a/go.mod b/go.mod index 1bb695d..370e527 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ -module github.com/kontera-technologies/go-supervisor +module github.com/kontera-technologies/go-supervisor/v2 go 1.14 + +require github.com/fortytw2/leaktest v1.3.0 diff --git a/v2/go.sum b/go.sum similarity index 100% rename from v2/go.sum rename to go.sum diff --git a/v2/parsers.go b/parsers.go similarity index 100% rename from v2/parsers.go rename to parsers.go diff --git a/v2/process-options.go b/process-options.go similarity index 100% rename from v2/process-options.go rename to process-options.go diff --git a/supervisor.go b/supervisor.go index 832554c..d560dd0 100644 --- a/supervisor.go +++ b/supervisor.go @@ -1,548 +1,688 @@ package supervisor import ( - "bufio" + "errors" "fmt" "io" "log" + "math" + "math/rand" "os" "os/exec" "sync" "sync/atomic" + "syscall" "time" ) -type ( - Event struct { - Code int - Message string - Time time.Time - } - - Process struct { - // communication - Stdout chan *[]byte - Stderr chan *[]byte - Stdin chan *[]byte - - // internal usage - closeHandlers func() bool - isdone int32 - stopping int32 - killed int32 - stopped int32 - - command string - options *Options - - // safe variables - mu sync.Mutex - cmd *exec.Cmd - pid int - needToNotifyDone bool - needToSendEvents bool - doneChannel chan bool - eventsChannel chan *Event - lastError error - } - - Options struct { - Args []string // argumets to pass - SpawnAttempts int // attempts before giving up - AttemptsBeforeTerminate int // on Stop() terminate process after X interrupt attempts - Debug bool // print events to stdout - Dir string // run dir - Id string // will be added to every log print - MaxSpawns int // Max spawn limit - StdoutIdleTime int // stop worker if we didn't recived stdout message in X seconds - StderrIdleTime int // stop worker if we didn't recived stderr message in X seconds - Env []string // see os.Cmd Env attribute - InheritEnv bool // take parent process environment variables - - DelayBetweenSpawns func(currentSleep int) (sleep int) // in seconds - } +const ( + defaultMaxSpawns = 1 + defaultMaxSpawnAttempts = 10 + defaultMaxSpawnBackOff = time.Minute + defaultMaxRespawnBackOff = time.Second + defaultMaxInterruptAttempts = 5 + defaultMaxTerminateAttempts = 5 + defaultNotifyEventTimeout = time.Millisecond + defaultParserBufferSize = 4096 + defaultIdleTimeout = 10 * time.Second + defaultTerminationGraceTimeout = time.Second + defaultEventTimeFormat = time.RFC3339Nano ) -// public +var EnsureClosedTimeout = time.Second -func Supervise(command string, opt ...Options) (p *Process, err error) { - options := &Options{} - if len(opt) > 0 { - options = &opt[0] +type Event struct { + Id string + Code string + Message string + Time time.Time + TimeFormat string +} + +func (ev Event) String() string { + if len(ev.Message) == 0 { + return fmt.Sprintf("[%30s][%s] %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code) } + return fmt.Sprintf("[%s][%30s] %s - %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code, ev.Message) +} - if options.Args == nil { - options.Args = make([]string, 0) +const ( + ready uint32 = 1 << iota + running + respawning + stopped + errored +) + +func phaseString(s uint32) string { + str := "unknown" + switch s { + case ready: + str = "ready" + case running: + str = "running" + case respawning: + str = "respawning" + case stopped: + str = "stopped" + case errored: + str = "errored" } + return fmt.Sprintf("%s(%d)", str, s) +} - if options.AttemptsBeforeTerminate == 0 { - options.AttemptsBeforeTerminate = 10 - } +type ProduceFn func() (*interface{}, bool) - if options.DelayBetweenSpawns == nil { - options.DelayBetweenSpawns = func(currentSleep int) (sleepTime int) { - if currentSleep > 500 { - sleepTime = 1 - } else { - sleepTime = currentSleep * 2 +type Process struct { + cmd *exec.Cmd + pid int64 + spawnCount int64 + stopC chan bool + ensureAllClosed func() + + phase uint32 + phaseMu sync.Mutex + + lastError atomic.Value + lastProcessState atomic.Value + + opts *ProcessOptions + + eventTimer *time.Timer + eventNotifierMu sync.Mutex + + doneNotifier chan bool + rand *rand.Rand + stopSleep chan bool +} + +func (p *Process) Input() chan<- []byte { + return p.opts.In +} + +// EmptyInput empties all messages from the Input channel. +func (p *Process) EmptyInput() { + for { + select { + case _, ok := <-p.opts.In: + if !ok { + return } - return sleepTime + default: + return } } +} - if options.Id == "" { - options.Id = "ID" - } +func (p *Process) Stdout() <-chan *interface{} { + return p.opts.Out +} - if options.SpawnAttempts == 0 { - options.SpawnAttempts = 10 - } +func (p *Process) Stderr() <-chan *interface{} { + return p.opts.Err +} - if options.MaxSpawns == 0 { - options.MaxSpawns = 1 +func (p *Process) LastProcessState() *os.ProcessState { + v := p.lastProcessState.Load() + if v == nil { + return nil } - - p = &Process{ - command: command, - options: options, - Stdout: make(chan *[]byte), - Stderr: make(chan *[]byte), - Stdin: make(chan *[]byte), - } - - if err := p.start(); err != nil { - return p, err - } - go p.watch() - return p, nil + return v.(*os.ProcessState) } func (p *Process) LastError() error { - p.mu.Lock() - defer p.mu.Unlock() - return p.lastError + v := p.lastError.Load() + if v == nil { + return nil + } + if x, ok := v.(error); ok { + return x + } + return nil } func (p *Process) Pid() int { - p.mu.Lock() - defer p.mu.Unlock() - return p.pid + return int(atomic.LoadInt64(&p.pid)) } -func (p *Process) NotifyEvents(c chan *Event) (channel chan *Event) { - p.mu.Lock() - defer p.mu.Unlock() - p.needToSendEvents = true - p.eventsChannel = c - return c -} - -func (p *Process) NotifyDone(c chan bool) (channel chan bool) { - p.mu.Lock() - defer p.mu.Unlock() - p.needToNotifyDone = true - p.doneChannel = c - return c -} - -func (p *Process) Running() bool { - if p.cmd == nil { - return false - } else if p.isKilled() { - return false - } else if p.cmd.ProcessState != nil { - return !p.cmd.ProcessState.Exited() - } else { - return true +func (p *Process) Start() (err error) { + p.phaseMu.Lock() + defer p.phaseMu.Unlock() + if p.phase != ready && p.phase != respawning { + return fmt.Errorf(`process phase is "%s" and not "ready" or "respawning"`, phaseString(p.phase)) } + + for attempt := 0; p.opts.MaxSpawnAttempts == -1 || attempt < p.opts.MaxSpawnAttempts; attempt++ { + err = p.unprotectedStart() + if err == nil { + p.phase = running + return + } + if !p.sleep(p.CalcBackOff(attempt, time.Second, p.opts.MaxSpawnBackOff)) { + break + } + } + + p.phase = errored + p.notifyDone() + return } -func (p *Process) Stop() { - if p.isDone(true) { - p.isStopping(true) - defer p.isStopping(false) - done := make(chan bool) - p.stop() +func (p *Process) unprotectedStart() error { + p.cmd = newCommand(p.opts) - go func() { - if p.needToNotifyDone { - p.doneChannel <- true + inPipe, err := p.cmd.StdinPipe() + if err != nil { + return fmt.Errorf("failed to fetch stdin pipe: %s", err) + } + + outPipe, err := p.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to fetch stdout pipe: %s", err) + } + + errPipe, err := p.cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to fetch stderr pipe: %s", err) + } + + if p.opts.OutputParser == nil { + return errors.New("missing output streamer") + } + + if p.opts.ErrorParser == nil { + return errors.New("missing error streamer") + } + + if err = p.cmd.Start(); err != nil { + return err + } + + atomic.AddInt64(&p.spawnCount, 1) + atomic.StoreInt64(&p.pid, int64(p.cmd.Process.Pid)) + + p.stopC = make(chan bool) + heartbeat, isMonitorClosed, isInClosed, isOutClosed, isErrClosed := make(chan bool), make(chan bool), make(chan bool), make(chan bool), make(chan bool) + + go chanToWriter(p.opts.In, inPipe, p.notifyEvent, isInClosed, p.stopC, heartbeat) + go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat) + go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil) + + go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Restart, p.notifyEvent) + + var ensureOnce sync.Once + p.ensureAllClosed = func() { + ensureOnce.Do(func() { + if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil { + log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr) } - <-time.After(time.Second) - p.closeChannels() - done <- true - }() + if cErr := ensureClosed("stdout", isOutClosed, outPipe.Close); cErr != nil { + log.Printf("[%s] Possible memory leak, stdout go-routine not closed. Error: %s", p.opts.Id, cErr) + } + if cErr := ensureClosed("stderr", isErrClosed, errPipe.Close); cErr != nil { + log.Printf("[%s] Possible memory leak, stderr go-routine not closed. Error: %s", p.opts.Id, cErr) + } + if cErr := ensureClosed("heartbeat monitor", isMonitorClosed, nil); cErr != nil { + log.Printf("[%s] Possible memory leak, monitoring go-routine not closed. Error: %s", p.opts.Id, cErr) + } + }) + } - <-done + go p.waitAndNotify() + + p.notifyEvent("ProcessStart", fmt.Sprintf("pid: %d", p.Pid())) + return nil +} + +func chanToWriter(in <-chan []byte, out io.Writer, notifyEvent func(string, ...interface{}), closeWhenDone, stopC, heartbeat chan bool) { + defer close(closeWhenDone) + for { + select { + case <-stopC: + return + case raw, chanOpen := <-in: + if !chanOpen { + notifyEvent("Error", "Input channel closed unexpectedly.") + return + } + + _, err := out.Write(raw) + if err != nil { + notifyEvent("WriteError", err.Error()) + return + } + heartbeat <- true + } } } -func (p *Process) IsDone() bool { - return p.isDone() && !p.isStopping() -} +func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, stopC, heartbeat chan bool) { + defer close(closeWhenDone) -// private -func (p *Process) closeChannels() { - //close(p.Stdin) - close(p.Stderr) - close(p.Stdout) - if p.needToSendEvents { - close(p.eventsChannel) + cleanPipe := func() { + for { + if res, eof := producer(); res != nil { + out <- res + } else if eof { + return + } + } } - if p.needToNotifyDone { - close(p.doneChannel) + + for { + if res,eof := producer(); res != nil { + select { + case out <- res: + select { + case heartbeat <- true: + default: + } + case <-stopC: + cleanPipe() + return + } + } else if eof { + return + } + + select { + case <-stopC: + cleanPipe() + return + default: + } } } -func (p *Process) start() error { - p.mu.Lock() - defer p.mu.Unlock() +// monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a +// positive heartbeat, or if a negative heartbeat is passed. +// +// isMonitorClosed will be closed when this function exists. +// +// When stopC closes, this function will exit immediately. +func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) { + defer close(isMonitorClosed) + t := time.NewTimer(idleTimeout) + defer t.Stop() - if p.isDone() { + for alive := true; alive; { + select { + case <-stopC: + notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.") + return + + case alive = <-heartbeat: + if alive { + if !t.Stop() { + <-t.C + } + t.Reset(idleTimeout) + } else { + notifyEvent("NegativeHeartbeat", "Stopping process.") + } + + case <-t.C: + alive = false + notifyEvent("MissingHeartbeat", "Stopping process.") + } + } + + if err := stop(); err != nil { + notifyEvent("StopError", err.Error()) + } +} + +func (p *Process) waitAndNotify() { + state, waitErr := p.cmd.Process.Wait() + + p.phaseMu.Lock() + automaticUnlock := true + defer func() { + if automaticUnlock { + p.phaseMu.Unlock() + } + }() + + p.lastProcessState.Store(state) + + if p.phase == stopped { + return + } else if p.phase != running && p.phase != respawning { + p.notifyEvent("RespawnError", fmt.Sprintf(`process phase is "%s" and not "running" or "respawning"`, phaseString(p.phase))) + } + + p.phase = stopped + + if waitErr != nil { + p.notifyEvent("WaitError", fmt.Sprintf("os.Process.Wait returned an error - %s", waitErr.Error())) + p.phase = errored + return + } + + if state.Success() { + p.notifyEvent("ProcessDone", state.String()) + } else { + p.notifyEvent("ProcessCrashed", state.String()) + p.lastError.Store(errors.New(state.String())) + } + + // Cleanup resources + select { + case <-p.stopC: + default: + close(p.stopC) + } + p.ensureAllClosed() + + if state.Success() { + p.notifyEvent("ProcessStopped", "Process existed successfully.") + p.notifyDone() + return + } + + if !p.canRespawn() { + p.notifyEvent("RespawnError", "Max number of respawns reached.") + p.notifyDone() + return + } + + sleepFor := p.CalcBackOff(int(atomic.LoadInt64(&p.spawnCount))-1, time.Second, p.opts.MaxRespawnBackOff) + p.notifyEvent("Sleep", fmt.Sprintf("Sleeping for %s before respwaning instance.", sleepFor.String())) + if !p.sleep(sleepFor) { + return + } + + p.phase = respawning + p.notifyEvent("ProcessRespawn", "Trying to respawn instance.") + + automaticUnlock = false + p.phaseMu.Unlock() + err := p.Start() + + if err != nil { + p.notifyEvent("RespawnError", err.Error()) + } +} + +func (p *Process) sleep(d time.Duration) bool { + t := time.NewTimer(d) + select { + case <-t.C: + return true + case <-p.stopSleep: + t.Stop() + return false + } +} + +func (p *Process) canRespawn() bool { + return p.opts.MaxSpawns == -1 || atomic.LoadInt64(&p.spawnCount) < int64(p.opts.MaxSpawns) +} + +// Stop tries to stop the process. +// Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error +// to be returned). +// +// This function will call notifyDone when it is done. +// +// If it fails to stop the process, the phase will change to errored and an error will be returned. +// Otherwise, the phase changes to stopped. +func (p *Process) Stop() error { + select { + case <-p.stopSleep: + default: + close(p.stopSleep) + } + p.phaseMu.Lock() + defer p.phaseMu.Unlock() + defer p.notifyDone() + err := p.unprotectedStop() + if err != nil { + p.phase = errored + return err + } + p.phase = stopped + return nil +} + +func (p *Process) unprotectedStop() (err error) { + p.notifyEvent("ProcessStop") + + select { + case <-p.stopC: + default: + close(p.stopC) + } + defer p.ensureAllClosed() + + if !p.IsAlive() { return nil } - var err error - - p.cmd = exec.Command(p.command, p.options.Args...) - env := make([]string, 0) - - if p.options.InheritEnv { - env = os.Environ() - } - - if p.options.Env != nil { - p.cmd.Env = append(env, p.options.Env...) - } - - if p.options.Dir != "" { - p.cmd.Dir = p.options.Dir - } - - stdout, stderr, stdin, err := p.openPipes() - if err != nil { - return err - } - - p.isStopped(false) - p.isKilled(false) - - closeIn := p.handleIn(stdin, p.Stdin) - closeOut := p.handleOut("stdout", stdout, p.Stdout, p.options.StdoutIdleTime) - closeErr := p.handleOut("stderr", stderr, p.Stderr, p.options.StderrIdleTime) - - p.closeHandlers = func() bool { - for k, v := range map[string]chan bool{ - "stdin": closeIn, - "stdout": closeOut, - "stderr": closeErr, - } { - p.event(5, "closing %s handler...", k) - select { - case v <- true: - <-v - case <-time.After(time.Second): - p.event(6, "%s is still open... memory leak...", k) - } + attempt := 0 + for ; attempt < p.opts.MaxInterruptAttempts; attempt++ { + p.notifyEvent("Interrupt", fmt.Sprintf("sending intterupt signal to %d - attempt #%d", -p.Pid(), attempt+1)) + err = p.interrupt() + if err == nil { + return nil } - return false + } + if p.opts.MaxInterruptAttempts > 0 { + p.notifyEvent("InterruptError", fmt.Sprintf("interrupt signal failed - %d attempts", attempt)) } - p.event(8, "starting instance...") - err = p.cmd.Start() + err = nil + for attempt = 0; attempt < p.opts.MaxTerminateAttempts; attempt++ { + p.notifyEvent("Terminate", fmt.Sprintf("sending terminate signal to %d - attempt #%d", -p.Pid(), attempt+1)) + err = p.terminate() + if err == nil { + return nil + } + } + if p.opts.MaxTerminateAttempts > 0 { + p.notifyEvent("TerminateError", fmt.Sprintf("terminate signal failed - %d attempts", attempt)) + } + + p.notifyEvent("Killing", fmt.Sprintf("sending kill signal to %d", p.Pid())) + err = syscall.Kill(-p.Pid(), syscall.SIGKILL) + if err != nil { + p.notifyEvent("KillError", err.Error()) return err } - p.pid = p.cmd.Process.Pid - - p.event(22, "instance ready...") - return nil } -// run in its own goroutine -func (p *Process) watch() { - attempt := 1 - currentSleep := 1 - numSpawns := 1 - for { - start := time.Now() - p.lastError = p.cmd.Wait() - time.Sleep(time.Second) - if p.isDone() { - break - } - - if p.lastError == nil { - p.event(12, "instance exited with exit code 0") - } else { - p.event(7, "instance crashed: %q", p.lastError.Error()) - } - - if numSpawns >= p.options.MaxSpawns { - p.event(13, "reached max spawns...") - p.Stop() // cleanup - break - } else { - numSpawns += 1 - } - - if (time.Now().Sub(start).Seconds()) > 60 { - attempt = 1 - currentSleep = 1 - } else { - attempt += 1 - currentSleep = p.options.DelayBetweenSpawns(currentSleep) - } - if attempt > p.options.SpawnAttempts { - p.event(9, "giving up, instance failed to start...") - p.Stop() // shutting down instance and send done notification... - break - } - - p.event(10, "going to sleep for %d seconds...", currentSleep) - p.stop() // cleanup - p.event(29, "entering sleep stage...") - - milliseconds := currentSleep * 1000 - waited := 0 - for waited < milliseconds { - time.Sleep(10 * time.Millisecond) - waited += 10 - if p.isDone() { - break - } - } - - p.start() +// Restart tries to stop and start the process. +// Entering this function will change the phase from running to respawning (any other initial phase will cause an error +// to be returned). +// +// If it fails to stop the process the phase will change to errored and notifyDone will be called. +// If there are no more allowed respawns the phase will change to stopped and notifyDone will be called. +// +// This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it +// fails) +// If Start fails, notifyDone will be called. +func (p *Process) Restart() error { + p.phaseMu.Lock() + defer p.phaseMu.Unlock() + if p.phase != running { + return fmt.Errorf(`process phase is "%s" and not "running"`, phaseString(p.phase)) } - p.event(11, "watch daemon is off...") + p.phase = respawning + err := p.unprotectedStop() + + if err != nil { + p.phase = errored + p.notifyDone() + return err + } + + if !p.canRespawn() { + p.phase = stopped + p.notifyDone() + return errors.New("max number of respawns reached") + } + + return nil } -func (p *Process) Restart() { - p.stop() +func (p *Process) IsAlive() bool { + err := syscall.Kill(-p.Pid(), syscall.Signal(0)) + if errno, ok := err.(syscall.Errno); ok { + return errno != syscall.ESRCH + } + return true } -func (p *Process) stop() { - p.mu.Lock() - defer p.mu.Unlock() +func (p *Process) IsDone() bool { + select { + case <-p.doneNotifier: + return true + default: + return false + } +} - if p.isStopped() { +func (p *Process) DoneNotifier() <-chan bool { + return p.doneNotifier +} + +// notifyDone closes the DoneNotifier channel (if it isn't already closed). +func (p *Process) notifyDone() { + select { + case <-p.doneNotifier: + default: + close(p.doneNotifier) + } +} + +// EventNotifier returns the eventNotifier channel (and creates one if none exists). +// +// It is protected by Process.eventNotifierMu. +func (p *Process) EventNotifier() chan Event { + p.eventNotifierMu.Lock() + defer p.eventNotifierMu.Unlock() + + if p.opts.EventNotifier == nil { + p.opts.EventNotifier = make(chan Event) + } + + return p.opts.EventNotifier +} + +// notifyEvent creates and passes an event struct from an event code string and an optional event message. +// fmt.Sprint will be called on the message slice. +// +// It is protected by Process.eventNotifierMu. +func (p *Process) notifyEvent(code string, message ...interface{}) { + // Create the event before calling Lock. + ev := Event{ + Id: p.opts.Id, + Code: code, + Message: fmt.Sprint(message...), + Time: time.Now(), + TimeFormat: p.opts.EventTimeFormat, + } + + // Log the event before calling Lock. + if p.opts.Debug { + fmt.Println(ev) + } + + p.eventNotifierMu.Lock() + defer p.eventNotifierMu.Unlock() + + if notifier := p.opts.EventNotifier; notifier != nil { + if p.eventTimer == nil { + p.eventTimer = time.NewTimer(p.opts.NotifyEventTimeout) + } else { + p.eventTimer.Reset(p.opts.NotifyEventTimeout) + } + + select { + case notifier <- ev: + if !p.eventTimer.Stop() { + <-p.eventTimer.C + } + case <-p.eventTimer.C: + log.Printf("Failed to sent %#v. EventNotifier is set, but isn't accepting any events.", ev) + } + } +} + +func (p *Process) interrupt() (err error) { + err = syscall.Kill(-p.Pid(), syscall.SIGINT) + if err != nil { return } - defer p.isStopped(true) - - p.event(20, "going to kill process..") - - attempts := 0 - - for p.Running() && p.cmd != nil && p.cmd.Process != nil { - attempts++ - if attempts < p.options.AttemptsBeforeTerminate { - p.event(3, "sending interrupt to process - attempt %d", attempts) - p.cmd.Process.Signal(os.Interrupt) - time.Sleep(time.Second) - } else { - p.event(4, "refuse to quit, kill it (pid %d)...", p.cmd.Process.Pid) - p.cmd.Process.Kill() - p.cmd.Process.Signal(os.Kill) - p.isKilled(true) - time.Sleep(time.Second) - break - } + time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. + if p.IsAlive() { + err = errors.New("interrupt signal failed") } - - p.event(98, "closing handlers...") - p.closeHandlers() + return } -// runs in its own goroutine -func (p *Process) handleIn(in io.WriteCloser, channel chan *[]byte) chan bool { - p.event(0, "opening stdin handler...") - c := make(chan bool) - - go func() { - defer p.event(0, "stdin handler is now closed...") - for { - select { - case message := <-channel: - if _, err := in.Write(append(*message, '\n')); err != nil { - select { - case <-c: - c <- true - return - } - } - case <-c: - c <- true - return - } - } - }() - - return c -} - -func (p *Process) getHeartbeater(name string, seconds int) chan bool { - c := make(chan bool, 1000) - - go func() { - for { - t := time.NewTimer(time.Second * time.Duration(seconds)) - - select { - case msg := <-c: - if !msg { - return - } - case <-t.C: - p.event(15, "%s - reached timeout, restarting instance...", name) - p.stop() - return - } - - t.Stop() - } - }() - - return c -} - -// runs in its own goroutine -func (p *Process) handleOut(name string, out *bufio.Reader, channel chan *[]byte, heartbeat int) chan bool { - p.event(0, "opening %v handler...", name) - - c := make(chan bool) - - go func() { - defer p.event(0, "%v handler is now closed...", name) - var heartbeatChannel chan bool - shouldHeartbeat := heartbeat > 0 - - if shouldHeartbeat { - heartbeatChannel = p.getHeartbeater(name, heartbeat) - } - beat := func(k bool) { - if shouldHeartbeat { - heartbeatChannel <- k - } - } - - defer func() { - err := recover() - - if p != nil { - defer beat(false) - if err != nil { - p.event(90, "%s handler: %s , recovering...", name, err) - if !p.isDone() { - select { - case <-c: - c <- true - return - } - } - } - } - }() - - for { - select { - case <-c: - c <- true - return - default: - line, err := out.ReadBytes('\n') - beat(true) - - if err != nil { - p.event(1, "can't read from %s: %s", name, err) - select { - case <-c: - c <- true - return - } - } - - select { - case channel <- &line: - case <-c: - c <- true - return - } - } - } - - }() - - return c -} - -func (p *Process) event(code int, message string, format ...interface{}) { - msg := &Event{ - Message: fmt.Sprintf(("[%s] " + message), append([]interface{}{p.options.Id}, format...)...), - Time: time.Now(), - Code: code, - } - - if p.options.Debug { - log.Printf("%s", msg.Message) - } - - if p.needToSendEvents && !p.isDone() { - p.eventsChannel <- msg - } -} - -func (p *Process) openPipes() (stdout, stderr *bufio.Reader, stdin io.WriteCloser, err error) { - stdin, err = p.cmd.StdinPipe() - +func (p *Process) terminate() (err error) { + err = syscall.Kill(-p.Pid(), syscall.SIGTERM) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get stdin pipe: %s", err) + return } - out, err := p.cmd.StdoutPipe() - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get stdout pipe: %s", err) + time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. + if p.IsAlive() { + err = errors.New("terminate signal failed") } - stdout = bufio.NewReader(out) + return +} - er, err := p.cmd.StderrPipe() - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get stderr pipe: %s", err) +func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration { + randBuffer := (step / 1000) * time.Duration(p.rand.Intn(1000)) + backOff := randBuffer + step*time.Duration(math.Exp2(float64(attempt))) + if backOff > maxBackOff { + return maxBackOff } - stderr = bufio.NewReader(er) - - return stdout, stderr, stdin, nil + return backOff } -func (p *Process) isKilled(killed ...bool) bool { - return isSomething(&p.killed, killed) +func NewProcess(opts ProcessOptions) *Process { + return &Process{ + phase: ready, + opts: initProcessOptions(opts), + doneNotifier: make(chan bool), + stopSleep: make(chan bool), + rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + } } -func (p *Process) isDone(done ...bool) bool { - return isSomething(&p.isdone, done) +// newCommand creates a new exec.Cmd struct. +func newCommand(opts *ProcessOptions) *exec.Cmd { + cmd := exec.Command(opts.Name, opts.Args...) + cmd.Env = opts.Env + cmd.Dir = opts.Dir + cmd.ExtraFiles = opts.ExtraFiles + cmd.SysProcAttr = opts.SysProcAttr + return cmd } -func (p *Process) isStopped(stop ...bool) bool { - return isSomething(&p.stopped, stop) -} +// todo: test if panics on double-close +func ensureClosed(name string, isStopped chan bool, forceClose func() error) error { + t := time.NewTimer(EnsureClosedTimeout) + defer t.Stop() -func (p *Process) isStopping(stopping ...bool) bool { - return isSomething(&p.stopping, stopping) -} - -func isSomething(n *int32, o []bool) bool { - if len(o) > 0 { - if o[0] { - return atomic.CompareAndSwapInt32(n, 0, 1) - } else { - return atomic.CompareAndSwapInt32(n, 1, 0) + select { + case <-isStopped: + return nil + case <-t.C: + if forceClose == nil { + return fmt.Errorf("stopped waiting for %s after %s", name, EnsureClosedTimeout) + } + if err := forceClose(); err != nil { + return fmt.Errorf("%s - %s", name, err.Error()) } - } else { - return atomic.LoadInt32(n) == 1 - } + return ensureClosed(name, isStopped, nil) + } } diff --git a/v2/supervisor_test.go b/supervisor_test.go similarity index 100% rename from v2/supervisor_test.go rename to supervisor_test.go diff --git a/v2/testdata/echo.sh b/testdata/echo.sh similarity index 100% rename from v2/testdata/echo.sh rename to testdata/echo.sh diff --git a/v2/testdata/endless.sh b/testdata/endless.sh similarity index 100% rename from v2/testdata/endless.sh rename to testdata/endless.sh diff --git a/v2/testdata/endless_jsons.sh b/testdata/endless_jsons.sh similarity index 100% rename from v2/testdata/endless_jsons.sh rename to testdata/endless_jsons.sh diff --git a/v2/testdata/error.sh b/testdata/error.sh similarity index 100% rename from v2/testdata/error.sh rename to testdata/error.sh diff --git a/v2/testdata/greet_with_error.sh b/testdata/greet_with_error.sh similarity index 100% rename from v2/testdata/greet_with_error.sh rename to testdata/greet_with_error.sh diff --git a/v2/testdata/incrementer.sh b/testdata/incrementer.sh similarity index 100% rename from v2/testdata/incrementer.sh rename to testdata/incrementer.sh diff --git a/v2/testdata/ipsum.txt b/testdata/ipsum.txt similarity index 100% rename from v2/testdata/ipsum.txt rename to testdata/ipsum.txt diff --git a/v2/testdata/ipsum.zlib b/testdata/ipsum.zlib similarity index 100% rename from v2/testdata/ipsum.zlib rename to testdata/ipsum.zlib diff --git a/v2/testdata/parent.sh b/testdata/parent.sh similarity index 100% rename from v2/testdata/parent.sh rename to testdata/parent.sh diff --git a/v2/testdata/producer.sh b/testdata/producer.sh similarity index 100% rename from v2/testdata/producer.sh rename to testdata/producer.sh diff --git a/v2/testdata/trap.sh b/testdata/trap.sh similarity index 100% rename from v2/testdata/trap.sh rename to testdata/trap.sh diff --git a/v2/testdata/zlib.sh b/testdata/zlib.sh similarity index 100% rename from v2/testdata/zlib.sh rename to testdata/zlib.sh diff --git a/v2/LICENSE b/v2/LICENSE deleted file mode 100644 index fdaf7ea..0000000 --- a/v2/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018 Kontera - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/v2/go.mod b/v2/go.mod deleted file mode 100644 index 370e527..0000000 --- a/v2/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module github.com/kontera-technologies/go-supervisor/v2 - -go 1.14 - -require github.com/fortytw2/leaktest v1.3.0 diff --git a/v2/supervisor.go b/v2/supervisor.go deleted file mode 100644 index d560dd0..0000000 --- a/v2/supervisor.go +++ /dev/null @@ -1,688 +0,0 @@ -package supervisor - -import ( - "errors" - "fmt" - "io" - "log" - "math" - "math/rand" - "os" - "os/exec" - "sync" - "sync/atomic" - "syscall" - "time" -) - -const ( - defaultMaxSpawns = 1 - defaultMaxSpawnAttempts = 10 - defaultMaxSpawnBackOff = time.Minute - defaultMaxRespawnBackOff = time.Second - defaultMaxInterruptAttempts = 5 - defaultMaxTerminateAttempts = 5 - defaultNotifyEventTimeout = time.Millisecond - defaultParserBufferSize = 4096 - defaultIdleTimeout = 10 * time.Second - defaultTerminationGraceTimeout = time.Second - defaultEventTimeFormat = time.RFC3339Nano -) - -var EnsureClosedTimeout = time.Second - -type Event struct { - Id string - Code string - Message string - Time time.Time - TimeFormat string -} - -func (ev Event) String() string { - if len(ev.Message) == 0 { - return fmt.Sprintf("[%30s][%s] %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code) - } - return fmt.Sprintf("[%s][%30s] %s - %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code, ev.Message) -} - -const ( - ready uint32 = 1 << iota - running - respawning - stopped - errored -) - -func phaseString(s uint32) string { - str := "unknown" - switch s { - case ready: - str = "ready" - case running: - str = "running" - case respawning: - str = "respawning" - case stopped: - str = "stopped" - case errored: - str = "errored" - } - return fmt.Sprintf("%s(%d)", str, s) -} - -type ProduceFn func() (*interface{}, bool) - -type Process struct { - cmd *exec.Cmd - pid int64 - spawnCount int64 - stopC chan bool - ensureAllClosed func() - - phase uint32 - phaseMu sync.Mutex - - lastError atomic.Value - lastProcessState atomic.Value - - opts *ProcessOptions - - eventTimer *time.Timer - eventNotifierMu sync.Mutex - - doneNotifier chan bool - rand *rand.Rand - stopSleep chan bool -} - -func (p *Process) Input() chan<- []byte { - return p.opts.In -} - -// EmptyInput empties all messages from the Input channel. -func (p *Process) EmptyInput() { - for { - select { - case _, ok := <-p.opts.In: - if !ok { - return - } - default: - return - } - } -} - -func (p *Process) Stdout() <-chan *interface{} { - return p.opts.Out -} - -func (p *Process) Stderr() <-chan *interface{} { - return p.opts.Err -} - -func (p *Process) LastProcessState() *os.ProcessState { - v := p.lastProcessState.Load() - if v == nil { - return nil - } - return v.(*os.ProcessState) -} - -func (p *Process) LastError() error { - v := p.lastError.Load() - if v == nil { - return nil - } - if x, ok := v.(error); ok { - return x - } - return nil -} - -func (p *Process) Pid() int { - return int(atomic.LoadInt64(&p.pid)) -} - -func (p *Process) Start() (err error) { - p.phaseMu.Lock() - defer p.phaseMu.Unlock() - if p.phase != ready && p.phase != respawning { - return fmt.Errorf(`process phase is "%s" and not "ready" or "respawning"`, phaseString(p.phase)) - } - - for attempt := 0; p.opts.MaxSpawnAttempts == -1 || attempt < p.opts.MaxSpawnAttempts; attempt++ { - err = p.unprotectedStart() - if err == nil { - p.phase = running - return - } - if !p.sleep(p.CalcBackOff(attempt, time.Second, p.opts.MaxSpawnBackOff)) { - break - } - } - - p.phase = errored - p.notifyDone() - return -} - -func (p *Process) unprotectedStart() error { - p.cmd = newCommand(p.opts) - - inPipe, err := p.cmd.StdinPipe() - if err != nil { - return fmt.Errorf("failed to fetch stdin pipe: %s", err) - } - - outPipe, err := p.cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("failed to fetch stdout pipe: %s", err) - } - - errPipe, err := p.cmd.StderrPipe() - if err != nil { - return fmt.Errorf("failed to fetch stderr pipe: %s", err) - } - - if p.opts.OutputParser == nil { - return errors.New("missing output streamer") - } - - if p.opts.ErrorParser == nil { - return errors.New("missing error streamer") - } - - if err = p.cmd.Start(); err != nil { - return err - } - - atomic.AddInt64(&p.spawnCount, 1) - atomic.StoreInt64(&p.pid, int64(p.cmd.Process.Pid)) - - p.stopC = make(chan bool) - heartbeat, isMonitorClosed, isInClosed, isOutClosed, isErrClosed := make(chan bool), make(chan bool), make(chan bool), make(chan bool), make(chan bool) - - go chanToWriter(p.opts.In, inPipe, p.notifyEvent, isInClosed, p.stopC, heartbeat) - go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat) - go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil) - - go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Restart, p.notifyEvent) - - var ensureOnce sync.Once - p.ensureAllClosed = func() { - ensureOnce.Do(func() { - if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil { - log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr) - } - if cErr := ensureClosed("stdout", isOutClosed, outPipe.Close); cErr != nil { - log.Printf("[%s] Possible memory leak, stdout go-routine not closed. Error: %s", p.opts.Id, cErr) - } - if cErr := ensureClosed("stderr", isErrClosed, errPipe.Close); cErr != nil { - log.Printf("[%s] Possible memory leak, stderr go-routine not closed. Error: %s", p.opts.Id, cErr) - } - if cErr := ensureClosed("heartbeat monitor", isMonitorClosed, nil); cErr != nil { - log.Printf("[%s] Possible memory leak, monitoring go-routine not closed. Error: %s", p.opts.Id, cErr) - } - }) - } - - go p.waitAndNotify() - - p.notifyEvent("ProcessStart", fmt.Sprintf("pid: %d", p.Pid())) - return nil -} - -func chanToWriter(in <-chan []byte, out io.Writer, notifyEvent func(string, ...interface{}), closeWhenDone, stopC, heartbeat chan bool) { - defer close(closeWhenDone) - for { - select { - case <-stopC: - return - case raw, chanOpen := <-in: - if !chanOpen { - notifyEvent("Error", "Input channel closed unexpectedly.") - return - } - - _, err := out.Write(raw) - if err != nil { - notifyEvent("WriteError", err.Error()) - return - } - heartbeat <- true - } - } -} - -func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, stopC, heartbeat chan bool) { - defer close(closeWhenDone) - - cleanPipe := func() { - for { - if res, eof := producer(); res != nil { - out <- res - } else if eof { - return - } - } - } - - for { - if res,eof := producer(); res != nil { - select { - case out <- res: - select { - case heartbeat <- true: - default: - } - case <-stopC: - cleanPipe() - return - } - } else if eof { - return - } - - select { - case <-stopC: - cleanPipe() - return - default: - } - } -} - -// monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a -// positive heartbeat, or if a negative heartbeat is passed. -// -// isMonitorClosed will be closed when this function exists. -// -// When stopC closes, this function will exit immediately. -func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) { - defer close(isMonitorClosed) - t := time.NewTimer(idleTimeout) - defer t.Stop() - - for alive := true; alive; { - select { - case <-stopC: - notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.") - return - - case alive = <-heartbeat: - if alive { - if !t.Stop() { - <-t.C - } - t.Reset(idleTimeout) - } else { - notifyEvent("NegativeHeartbeat", "Stopping process.") - } - - case <-t.C: - alive = false - notifyEvent("MissingHeartbeat", "Stopping process.") - } - } - - if err := stop(); err != nil { - notifyEvent("StopError", err.Error()) - } -} - -func (p *Process) waitAndNotify() { - state, waitErr := p.cmd.Process.Wait() - - p.phaseMu.Lock() - automaticUnlock := true - defer func() { - if automaticUnlock { - p.phaseMu.Unlock() - } - }() - - p.lastProcessState.Store(state) - - if p.phase == stopped { - return - } else if p.phase != running && p.phase != respawning { - p.notifyEvent("RespawnError", fmt.Sprintf(`process phase is "%s" and not "running" or "respawning"`, phaseString(p.phase))) - } - - p.phase = stopped - - if waitErr != nil { - p.notifyEvent("WaitError", fmt.Sprintf("os.Process.Wait returned an error - %s", waitErr.Error())) - p.phase = errored - return - } - - if state.Success() { - p.notifyEvent("ProcessDone", state.String()) - } else { - p.notifyEvent("ProcessCrashed", state.String()) - p.lastError.Store(errors.New(state.String())) - } - - // Cleanup resources - select { - case <-p.stopC: - default: - close(p.stopC) - } - p.ensureAllClosed() - - if state.Success() { - p.notifyEvent("ProcessStopped", "Process existed successfully.") - p.notifyDone() - return - } - - if !p.canRespawn() { - p.notifyEvent("RespawnError", "Max number of respawns reached.") - p.notifyDone() - return - } - - sleepFor := p.CalcBackOff(int(atomic.LoadInt64(&p.spawnCount))-1, time.Second, p.opts.MaxRespawnBackOff) - p.notifyEvent("Sleep", fmt.Sprintf("Sleeping for %s before respwaning instance.", sleepFor.String())) - if !p.sleep(sleepFor) { - return - } - - p.phase = respawning - p.notifyEvent("ProcessRespawn", "Trying to respawn instance.") - - automaticUnlock = false - p.phaseMu.Unlock() - err := p.Start() - - if err != nil { - p.notifyEvent("RespawnError", err.Error()) - } -} - -func (p *Process) sleep(d time.Duration) bool { - t := time.NewTimer(d) - select { - case <-t.C: - return true - case <-p.stopSleep: - t.Stop() - return false - } -} - -func (p *Process) canRespawn() bool { - return p.opts.MaxSpawns == -1 || atomic.LoadInt64(&p.spawnCount) < int64(p.opts.MaxSpawns) -} - -// Stop tries to stop the process. -// Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error -// to be returned). -// -// This function will call notifyDone when it is done. -// -// If it fails to stop the process, the phase will change to errored and an error will be returned. -// Otherwise, the phase changes to stopped. -func (p *Process) Stop() error { - select { - case <-p.stopSleep: - default: - close(p.stopSleep) - } - p.phaseMu.Lock() - defer p.phaseMu.Unlock() - defer p.notifyDone() - err := p.unprotectedStop() - if err != nil { - p.phase = errored - return err - } - p.phase = stopped - return nil -} - -func (p *Process) unprotectedStop() (err error) { - p.notifyEvent("ProcessStop") - - select { - case <-p.stopC: - default: - close(p.stopC) - } - defer p.ensureAllClosed() - - if !p.IsAlive() { - return nil - } - - attempt := 0 - for ; attempt < p.opts.MaxInterruptAttempts; attempt++ { - p.notifyEvent("Interrupt", fmt.Sprintf("sending intterupt signal to %d - attempt #%d", -p.Pid(), attempt+1)) - err = p.interrupt() - if err == nil { - return nil - } - } - if p.opts.MaxInterruptAttempts > 0 { - p.notifyEvent("InterruptError", fmt.Sprintf("interrupt signal failed - %d attempts", attempt)) - } - - err = nil - for attempt = 0; attempt < p.opts.MaxTerminateAttempts; attempt++ { - p.notifyEvent("Terminate", fmt.Sprintf("sending terminate signal to %d - attempt #%d", -p.Pid(), attempt+1)) - err = p.terminate() - if err == nil { - return nil - } - } - if p.opts.MaxTerminateAttempts > 0 { - p.notifyEvent("TerminateError", fmt.Sprintf("terminate signal failed - %d attempts", attempt)) - } - - p.notifyEvent("Killing", fmt.Sprintf("sending kill signal to %d", p.Pid())) - err = syscall.Kill(-p.Pid(), syscall.SIGKILL) - - if err != nil { - p.notifyEvent("KillError", err.Error()) - return err - } - - return nil -} - -// Restart tries to stop and start the process. -// Entering this function will change the phase from running to respawning (any other initial phase will cause an error -// to be returned). -// -// If it fails to stop the process the phase will change to errored and notifyDone will be called. -// If there are no more allowed respawns the phase will change to stopped and notifyDone will be called. -// -// This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it -// fails) -// If Start fails, notifyDone will be called. -func (p *Process) Restart() error { - p.phaseMu.Lock() - defer p.phaseMu.Unlock() - if p.phase != running { - return fmt.Errorf(`process phase is "%s" and not "running"`, phaseString(p.phase)) - } - p.phase = respawning - err := p.unprotectedStop() - - if err != nil { - p.phase = errored - p.notifyDone() - return err - } - - if !p.canRespawn() { - p.phase = stopped - p.notifyDone() - return errors.New("max number of respawns reached") - } - - return nil -} - -func (p *Process) IsAlive() bool { - err := syscall.Kill(-p.Pid(), syscall.Signal(0)) - if errno, ok := err.(syscall.Errno); ok { - return errno != syscall.ESRCH - } - return true -} - -func (p *Process) IsDone() bool { - select { - case <-p.doneNotifier: - return true - default: - return false - } -} - -func (p *Process) DoneNotifier() <-chan bool { - return p.doneNotifier -} - -// notifyDone closes the DoneNotifier channel (if it isn't already closed). -func (p *Process) notifyDone() { - select { - case <-p.doneNotifier: - default: - close(p.doneNotifier) - } -} - -// EventNotifier returns the eventNotifier channel (and creates one if none exists). -// -// It is protected by Process.eventNotifierMu. -func (p *Process) EventNotifier() chan Event { - p.eventNotifierMu.Lock() - defer p.eventNotifierMu.Unlock() - - if p.opts.EventNotifier == nil { - p.opts.EventNotifier = make(chan Event) - } - - return p.opts.EventNotifier -} - -// notifyEvent creates and passes an event struct from an event code string and an optional event message. -// fmt.Sprint will be called on the message slice. -// -// It is protected by Process.eventNotifierMu. -func (p *Process) notifyEvent(code string, message ...interface{}) { - // Create the event before calling Lock. - ev := Event{ - Id: p.opts.Id, - Code: code, - Message: fmt.Sprint(message...), - Time: time.Now(), - TimeFormat: p.opts.EventTimeFormat, - } - - // Log the event before calling Lock. - if p.opts.Debug { - fmt.Println(ev) - } - - p.eventNotifierMu.Lock() - defer p.eventNotifierMu.Unlock() - - if notifier := p.opts.EventNotifier; notifier != nil { - if p.eventTimer == nil { - p.eventTimer = time.NewTimer(p.opts.NotifyEventTimeout) - } else { - p.eventTimer.Reset(p.opts.NotifyEventTimeout) - } - - select { - case notifier <- ev: - if !p.eventTimer.Stop() { - <-p.eventTimer.C - } - case <-p.eventTimer.C: - log.Printf("Failed to sent %#v. EventNotifier is set, but isn't accepting any events.", ev) - } - } -} - -func (p *Process) interrupt() (err error) { - err = syscall.Kill(-p.Pid(), syscall.SIGINT) - if err != nil { - return - } - - time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. - if p.IsAlive() { - err = errors.New("interrupt signal failed") - } - return -} - -func (p *Process) terminate() (err error) { - err = syscall.Kill(-p.Pid(), syscall.SIGTERM) - if err != nil { - return - } - - time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. - if p.IsAlive() { - err = errors.New("terminate signal failed") - } - return -} - -func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration { - randBuffer := (step / 1000) * time.Duration(p.rand.Intn(1000)) - backOff := randBuffer + step*time.Duration(math.Exp2(float64(attempt))) - if backOff > maxBackOff { - return maxBackOff - } - return backOff -} - -func NewProcess(opts ProcessOptions) *Process { - return &Process{ - phase: ready, - opts: initProcessOptions(opts), - doneNotifier: make(chan bool), - stopSleep: make(chan bool), - rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), - } -} - -// newCommand creates a new exec.Cmd struct. -func newCommand(opts *ProcessOptions) *exec.Cmd { - cmd := exec.Command(opts.Name, opts.Args...) - cmd.Env = opts.Env - cmd.Dir = opts.Dir - cmd.ExtraFiles = opts.ExtraFiles - cmd.SysProcAttr = opts.SysProcAttr - return cmd -} - -// todo: test if panics on double-close -func ensureClosed(name string, isStopped chan bool, forceClose func() error) error { - t := time.NewTimer(EnsureClosedTimeout) - defer t.Stop() - - select { - case <-isStopped: - return nil - case <-t.C: - if forceClose == nil { - return fmt.Errorf("stopped waiting for %s after %s", name, EnsureClosedTimeout) - } - if err := forceClose(); err != nil { - return fmt.Errorf("%s - %s", name, err.Error()) - } - - return ensureClosed(name, isStopped, nil) - } -}