mirror of
https://github.com/kontera-technologies/go-supervisor
synced 2025-09-27 03:25:53 +08:00
738 lines
17 KiB
Go
738 lines
17 KiB
Go
package supervisor_test
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"reflect"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fortytw2/leaktest"
|
|
|
|
su "github.com/kontera-technologies/go-supervisor/v2"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
su.EnsureClosedTimeout = time.Millisecond * 10
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
func safeStop(t *time.Timer) {
|
|
if !t.Stop() {
|
|
<-t.C
|
|
}
|
|
}
|
|
|
|
func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) {
|
|
t.Helper()
|
|
for i := from; i < to; i++ {
|
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
|
t.Helper()
|
|
f(t, i)
|
|
})
|
|
}
|
|
}
|
|
|
|
func fatalIfErr(tb testing.TB, err error) {
|
|
tb.Helper()
|
|
if err != nil {
|
|
tb.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func assertExpectedEqualsActual(t *testing.T, expected, actual interface{}) {
|
|
t.Helper()
|
|
if !reflect.DeepEqual(expected, actual) {
|
|
t.Errorf("\n\tExpected: %q\n\tActual: %q", expected, actual)
|
|
}
|
|
}
|
|
|
|
func testDir(tb testing.TB) string {
|
|
testDir, err := filepath.Abs("testdata")
|
|
fatalIfErr(tb, err)
|
|
return testDir
|
|
}
|
|
|
|
func funcName() string {
|
|
pc, _, _, ok := runtime.Caller(1)
|
|
if !ok {
|
|
return "?"
|
|
}
|
|
|
|
fn := runtime.FuncForPC(pc)
|
|
return strings.TrimPrefix(fn.Name(), "github.com/kontera-technologies/go-supervisor/v2_test.")
|
|
}
|
|
|
|
// logProcessEvents is a helper function that registers an event notifier that
|
|
// will pass all events to the logger.
|
|
func logProcessEvents(tb testing.TB, p *su.Process) {
|
|
tb.Helper()
|
|
closeC := make(chan interface{})
|
|
notifier := p.EventNotifier()
|
|
go func() {
|
|
tb.Helper()
|
|
for stop := false; !stop; {
|
|
select {
|
|
case x := <-notifier:
|
|
tb.Logf("%+v", x)
|
|
case <-closeC:
|
|
stop = true
|
|
}
|
|
}
|
|
}()
|
|
tb.Cleanup(func() { close(closeC) })
|
|
}
|
|
|
|
func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn {
|
|
p := su.MakeLineParser(fromR, parserSize)
|
|
return func() (*interface{}, error) {
|
|
raw, err := p()
|
|
if raw != nil {
|
|
var res interface{}
|
|
res = errors.New((*raw).(string))
|
|
return &res, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// ensureProcessKilled logs a fatal error if the process isn't dead, and kills the process.
|
|
func ensureProcessKilled(tb testing.TB, pid int) {
|
|
tb.Helper()
|
|
signalErr := syscall.Kill(pid, syscall.Signal(0))
|
|
if signalErr != syscall.Errno(3) {
|
|
tb.Errorf("child process (%d) is still running, killing it.", pid)
|
|
fatalIfErr(tb, syscall.Kill(pid, syscall.SIGKILL))
|
|
}
|
|
}
|
|
|
|
func TestStderrMemoryLeak(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./endless_errors.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeBytesParser,
|
|
ErrorParser: su.MakeBytesParser,
|
|
MaxSpawns: 1,
|
|
MaxSpawnAttempts: 1,
|
|
})
|
|
|
|
origLogOut := log.Writer()
|
|
defer log.SetOutput(origLogOut)
|
|
logOut := bytes.NewBuffer([]byte{})
|
|
log.SetOutput(logOut)
|
|
|
|
fatalIfErr(t, p.Start())
|
|
|
|
time.Sleep(time.Millisecond * 20)
|
|
|
|
fatalIfErr(t, p.Stop())
|
|
|
|
select {
|
|
case <-p.DoneNotifier():
|
|
case <-time.After(time.Second):
|
|
t.Errorf("Timeout")
|
|
}
|
|
|
|
logOutStr := logOut.String()
|
|
if len(logOutStr) > 0 {
|
|
t.Errorf("Global logger was used - probably to print errors: \n%s", logOutStr)
|
|
}
|
|
}
|
|
|
|
func TestJsonParser(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./endless_jsons.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeJsonLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
MaxSpawns: 1,
|
|
Out: make(chan *interface{}, 5),
|
|
})
|
|
|
|
expected := map[string]interface{}{
|
|
"foo": "bar",
|
|
"quo": []interface{}{"quz", float64(1), false},
|
|
}
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
|
|
time.AfterFunc(time.Millisecond*30, func() {
|
|
fatalIfErr(t, p.Stop())
|
|
})
|
|
|
|
runFor(t, 0, 3, func(t *testing.T, i int) {
|
|
select {
|
|
case v := <-p.Stdout():
|
|
assertExpectedEqualsActual(t, expected, *v)
|
|
case <-time.After(time.Millisecond * 30):
|
|
t.Error("Expected output.")
|
|
}
|
|
})
|
|
|
|
select {
|
|
case v := <-p.Stdout():
|
|
t.Errorf("Unexpected output - %#v", *v)
|
|
case <-time.After(time.Millisecond * 20):
|
|
}
|
|
}
|
|
|
|
func TestBadJsonOutput(t *testing.T) {
|
|
out := bytes.NewReader([]byte(`{"a":"b"}
|
|
2019/08/21
|
|
13:43:24
|
|
invalid character '}'
|
|
{"c":"d"}{"c":"d"}
|
|
{"c":"d"}`))
|
|
tmp := su.MakeJsonLineParser(out, 4096)
|
|
p := func() *interface{} {
|
|
a, _ := tmp()
|
|
return a
|
|
}
|
|
|
|
assertExpectedEqualsActual(t, map[string]interface{}{"a": "b"}, *p())
|
|
assertExpectedEqualsActual(t, float64(2019), *p())
|
|
assertExpectedEqualsActual(t, (*interface{})(nil), p())
|
|
assertExpectedEqualsActual(t, float64(13), *p())
|
|
assertExpectedEqualsActual(t, (*interface{})(nil), p())
|
|
assertExpectedEqualsActual(t, (*interface{})(nil), p())
|
|
assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p())
|
|
assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p())
|
|
assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p())
|
|
assertExpectedEqualsActual(t, (*interface{})(nil), p())
|
|
}
|
|
|
|
func BenchmarkBadJsonOutput(b *testing.B) {
|
|
|
|
}
|
|
|
|
func TestProcess_Start(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./greet_with_error.sh",
|
|
Args: []string{"Hello"},
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
MaxSpawns: 1,
|
|
Out: make(chan *interface{}, 1),
|
|
Err: make(chan *interface{}, 1),
|
|
})
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
|
|
x := []byte("world\n")
|
|
select {
|
|
case p.Input() <- x:
|
|
case <-time.After(time.Millisecond):
|
|
t.Error("Input wasn't consumed in 1 millisecond")
|
|
}
|
|
|
|
select {
|
|
case out := <-p.Stdout():
|
|
assertExpectedEqualsActual(t, "Hello world", *out)
|
|
case <-time.After(time.Millisecond * 200):
|
|
t.Error("No output in 200ms")
|
|
}
|
|
|
|
select {
|
|
case v := <-p.Stderr():
|
|
assertExpectedEqualsActual(t, "Bye world", (*v).(error).Error())
|
|
case <-time.After(time.Millisecond * 200):
|
|
t.Error("No error in 200ms")
|
|
}
|
|
}
|
|
|
|
func TestMakeLineParser(t *testing.T) {
|
|
cmd := exec.Command("./endless.sh")
|
|
cmd.Dir = testDir(t)
|
|
out, _ := cmd.StdoutPipe()
|
|
_ = cmd.Start()
|
|
c := make(chan *interface{})
|
|
go func() {
|
|
x := su.MakeLineParser(out, 0)
|
|
for a, _ := x(); a != nil; a, _ = x() {
|
|
c <- a
|
|
}
|
|
close(c)
|
|
}()
|
|
time.AfterFunc(time.Second, func() {
|
|
_ = cmd.Process.Kill()
|
|
})
|
|
|
|
runFor(t, 0, 10, func(t *testing.T, i int) {
|
|
select {
|
|
case x := <-c:
|
|
if x == nil {
|
|
t.Error("unexpected nil")
|
|
return
|
|
}
|
|
assertExpectedEqualsActual(t, "foo", *x)
|
|
case <-time.After(time.Millisecond * 20):
|
|
t.Error("Expected output before 20ms pass.")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestProcess_Signal(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./endless.sh",
|
|
Dir: testDir(t),
|
|
Out: make(chan *interface{}, 10),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
})
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
pid := p.Pid()
|
|
|
|
c := make(chan bool)
|
|
time.AfterFunc(time.Millisecond*70, func() {
|
|
fatalIfErr(t, syscall.Kill(-p.Pid(), syscall.SIGINT))
|
|
c <- true
|
|
})
|
|
|
|
runFor(t, 0, 5, func(t *testing.T, i int) {
|
|
select {
|
|
case out := <-p.Stdout():
|
|
if *out != "foo" {
|
|
t.Errorf(`Expected: "foo", received: "%s"`, *out)
|
|
}
|
|
case err := <-p.Stderr():
|
|
t.Error("Unexpected error:", err)
|
|
case <-time.After(time.Millisecond * 30):
|
|
t.Error("Expected output in channel")
|
|
}
|
|
})
|
|
|
|
<-c
|
|
time.Sleep(time.Millisecond * 10)
|
|
ensureProcessKilled(t, pid)
|
|
}
|
|
|
|
func TestProcess_Close(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./trap.sh",
|
|
Args: []string{"endless.sh"},
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
EventNotifier: make(chan su.Event, 10),
|
|
MaxInterruptAttempts: 1,
|
|
MaxTerminateAttempts: 2,
|
|
TerminationGraceTimeout: time.Millisecond,
|
|
})
|
|
|
|
procClosedC := make(chan error)
|
|
fatalIfErr(t, p.Start())
|
|
time.AfterFunc(time.Millisecond*20, func() {
|
|
procClosedC <- p.Stop()
|
|
})
|
|
|
|
var err error
|
|
var childPid int
|
|
|
|
select {
|
|
case v := <-p.Stderr():
|
|
childPid, err = strconv.Atoi((*v).(error).Error())
|
|
if err != nil {
|
|
t.Fatal("Expected child process id in error channel. Instead received:", (*v).(error).Error())
|
|
}
|
|
case <-time.After(time.Millisecond * 10):
|
|
t.Fatal("Expected child process id in error channel in 100 milliseconds")
|
|
}
|
|
|
|
t.Run("<-procClosedC", func(t *testing.T) {
|
|
fatalIfErr(t, <-procClosedC)
|
|
})
|
|
|
|
t.Run("trapped signals", func(t *testing.T) {
|
|
errs := map[string]string{
|
|
"InterruptError": "interrupt signal failed - 1 attempts",
|
|
"TerminateError": "terminate signal failed - 2 attempts",
|
|
}
|
|
|
|
for i := 0; i < 10 && len(errs) > 0; i++ {
|
|
select {
|
|
case ev := <-p.EventNotifier():
|
|
if !strings.HasSuffix(ev.Code, "Error") {
|
|
continue
|
|
}
|
|
assertExpectedEqualsActual(t, errs[ev.Code], ev.Message)
|
|
delete(errs, ev.Code)
|
|
default:
|
|
}
|
|
}
|
|
for code, err := range errs {
|
|
t.Errorf(`expected a %s event - "%s"`, code, err)
|
|
}
|
|
})
|
|
|
|
time.Sleep(time.Millisecond * 15)
|
|
ensureProcessKilled(t, childPid)
|
|
}
|
|
|
|
func TestProcess_RespawnOnFailedExit(t *testing.T) {
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./error.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: su.MakeLineParser,
|
|
Err: make(chan *interface{}, 3),
|
|
MaxSpawns: 3,
|
|
MaxRespawnBackOff: time.Millisecond,
|
|
})
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
|
|
runFor(t, 0, 3, func(t *testing.T, i int) {
|
|
select {
|
|
case out := <-p.Stdout():
|
|
t.Errorf("Unexpected output: %#v", out)
|
|
case v := <-p.Stderr():
|
|
assertExpectedEqualsActual(t, "Bye world", *v)
|
|
case <-time.After(time.Millisecond * 3000):
|
|
t.Error("Expected error within 3000ms")
|
|
return
|
|
}
|
|
})
|
|
|
|
select {
|
|
case out := <-p.Stdout():
|
|
t.Errorf("Unexpected output: %#v", out)
|
|
case v := <-p.Stderr():
|
|
t.Errorf("Unexpected error: %#v", *v)
|
|
case <-time.After(time.Millisecond * 500):
|
|
}
|
|
}
|
|
|
|
func TestProcess_NoRespawnOnSuccessExit(t *testing.T) {
|
|
runtime.Caller(0)
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./echo.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
})
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
|
|
select {
|
|
case out := <-p.Stdout():
|
|
assertExpectedEqualsActual(t, "Hello world", *out)
|
|
case <-time.After(time.Millisecond * 150):
|
|
t.Error("No output in 150 milliseconds")
|
|
}
|
|
|
|
select {
|
|
case out := <-p.Stdout():
|
|
t.Errorf("Unexpected output: %s", *out)
|
|
case <-time.After(time.Millisecond * 10):
|
|
}
|
|
}
|
|
|
|
func TestCalcBackOff(t *testing.T) {
|
|
p1 := su.NewProcess(su.ProcessOptions{Id: funcName() + "-1"})
|
|
p2 := su.NewProcess(su.ProcessOptions{Id: funcName() + "-2"})
|
|
|
|
for i := 0; i < 5; i++ {
|
|
a, b := p1.CalcBackOff(i, time.Second, time.Minute), p2.CalcBackOff(i, time.Second, time.Minute)
|
|
if a == b {
|
|
t.Errorf("2 identical results for CalcBackOff(%d, time.Minute): %v", i, a)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestProcess_Restart(t *testing.T) {
|
|
defer leaktest.Check(t)()
|
|
timer := time.NewTimer(0)
|
|
safeStop(timer)
|
|
|
|
// initialGoroutines := runtime.NumGoroutine()
|
|
p := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./endless.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: makeErrorParser,
|
|
Out: make(chan *interface{}, 5),
|
|
IdleTimeout: time.Millisecond * 30,
|
|
MaxSpawns: 2,
|
|
MaxRespawnBackOff: time.Microsecond * 100,
|
|
TerminationGraceTimeout: time.Millisecond,
|
|
})
|
|
|
|
fatalIfErr(t, p.Start())
|
|
defer p.Stop()
|
|
|
|
numGoroutines := -1
|
|
|
|
runFor(t, 0, 3, func(t *testing.T, i int) {
|
|
timer.Reset(time.Millisecond * 20)
|
|
if numGoroutines == -1 {
|
|
numGoroutines = runtime.NumGoroutine()
|
|
}
|
|
select {
|
|
case out := <-p.Stdout():
|
|
if *out != "foo" {
|
|
t.Errorf(`Expected: "foo", received: "%s"`, *out)
|
|
}
|
|
case err := <-p.Stderr():
|
|
t.Error("Unexpected error:", err)
|
|
case <-timer.C:
|
|
t.Error("Expected output in channel")
|
|
return
|
|
}
|
|
safeStop(timer)
|
|
})
|
|
|
|
fatalIfErr(t, p.Restart())
|
|
|
|
t.Run("SIGINT received", func(t *testing.T) {
|
|
if state := p.LastProcessState(); state != nil {
|
|
raw := state.Sys()
|
|
waitStatus, ok := raw.(syscall.WaitStatus)
|
|
if !ok {
|
|
t.Fatalf("Process.LastError().Sys() should be of type syscall.WaitStatus, %q received", raw)
|
|
} else if waitStatus.Signal() != syscall.SIGINT {
|
|
t.Errorf("Expected %#v, %#v signal received", syscall.SIGINT.String(), waitStatus.Signal().String())
|
|
}
|
|
}
|
|
})
|
|
|
|
runFor(t, 3, 6, func(t *testing.T, i int) {
|
|
timer.Reset(time.Millisecond * 20)
|
|
select {
|
|
case out := <-p.Stdout():
|
|
if *out != "foo" {
|
|
t.Errorf(`Expected: "foo", received: "%s"`, *out)
|
|
}
|
|
case err := <-p.Stderr():
|
|
t.Error("Unexpected error:", err)
|
|
case <-timer.C:
|
|
t.Error("Expected output in channel within 120ms")
|
|
return
|
|
}
|
|
safeStop(timer)
|
|
})
|
|
|
|
_ = p.Restart()
|
|
|
|
t.Run("MaxSpawns reached", func(t *testing.T) {
|
|
timer.Reset(time.Millisecond * 24)
|
|
select {
|
|
case out := <-p.Stdout():
|
|
t.Error("Unexpected output:", *out)
|
|
case err := <-p.Stderr():
|
|
t.Error("Unexpected error:", err)
|
|
case <-timer.C:
|
|
return
|
|
}
|
|
safeStop(timer)
|
|
})
|
|
time.Sleep(time.Second)
|
|
}
|
|
|
|
// test_timings_compressed_data can be used to test the performance of this library.
|
|
func test_timings_compressed_data(t *testing.T) {
|
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
|
|
|
f, err := os.Open("testdata/ipsum.zlib")
|
|
fatalIfErr(t, err)
|
|
content, err := ioutil.ReadAll(f)
|
|
fatalIfErr(t, err)
|
|
|
|
producer := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./zlib.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeLineParser,
|
|
ErrorParser: su.MakeLineParser,
|
|
MaxSpawnAttempts: 1,
|
|
ParserBufferSize: 170000,
|
|
})
|
|
|
|
fatalIfErr(t, producer.Start())
|
|
|
|
stop := make(chan bool)
|
|
pDone := make(chan bool)
|
|
|
|
prodInNum := int64(0)
|
|
prodOutNum := int64(0)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
log.Println("prodInNum", prodInNum)
|
|
return
|
|
case <-time.After(time.Microsecond):
|
|
producer.Input() <- content
|
|
prodInNum++
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
log.Println("prodOutNum", prodOutNum)
|
|
return
|
|
case <-producer.Stdout():
|
|
prodOutNum++
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
<-stop
|
|
_ = producer.Stop()
|
|
close(pDone)
|
|
}()
|
|
|
|
time.AfterFunc(time.Second*10, func() {
|
|
close(stop)
|
|
})
|
|
|
|
<-pDone
|
|
|
|
log.Println(prodInNum, prodOutNum)
|
|
}
|
|
|
|
// test_timings can be used to test the performance of this library.
|
|
func test_timings(t *testing.T) {
|
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
|
|
|
f, err := os.Open("testdata/ipsum.txt")
|
|
fatalIfErr(t, err)
|
|
|
|
ipsum, err := ioutil.ReadAll(f)
|
|
fatalIfErr(t, err)
|
|
ipsum = append(ipsum, '\n')
|
|
|
|
producer := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./producer.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeBytesParser,
|
|
ErrorParser: su.MakeLineParser,
|
|
ParserBufferSize: 170000,
|
|
})
|
|
incrementer := su.NewProcess(su.ProcessOptions{
|
|
Id: funcName(),
|
|
Name: "./incrementer.sh",
|
|
Dir: testDir(t),
|
|
OutputParser: su.MakeBytesParser,
|
|
ErrorParser: su.MakeLineParser,
|
|
ParserBufferSize: 170000,
|
|
})
|
|
|
|
fatalIfErr(t, producer.Start())
|
|
fatalIfErr(t, incrementer.Start())
|
|
|
|
stop := make(chan bool)
|
|
pDone := make(chan bool)
|
|
iDone := make(chan bool)
|
|
|
|
prodInNum := int64(0)
|
|
prodOutNum := int64(0)
|
|
incOutNum := int64(0)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
log.Println("prodInNum", prodInNum)
|
|
return
|
|
case <-time.After(time.Microsecond * 50):
|
|
producer.Input() <- ipsum
|
|
prodInNum++
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
log.Println("prodOutNum", prodOutNum)
|
|
return
|
|
case msg := <-producer.Stdout():
|
|
incrementer.Input() <- (*msg).([]byte)
|
|
prodOutNum++
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
log.Println("incOutNum", incOutNum)
|
|
return
|
|
case <-incrementer.Stdout():
|
|
incOutNum++
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
<-stop
|
|
_ = producer.Stop()
|
|
close(pDone)
|
|
}()
|
|
go func() {
|
|
<-stop
|
|
_ = incrementer.Stop()
|
|
close(iDone)
|
|
}()
|
|
|
|
time.AfterFunc(time.Second*10, func() {
|
|
close(stop)
|
|
})
|
|
|
|
<-iDone
|
|
<-pDone
|
|
|
|
log.Println(prodInNum, prodOutNum, incOutNum)
|
|
}
|
|
|
|
func TestMonitorRunTimeout(t *testing.T) {
|
|
heartbeat, isMonitorClosed, stopC := make(chan bool), make(chan bool), make(chan bool)
|
|
result := make(chan string)
|
|
resEvent := make(chan string)
|
|
|
|
stopF := func() error {
|
|
result <- "Stopped"
|
|
return nil
|
|
}
|
|
eventNotify := func(event string, message ...interface{}) {
|
|
resEvent <- event
|
|
}
|
|
go su.MonitorHeartBeat(20*time.Millisecond, 10*time.Millisecond, heartbeat, isMonitorClosed, stopC, stopF, eventNotify)
|
|
time.Sleep(20 * time.Millisecond)
|
|
assertExpectedEqualsActual(t, <-resEvent, "RunTimePassed")
|
|
assertExpectedEqualsActual(t, <-result, "Stopped")
|
|
}
|