Fix Stderr memory leak when cleaning pipe (#6)

This commit is contained in:
Eyal Shalev
2021-05-31 15:37:11 +03:00
committed by GitHub
parent c766df8bf1
commit da3f8a8ba8
7 changed files with 128 additions and 94 deletions

View File

@@ -1,8 +1,8 @@
language: go language: go
go: go:
- '1.12'
- '1.13'
- '1.14' - '1.14'
- '1.15'
- '1.16'
env: env:
global: global:
- GO111MODULE=on - GO111MODULE=on

View File

@@ -50,15 +50,15 @@ func main() {
for { for {
select { select {
case msg := <-p.Stdout(): case msg := <-p.Stdout():
fmt.Printf("Recived STDOUT message: %s\n", *msg) fmt.Printf("Received STDOUT message: %s\n", *msg)
case msg := <-p.Stderr(): case msg := <-p.Stderr():
fmt.Printf("Recived STDERR message: %s\n", *msg) fmt.Printf("Received STDERR message: %s\n", *msg)
case event := <-events: case event := <-events:
switch event.Code { switch event.Code {
case "ProcessStart": case "ProcessStart":
fmt.Printf("Recived event: %s\n", event.Code) fmt.Printf("Received event: %s\n", event.Code)
default: default:
fmt.Printf("Recived event: %s - %s\n", event.Code, event.Message) fmt.Printf("Received event: %s - %s\n", event.Code, event.Message)
} }
case <-p.DoneNotifier(): case <-p.DoneNotifier():
fmt.Println("Closing loop we are done...") fmt.Println("Closing loop we are done...")
@@ -78,32 +78,32 @@ func main() {
running the program should produce this output running the program should produce this output
``` ```
Recived event: ProcessStart Received event: ProcessStart
Recived STDOUT message: STDOUT MESSAGE Received STDOUT message: STDOUT MESSAGE
Recived STDERR message: STDERR MESSAGE Received STDERR message: STDERR MESSAGE
Recived event: ProcessDone - exit status 0 Received event: ProcessDone - exit status 0
Recived event: StoppingHeartbeatMonitoring - Stop signal received. Received event: StoppingHeartbeatMonitoring - Stop signal received.
Recived event: Sleep - Sleeping for 1s before respwaning instance. Received event: Sleep - Sleeping for 1s before respwaning instance.
Recived event: ProcessRespawn - Trying to respawn instance. Received event: ProcessRespawn - Trying to respawn instance.
Recived event: ProcessStart Received event: ProcessStart
Recived STDOUT message: STDOUT MESSAGE Received STDOUT message: STDOUT MESSAGE
Recived STDERR message: STDERR MESSAGE Received STDERR message: STDERR MESSAGE
Recived event: ProcessDone - exit status 0 Received event: ProcessDone - exit status 0
Recived event: StoppingHeartbeatMonitoring - Stop signal received. Received event: StoppingHeartbeatMonitoring - Stop signal received.
Recived event: Sleep - Sleeping for 1s before respwaning instance. Received event: Sleep - Sleeping for 1s before respwaning instance.
Recived event: ProcessRespawn - Trying to respawn instance. Received event: ProcessRespawn - Trying to respawn instance.
Recived event: ProcessStart Received event: ProcessStart
Recived STDOUT message: STDOUT MESSAGE Received STDOUT message: STDOUT MESSAGE
Recived STDERR message: STDERR MESSAGE Received STDERR message: STDERR MESSAGE
Recived event: ProcessDone - exit status 0 Received event: ProcessDone - exit status 0
Recived event: StoppingHeartbeatMonitoring - Stop signal received. Received event: StoppingHeartbeatMonitoring - Stop signal received.
Recived event: Sleep - Sleeping for 1s before respwaning instance. Received event: Sleep - Sleeping for 1s before respwaning instance.
Recived event: ProcessRespawn - Trying to respawn instance. Received event: ProcessRespawn - Trying to respawn instance.
Recived event: ProcessStart Received event: ProcessStart
Recived STDOUT message: STDOUT MESSAGE Received STDOUT message: STDOUT MESSAGE
Recived STDERR message: STDERR MESSAGE Received STDERR message: STDERR MESSAGE
Recived event: ProcessDone - exit status 0 Received event: ProcessDone - exit status 0
Recived event: StoppingHeartbeatMonitoring - Stop signal received. Received event: StoppingHeartbeatMonitoring - Stop signal received.
Recived event: RespawnError - Max number of respawns reached. Received event: RespawnError - Max number of respawns reached.
Closing loop we are done... Closing loop we are done...
``` ```

View File

@@ -33,15 +33,14 @@ func Example() {
for { for {
select { select {
case msg := <-p.Stdout(): case msg := <-p.Stdout():
fmt.Printf("Recived STDOUT message: %s\n", *msg) fmt.Printf("Received STDOUT message: %s\n", *msg)
case msg := <-p.Stderr(): case msg := <-p.Stderr():
fmt.Printf("Recived STDERR message: %s\n", *msg) fmt.Printf("Received STDERR message: %s\n", *msg)
case event := <-events: case event := <-events:
switch event.Code { if event.Code == "ProcessStart" || event.Message == "" {
case "ProcessStart": fmt.Printf("Received event: %s\n", event.Code)
fmt.Printf("Recived event: %s\n", event.Code) } else {
default: fmt.Printf("Received event: %s - %s\n", event.Code, event.Message)
fmt.Printf("Recived event: %s - %s\n", event.Code, event.Message)
} }
case <-p.DoneNotifier(): case <-p.DoneNotifier():
fmt.Println("Closing loop we are done...") fmt.Println("Closing loop we are done...")
@@ -58,32 +57,32 @@ func Example() {
<-exit <-exit
// Output: // Output:
// Recived event: ProcessStart // Received event: ProcessStart
// Recived STDOUT message: STDOUT MESSAGE // Received STDOUT message: STDOUT MESSAGE
// Recived STDERR message: STDERR MESSAGE // Received STDERR message: STDERR MESSAGE
// Recived event: ProcessDone - exit status 0 // Received event: ProcessDone - exit status 0
// Recived event: StoppingHeartbeatMonitoring - Stop signal received. // Received event: StoppingHeartbeatMonitoring - Stop signal received.
// Recived event: Sleep - Sleeping for 1s before respwaning instance. // Received event: Sleep - Sleeping for 1s before respwaning instance.
// Recived event: ProcessRespawn - Trying to respawn instance. // Received event: ProcessRespawn - Trying to respawn instance.
// Recived event: ProcessStart // Received event: ProcessStart
// Recived STDOUT message: STDOUT MESSAGE // Received STDOUT message: STDOUT MESSAGE
// Recived STDERR message: STDERR MESSAGE // Received STDERR message: STDERR MESSAGE
// Recived event: ProcessDone - exit status 0 // Received event: ProcessDone - exit status 0
// Recived event: StoppingHeartbeatMonitoring - Stop signal received. // Received event: StoppingHeartbeatMonitoring - Stop signal received.
// Recived event: Sleep - Sleeping for 1s before respwaning instance. // Received event: Sleep - Sleeping for 1s before respwaning instance.
// Recived event: ProcessRespawn - Trying to respawn instance. // Received event: ProcessRespawn - Trying to respawn instance.
// Recived event: ProcessStart // Received event: ProcessStart
// Recived STDOUT message: STDOUT MESSAGE // Received STDOUT message: STDOUT MESSAGE
// Recived STDERR message: STDERR MESSAGE // Received STDERR message: STDERR MESSAGE
// Recived event: ProcessDone - exit status 0 // Received event: ProcessDone - exit status 0
// Recived event: StoppingHeartbeatMonitoring - Stop signal received. // Received event: StoppingHeartbeatMonitoring - Stop signal received.
// Recived event: Sleep - Sleeping for 1s before respwaning instance. // Received event: Sleep - Sleeping for 1s before respwaning instance.
// Recived event: ProcessRespawn - Trying to respawn instance. // Received event: ProcessRespawn - Trying to respawn instance.
// Recived event: ProcessStart // Received event: ProcessStart
// Recived STDOUT message: STDOUT MESSAGE // Received STDOUT message: STDOUT MESSAGE
// Recived STDERR message: STDERR MESSAGE // Received STDERR message: STDERR MESSAGE
// Recived event: ProcessDone - exit status 0 // Received event: ProcessDone - exit status 0
// Recived event: StoppingHeartbeatMonitoring - Stop signal received. // Received event: StoppingHeartbeatMonitoring - Stop signal received.
// Recived event: RespawnError - Max number of respawns reached. // Received event: RespawnError - Max number of respawns reached.
// Closing loop we are done... // Closing loop we are done...
} }

View File

@@ -49,7 +49,7 @@ func MakeLineParser(fromR io.Reader, bufferSize int) ProduceFn {
} }
} }
// MakeLineParser is called with an io.Reader, and returns a function, that when called will output references to // MakeBytesParser is called with an io.Reader, and returns a function, that when called will output references to
// byte slices that contain the bytes read from the io.Reader. // byte slices that contain the bytes read from the io.Reader.
func MakeBytesParser(fromR io.Reader, bufferSize int) ProduceFn { func MakeBytesParser(fromR io.Reader, bufferSize int) ProduceFn {
br := bufio.NewReaderSize(fromR, bufferSize) br := bufio.NewReaderSize(fromR, bufferSize)

View File

@@ -268,7 +268,11 @@ func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, st
cleanPipe := func() { cleanPipe := func() {
for { for {
if res, err := producer(); res != nil { if res, err := producer(); res != nil {
out <- res select {
case out <- res:
default:
// During cleaning, throw out messages if they are not collect right away.
}
} else if err != nil { } else if err != nil {
return return
} }
@@ -314,7 +318,8 @@ func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, sto
select { select {
case <-stopC: case <-stopC:
notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.") notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.")
return close(isMonitorClosed)
return // Return early to avoid calling stop()
case alive = <-heartbeat: case alive = <-heartbeat:
if alive { if alive {

View File

@@ -33,14 +33,6 @@ func safeStop(t *time.Timer) {
} }
} }
type testCommon interface {
Helper()
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
}
func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) { func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) {
t.Helper() t.Helper()
for i := from; i < to; i++ { for i := from; i < to; i++ {
@@ -51,10 +43,10 @@ func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) {
} }
} }
func fatalIfErr(t testCommon, err error) { func fatalIfErr(tb testing.TB, err error) {
t.Helper() tb.Helper()
if err != nil { if err != nil {
t.Fatal(err) tb.Fatal(err)
} }
} }
@@ -65,9 +57,9 @@ func assertExpectedEqualsActual(t *testing.T, expected, actual interface{}) {
} }
} }
func testDir(t testCommon) string { func testDir(tb testing.TB) string {
testDir, err := filepath.Abs("testdata") testDir, err := filepath.Abs("testdata")
fatalIfErr(t, err) fatalIfErr(tb, err)
return testDir return testDir
} }
@@ -83,24 +75,22 @@ func funcName() string {
// logProcessEvents is a helper function that registers an event notifier that // logProcessEvents is a helper function that registers an event notifier that
// will pass all events to the logger. // will pass all events to the logger.
func logProcessEvents(t testCommon, p *su.Process) (teardown func()) { func logProcessEvents(tb testing.TB, p *su.Process) {
t.Helper() tb.Helper()
closeC := make(chan interface{}) closeC := make(chan interface{})
notifier := p.EventNotifier() notifier := p.EventNotifier()
go func() { go func() {
tb.Helper()
for stop := false; !stop; { for stop := false; !stop; {
select { select {
case x := <-notifier: case x := <-notifier:
log.Printf("%+v", x) tb.Logf("%+v", x)
// t.Logf("%+v", x)
case <-closeC: case <-closeC:
stop = true stop = true
} }
} }
}() }()
return func() { tb.Cleanup(func() { close(closeC) })
close(closeC)
}
} }
func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn { func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn {
@@ -117,12 +107,46 @@ func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn {
} }
// ensureProcessKilled logs a fatal error if the process isn't dead, and kills the process. // ensureProcessKilled logs a fatal error if the process isn't dead, and kills the process.
func ensureProcessKilled(t testCommon, pid int) { func ensureProcessKilled(tb testing.TB, pid int) {
t.Helper() tb.Helper()
signalErr := syscall.Kill(pid, syscall.Signal(0)) signalErr := syscall.Kill(pid, syscall.Signal(0))
if signalErr != syscall.Errno(3) { if signalErr != syscall.Errno(3) {
t.Errorf("child process (%d) is still running, killing it.", pid) tb.Errorf("child process (%d) is still running, killing it.", pid)
fatalIfErr(t, syscall.Kill(pid, syscall.SIGKILL)) fatalIfErr(tb, syscall.Kill(pid, syscall.SIGKILL))
}
}
func TestStderrMemoryLeak(t *testing.T) {
p := su.NewProcess(su.ProcessOptions{
Id: funcName(),
Name: "./endless_errors.sh",
Dir: testDir(t),
OutputParser: su.MakeBytesParser,
ErrorParser: su.MakeBytesParser,
MaxSpawns: 1,
MaxSpawnAttempts: 1,
})
origLogOut := log.Writer()
defer log.SetOutput(origLogOut)
logOut := bytes.NewBuffer([]byte{})
log.SetOutput(logOut)
fatalIfErr(t, p.Start())
time.Sleep(time.Millisecond * 20)
fatalIfErr(t, p.Stop())
select {
case <-p.DoneNotifier():
case <-time.After(time.Second):
t.Errorf("Timeout")
}
logOutStr := logOut.String()
if len(logOutStr) > 0 {
t.Errorf("Global logger was used - probably to print errors: \n%s", logOutStr)
} }
} }

6
testdata/endless_errors.sh vendored Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
while :; do
>&2 echo "foo"
sleep 0.01
done