support run timeout (#7)

* support run timeout

* fix tests
This commit is contained in:
motiisr
2022-01-23 11:25:06 +02:00
committed by GitHub
parent da3f8a8ba8
commit 47fd9c75b7
3 changed files with 85 additions and 48 deletions

View File

@@ -110,6 +110,11 @@ type ProcessOptions struct {
// EventTimeFormat is the time format used when events are marshaled to string.
// Will use defaultEventTimeFormat when set to "".
EventTimeFormat string
// RunTimeout is the duration that the process can run before we terminate the process.
// Set to <= 0, for an unlimited run timeout
// Will use defaultRunTimeout when set to 0.
RunTimeout time.Duration
}
// init initializes the opts structure with default and required options.
@@ -154,12 +159,18 @@ func initProcessOptions(opts ProcessOptions) *ProcessOptions {
if opts.IdleTimeout == 0 {
opts.IdleTimeout = defaultIdleTimeout
}
if opts.IdleTimeout < 0 {
opts.IdleTimeout = time.Duration(maxDuration)
}
if opts.TerminationGraceTimeout == 0 {
opts.TerminationGraceTimeout = defaultTerminationGraceTimeout
}
if opts.EventTimeFormat == "" {
opts.EventTimeFormat = defaultEventTimeFormat
}
if opts.RunTimeout <= 0 {
opts.RunTimeout = defaultRunTimeout
}
if opts.In == nil {
opts.In = make(chan []byte)
}

View File

@@ -15,6 +15,7 @@ import (
"time"
)
const maxDuration = 1<<63 - 1
const (
defaultMaxSpawns = 1
defaultMaxSpawnAttempts = 10
@@ -25,6 +26,7 @@ const (
defaultNotifyEventTimeout = time.Millisecond
defaultParserBufferSize = 4096
defaultIdleTimeout = 10 * time.Second
defaultRunTimeout = time.Duration(maxDuration)
defaultTerminationGraceTimeout = time.Second
defaultEventTimeFormat = time.RFC3339Nano
)
@@ -208,7 +210,7 @@ func (p *Process) unprotectedStart() error {
go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat)
go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil)
go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Stop, p.notifyEvent)
go MonitorHeartBeat(p.opts.IdleTimeout, p.opts.RunTimeout, heartbeat, isMonitorClosed, p.stopC, p.Stop, p.notifyEvent)
var ensureOnce sync.Once
p.ensureAllClosed = func() {
@@ -218,7 +220,9 @@ func (p *Process) unprotectedStart() error {
default:
log.Printf("[%s] ensureAllClosed was called before stopC channel was closed.", p.opts.Id)
}
if p.opts.Debug { log.Printf("[%s] Starting to ensure all pipes have closed.", p.opts.Id) }
if p.opts.Debug {
log.Printf("[%s] Starting to ensure all pipes have closed.", p.opts.Id)
}
if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil {
log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr)
}
@@ -304,16 +308,17 @@ func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, st
}
}
// monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a
// positive heartbeat, or if a negative heartbeat is passed.
// MonitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a
// positive heartbeat, or if a negative heartbeat is passed, or if the run timeout passed.
//
// isMonitorClosed will be closed when this function exists.
//
// When stopC closes, this function will exit immediately.
func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) {
func MonitorHeartBeat(idleTimeout time.Duration, runTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) {
t := time.NewTimer(idleTimeout)
r := time.NewTimer(runTimeout)
defer t.Stop()
defer r.Stop()
for alive := true; alive; {
select {
case <-stopC:
@@ -334,6 +339,9 @@ func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, sto
case <-t.C:
alive = false
notifyEvent("MissingHeartbeat", "Stopping process.")
case <-r.C:
alive = false
notifyEvent("RunTimePassed", "Stopping process.")
}
}

View File

@@ -717,3 +717,21 @@ func test_timings(t *testing.T) {
log.Println(prodInNum, prodOutNum, incOutNum)
}
func TestMonitorRunTimeout(t *testing.T) {
heartbeat, isMonitorClosed, stopC := make(chan bool), make(chan bool), make(chan bool)
result := make(chan string)
resEvent := make(chan string)
stopF := func() error {
result <- "Stopped"
return nil
}
eventNotify := func(event string, message ...interface{}) {
resEvent <- event
}
go su.MonitorHeartBeat(20*time.Millisecond, 10*time.Millisecond, heartbeat, isMonitorClosed, stopC, stopF, eventNotify)
time.Sleep(20 * time.Millisecond)
assertExpectedEqualsActual(t, <-resEvent, "RunTimePassed")
assertExpectedEqualsActual(t, <-result, "Stopped")
}