diff --git a/.travis.yml b/.travis.yml index ccd1f86..28b85f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ language: go go: - - '1.12' - - '1.13' - '1.14' + - '1.15' + - '1.16' env: global: - GO111MODULE=on diff --git a/README.md b/README.md index 48c1f0a..409fac7 100644 --- a/README.md +++ b/README.md @@ -50,15 +50,15 @@ func main() { for { select { case msg := <-p.Stdout(): - fmt.Printf("Recived STDOUT message: %s\n", *msg) + fmt.Printf("Received STDOUT message: %s\n", *msg) case msg := <-p.Stderr(): - fmt.Printf("Recived STDERR message: %s\n", *msg) + fmt.Printf("Received STDERR message: %s\n", *msg) case event := <-events: switch event.Code { case "ProcessStart": - fmt.Printf("Recived event: %s\n", event.Code) + fmt.Printf("Received event: %s\n", event.Code) default: - fmt.Printf("Recived event: %s - %s\n", event.Code, event.Message) + fmt.Printf("Received event: %s - %s\n", event.Code, event.Message) } case <-p.DoneNotifier(): fmt.Println("Closing loop we are done...") @@ -78,32 +78,32 @@ func main() { running the program should produce this output ``` -Recived event: ProcessStart -Recived STDOUT message: STDOUT MESSAGE -Recived STDERR message: STDERR MESSAGE -Recived event: ProcessDone - exit status 0 -Recived event: StoppingHeartbeatMonitoring - Stop signal received. -Recived event: Sleep - Sleeping for 1s before respwaning instance. -Recived event: ProcessRespawn - Trying to respawn instance. -Recived event: ProcessStart -Recived STDOUT message: STDOUT MESSAGE -Recived STDERR message: STDERR MESSAGE -Recived event: ProcessDone - exit status 0 -Recived event: StoppingHeartbeatMonitoring - Stop signal received. -Recived event: Sleep - Sleeping for 1s before respwaning instance. -Recived event: ProcessRespawn - Trying to respawn instance. -Recived event: ProcessStart -Recived STDOUT message: STDOUT MESSAGE -Recived STDERR message: STDERR MESSAGE -Recived event: ProcessDone - exit status 0 -Recived event: StoppingHeartbeatMonitoring - Stop signal received. -Recived event: Sleep - Sleeping for 1s before respwaning instance. -Recived event: ProcessRespawn - Trying to respawn instance. -Recived event: ProcessStart -Recived STDOUT message: STDOUT MESSAGE -Recived STDERR message: STDERR MESSAGE -Recived event: ProcessDone - exit status 0 -Recived event: StoppingHeartbeatMonitoring - Stop signal received. -Recived event: RespawnError - Max number of respawns reached. +Received event: ProcessStart +Received STDOUT message: STDOUT MESSAGE +Received STDERR message: STDERR MESSAGE +Received event: ProcessDone - exit status 0 +Received event: StoppingHeartbeatMonitoring - Stop signal received. +Received event: Sleep - Sleeping for 1s before respwaning instance. +Received event: ProcessRespawn - Trying to respawn instance. +Received event: ProcessStart +Received STDOUT message: STDOUT MESSAGE +Received STDERR message: STDERR MESSAGE +Received event: ProcessDone - exit status 0 +Received event: StoppingHeartbeatMonitoring - Stop signal received. +Received event: Sleep - Sleeping for 1s before respwaning instance. +Received event: ProcessRespawn - Trying to respawn instance. +Received event: ProcessStart +Received STDOUT message: STDOUT MESSAGE +Received STDERR message: STDERR MESSAGE +Received event: ProcessDone - exit status 0 +Received event: StoppingHeartbeatMonitoring - Stop signal received. +Received event: Sleep - Sleeping for 1s before respwaning instance. +Received event: ProcessRespawn - Trying to respawn instance. +Received event: ProcessStart +Received STDOUT message: STDOUT MESSAGE +Received STDERR message: STDERR MESSAGE +Received event: ProcessDone - exit status 0 +Received event: StoppingHeartbeatMonitoring - Stop signal received. +Received event: RespawnError - Max number of respawns reached. Closing loop we are done... ``` diff --git a/example_test.go b/example_test.go index 2ccfedc..1b0cb3c 100644 --- a/example_test.go +++ b/example_test.go @@ -33,15 +33,14 @@ func Example() { for { select { case msg := <-p.Stdout(): - fmt.Printf("Recived STDOUT message: %s\n", *msg) + fmt.Printf("Received STDOUT message: %s\n", *msg) case msg := <-p.Stderr(): - fmt.Printf("Recived STDERR message: %s\n", *msg) + fmt.Printf("Received STDERR message: %s\n", *msg) case event := <-events: - switch event.Code { - case "ProcessStart": - fmt.Printf("Recived event: %s\n", event.Code) - default: - fmt.Printf("Recived event: %s - %s\n", event.Code, event.Message) + if event.Code == "ProcessStart" || event.Message == "" { + fmt.Printf("Received event: %s\n", event.Code) + } else { + fmt.Printf("Received event: %s - %s\n", event.Code, event.Message) } case <-p.DoneNotifier(): fmt.Println("Closing loop we are done...") @@ -58,32 +57,32 @@ func Example() { <-exit // Output: - // Recived event: ProcessStart - // Recived STDOUT message: STDOUT MESSAGE - // Recived STDERR message: STDERR MESSAGE - // Recived event: ProcessDone - exit status 0 - // Recived event: StoppingHeartbeatMonitoring - Stop signal received. - // Recived event: Sleep - Sleeping for 1s before respwaning instance. - // Recived event: ProcessRespawn - Trying to respawn instance. - // Recived event: ProcessStart - // Recived STDOUT message: STDOUT MESSAGE - // Recived STDERR message: STDERR MESSAGE - // Recived event: ProcessDone - exit status 0 - // Recived event: StoppingHeartbeatMonitoring - Stop signal received. - // Recived event: Sleep - Sleeping for 1s before respwaning instance. - // Recived event: ProcessRespawn - Trying to respawn instance. - // Recived event: ProcessStart - // Recived STDOUT message: STDOUT MESSAGE - // Recived STDERR message: STDERR MESSAGE - // Recived event: ProcessDone - exit status 0 - // Recived event: StoppingHeartbeatMonitoring - Stop signal received. - // Recived event: Sleep - Sleeping for 1s before respwaning instance. - // Recived event: ProcessRespawn - Trying to respawn instance. - // Recived event: ProcessStart - // Recived STDOUT message: STDOUT MESSAGE - // Recived STDERR message: STDERR MESSAGE - // Recived event: ProcessDone - exit status 0 - // Recived event: StoppingHeartbeatMonitoring - Stop signal received. - // Recived event: RespawnError - Max number of respawns reached. + // Received event: ProcessStart + // Received STDOUT message: STDOUT MESSAGE + // Received STDERR message: STDERR MESSAGE + // Received event: ProcessDone - exit status 0 + // Received event: StoppingHeartbeatMonitoring - Stop signal received. + // Received event: Sleep - Sleeping for 1s before respwaning instance. + // Received event: ProcessRespawn - Trying to respawn instance. + // Received event: ProcessStart + // Received STDOUT message: STDOUT MESSAGE + // Received STDERR message: STDERR MESSAGE + // Received event: ProcessDone - exit status 0 + // Received event: StoppingHeartbeatMonitoring - Stop signal received. + // Received event: Sleep - Sleeping for 1s before respwaning instance. + // Received event: ProcessRespawn - Trying to respawn instance. + // Received event: ProcessStart + // Received STDOUT message: STDOUT MESSAGE + // Received STDERR message: STDERR MESSAGE + // Received event: ProcessDone - exit status 0 + // Received event: StoppingHeartbeatMonitoring - Stop signal received. + // Received event: Sleep - Sleeping for 1s before respwaning instance. + // Received event: ProcessRespawn - Trying to respawn instance. + // Received event: ProcessStart + // Received STDOUT message: STDOUT MESSAGE + // Received STDERR message: STDERR MESSAGE + // Received event: ProcessDone - exit status 0 + // Received event: StoppingHeartbeatMonitoring - Stop signal received. + // Received event: RespawnError - Max number of respawns reached. // Closing loop we are done... } diff --git a/parsers.go b/parsers.go index 817d371..af7f6d3 100644 --- a/parsers.go +++ b/parsers.go @@ -49,7 +49,7 @@ func MakeLineParser(fromR io.Reader, bufferSize int) ProduceFn { } } -// MakeLineParser is called with an io.Reader, and returns a function, that when called will output references to +// MakeBytesParser is called with an io.Reader, and returns a function, that when called will output references to // byte slices that contain the bytes read from the io.Reader. func MakeBytesParser(fromR io.Reader, bufferSize int) ProduceFn { br := bufio.NewReaderSize(fromR, bufferSize) diff --git a/supervisor.go b/supervisor.go index e76c8fd..a11b65f 100644 --- a/supervisor.go +++ b/supervisor.go @@ -268,7 +268,11 @@ func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, st cleanPipe := func() { for { if res, err := producer(); res != nil { - out <- res + select { + case out <- res: + default: + // During cleaning, throw out messages if they are not collect right away. + } } else if err != nil { return } @@ -314,7 +318,8 @@ func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, sto select { case <-stopC: notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.") - return + close(isMonitorClosed) + return // Return early to avoid calling stop() case alive = <-heartbeat: if alive { diff --git a/supervisor_test.go b/supervisor_test.go index b4424a5..8902c62 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -33,14 +33,6 @@ func safeStop(t *time.Timer) { } } -type testCommon interface { - Helper() - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) -} - func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) { t.Helper() for i := from; i < to; i++ { @@ -51,10 +43,10 @@ func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) { } } -func fatalIfErr(t testCommon, err error) { - t.Helper() +func fatalIfErr(tb testing.TB, err error) { + tb.Helper() if err != nil { - t.Fatal(err) + tb.Fatal(err) } } @@ -65,9 +57,9 @@ func assertExpectedEqualsActual(t *testing.T, expected, actual interface{}) { } } -func testDir(t testCommon) string { +func testDir(tb testing.TB) string { testDir, err := filepath.Abs("testdata") - fatalIfErr(t, err) + fatalIfErr(tb, err) return testDir } @@ -83,24 +75,22 @@ func funcName() string { // logProcessEvents is a helper function that registers an event notifier that // will pass all events to the logger. -func logProcessEvents(t testCommon, p *su.Process) (teardown func()) { - t.Helper() +func logProcessEvents(tb testing.TB, p *su.Process) { + tb.Helper() closeC := make(chan interface{}) notifier := p.EventNotifier() go func() { + tb.Helper() for stop := false; !stop; { select { case x := <-notifier: - log.Printf("%+v", x) - // t.Logf("%+v", x) + tb.Logf("%+v", x) case <-closeC: stop = true } } }() - return func() { - close(closeC) - } + tb.Cleanup(func() { close(closeC) }) } func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn { @@ -117,12 +107,46 @@ func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn { } // ensureProcessKilled logs a fatal error if the process isn't dead, and kills the process. -func ensureProcessKilled(t testCommon, pid int) { - t.Helper() +func ensureProcessKilled(tb testing.TB, pid int) { + tb.Helper() signalErr := syscall.Kill(pid, syscall.Signal(0)) if signalErr != syscall.Errno(3) { - t.Errorf("child process (%d) is still running, killing it.", pid) - fatalIfErr(t, syscall.Kill(pid, syscall.SIGKILL)) + tb.Errorf("child process (%d) is still running, killing it.", pid) + fatalIfErr(tb, syscall.Kill(pid, syscall.SIGKILL)) + } +} + +func TestStderrMemoryLeak(t *testing.T) { + p := su.NewProcess(su.ProcessOptions{ + Id: funcName(), + Name: "./endless_errors.sh", + Dir: testDir(t), + OutputParser: su.MakeBytesParser, + ErrorParser: su.MakeBytesParser, + MaxSpawns: 1, + MaxSpawnAttempts: 1, + }) + + origLogOut := log.Writer() + defer log.SetOutput(origLogOut) + logOut := bytes.NewBuffer([]byte{}) + log.SetOutput(logOut) + + fatalIfErr(t, p.Start()) + + time.Sleep(time.Millisecond * 20) + + fatalIfErr(t, p.Stop()) + + select { + case <-p.DoneNotifier(): + case <-time.After(time.Second): + t.Errorf("Timeout") + } + + logOutStr := logOut.String() + if len(logOutStr) > 0 { + t.Errorf("Global logger was used - probably to print errors: \n%s", logOutStr) } } diff --git a/testdata/endless_errors.sh b/testdata/endless_errors.sh new file mode 100755 index 0000000..fab187b --- /dev/null +++ b/testdata/endless_errors.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +while :; do + >&2 echo "foo" + sleep 0.01 +done \ No newline at end of file