mirror of
https://github.com/kontera-technologies/go-supervisor
synced 2025-10-27 23:31:29 +08:00
moved to branch-based versioning
This commit is contained in:
5
LICENSE
5
LICENSE
@@ -1,6 +1,6 @@
|
|||||||
The MIT License (MIT)
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2015 Kontera Technologies
|
Copyright (c) 2018 Kontera
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
@@ -19,4 +19,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
SOFTWARE.
|
SOFTWARE.
|
||||||
|
|
||||||
|
|||||||
138
README.md
138
README.md
@@ -1,138 +0,0 @@
|
|||||||
# go-supervisor (V1)
|
|
||||||
|
|
||||||
Small library for supervising child processes in `Go`, it exposes `Stdout`,`Stderr` and `Stdin` in the "Go way" using channels...
|
|
||||||
|
|
||||||
## Example
|
|
||||||
`example.bash` print stuff to stdout and stderr and quit after 5 seconds...
|
|
||||||
```bash
|
|
||||||
#!/bin/bash
|
|
||||||
echo "STDOUT MESSAGE"
|
|
||||||
echo "STDERR MESSAGE" 1>&2
|
|
||||||
sleep 5
|
|
||||||
```
|
|
||||||
|
|
||||||
`supervisor-exapmle.go` spawn and supervise the bash program...
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/kontera-technologies/go-supervisor"
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
p, err := supervisor.Supervise("example.bash", supervisor.Options{
|
|
||||||
Args: []string{}, // argumets to pass ( default is none )
|
|
||||||
SpawnAttempts: 4, // attempts before giving up ( default 10 )
|
|
||||||
AttemptsBeforeTerminate: 10, // on Stop() terminate process after X interrupt attempts (default is 10)
|
|
||||||
Debug: true, // print events to stdout ( default false )
|
|
||||||
Dir: "/tmp", // run dir ( default is current dir )
|
|
||||||
Id: "example", // will be added to every log print ( default is "NOID")
|
|
||||||
MaxSpawns: 10, // Max spawn limit ( default is 1 )
|
|
||||||
StdoutIdleTime: 10, // stop worker if we didn't recived stdout message in X seconds ( default is 0 - disbaled )
|
|
||||||
StderrIdleTime: 10, // stop worker if we didn't recived stderr message in X seconds ( default is 0 - disbaled )
|
|
||||||
|
|
||||||
// function that calculate sleep time based in the current sleep time
|
|
||||||
// useful for exponential backoff ( default is this function )
|
|
||||||
DelayBetweenSpawns: func(currentSleep int) (sleepTime int) {
|
|
||||||
return currentSleep * 2
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("failed to start process: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
exit := make(chan bool)
|
|
||||||
done := p.NotifyDone(make(chan bool)) // process is done...
|
|
||||||
events := p.NotifyEvents(make(chan *supervisor.Event,1000))
|
|
||||||
|
|
||||||
// read stuff
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg := <-p.Stdout:
|
|
||||||
log.Printf("Recived STDOUT message %s", msg)
|
|
||||||
case msg := <-p.Stderr:
|
|
||||||
log.Printf("Recived STDERR message %s", msg)
|
|
||||||
case event := <- events:
|
|
||||||
switch event.Code {
|
|
||||||
case 22:
|
|
||||||
log.Printf("instance respawned, new pid is: %v...", p.Pid())
|
|
||||||
default:
|
|
||||||
log.Printf("[%d] %s", event.Code, event.Message)
|
|
||||||
}
|
|
||||||
case <-done: // process quit
|
|
||||||
log.Printf("Closing loop we are done....")
|
|
||||||
exit <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-exit
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
running the program should produce this output
|
|
||||||
```
|
|
||||||
2015/02/12 13:02:14 Recived STDERR message &STDERR MESSAGE
|
|
||||||
2015/02/12 13:02:14 Recived STDOUT message &STDOUT MESSAGE
|
|
||||||
2015/02/12 13:02:20 [1] [example] can't read from stderr: EOF
|
|
||||||
2015/02/12 13:02:20 [1] [example] can't read from stdout: EOF
|
|
||||||
2015/02/12 13:02:20 [7] [example] instance crashed...
|
|
||||||
2015/02/12 13:02:20 [10] [example] going to sleep for 2 seconds...
|
|
||||||
2015/02/12 13:02:20 [20] [example] going to kill process..
|
|
||||||
2015/02/12 13:02:20 [5] [example] stderr goroutine is done...
|
|
||||||
2015/02/12 13:02:20 [5] [example] stdout goroutine is done...
|
|
||||||
2015/02/12 13:02:20 [19] [example] closing stdin handler...
|
|
||||||
2015/02/12 13:02:20 [5] [example] stdin goroutine is done...
|
|
||||||
2015/02/12 13:02:20 [29] [example] entering sleep stage...
|
|
||||||
2015/02/12 13:02:22 [8] [example] starting instance...
|
|
||||||
2015/02/12 13:02:22 [0] [example] opening stdin handler...
|
|
||||||
2015/02/12 13:02:22 [0] [example] opening stdout handler...
|
|
||||||
2015/02/12 13:02:22 [0] [example] opening stderr handler...
|
|
||||||
2015/02/12 13:02:22 instance respawned, new pid is: 58312...
|
|
||||||
2015/02/12 13:02:22 Recived STDOUT message &STDOUT MESSAGE
|
|
||||||
2015/02/12 13:02:22 Recived STDERR message &STDERR MESSAGE
|
|
||||||
2015/02/12 13:02:27 [1] [example] can't read from stdout: EOF
|
|
||||||
2015/02/12 13:02:27 [1] [example] can't read from stderr: EOF
|
|
||||||
2015/02/12 13:02:27 [7] [example] instance crashed...
|
|
||||||
2015/02/12 13:02:27 [10] [example] going to sleep for 4 seconds...
|
|
||||||
2015/02/12 13:02:27 [20] [example] going to kill process..
|
|
||||||
2015/02/12 13:02:27 [5] [example] stderr goroutine is done...
|
|
||||||
2015/02/12 13:02:27 [5] [example] stdout goroutine is done...
|
|
||||||
2015/02/12 13:02:27 [19] [example] closing stdin handler...
|
|
||||||
2015/02/12 13:02:27 [5] [example] stdin goroutine is done...
|
|
||||||
2015/02/12 13:02:27 [29] [example] entering sleep stage...
|
|
||||||
2015/02/12 13:02:31 [8] [example] starting instance...
|
|
||||||
2015/02/12 13:02:31 [0] [example] opening stdin handler...
|
|
||||||
2015/02/12 13:02:31 [0] [example] opening stdout handler...
|
|
||||||
2015/02/12 13:02:31 [0] [example] opening stderr handler...
|
|
||||||
2015/02/12 13:02:31 instance respawned, new pid is: 58323...
|
|
||||||
2015/02/12 13:02:31 Recived STDERR message &STDERR MESSAGE
|
|
||||||
2015/02/12 13:02:31 Recived STDOUT message &STDOUT MESSAGE
|
|
||||||
2015/02/12 13:02:36 [1] [example] can't read from stderr: EOF
|
|
||||||
2015/02/12 13:02:36 [1] [example] can't read from stdout: EOF
|
|
||||||
2015/02/12 13:02:36 [7] [example] instance crashed...
|
|
||||||
2015/02/12 13:02:36 [10] [example] going to sleep for 8 seconds...
|
|
||||||
2015/02/12 13:02:36 [20] [example] going to kill process..
|
|
||||||
2015/02/12 13:02:36 [5] [example] stderr goroutine is done...
|
|
||||||
2015/02/12 13:02:36 [5] [example] stdout goroutine is done...
|
|
||||||
2015/02/12 13:02:36 [19] [example] closing stdin handler...
|
|
||||||
2015/02/12 13:02:36 [5] [example] stdin goroutine is done...
|
|
||||||
2015/02/12 13:02:36 [29] [example] entering sleep stage...
|
|
||||||
2015/02/12 13:02:45 [8] [example] starting instance...
|
|
||||||
2015/02/12 13:02:45 [0] [example] opening stdin handler...
|
|
||||||
2015/02/12 13:02:45 [0] [example] opening stdout handler...
|
|
||||||
2015/02/12 13:02:45 [0] [example] opening stderr handler...
|
|
||||||
2015/02/12 13:02:45 instance respawned, new pid is: 58338...
|
|
||||||
2015/02/12 13:02:45 Recived STDOUT message &STDOUT MESSAGE
|
|
||||||
2015/02/12 13:02:45 Recived STDERR message &STDERR MESSAGE
|
|
||||||
2015/02/12 13:02:50 [1] [example] can't read from stderr: EOF
|
|
||||||
2015/02/12 13:02:50 [1] [example] can't read from stdout: EOF
|
|
||||||
2015/02/12 13:02:50 [7] [example] instance crashed...
|
|
||||||
2015/02/12 13:02:50 [9] [example] giving up, instance failed to start...
|
|
||||||
2015/02/12 13:02:50 Closing loop we are done....
|
|
||||||
```
|
|
||||||
4
go.mod
4
go.mod
@@ -1,3 +1,5 @@
|
|||||||
module github.com/kontera-technologies/go-supervisor
|
module github.com/kontera-technologies/go-supervisor/v2
|
||||||
|
|
||||||
go 1.14
|
go 1.14
|
||||||
|
|
||||||
|
require github.com/fortytw2/leaktest v1.3.0
|
||||||
|
|||||||
1052
supervisor.go
1052
supervisor.go
File diff suppressed because it is too large
Load Diff
0
v2/testdata/echo.sh → testdata/echo.sh
vendored
0
v2/testdata/echo.sh → testdata/echo.sh
vendored
0
v2/testdata/trap.sh → testdata/trap.sh
vendored
0
v2/testdata/trap.sh → testdata/trap.sh
vendored
0
v2/testdata/zlib.sh → testdata/zlib.sh
vendored
0
v2/testdata/zlib.sh → testdata/zlib.sh
vendored
21
v2/LICENSE
21
v2/LICENSE
@@ -1,21 +0,0 @@
|
|||||||
MIT License
|
|
||||||
|
|
||||||
Copyright (c) 2018 Kontera
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
||||||
@@ -1,5 +0,0 @@
|
|||||||
module github.com/kontera-technologies/go-supervisor/v2
|
|
||||||
|
|
||||||
go 1.14
|
|
||||||
|
|
||||||
require github.com/fortytw2/leaktest v1.3.0
|
|
||||||
688
v2/supervisor.go
688
v2/supervisor.go
@@ -1,688 +0,0 @@
|
|||||||
package supervisor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultMaxSpawns = 1
|
|
||||||
defaultMaxSpawnAttempts = 10
|
|
||||||
defaultMaxSpawnBackOff = time.Minute
|
|
||||||
defaultMaxRespawnBackOff = time.Second
|
|
||||||
defaultMaxInterruptAttempts = 5
|
|
||||||
defaultMaxTerminateAttempts = 5
|
|
||||||
defaultNotifyEventTimeout = time.Millisecond
|
|
||||||
defaultParserBufferSize = 4096
|
|
||||||
defaultIdleTimeout = 10 * time.Second
|
|
||||||
defaultTerminationGraceTimeout = time.Second
|
|
||||||
defaultEventTimeFormat = time.RFC3339Nano
|
|
||||||
)
|
|
||||||
|
|
||||||
var EnsureClosedTimeout = time.Second
|
|
||||||
|
|
||||||
type Event struct {
|
|
||||||
Id string
|
|
||||||
Code string
|
|
||||||
Message string
|
|
||||||
Time time.Time
|
|
||||||
TimeFormat string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ev Event) String() string {
|
|
||||||
if len(ev.Message) == 0 {
|
|
||||||
return fmt.Sprintf("[%30s][%s] %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("[%s][%30s] %s - %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code, ev.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
ready uint32 = 1 << iota
|
|
||||||
running
|
|
||||||
respawning
|
|
||||||
stopped
|
|
||||||
errored
|
|
||||||
)
|
|
||||||
|
|
||||||
func phaseString(s uint32) string {
|
|
||||||
str := "unknown"
|
|
||||||
switch s {
|
|
||||||
case ready:
|
|
||||||
str = "ready"
|
|
||||||
case running:
|
|
||||||
str = "running"
|
|
||||||
case respawning:
|
|
||||||
str = "respawning"
|
|
||||||
case stopped:
|
|
||||||
str = "stopped"
|
|
||||||
case errored:
|
|
||||||
str = "errored"
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s(%d)", str, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
type ProduceFn func() (*interface{}, bool)
|
|
||||||
|
|
||||||
type Process struct {
|
|
||||||
cmd *exec.Cmd
|
|
||||||
pid int64
|
|
||||||
spawnCount int64
|
|
||||||
stopC chan bool
|
|
||||||
ensureAllClosed func()
|
|
||||||
|
|
||||||
phase uint32
|
|
||||||
phaseMu sync.Mutex
|
|
||||||
|
|
||||||
lastError atomic.Value
|
|
||||||
lastProcessState atomic.Value
|
|
||||||
|
|
||||||
opts *ProcessOptions
|
|
||||||
|
|
||||||
eventTimer *time.Timer
|
|
||||||
eventNotifierMu sync.Mutex
|
|
||||||
|
|
||||||
doneNotifier chan bool
|
|
||||||
rand *rand.Rand
|
|
||||||
stopSleep chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) Input() chan<- []byte {
|
|
||||||
return p.opts.In
|
|
||||||
}
|
|
||||||
|
|
||||||
// EmptyInput empties all messages from the Input channel.
|
|
||||||
func (p *Process) EmptyInput() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case _, ok := <-p.opts.In:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) Stdout() <-chan *interface{} {
|
|
||||||
return p.opts.Out
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) Stderr() <-chan *interface{} {
|
|
||||||
return p.opts.Err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) LastProcessState() *os.ProcessState {
|
|
||||||
v := p.lastProcessState.Load()
|
|
||||||
if v == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return v.(*os.ProcessState)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) LastError() error {
|
|
||||||
v := p.lastError.Load()
|
|
||||||
if v == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if x, ok := v.(error); ok {
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) Pid() int {
|
|
||||||
return int(atomic.LoadInt64(&p.pid))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) Start() (err error) {
|
|
||||||
p.phaseMu.Lock()
|
|
||||||
defer p.phaseMu.Unlock()
|
|
||||||
if p.phase != ready && p.phase != respawning {
|
|
||||||
return fmt.Errorf(`process phase is "%s" and not "ready" or "respawning"`, phaseString(p.phase))
|
|
||||||
}
|
|
||||||
|
|
||||||
for attempt := 0; p.opts.MaxSpawnAttempts == -1 || attempt < p.opts.MaxSpawnAttempts; attempt++ {
|
|
||||||
err = p.unprotectedStart()
|
|
||||||
if err == nil {
|
|
||||||
p.phase = running
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !p.sleep(p.CalcBackOff(attempt, time.Second, p.opts.MaxSpawnBackOff)) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.phase = errored
|
|
||||||
p.notifyDone()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) unprotectedStart() error {
|
|
||||||
p.cmd = newCommand(p.opts)
|
|
||||||
|
|
||||||
inPipe, err := p.cmd.StdinPipe()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to fetch stdin pipe: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
outPipe, err := p.cmd.StdoutPipe()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to fetch stdout pipe: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
errPipe, err := p.cmd.StderrPipe()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to fetch stderr pipe: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.opts.OutputParser == nil {
|
|
||||||
return errors.New("missing output streamer")
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.opts.ErrorParser == nil {
|
|
||||||
return errors.New("missing error streamer")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = p.cmd.Start(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddInt64(&p.spawnCount, 1)
|
|
||||||
atomic.StoreInt64(&p.pid, int64(p.cmd.Process.Pid))
|
|
||||||
|
|
||||||
p.stopC = make(chan bool)
|
|
||||||
heartbeat, isMonitorClosed, isInClosed, isOutClosed, isErrClosed := make(chan bool), make(chan bool), make(chan bool), make(chan bool), make(chan bool)
|
|
||||||
|
|
||||||
go chanToWriter(p.opts.In, inPipe, p.notifyEvent, isInClosed, p.stopC, heartbeat)
|
|
||||||
go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat)
|
|
||||||
go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil)
|
|
||||||
|
|
||||||
go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Restart, p.notifyEvent)
|
|
||||||
|
|
||||||
var ensureOnce sync.Once
|
|
||||||
p.ensureAllClosed = func() {
|
|
||||||
ensureOnce.Do(func() {
|
|
||||||
if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil {
|
|
||||||
log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr)
|
|
||||||
}
|
|
||||||
if cErr := ensureClosed("stdout", isOutClosed, outPipe.Close); cErr != nil {
|
|
||||||
log.Printf("[%s] Possible memory leak, stdout go-routine not closed. Error: %s", p.opts.Id, cErr)
|
|
||||||
}
|
|
||||||
if cErr := ensureClosed("stderr", isErrClosed, errPipe.Close); cErr != nil {
|
|
||||||
log.Printf("[%s] Possible memory leak, stderr go-routine not closed. Error: %s", p.opts.Id, cErr)
|
|
||||||
}
|
|
||||||
if cErr := ensureClosed("heartbeat monitor", isMonitorClosed, nil); cErr != nil {
|
|
||||||
log.Printf("[%s] Possible memory leak, monitoring go-routine not closed. Error: %s", p.opts.Id, cErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
go p.waitAndNotify()
|
|
||||||
|
|
||||||
p.notifyEvent("ProcessStart", fmt.Sprintf("pid: %d", p.Pid()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func chanToWriter(in <-chan []byte, out io.Writer, notifyEvent func(string, ...interface{}), closeWhenDone, stopC, heartbeat chan bool) {
|
|
||||||
defer close(closeWhenDone)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-stopC:
|
|
||||||
return
|
|
||||||
case raw, chanOpen := <-in:
|
|
||||||
if !chanOpen {
|
|
||||||
notifyEvent("Error", "Input channel closed unexpectedly.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := out.Write(raw)
|
|
||||||
if err != nil {
|
|
||||||
notifyEvent("WriteError", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
heartbeat <- true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, stopC, heartbeat chan bool) {
|
|
||||||
defer close(closeWhenDone)
|
|
||||||
|
|
||||||
cleanPipe := func() {
|
|
||||||
for {
|
|
||||||
if res, eof := producer(); res != nil {
|
|
||||||
out <- res
|
|
||||||
} else if eof {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
if res,eof := producer(); res != nil {
|
|
||||||
select {
|
|
||||||
case out <- res:
|
|
||||||
select {
|
|
||||||
case heartbeat <- true:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
case <-stopC:
|
|
||||||
cleanPipe()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else if eof {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-stopC:
|
|
||||||
cleanPipe()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a
|
|
||||||
// positive heartbeat, or if a negative heartbeat is passed.
|
|
||||||
//
|
|
||||||
// isMonitorClosed will be closed when this function exists.
|
|
||||||
//
|
|
||||||
// When stopC closes, this function will exit immediately.
|
|
||||||
func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) {
|
|
||||||
defer close(isMonitorClosed)
|
|
||||||
t := time.NewTimer(idleTimeout)
|
|
||||||
defer t.Stop()
|
|
||||||
|
|
||||||
for alive := true; alive; {
|
|
||||||
select {
|
|
||||||
case <-stopC:
|
|
||||||
notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.")
|
|
||||||
return
|
|
||||||
|
|
||||||
case alive = <-heartbeat:
|
|
||||||
if alive {
|
|
||||||
if !t.Stop() {
|
|
||||||
<-t.C
|
|
||||||
}
|
|
||||||
t.Reset(idleTimeout)
|
|
||||||
} else {
|
|
||||||
notifyEvent("NegativeHeartbeat", "Stopping process.")
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-t.C:
|
|
||||||
alive = false
|
|
||||||
notifyEvent("MissingHeartbeat", "Stopping process.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := stop(); err != nil {
|
|
||||||
notifyEvent("StopError", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) waitAndNotify() {
|
|
||||||
state, waitErr := p.cmd.Process.Wait()
|
|
||||||
|
|
||||||
p.phaseMu.Lock()
|
|
||||||
automaticUnlock := true
|
|
||||||
defer func() {
|
|
||||||
if automaticUnlock {
|
|
||||||
p.phaseMu.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
p.lastProcessState.Store(state)
|
|
||||||
|
|
||||||
if p.phase == stopped {
|
|
||||||
return
|
|
||||||
} else if p.phase != running && p.phase != respawning {
|
|
||||||
p.notifyEvent("RespawnError", fmt.Sprintf(`process phase is "%s" and not "running" or "respawning"`, phaseString(p.phase)))
|
|
||||||
}
|
|
||||||
|
|
||||||
p.phase = stopped
|
|
||||||
|
|
||||||
if waitErr != nil {
|
|
||||||
p.notifyEvent("WaitError", fmt.Sprintf("os.Process.Wait returned an error - %s", waitErr.Error()))
|
|
||||||
p.phase = errored
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.Success() {
|
|
||||||
p.notifyEvent("ProcessDone", state.String())
|
|
||||||
} else {
|
|
||||||
p.notifyEvent("ProcessCrashed", state.String())
|
|
||||||
p.lastError.Store(errors.New(state.String()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup resources
|
|
||||||
select {
|
|
||||||
case <-p.stopC:
|
|
||||||
default:
|
|
||||||
close(p.stopC)
|
|
||||||
}
|
|
||||||
p.ensureAllClosed()
|
|
||||||
|
|
||||||
if state.Success() {
|
|
||||||
p.notifyEvent("ProcessStopped", "Process existed successfully.")
|
|
||||||
p.notifyDone()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !p.canRespawn() {
|
|
||||||
p.notifyEvent("RespawnError", "Max number of respawns reached.")
|
|
||||||
p.notifyDone()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sleepFor := p.CalcBackOff(int(atomic.LoadInt64(&p.spawnCount))-1, time.Second, p.opts.MaxRespawnBackOff)
|
|
||||||
p.notifyEvent("Sleep", fmt.Sprintf("Sleeping for %s before respwaning instance.", sleepFor.String()))
|
|
||||||
if !p.sleep(sleepFor) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.phase = respawning
|
|
||||||
p.notifyEvent("ProcessRespawn", "Trying to respawn instance.")
|
|
||||||
|
|
||||||
automaticUnlock = false
|
|
||||||
p.phaseMu.Unlock()
|
|
||||||
err := p.Start()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
p.notifyEvent("RespawnError", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) sleep(d time.Duration) bool {
|
|
||||||
t := time.NewTimer(d)
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
return true
|
|
||||||
case <-p.stopSleep:
|
|
||||||
t.Stop()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) canRespawn() bool {
|
|
||||||
return p.opts.MaxSpawns == -1 || atomic.LoadInt64(&p.spawnCount) < int64(p.opts.MaxSpawns)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop tries to stop the process.
|
|
||||||
// Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error
|
|
||||||
// to be returned).
|
|
||||||
//
|
|
||||||
// This function will call notifyDone when it is done.
|
|
||||||
//
|
|
||||||
// If it fails to stop the process, the phase will change to errored and an error will be returned.
|
|
||||||
// Otherwise, the phase changes to stopped.
|
|
||||||
func (p *Process) Stop() error {
|
|
||||||
select {
|
|
||||||
case <-p.stopSleep:
|
|
||||||
default:
|
|
||||||
close(p.stopSleep)
|
|
||||||
}
|
|
||||||
p.phaseMu.Lock()
|
|
||||||
defer p.phaseMu.Unlock()
|
|
||||||
defer p.notifyDone()
|
|
||||||
err := p.unprotectedStop()
|
|
||||||
if err != nil {
|
|
||||||
p.phase = errored
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
p.phase = stopped
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) unprotectedStop() (err error) {
|
|
||||||
p.notifyEvent("ProcessStop")
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-p.stopC:
|
|
||||||
default:
|
|
||||||
close(p.stopC)
|
|
||||||
}
|
|
||||||
defer p.ensureAllClosed()
|
|
||||||
|
|
||||||
if !p.IsAlive() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
attempt := 0
|
|
||||||
for ; attempt < p.opts.MaxInterruptAttempts; attempt++ {
|
|
||||||
p.notifyEvent("Interrupt", fmt.Sprintf("sending intterupt signal to %d - attempt #%d", -p.Pid(), attempt+1))
|
|
||||||
err = p.interrupt()
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if p.opts.MaxInterruptAttempts > 0 {
|
|
||||||
p.notifyEvent("InterruptError", fmt.Sprintf("interrupt signal failed - %d attempts", attempt))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = nil
|
|
||||||
for attempt = 0; attempt < p.opts.MaxTerminateAttempts; attempt++ {
|
|
||||||
p.notifyEvent("Terminate", fmt.Sprintf("sending terminate signal to %d - attempt #%d", -p.Pid(), attempt+1))
|
|
||||||
err = p.terminate()
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if p.opts.MaxTerminateAttempts > 0 {
|
|
||||||
p.notifyEvent("TerminateError", fmt.Sprintf("terminate signal failed - %d attempts", attempt))
|
|
||||||
}
|
|
||||||
|
|
||||||
p.notifyEvent("Killing", fmt.Sprintf("sending kill signal to %d", p.Pid()))
|
|
||||||
err = syscall.Kill(-p.Pid(), syscall.SIGKILL)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
p.notifyEvent("KillError", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restart tries to stop and start the process.
|
|
||||||
// Entering this function will change the phase from running to respawning (any other initial phase will cause an error
|
|
||||||
// to be returned).
|
|
||||||
//
|
|
||||||
// If it fails to stop the process the phase will change to errored and notifyDone will be called.
|
|
||||||
// If there are no more allowed respawns the phase will change to stopped and notifyDone will be called.
|
|
||||||
//
|
|
||||||
// This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it
|
|
||||||
// fails)
|
|
||||||
// If Start fails, notifyDone will be called.
|
|
||||||
func (p *Process) Restart() error {
|
|
||||||
p.phaseMu.Lock()
|
|
||||||
defer p.phaseMu.Unlock()
|
|
||||||
if p.phase != running {
|
|
||||||
return fmt.Errorf(`process phase is "%s" and not "running"`, phaseString(p.phase))
|
|
||||||
}
|
|
||||||
p.phase = respawning
|
|
||||||
err := p.unprotectedStop()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
p.phase = errored
|
|
||||||
p.notifyDone()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !p.canRespawn() {
|
|
||||||
p.phase = stopped
|
|
||||||
p.notifyDone()
|
|
||||||
return errors.New("max number of respawns reached")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) IsAlive() bool {
|
|
||||||
err := syscall.Kill(-p.Pid(), syscall.Signal(0))
|
|
||||||
if errno, ok := err.(syscall.Errno); ok {
|
|
||||||
return errno != syscall.ESRCH
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) IsDone() bool {
|
|
||||||
select {
|
|
||||||
case <-p.doneNotifier:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) DoneNotifier() <-chan bool {
|
|
||||||
return p.doneNotifier
|
|
||||||
}
|
|
||||||
|
|
||||||
// notifyDone closes the DoneNotifier channel (if it isn't already closed).
|
|
||||||
func (p *Process) notifyDone() {
|
|
||||||
select {
|
|
||||||
case <-p.doneNotifier:
|
|
||||||
default:
|
|
||||||
close(p.doneNotifier)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventNotifier returns the eventNotifier channel (and creates one if none exists).
|
|
||||||
//
|
|
||||||
// It is protected by Process.eventNotifierMu.
|
|
||||||
func (p *Process) EventNotifier() chan Event {
|
|
||||||
p.eventNotifierMu.Lock()
|
|
||||||
defer p.eventNotifierMu.Unlock()
|
|
||||||
|
|
||||||
if p.opts.EventNotifier == nil {
|
|
||||||
p.opts.EventNotifier = make(chan Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
return p.opts.EventNotifier
|
|
||||||
}
|
|
||||||
|
|
||||||
// notifyEvent creates and passes an event struct from an event code string and an optional event message.
|
|
||||||
// fmt.Sprint will be called on the message slice.
|
|
||||||
//
|
|
||||||
// It is protected by Process.eventNotifierMu.
|
|
||||||
func (p *Process) notifyEvent(code string, message ...interface{}) {
|
|
||||||
// Create the event before calling Lock.
|
|
||||||
ev := Event{
|
|
||||||
Id: p.opts.Id,
|
|
||||||
Code: code,
|
|
||||||
Message: fmt.Sprint(message...),
|
|
||||||
Time: time.Now(),
|
|
||||||
TimeFormat: p.opts.EventTimeFormat,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log the event before calling Lock.
|
|
||||||
if p.opts.Debug {
|
|
||||||
fmt.Println(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.eventNotifierMu.Lock()
|
|
||||||
defer p.eventNotifierMu.Unlock()
|
|
||||||
|
|
||||||
if notifier := p.opts.EventNotifier; notifier != nil {
|
|
||||||
if p.eventTimer == nil {
|
|
||||||
p.eventTimer = time.NewTimer(p.opts.NotifyEventTimeout)
|
|
||||||
} else {
|
|
||||||
p.eventTimer.Reset(p.opts.NotifyEventTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case notifier <- ev:
|
|
||||||
if !p.eventTimer.Stop() {
|
|
||||||
<-p.eventTimer.C
|
|
||||||
}
|
|
||||||
case <-p.eventTimer.C:
|
|
||||||
log.Printf("Failed to sent %#v. EventNotifier is set, but isn't accepting any events.", ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) interrupt() (err error) {
|
|
||||||
err = syscall.Kill(-p.Pid(), syscall.SIGINT)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end.
|
|
||||||
if p.IsAlive() {
|
|
||||||
err = errors.New("interrupt signal failed")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) terminate() (err error) {
|
|
||||||
err = syscall.Kill(-p.Pid(), syscall.SIGTERM)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end.
|
|
||||||
if p.IsAlive() {
|
|
||||||
err = errors.New("terminate signal failed")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration {
|
|
||||||
randBuffer := (step / 1000) * time.Duration(p.rand.Intn(1000))
|
|
||||||
backOff := randBuffer + step*time.Duration(math.Exp2(float64(attempt)))
|
|
||||||
if backOff > maxBackOff {
|
|
||||||
return maxBackOff
|
|
||||||
}
|
|
||||||
return backOff
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewProcess(opts ProcessOptions) *Process {
|
|
||||||
return &Process{
|
|
||||||
phase: ready,
|
|
||||||
opts: initProcessOptions(opts),
|
|
||||||
doneNotifier: make(chan bool),
|
|
||||||
stopSleep: make(chan bool),
|
|
||||||
rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// newCommand creates a new exec.Cmd struct.
|
|
||||||
func newCommand(opts *ProcessOptions) *exec.Cmd {
|
|
||||||
cmd := exec.Command(opts.Name, opts.Args...)
|
|
||||||
cmd.Env = opts.Env
|
|
||||||
cmd.Dir = opts.Dir
|
|
||||||
cmd.ExtraFiles = opts.ExtraFiles
|
|
||||||
cmd.SysProcAttr = opts.SysProcAttr
|
|
||||||
return cmd
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo: test if panics on double-close
|
|
||||||
func ensureClosed(name string, isStopped chan bool, forceClose func() error) error {
|
|
||||||
t := time.NewTimer(EnsureClosedTimeout)
|
|
||||||
defer t.Stop()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-isStopped:
|
|
||||||
return nil
|
|
||||||
case <-t.C:
|
|
||||||
if forceClose == nil {
|
|
||||||
return fmt.Errorf("stopped waiting for %s after %s", name, EnsureClosedTimeout)
|
|
||||||
}
|
|
||||||
if err := forceClose(); err != nil {
|
|
||||||
return fmt.Errorf("%s - %s", name, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return ensureClosed(name, isStopped, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user