mirror of
				https://github.com/kontera-technologies/go-supervisor
				synced 2025-11-01 01:02:32 +08:00 
			
		
		
		
	Added v2 and go-mod support
This commit is contained in:
		
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,3 @@ | ||||
| module github.com/kontera-technologies/go-supervisor | ||||
|  | ||||
| go 1.14 | ||||
							
								
								
									
										21
									
								
								v2/LICENSE
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								v2/LICENSE
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | ||||
| MIT License | ||||
|  | ||||
| Copyright (c) 2018 Kontera | ||||
|  | ||||
| Permission is hereby granted, free of charge, to any person obtaining a copy | ||||
| of this software and associated documentation files (the "Software"), to deal | ||||
| in the Software without restriction, including without limitation the rights | ||||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||||
| copies of the Software, and to permit persons to whom the Software is | ||||
| furnished to do so, subject to the following conditions: | ||||
|  | ||||
| The above copyright notice and this permission notice shall be included in all | ||||
| copies or substantial portions of the Software. | ||||
|  | ||||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||||
| SOFTWARE. | ||||
							
								
								
									
										5
									
								
								v2/go.mod
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								v2/go.mod
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,5 @@ | ||||
| module github.com/kontera-technologies/go-supervisor/v2 | ||||
|  | ||||
| go 1.14 | ||||
|  | ||||
| require github.com/fortytw2/leaktest v1.3.0 | ||||
							
								
								
									
										2
									
								
								v2/go.sum
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								v2/go.sum
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,2 @@ | ||||
| github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= | ||||
| github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= | ||||
							
								
								
									
										62
									
								
								v2/parsers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										62
									
								
								v2/parsers.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,62 @@ | ||||
| package supervisor | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // MakeJsonLineParser is called with an io.Reader, and returns a function, that when called will output references to | ||||
| // map[string]interface{} objects that contain the parsed json data. | ||||
| // If an invalid json is encountered, all the characters up until a new-line will be dropped. | ||||
| func MakeJsonLineParser(fromR io.Reader, bufferSize int) ProduceFn { | ||||
| 	br := bufio.NewReaderSize(fromR, bufferSize) | ||||
| 	dec := json.NewDecoder(br) | ||||
| 	return func() (*interface{}, bool) { | ||||
| 		var v interface{} | ||||
| 		if err := dec.Decode(&v); err == nil { | ||||
| 			return &v, false | ||||
| 		} else if err != io.EOF { | ||||
| 			rest, _ := ioutil.ReadAll(dec.Buffered()) | ||||
| 			restLines := bytes.SplitAfterN(rest, []byte{'\n'}, 2) | ||||
| 			if len(restLines) > 1 { | ||||
| 				// todo: test memory consumption on many mistakes (which will happen) | ||||
| 				dec = json.NewDecoder(io.MultiReader(bytes.NewReader(restLines[1]), br)) | ||||
| 			} else { | ||||
| 				dec = json.NewDecoder(br) | ||||
| 			} | ||||
| 		} | ||||
| 		return nil, true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // MakeLineParser is called with an io.Reader, and returns a function, that when called will output references to | ||||
| // strings that contain the bytes read from the io.Reader (without the new-line suffix). | ||||
| func MakeLineParser(fromR io.Reader, bufferSize int) ProduceFn { | ||||
| 	br := bufio.NewReaderSize(fromR, bufferSize) | ||||
| 	return func() (*interface{}, bool) { | ||||
| 		str, err := br.ReadString('\n') | ||||
| 		if err == nil { | ||||
| 			res := (interface{})(strings.TrimSuffix(str, string('\n'))) | ||||
| 			return &res, false | ||||
| 		} | ||||
| 		return nil, err == io.EOF | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // MakeLineParser is called with an io.Reader, and returns a function, that when called will output references to | ||||
| // byte slices that contain the bytes read from the io.Reader. | ||||
| func MakeBytesParser(fromR io.Reader, bufferSize int) ProduceFn { | ||||
| 	br := bufio.NewReaderSize(fromR, bufferSize) | ||||
| 	return func() (*interface{}, bool) { | ||||
| 		v, err := br.ReadBytes('\n') | ||||
| 		if err == nil { | ||||
| 			res := (interface{})(bytes.TrimSuffix(v, []byte{'\n'})) | ||||
| 			return &res, false | ||||
| 		} | ||||
| 		return nil, err == io.EOF | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										184
									
								
								v2/process-options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										184
									
								
								v2/process-options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,184 @@ | ||||
| package supervisor | ||||
|  | ||||
| import ( | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type ProcessOptions struct { | ||||
| 	// If Name contains no path separators, Command uses LookPath to | ||||
| 	// resolve Name to a complete path if possible. Otherwise it uses Name | ||||
| 	// directly as Path. | ||||
| 	Name string | ||||
|  | ||||
| 	// The returned Cmd's Args field is constructed from the command name | ||||
| 	// followed by the elements of arg, so arg should not include the | ||||
| 	// command name itself. For example, Command("echo", "hello"). | ||||
| 	// Args[0] is always name, not the possibly resolved Path. | ||||
| 	Args []string | ||||
|  | ||||
| 	// Env specifies the environment of the process. | ||||
| 	// Each entry is of the form "key=value". | ||||
| 	// If Env is nil, the new process uses the current process's | ||||
| 	// environment. | ||||
| 	// If Env contains duplicate environment keys, only the last | ||||
| 	// value in the slice for each duplicate key is used. | ||||
| 	Env []string | ||||
|  | ||||
| 	// When InheritEnv is true, os.Environ() will be prepended to Env. | ||||
| 	InheritEnv bool | ||||
|  | ||||
| 	// Dir specifies the working directory of the command. | ||||
| 	// If Dir is the empty string, Run runs the command in the | ||||
| 	// calling process's current directory. | ||||
| 	Dir string | ||||
|  | ||||
| 	// ExtraFiles specifies additional open files to be inherited by the | ||||
| 	// new process. It does not include standard input, standard output, or | ||||
| 	// standard error. If non-nil, entry i becomes file descriptor 3+i. | ||||
| 	// | ||||
| 	// ExtraFiles is not supported on Windows. | ||||
| 	ExtraFiles []*os.File | ||||
|  | ||||
| 	// SysProcAttr holds optional, operating system-specific attributes. | ||||
| 	// Run passes it to os.StartProcess as the os.ProcAttr's Sys field. | ||||
| 	SysProcAttr *syscall.SysProcAttr | ||||
|  | ||||
| 	In  chan []byte | ||||
| 	Out chan *interface{} | ||||
| 	Err chan *interface{} | ||||
|  | ||||
| 	EventNotifier chan Event | ||||
|  | ||||
| 	Id string | ||||
|  | ||||
| 	// Debug - when this flag is set to true, events will be logged to the default go logger. | ||||
| 	Debug bool | ||||
|  | ||||
| 	OutputParser func(fromR io.Reader, bufferSize int) ProduceFn | ||||
| 	ErrorParser func(fromR io.Reader, bufferSize int) ProduceFn | ||||
|  | ||||
| 	// MaxSpawns is the maximum number of times that a process can be spawned | ||||
| 	// Set to -1, for an unlimited amount of times. | ||||
| 	// Will use defaultMaxSpawns when set to 0. | ||||
| 	MaxSpawns int | ||||
|  | ||||
| 	// MaxSpawnAttempts is the maximum number of spawns attempts for a process. | ||||
| 	// Set to -1, for an unlimited amount of attempts. | ||||
| 	// Will use defaultMaxSpawnAttempts when set to 0. | ||||
| 	MaxSpawnAttempts int | ||||
|  | ||||
| 	// MaxSpawnBackOff is the maximum duration that we will wait between spawn attempts. | ||||
| 	// Will use defaultMaxSpawnBackOff when set to 0. | ||||
| 	MaxSpawnBackOff time.Duration | ||||
|  | ||||
| 	// MaxRespawnBackOff is the maximum duration that we will wait between respawn attempts. | ||||
| 	// Will use defaultMaxRespawnBackOff when set to 0. | ||||
| 	MaxRespawnBackOff time.Duration | ||||
|  | ||||
| 	// MaxInterruptAttempts is the maximum number of times that we will try to interrupt the process when closed, before | ||||
| 	// terminating and/or killing it. | ||||
| 	// Set to -1, to never send the interrupt signal. | ||||
| 	// Will use defaultMaxInterruptAttempts when set to 0. | ||||
| 	MaxInterruptAttempts int | ||||
|  | ||||
| 	// MaxTerminateAttempts is the maximum number of times that we will try to terminate the process when closed, before | ||||
| 	// killing it. | ||||
| 	// Set to -1, to never send the terminate signal. | ||||
| 	// Will use defaultMaxTerminateAttempts when set to 0. | ||||
| 	MaxTerminateAttempts int | ||||
|  | ||||
| 	// NotifyEventTimeout is the amount of time that the process will BLOCK while trying to send an event. | ||||
| 	NotifyEventTimeout time.Duration | ||||
|  | ||||
| 	// ParserBufferSize is the size of the buffer to be used by the OutputParser and ErrorParser. | ||||
| 	// Will use defaultParserBufferSize when set to 0. | ||||
| 	ParserBufferSize int | ||||
|  | ||||
| 	// IdleTimeout is the duration that the process can remain idle (no output) before we terminate the process. | ||||
| 	// Set to -1, for an unlimited idle timeout (not recommended) | ||||
| 	// Will use defaultIdleTimeout when set to 0. | ||||
| 	IdleTimeout  time.Duration | ||||
|  | ||||
| 	// TerminationGraceTimeout is the duration of time that the supervisor will wait after sending interrupt/terminate | ||||
| 	// signals, before checking if the process is still alive. | ||||
| 	// Will use defaultTerminationGraceTimeout when set to 0. | ||||
| 	TerminationGraceTimeout time.Duration | ||||
|  | ||||
| 	// EventTimeFormat is the time format used when events are marshaled to string. | ||||
| 	// Will use defaultEventTimeFormat when set to "". | ||||
| 	EventTimeFormat string | ||||
| } | ||||
|  | ||||
| // init initializes the opts structure with default and required options. | ||||
| func initProcessOptions(opts ProcessOptions) *ProcessOptions { | ||||
| 	if opts.SysProcAttr == nil { | ||||
| 		opts.SysProcAttr = &syscall.SysProcAttr{} | ||||
| 	} else { | ||||
| 		opts.SysProcAttr = deepCloneSysProcAttr(*opts.SysProcAttr) | ||||
| 	} | ||||
|  | ||||
| 	// Start a new process group for the spawned process. | ||||
| 	opts.SysProcAttr.Setpgid = true | ||||
|  | ||||
| 	if opts.InheritEnv { | ||||
| 		opts.Env = append(os.Environ(), opts.Env...) | ||||
| 	} | ||||
|  | ||||
| 	if opts.MaxSpawns == 0 { | ||||
| 		opts.MaxSpawns = defaultMaxSpawns | ||||
| 	} | ||||
| 	if opts.MaxSpawnAttempts == 0 { | ||||
| 		opts.MaxSpawnAttempts = defaultMaxSpawnAttempts | ||||
| 	} | ||||
| 	if opts.MaxSpawnBackOff == 0 { | ||||
| 		opts.MaxSpawnBackOff = defaultMaxSpawnBackOff | ||||
| 	} | ||||
| 	if opts.MaxRespawnBackOff == 0 { | ||||
| 		opts.MaxRespawnBackOff = defaultMaxRespawnBackOff | ||||
| 	} | ||||
| 	if opts.MaxInterruptAttempts == 0 { | ||||
| 		opts.MaxInterruptAttempts = defaultMaxInterruptAttempts | ||||
| 	} | ||||
| 	if opts.MaxTerminateAttempts == 0 { | ||||
| 		opts.MaxTerminateAttempts = defaultMaxTerminateAttempts | ||||
| 	} | ||||
| 	if opts.NotifyEventTimeout == 0 { | ||||
| 		opts.NotifyEventTimeout = defaultNotifyEventTimeout | ||||
| 	} | ||||
| 	if opts.ParserBufferSize == 0 { | ||||
| 		opts.ParserBufferSize = defaultParserBufferSize | ||||
| 	} | ||||
| 	if opts.IdleTimeout == 0 { | ||||
| 		opts.IdleTimeout = defaultIdleTimeout | ||||
| 	} | ||||
| 	if opts.TerminationGraceTimeout == 0 { | ||||
| 		opts.TerminationGraceTimeout = defaultTerminationGraceTimeout | ||||
| 	} | ||||
| 	if opts.EventTimeFormat == "" { | ||||
| 		opts.EventTimeFormat = defaultEventTimeFormat | ||||
| 	} | ||||
| 	if opts.In == nil { | ||||
| 		opts.In = make(chan []byte) | ||||
| 	} | ||||
| 	if opts.Out == nil { | ||||
| 		opts.Out = make(chan *interface{}) | ||||
| 	} | ||||
| 	if opts.Err == nil { | ||||
| 		opts.Err = make(chan *interface{}) | ||||
| 	} | ||||
|  | ||||
| 	return &opts | ||||
| } | ||||
|  | ||||
| // deepCloneSysProcAttr is a helper function that deep-copies the syscall.SysProcAttr struct and returns a reference to the | ||||
| // new struct. | ||||
| func deepCloneSysProcAttr(x syscall.SysProcAttr) *syscall.SysProcAttr { | ||||
| 	if x.Credential != nil { | ||||
| 		y := *x.Credential | ||||
| 		x.Credential = &y | ||||
| 	} | ||||
| 	return &x | ||||
| } | ||||
							
								
								
									
										688
									
								
								v2/supervisor.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										688
									
								
								v2/supervisor.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,688 @@ | ||||
| package supervisor | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"math" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"os/exec" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	defaultMaxSpawns               = 1 | ||||
| 	defaultMaxSpawnAttempts        = 10 | ||||
| 	defaultMaxSpawnBackOff         = time.Minute | ||||
| 	defaultMaxRespawnBackOff       = time.Second | ||||
| 	defaultMaxInterruptAttempts    = 5 | ||||
| 	defaultMaxTerminateAttempts    = 5 | ||||
| 	defaultNotifyEventTimeout      = time.Millisecond | ||||
| 	defaultParserBufferSize        = 4096 | ||||
| 	defaultIdleTimeout             = 10 * time.Second | ||||
| 	defaultTerminationGraceTimeout = time.Second | ||||
| 	defaultEventTimeFormat = time.RFC3339Nano | ||||
| ) | ||||
|  | ||||
| var EnsureClosedTimeout = time.Second | ||||
|  | ||||
| type Event struct { | ||||
| 	Id      string | ||||
| 	Code    string | ||||
| 	Message string | ||||
| 	Time    time.Time | ||||
| 	TimeFormat string | ||||
| } | ||||
|  | ||||
| func (ev Event) String() string { | ||||
| 	if len(ev.Message) == 0 { | ||||
| 		return fmt.Sprintf("[%30s][%s] %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code) | ||||
| 	} | ||||
| 	return fmt.Sprintf("[%s][%30s] %s - %s", ev.Time.Format(ev.TimeFormat), ev.Id, ev.Code, ev.Message) | ||||
| } | ||||
|  | ||||
| const ( | ||||
| 	ready uint32 = 1 << iota | ||||
| 	running | ||||
| 	respawning | ||||
| 	stopped | ||||
| 	errored | ||||
| ) | ||||
|  | ||||
| func phaseString(s uint32) string { | ||||
| 	str := "unknown" | ||||
| 	switch s { | ||||
| 	case ready: | ||||
| 		str = "ready" | ||||
| 	case running: | ||||
| 		str = "running" | ||||
| 	case respawning: | ||||
| 		str = "respawning" | ||||
| 	case stopped: | ||||
| 		str = "stopped" | ||||
| 	case errored: | ||||
| 		str = "errored" | ||||
| 	} | ||||
| 	return fmt.Sprintf("%s(%d)", str, s) | ||||
| } | ||||
|  | ||||
| type ProduceFn func() (*interface{}, bool) | ||||
|  | ||||
| type Process struct { | ||||
| 	cmd             *exec.Cmd | ||||
| 	pid             int64 | ||||
| 	spawnCount      int64 | ||||
| 	stopC           chan bool | ||||
| 	ensureAllClosed func() | ||||
|  | ||||
| 	phase   uint32 | ||||
| 	phaseMu sync.Mutex | ||||
|  | ||||
| 	lastError        atomic.Value | ||||
| 	lastProcessState atomic.Value | ||||
|  | ||||
| 	opts *ProcessOptions | ||||
|  | ||||
| 	eventTimer      *time.Timer | ||||
| 	eventNotifierMu sync.Mutex | ||||
|  | ||||
| 	doneNotifier chan bool | ||||
| 	rand         *rand.Rand | ||||
| 	stopSleep    chan bool | ||||
| } | ||||
|  | ||||
| func (p *Process) Input() chan<- []byte { | ||||
| 	return p.opts.In | ||||
| } | ||||
|  | ||||
| // EmptyInput empties all messages from the Input channel. | ||||
| func (p *Process) EmptyInput() { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case _, ok := <-p.opts.In: | ||||
| 			if !ok { | ||||
| 				return | ||||
| 			} | ||||
| 		default: | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) Stdout() <-chan *interface{} { | ||||
| 	return p.opts.Out | ||||
| } | ||||
|  | ||||
| func (p *Process) Stderr() <-chan *interface{} { | ||||
| 	return p.opts.Err | ||||
| } | ||||
|  | ||||
| func (p *Process) LastProcessState() *os.ProcessState { | ||||
| 	v := p.lastProcessState.Load() | ||||
| 	if v == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return v.(*os.ProcessState) | ||||
| } | ||||
|  | ||||
| func (p *Process) LastError() error { | ||||
| 	v := p.lastError.Load() | ||||
| 	if v == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if x, ok := v.(error); ok { | ||||
| 		return x | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (p *Process) Pid() int { | ||||
| 	return int(atomic.LoadInt64(&p.pid)) | ||||
| } | ||||
|  | ||||
| func (p *Process) Start() (err error) { | ||||
| 	p.phaseMu.Lock() | ||||
| 	defer p.phaseMu.Unlock() | ||||
| 	if p.phase != ready && p.phase != respawning { | ||||
| 		return fmt.Errorf(`process phase is "%s" and not "ready" or "respawning"`, phaseString(p.phase)) | ||||
| 	} | ||||
|  | ||||
| 	for attempt := 0; p.opts.MaxSpawnAttempts == -1 || attempt < p.opts.MaxSpawnAttempts; attempt++ { | ||||
| 		err = p.unprotectedStart() | ||||
| 		if err == nil { | ||||
| 			p.phase = running | ||||
| 			return | ||||
| 		} | ||||
| 		if !p.sleep(p.CalcBackOff(attempt, time.Second, p.opts.MaxSpawnBackOff)) { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	p.phase = errored | ||||
| 	p.notifyDone() | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (p *Process) unprotectedStart() error { | ||||
| 	p.cmd = newCommand(p.opts) | ||||
|  | ||||
| 	inPipe, err := p.cmd.StdinPipe() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to fetch stdin pipe: %s", err) | ||||
| 	} | ||||
|  | ||||
| 	outPipe, err := p.cmd.StdoutPipe() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to fetch stdout pipe: %s", err) | ||||
| 	} | ||||
|  | ||||
| 	errPipe, err := p.cmd.StderrPipe() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to fetch stderr pipe: %s", err) | ||||
| 	} | ||||
|  | ||||
| 	if p.opts.OutputParser == nil { | ||||
| 		return errors.New("missing output streamer") | ||||
| 	} | ||||
|  | ||||
| 	if p.opts.ErrorParser == nil { | ||||
| 		return errors.New("missing error streamer") | ||||
| 	} | ||||
|  | ||||
| 	if err = p.cmd.Start(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	atomic.AddInt64(&p.spawnCount, 1) | ||||
| 	atomic.StoreInt64(&p.pid, int64(p.cmd.Process.Pid)) | ||||
|  | ||||
| 	p.stopC = make(chan bool) | ||||
| 	heartbeat, isMonitorClosed, isInClosed, isOutClosed, isErrClosed := make(chan bool), make(chan bool), make(chan bool), make(chan bool), make(chan bool) | ||||
|  | ||||
| 	go chanToWriter(p.opts.In, inPipe, p.notifyEvent, isInClosed, p.stopC, heartbeat) | ||||
| 	go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat) | ||||
| 	go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil) | ||||
|  | ||||
| 	go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Restart, p.notifyEvent) | ||||
|  | ||||
| 	var ensureOnce sync.Once | ||||
| 	p.ensureAllClosed = func() { | ||||
| 		ensureOnce.Do(func() { | ||||
| 			if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil { | ||||
| 				log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr) | ||||
| 			} | ||||
| 			if cErr := ensureClosed("stdout", isOutClosed, outPipe.Close); cErr != nil { | ||||
| 				log.Printf("[%s] Possible memory leak, stdout go-routine not closed. Error: %s", p.opts.Id, cErr) | ||||
| 			} | ||||
| 			if cErr := ensureClosed("stderr", isErrClosed, errPipe.Close); cErr != nil { | ||||
| 				log.Printf("[%s] Possible memory leak, stderr go-routine not closed. Error: %s", p.opts.Id, cErr) | ||||
| 			} | ||||
| 			if cErr := ensureClosed("heartbeat monitor", isMonitorClosed, nil); cErr != nil { | ||||
| 				log.Printf("[%s] Possible memory leak, monitoring go-routine not closed. Error: %s", p.opts.Id, cErr) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	go p.waitAndNotify() | ||||
|  | ||||
| 	p.notifyEvent("ProcessStart", fmt.Sprintf("pid: %d", p.Pid())) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func chanToWriter(in <-chan []byte, out io.Writer, notifyEvent func(string, ...interface{}), closeWhenDone, stopC, heartbeat chan bool) { | ||||
| 	defer close(closeWhenDone) | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-stopC: | ||||
| 			return | ||||
| 		case raw, chanOpen := <-in: | ||||
| 			if !chanOpen { | ||||
| 				notifyEvent("Error", "Input channel closed unexpectedly.") | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			_, err := out.Write(raw) | ||||
| 			if err != nil { | ||||
| 				notifyEvent("WriteError", err.Error()) | ||||
| 				return | ||||
| 			} | ||||
| 			heartbeat <- true | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, stopC, heartbeat chan bool) { | ||||
| 	defer close(closeWhenDone) | ||||
|  | ||||
| 	cleanPipe := func() { | ||||
| 		for { | ||||
| 			if res, eof := producer(); res != nil { | ||||
| 				out <- res | ||||
| 			} else if eof { | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for { | ||||
| 		if res,eof := producer(); res != nil { | ||||
| 			select { | ||||
| 			case out <- res: | ||||
| 				select { | ||||
| 				case heartbeat <- true: | ||||
| 				default: | ||||
| 				} | ||||
| 			case <-stopC: | ||||
| 				cleanPipe() | ||||
| 				return | ||||
| 			} | ||||
| 		} else if eof { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| 		case <-stopC: | ||||
| 			cleanPipe() | ||||
| 			return | ||||
| 		default: | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a | ||||
| // positive heartbeat, or if a negative heartbeat is passed. | ||||
| // | ||||
| // isMonitorClosed will be closed when this function exists. | ||||
| // | ||||
| // When stopC closes, this function will exit immediately. | ||||
| func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) { | ||||
| 	defer close(isMonitorClosed) | ||||
| 	t := time.NewTimer(idleTimeout) | ||||
| 	defer t.Stop() | ||||
|  | ||||
| 	for alive := true; alive; { | ||||
| 		select { | ||||
| 		case <-stopC: | ||||
| 			notifyEvent("StoppingHeartbeatMonitoring", "Stop signal received.") | ||||
| 			return | ||||
|  | ||||
| 		case alive = <-heartbeat: | ||||
| 			if alive { | ||||
| 				if !t.Stop() { | ||||
| 					<-t.C | ||||
| 				} | ||||
| 				t.Reset(idleTimeout) | ||||
| 			} else { | ||||
| 				notifyEvent("NegativeHeartbeat", "Stopping process.") | ||||
| 			} | ||||
|  | ||||
| 		case <-t.C: | ||||
| 			alive = false | ||||
| 			notifyEvent("MissingHeartbeat", "Stopping process.") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := stop(); err != nil { | ||||
| 		notifyEvent("StopError", err.Error()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) waitAndNotify() { | ||||
| 	state, waitErr := p.cmd.Process.Wait() | ||||
|  | ||||
| 	p.phaseMu.Lock() | ||||
| 	automaticUnlock := true | ||||
| 	defer func() { | ||||
| 		if automaticUnlock { | ||||
| 			p.phaseMu.Unlock() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	p.lastProcessState.Store(state) | ||||
|  | ||||
| 	if p.phase == stopped { | ||||
| 		return | ||||
| 	} else if p.phase != running && p.phase != respawning { | ||||
| 		p.notifyEvent("RespawnError", fmt.Sprintf(`process phase is "%s" and not "running" or "respawning"`, phaseString(p.phase))) | ||||
| 	} | ||||
|  | ||||
| 	p.phase = stopped | ||||
|  | ||||
| 	if waitErr != nil { | ||||
| 		p.notifyEvent("WaitError", fmt.Sprintf("os.Process.Wait returned an error - %s", waitErr.Error())) | ||||
| 		p.phase = errored | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if state.Success() { | ||||
| 		p.notifyEvent("ProcessDone", state.String()) | ||||
| 	} else { | ||||
| 		p.notifyEvent("ProcessCrashed", state.String()) | ||||
| 		p.lastError.Store(errors.New(state.String())) | ||||
| 	} | ||||
|  | ||||
| 	// Cleanup resources | ||||
| 	select { | ||||
| 	case <-p.stopC: | ||||
| 	default: | ||||
| 		close(p.stopC) | ||||
| 	} | ||||
| 	p.ensureAllClosed() | ||||
|  | ||||
| 	if state.Success() { | ||||
| 		p.notifyEvent("ProcessStopped", "Process existed successfully.") | ||||
| 		p.notifyDone() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if !p.canRespawn() { | ||||
| 		p.notifyEvent("RespawnError", "Max number of respawns reached.") | ||||
| 		p.notifyDone() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	sleepFor := p.CalcBackOff(int(atomic.LoadInt64(&p.spawnCount))-1, time.Second, p.opts.MaxRespawnBackOff) | ||||
| 	p.notifyEvent("Sleep", fmt.Sprintf("Sleeping for %s before respwaning instance.", sleepFor.String())) | ||||
| 	if !p.sleep(sleepFor) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	p.phase = respawning | ||||
| 	p.notifyEvent("ProcessRespawn", "Trying to respawn instance.") | ||||
|  | ||||
| 	automaticUnlock = false | ||||
| 	p.phaseMu.Unlock() | ||||
| 	err := p.Start() | ||||
|  | ||||
| 	if err != nil { | ||||
| 		p.notifyEvent("RespawnError", err.Error()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) sleep(d time.Duration) bool { | ||||
| 	t := time.NewTimer(d) | ||||
| 	select { | ||||
| 	case <-t.C: | ||||
| 		return true | ||||
| 	case <-p.stopSleep: | ||||
| 		t.Stop() | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) canRespawn() bool { | ||||
| 	return p.opts.MaxSpawns == -1 || atomic.LoadInt64(&p.spawnCount) < int64(p.opts.MaxSpawns) | ||||
| } | ||||
|  | ||||
| // Stop tries to stop the process. | ||||
| // Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error | ||||
| // to be returned). | ||||
| // | ||||
| // This function will call notifyDone when it is done. | ||||
| // | ||||
| // If it fails to stop the process, the phase will change to errored and an error will be returned. | ||||
| // Otherwise, the phase changes to stopped. | ||||
| func (p *Process) Stop() error { | ||||
| 	select { | ||||
| 	case <-p.stopSleep: | ||||
| 	default: | ||||
| 		close(p.stopSleep) | ||||
| 	} | ||||
| 	p.phaseMu.Lock() | ||||
| 	defer p.phaseMu.Unlock() | ||||
| 	defer p.notifyDone() | ||||
| 	err := p.unprotectedStop() | ||||
| 	if err != nil { | ||||
| 		p.phase = errored | ||||
| 		return err | ||||
| 	} | ||||
| 	p.phase = stopped | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (p *Process) unprotectedStop() (err error) { | ||||
| 	p.notifyEvent("ProcessStop") | ||||
|  | ||||
| 	select { | ||||
| 	case <-p.stopC: | ||||
| 	default: | ||||
| 		close(p.stopC) | ||||
| 	} | ||||
| 	defer p.ensureAllClosed() | ||||
|  | ||||
| 	if !p.IsAlive() { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	attempt := 0 | ||||
| 	for ; attempt < p.opts.MaxInterruptAttempts; attempt++ { | ||||
| 		p.notifyEvent("Interrupt", fmt.Sprintf("sending intterupt signal to %d - attempt #%d", -p.Pid(), attempt+1)) | ||||
| 		err = p.interrupt() | ||||
| 		if err == nil { | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
| 	if p.opts.MaxInterruptAttempts > 0 { | ||||
| 		p.notifyEvent("InterruptError", fmt.Sprintf("interrupt signal failed - %d attempts", attempt)) | ||||
| 	} | ||||
|  | ||||
| 	err = nil | ||||
| 	for attempt = 0; attempt < p.opts.MaxTerminateAttempts; attempt++ { | ||||
| 		p.notifyEvent("Terminate", fmt.Sprintf("sending terminate signal to %d - attempt #%d", -p.Pid(), attempt+1)) | ||||
| 		err = p.terminate() | ||||
| 		if err == nil { | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
| 	if p.opts.MaxTerminateAttempts > 0 { | ||||
| 		p.notifyEvent("TerminateError", fmt.Sprintf("terminate signal failed - %d attempts", attempt)) | ||||
| 	} | ||||
|  | ||||
| 	p.notifyEvent("Killing", fmt.Sprintf("sending kill signal to %d", p.Pid())) | ||||
| 	err = syscall.Kill(-p.Pid(), syscall.SIGKILL) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		p.notifyEvent("KillError", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Restart tries to stop and start the process. | ||||
| // Entering this function will change the phase from running to respawning (any other initial phase will cause an error | ||||
| // to be returned). | ||||
| // | ||||
| // If it fails to stop the process the phase will change to errored and notifyDone will be called. | ||||
| // If there are no more allowed respawns the phase will change to stopped and notifyDone will be called. | ||||
| // | ||||
| // This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it | ||||
| // fails) | ||||
| // If Start fails, notifyDone will be called. | ||||
| func (p *Process) Restart() error { | ||||
| 	p.phaseMu.Lock() | ||||
| 	defer p.phaseMu.Unlock() | ||||
| 	if p.phase != running { | ||||
| 		return fmt.Errorf(`process phase is "%s" and not "running"`, phaseString(p.phase)) | ||||
| 	} | ||||
| 	p.phase = respawning | ||||
| 	err := p.unprotectedStop() | ||||
|  | ||||
| 	if err != nil { | ||||
| 		p.phase = errored | ||||
| 		p.notifyDone() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if !p.canRespawn() { | ||||
| 		p.phase = stopped | ||||
| 		p.notifyDone() | ||||
| 		return errors.New("max number of respawns reached") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (p *Process) IsAlive() bool { | ||||
| 	err := syscall.Kill(-p.Pid(), syscall.Signal(0)) | ||||
| 	if errno, ok := err.(syscall.Errno); ok { | ||||
| 		return errno != syscall.ESRCH | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (p *Process) IsDone() bool { | ||||
| 	select { | ||||
| 	case <-p.doneNotifier: | ||||
| 		return true | ||||
| 	default: | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) DoneNotifier() <-chan bool { | ||||
| 	return p.doneNotifier | ||||
| } | ||||
|  | ||||
| // notifyDone closes the DoneNotifier channel (if it isn't already closed). | ||||
| func (p *Process) notifyDone() { | ||||
| 	select { | ||||
| 	case <-p.doneNotifier: | ||||
| 	default: | ||||
| 		close(p.doneNotifier) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // EventNotifier returns the eventNotifier channel (and creates one if none exists). | ||||
| // | ||||
| // It is protected by Process.eventNotifierMu. | ||||
| func (p *Process) EventNotifier() chan Event { | ||||
| 	p.eventNotifierMu.Lock() | ||||
| 	defer p.eventNotifierMu.Unlock() | ||||
|  | ||||
| 	if p.opts.EventNotifier == nil { | ||||
| 		p.opts.EventNotifier = make(chan Event) | ||||
| 	} | ||||
|  | ||||
| 	return p.opts.EventNotifier | ||||
| } | ||||
|  | ||||
| // notifyEvent creates and passes an event struct from an event code string and an optional event message. | ||||
| // fmt.Sprint will be called on the message slice. | ||||
| // | ||||
| // It is protected by Process.eventNotifierMu. | ||||
| func (p *Process) notifyEvent(code string, message ...interface{}) { | ||||
| 	// Create the event before calling Lock. | ||||
| 	ev := Event{ | ||||
| 		Id:      p.opts.Id, | ||||
| 		Code:    code, | ||||
| 		Message: fmt.Sprint(message...), | ||||
| 		Time:    time.Now(), | ||||
| 		TimeFormat: p.opts.EventTimeFormat, | ||||
| 	} | ||||
|  | ||||
| 	// Log the event before calling Lock. | ||||
| 	if p.opts.Debug { | ||||
| 		fmt.Println(ev) | ||||
| 	} | ||||
|  | ||||
| 	p.eventNotifierMu.Lock() | ||||
| 	defer p.eventNotifierMu.Unlock() | ||||
|  | ||||
| 	if notifier := p.opts.EventNotifier; notifier != nil { | ||||
| 		if p.eventTimer == nil { | ||||
| 			p.eventTimer = time.NewTimer(p.opts.NotifyEventTimeout) | ||||
| 		} else { | ||||
| 			p.eventTimer.Reset(p.opts.NotifyEventTimeout) | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| 		case notifier <- ev: | ||||
| 			if !p.eventTimer.Stop() { | ||||
| 				<-p.eventTimer.C | ||||
| 			} | ||||
| 		case <-p.eventTimer.C: | ||||
| 			log.Printf("Failed to sent %#v. EventNotifier is set, but isn't accepting any events.", ev) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *Process) interrupt() (err error) { | ||||
| 	err = syscall.Kill(-p.Pid(), syscall.SIGINT) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. | ||||
| 	if p.IsAlive() { | ||||
| 		err = errors.New("interrupt signal failed") | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (p *Process) terminate() (err error) { | ||||
| 	err = syscall.Kill(-p.Pid(), syscall.SIGTERM) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	time.Sleep(p.opts.TerminationGraceTimeout) // Sleep for a second to allow the process to end. | ||||
| 	if p.IsAlive() { | ||||
| 		err = errors.New("terminate signal failed") | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration { | ||||
| 	randBuffer := (step / 1000) * time.Duration(p.rand.Intn(1000)) | ||||
| 	backOff := randBuffer + step*time.Duration(math.Exp2(float64(attempt))) | ||||
| 	if backOff > maxBackOff { | ||||
| 		return maxBackOff | ||||
| 	} | ||||
| 	return backOff | ||||
| } | ||||
|  | ||||
| func NewProcess(opts ProcessOptions) *Process { | ||||
| 	return &Process{ | ||||
| 		phase:        ready, | ||||
| 		opts:         initProcessOptions(opts), | ||||
| 		doneNotifier: make(chan bool), | ||||
| 		stopSleep:    make(chan bool), | ||||
| 		rand:         rand.New(rand.NewSource(time.Now().UTC().UnixNano())), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // newCommand creates a new exec.Cmd struct. | ||||
| func newCommand(opts *ProcessOptions) *exec.Cmd { | ||||
| 	cmd := exec.Command(opts.Name, opts.Args...) | ||||
| 	cmd.Env = opts.Env | ||||
| 	cmd.Dir = opts.Dir | ||||
| 	cmd.ExtraFiles = opts.ExtraFiles | ||||
| 	cmd.SysProcAttr = opts.SysProcAttr | ||||
| 	return cmd | ||||
| } | ||||
|  | ||||
| // todo: test if panics on double-close | ||||
| func ensureClosed(name string, isStopped chan bool, forceClose func() error) error { | ||||
| 	t := time.NewTimer(EnsureClosedTimeout) | ||||
| 	defer t.Stop() | ||||
|  | ||||
| 	select { | ||||
| 	case <-isStopped: | ||||
| 		return nil | ||||
| 	case <-t.C: | ||||
| 		if forceClose == nil { | ||||
| 			return fmt.Errorf("stopped waiting for %s after %s", name, EnsureClosedTimeout) | ||||
| 		} | ||||
| 		if err := forceClose(); err != nil { | ||||
| 			return fmt.Errorf("%s - %s", name, err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		return ensureClosed(name, isStopped, nil) | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										695
									
								
								v2/supervisor_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										695
									
								
								v2/supervisor_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,695 @@ | ||||
| package supervisor_test | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"log" | ||||
| 	"os" | ||||
| 	"os/exec" | ||||
| 	"path/filepath" | ||||
| 	"reflect" | ||||
| 	"runtime" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"syscall" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/fortytw2/leaktest" | ||||
|  | ||||
| 	su "github.com/kontera-technologies/go-supervisor/v2" | ||||
| ) | ||||
|  | ||||
| func TestMain(m *testing.M) { | ||||
| 	su.EnsureClosedTimeout = time.Millisecond * 10 | ||||
| 	os.Exit(m.Run()) | ||||
| } | ||||
|  | ||||
| func safeStop(t *time.Timer) { | ||||
| 	if !t.Stop() { | ||||
| 		<-t.C | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type testCommon interface { | ||||
| 	Helper() | ||||
| 	Error(args ...interface{}) | ||||
| 	Errorf(format string, args ...interface{}) | ||||
| 	Fatal(args ...interface{}) | ||||
| 	Fatalf(format string, args ...interface{}) | ||||
| } | ||||
|  | ||||
| func runFor(t *testing.T, from, to int, f func(t *testing.T, i int)) { | ||||
| 	t.Helper() | ||||
| 	for i := from; i < to; i++ { | ||||
| 		t.Run(strconv.Itoa(i), func(t *testing.T) { | ||||
| 			t.Helper() | ||||
| 			f(t, i) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func fatalIfErr(t testCommon, err error) { | ||||
| 	t.Helper() | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func assertExpectedEqualsActual(t *testing.T, expected, actual interface{}) { | ||||
| 	t.Helper() | ||||
| 	if !reflect.DeepEqual(expected, actual) { | ||||
| 		t.Errorf("\n\tExpected: %q\n\tActual:   %q", expected, actual) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func testDir(t testCommon) string { | ||||
| 	testDir, err := filepath.Abs("testdata") | ||||
| 	fatalIfErr(t, err) | ||||
| 	return testDir | ||||
| } | ||||
|  | ||||
| func funcName() string { | ||||
| 	pc, _, _, ok := runtime.Caller(1) | ||||
| 	if !ok { | ||||
| 		return "?" | ||||
| 	} | ||||
|  | ||||
| 	fn := runtime.FuncForPC(pc) | ||||
| 	return strings.TrimPrefix(fn.Name(), "github.com/kontera-technologies/go-supervisor/v2_test.") | ||||
| } | ||||
|  | ||||
| // logProcessEvents is a helper function that registers an event notifier that | ||||
| // will pass all events to the logger. | ||||
| func logProcessEvents(t testCommon, p *su.Process) (teardown func()) { | ||||
| 	t.Helper() | ||||
| 	closeC := make(chan interface{}) | ||||
| 	notifier := p.EventNotifier() | ||||
| 	go func() { | ||||
| 		for stop := false; !stop; { | ||||
| 			select { | ||||
| 			case x := <-notifier: | ||||
| 				log.Printf("%+v", x) | ||||
| 				// t.Logf("%+v", x) | ||||
| 			case <-closeC: | ||||
| 				stop = true | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	return func() { | ||||
| 		close(closeC) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func makeErrorParser(fromR io.Reader, parserSize int) su.ProduceFn { | ||||
| 	p := su.MakeLineParser(fromR, parserSize) | ||||
| 	return func() (*interface{}, bool) { | ||||
| 		raw, isEof := p() | ||||
| 		if raw != nil { | ||||
| 			var res interface{} | ||||
| 			res = errors.New((*raw).(string)) | ||||
| 			return &res, false | ||||
| 		} | ||||
| 		return nil, isEof | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ensureProcessKilled logs a fatal error if the process isn't dead, and kills the process. | ||||
| func ensureProcessKilled(t testCommon, pid int) { | ||||
| 	t.Helper() | ||||
| 	signalErr := syscall.Kill(pid, syscall.Signal(0)) | ||||
| 	if signalErr != syscall.Errno(3) { | ||||
| 		t.Errorf("child process (%d) is still running, killing it.", pid) | ||||
| 		fatalIfErr(t, syscall.Kill(pid, syscall.SIGKILL)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestJsonParser(t *testing.T) { | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:           funcName(), | ||||
| 		Name:         "./endless_jsons.sh", | ||||
| 		Dir:          testDir(t), | ||||
| 		OutputParser: su.MakeJsonLineParser, | ||||
| 		ErrorParser:  makeErrorParser, | ||||
| 		MaxSpawns:    1, | ||||
| 		Out:          make(chan *interface{}, 5), | ||||
| 	}) | ||||
|  | ||||
| 	expected := map[string]interface{}{ | ||||
| 		"foo": "bar", | ||||
| 		"quo": []interface{}{"quz", float64(1), false}, | ||||
| 	} | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
|  | ||||
| 	time.AfterFunc(time.Millisecond * 30, func() { | ||||
| 		fatalIfErr(t, p.Stop()) | ||||
| 	}) | ||||
|  | ||||
| 	runFor(t, 0, 3, func(t *testing.T, i int) { | ||||
| 		select { | ||||
| 		case v := <-p.Stdout(): | ||||
| 			assertExpectedEqualsActual(t, expected, *v) | ||||
| 		case <-time.After(time.Millisecond * 30): | ||||
| 			t.Error("Expected output.") | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	select { | ||||
| 	case v := <-p.Stdout(): | ||||
| 		t.Errorf("Unexpected output - %#v", *v) | ||||
| 	case <-time.After(time.Millisecond * 20): | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestBadJsonOutput(t *testing.T) { | ||||
| 	out := bytes.NewReader([]byte(`{"a":"b"} | ||||
| 2019/08/21 | ||||
| 13:43:24 | ||||
| invalid character '}' | ||||
| {"c":"d"}{"c":"d"} | ||||
| {"c":"d"}`)) | ||||
| 	tmp := su.MakeJsonLineParser(out, 4096) | ||||
| 	p := func() *interface{} { | ||||
| 		a,_ := tmp() | ||||
| 		return a | ||||
| 	} | ||||
|  | ||||
| 	assertExpectedEqualsActual(t, map[string]interface{}{"a": "b"}, *p()) | ||||
| 	assertExpectedEqualsActual(t, float64(2019), *p()) | ||||
| 	assertExpectedEqualsActual(t, (*interface{})(nil), p()) | ||||
| 	assertExpectedEqualsActual(t, float64(13), *p()) | ||||
| 	assertExpectedEqualsActual(t, (*interface{})(nil), p()) | ||||
| 	assertExpectedEqualsActual(t, (*interface{})(nil), p()) | ||||
| 	assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p()) | ||||
| 	assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p()) | ||||
| 	assertExpectedEqualsActual(t, map[string]interface{}{"c": "d"}, *p()) | ||||
| 	assertExpectedEqualsActual(t, (*interface{})(nil), p()) | ||||
| } | ||||
|  | ||||
| func BenchmarkBadJsonOutput(b *testing.B) { | ||||
|  | ||||
| } | ||||
|  | ||||
| func TestProcess_Start(t *testing.T) { | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:           funcName(), | ||||
| 		Name:         "./greet_with_error.sh", | ||||
| 		Args:         []string{"Hello"}, | ||||
| 		Dir:          testDir(t), | ||||
| 		OutputParser: su.MakeLineParser, | ||||
| 		ErrorParser:  makeErrorParser, | ||||
| 		MaxSpawns:    1, | ||||
| 		Out:          make(chan *interface{}, 1), | ||||
| 		Err:          make(chan *interface{}, 1), | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
|  | ||||
| 	x := []byte("world\n") | ||||
| 	select { | ||||
| 	case p.Input() <- x: | ||||
| 	case <-time.After(time.Millisecond): | ||||
| 		t.Error("Input wasn't consumed in 1 millisecond") | ||||
| 	} | ||||
|  | ||||
| 	select { | ||||
| 	case out := <-p.Stdout(): | ||||
| 		assertExpectedEqualsActual(t, "Hello world", *out) | ||||
| 	case <-time.After(time.Millisecond * 200): | ||||
| 		t.Error("No output in 200ms") | ||||
| 	} | ||||
|  | ||||
| 	select { | ||||
| 	case v := <-p.Stderr(): | ||||
| 		assertExpectedEqualsActual(t, "Bye world", (*v).(error).Error()) | ||||
| 	case <-time.After(time.Millisecond * 200): | ||||
| 		t.Error("No error in 200ms") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestMakeLineParser(t *testing.T) { | ||||
| 	cmd := exec.Command("./endless.sh") | ||||
| 	cmd.Dir = testDir(t) | ||||
| 	out, _ := cmd.StdoutPipe() | ||||
| 	_ = cmd.Start() | ||||
| 	c := make(chan *interface{}) | ||||
| 	go func() { | ||||
| 		x := su.MakeLineParser(out, 0) | ||||
| 		for a,_ := x(); a != nil; a,_ = x() { | ||||
| 			c <- a | ||||
| 		} | ||||
| 		close(c) | ||||
| 	}() | ||||
| 	time.AfterFunc(time.Second, func() { | ||||
| 		_ = cmd.Process.Kill() | ||||
| 	}) | ||||
|  | ||||
| 	runFor(t, 0, 10, func(t *testing.T, i int) { | ||||
| 		select { | ||||
| 		case x := <-c: | ||||
| 			if x == nil { | ||||
| 				t.Error("unexpected nil") | ||||
| 				return | ||||
| 			} | ||||
| 			assertExpectedEqualsActual(t, "foo", *x) | ||||
| 		case <-time.After(time.Millisecond * 20): | ||||
| 			t.Error("Expected output before 20ms pass.") | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func TestProcess_Signal(t *testing.T) { | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:           funcName(), | ||||
| 		Name:         "./endless.sh", | ||||
| 		Dir:          testDir(t), | ||||
| 		Out:          make(chan *interface{}, 10), | ||||
| 		OutputParser: su.MakeLineParser, | ||||
| 		ErrorParser:  makeErrorParser, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
| 	pid := p.Pid() | ||||
|  | ||||
| 	c := make(chan bool) | ||||
| 	time.AfterFunc(time.Millisecond * 70, func() { | ||||
| 		fatalIfErr(t, syscall.Kill(-p.Pid(), syscall.SIGINT)) | ||||
| 		c <- true | ||||
| 	}) | ||||
|  | ||||
| 	runFor(t, 0, 5, func(t *testing.T, i int) { | ||||
| 		select { | ||||
| 		case out := <-p.Stdout(): | ||||
| 			if *out != "foo" { | ||||
| 				t.Errorf(`Expected: "foo", received: "%s"`, *out) | ||||
| 			} | ||||
| 		case err := <-p.Stderr(): | ||||
| 			t.Error("Unexpected error:", err) | ||||
| 		case <-time.After(time.Millisecond * 30): | ||||
| 			t.Error("Expected output in channel") | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	<-c | ||||
| 	time.Sleep(time.Millisecond * 10) | ||||
| 	ensureProcessKilled(t, pid) | ||||
| } | ||||
|  | ||||
| func TestProcess_Close(t *testing.T) { | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:            funcName(), | ||||
| 		Name:          "./trap.sh", | ||||
| 		Args:          []string{"endless.sh"}, | ||||
| 		Dir:           testDir(t), | ||||
| 		OutputParser:  su.MakeLineParser, | ||||
| 		ErrorParser:   makeErrorParser, | ||||
| 		EventNotifier: make(chan su.Event, 10), | ||||
| 		MaxInterruptAttempts: 1, | ||||
| 		MaxTerminateAttempts: 2, | ||||
| 		TerminationGraceTimeout: time.Millisecond, | ||||
| 	}) | ||||
|  | ||||
| 	procClosedC := make(chan error) | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	time.AfterFunc(time.Millisecond*20, func() { | ||||
| 		procClosedC <- p.Stop() | ||||
| 	}) | ||||
|  | ||||
| 	var err error | ||||
| 	var childPid int | ||||
|  | ||||
| 	select { | ||||
| 	case v := <-p.Stderr(): | ||||
| 		childPid, err = strconv.Atoi((*v).(error).Error()) | ||||
| 		if err != nil { | ||||
| 			t.Fatal("Expected child process id in error channel. Instead received:", (*v).(error).Error()) | ||||
| 		} | ||||
| 	case <-time.After(time.Millisecond * 10): | ||||
| 		t.Fatal("Expected child process id in error channel in 100 milliseconds") | ||||
| 	} | ||||
|  | ||||
| 	t.Run("<-procClosedC", func(t *testing.T) { | ||||
| 		fatalIfErr(t, <-procClosedC) | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("trapped signals", func(t *testing.T) { | ||||
| 		errs := map[string]string{ | ||||
| 			"InterruptError": "interrupt signal failed - 1 attempts", | ||||
| 			"TerminateError": "terminate signal failed - 2 attempts", | ||||
| 		} | ||||
|  | ||||
| 		for i := 0; i < 10 && len(errs) > 0; i++ { | ||||
| 			select { | ||||
| 			case ev := <-p.EventNotifier(): | ||||
| 				if !strings.HasSuffix(ev.Code, "Error") { | ||||
| 					continue | ||||
| 				} | ||||
| 				assertExpectedEqualsActual(t, errs[ev.Code], ev.Message) | ||||
| 				delete(errs, ev.Code) | ||||
| 			default: | ||||
| 			} | ||||
| 		} | ||||
| 		for code,err := range errs { | ||||
| 			t.Errorf(`expected a %s event - "%s"`, code, err) | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	time.Sleep(time.Millisecond * 15) | ||||
| 	ensureProcessKilled(t, childPid) | ||||
| } | ||||
|  | ||||
| func TestProcess_RespawnOnFailedExit(t *testing.T) { | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:                funcName(), | ||||
| 		Name:              "./error.sh", | ||||
| 		Dir:               testDir(t), | ||||
| 		OutputParser:      su.MakeLineParser, | ||||
| 		ErrorParser:       su.MakeLineParser, | ||||
| 		Err:               make(chan *interface{}, 3), | ||||
| 		MaxSpawns:         3, | ||||
| 		MaxRespawnBackOff: time.Millisecond, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
|  | ||||
| 	runFor(t, 0, 3, func(t *testing.T, i int) { | ||||
| 		select { | ||||
| 		case out := <-p.Stdout(): | ||||
| 			t.Errorf("Unexpected output: %#v", out) | ||||
| 		case v := <-p.Stderr(): | ||||
| 			assertExpectedEqualsActual(t, "Bye world", *v) | ||||
| 		case <-time.After(time.Millisecond * 3000): | ||||
| 			t.Error("Expected error within 3000ms") | ||||
| 			return | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	select { | ||||
| 	case out := <-p.Stdout(): | ||||
| 		t.Errorf("Unexpected output: %#v", out) | ||||
| 	case v := <-p.Stderr(): | ||||
| 		t.Errorf("Unexpected error: %#v", *v) | ||||
| 	case <-time.After(time.Millisecond * 500): | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestProcess_NoRespawnOnSuccessExit(t *testing.T) { | ||||
| 	runtime.Caller(0) | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:           funcName(), | ||||
| 		Name:         "./echo.sh", | ||||
| 		Dir:          testDir(t), | ||||
| 		OutputParser: su.MakeLineParser, | ||||
| 		ErrorParser:  makeErrorParser, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
|  | ||||
| 	select { | ||||
| 	case out := <-p.Stdout(): | ||||
| 		assertExpectedEqualsActual(t, "Hello world", *out) | ||||
| 	case <-time.After(time.Millisecond * 150): | ||||
| 		t.Error("No output in 150 milliseconds") | ||||
| 	} | ||||
|  | ||||
| 	select { | ||||
| 	case out := <-p.Stdout(): | ||||
| 		t.Errorf("Unexpected output: %s", *out) | ||||
| 	case <-time.After(time.Millisecond * 10): | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestCalcBackOff(t *testing.T) { | ||||
| 	p1 := su.NewProcess(su.ProcessOptions{Id: funcName() + "-1"}) | ||||
| 	p2 := su.NewProcess(su.ProcessOptions{Id: funcName() + "-2"}) | ||||
|  | ||||
| 	for i := 0; i < 5; i++ { | ||||
| 		a, b := p1.CalcBackOff(i, time.Second, time.Minute), p2.CalcBackOff(i, time.Second, time.Minute) | ||||
| 		if a == b { | ||||
| 			t.Errorf("2 identical results for CalcBackOff(%d, time.Minute): %v", i, a) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestProcess_Restart(t *testing.T) { | ||||
| 	defer leaktest.Check(t)() | ||||
| 	timer := time.NewTimer(0) | ||||
| 	safeStop(timer) | ||||
|  | ||||
| 	// initialGoroutines := runtime.NumGoroutine() | ||||
| 	p := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:                funcName(), | ||||
| 		Name:              "./endless.sh", | ||||
| 		Dir:               testDir(t), | ||||
| 		OutputParser:      su.MakeLineParser, | ||||
| 		ErrorParser:       makeErrorParser, | ||||
| 		Out:               make(chan *interface{}, 5), | ||||
| 		IdleTimeout:       time.Millisecond * 30, | ||||
| 		MaxSpawns:         2, | ||||
| 		MaxRespawnBackOff: time.Microsecond * 100, | ||||
| 		TerminationGraceTimeout: time.Millisecond, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Start()) | ||||
| 	defer p.Stop() | ||||
|  | ||||
| 	numGoroutines := -1 | ||||
|  | ||||
| 	runFor(t, 0, 3, func(t *testing.T, i int) { | ||||
| 		timer.Reset(time.Millisecond * 20) | ||||
| 		if numGoroutines == -1 { | ||||
| 			numGoroutines = runtime.NumGoroutine() | ||||
| 		} | ||||
| 		select { | ||||
| 		case out := <-p.Stdout(): | ||||
| 			if *out != "foo" { | ||||
| 				t.Errorf(`Expected: "foo", received: "%s"`, *out) | ||||
| 			} | ||||
| 		case err := <-p.Stderr(): | ||||
| 			t.Error("Unexpected error:", err) | ||||
| 		case <-timer.C: | ||||
| 			t.Error("Expected output in channel") | ||||
| 			return | ||||
| 		} | ||||
| 		safeStop(timer) | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, p.Restart()) | ||||
|  | ||||
| 	t.Run("SIGINT received", func(t *testing.T) { | ||||
| 		if state := p.LastProcessState(); state != nil { | ||||
| 			raw := state.Sys() | ||||
| 			waitStatus, ok := raw.(syscall.WaitStatus) | ||||
| 			if !ok { | ||||
| 				t.Fatalf("Process.LastError().Sys() should be of type syscall.WaitStatus, %q received", raw) | ||||
| 			} else if waitStatus.Signal() != syscall.SIGINT { | ||||
| 				t.Errorf("Expected %#v, %#v signal received", syscall.SIGINT.String(), waitStatus.Signal().String()) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	runFor(t, 3, 6, func(t *testing.T, i int) { | ||||
| 		timer.Reset(time.Millisecond * 20) | ||||
| 		select { | ||||
| 		case out := <-p.Stdout(): | ||||
| 			if *out != "foo" { | ||||
| 				t.Errorf(`Expected: "foo", received: "%s"`, *out) | ||||
| 			} | ||||
| 		case err := <-p.Stderr(): | ||||
| 			t.Error("Unexpected error:", err) | ||||
| 		case <-timer.C: | ||||
| 			t.Error("Expected output in channel within 120ms") | ||||
| 			return | ||||
| 		} | ||||
| 		safeStop(timer) | ||||
| 	}) | ||||
|  | ||||
| 	_ = p.Restart() | ||||
|  | ||||
| 	t.Run("MaxSpawns reached", func(t *testing.T) { | ||||
| 		timer.Reset(time.Millisecond * 24) | ||||
| 		select { | ||||
| 		case out := <-p.Stdout(): | ||||
| 			t.Error("Unexpected output:", *out) | ||||
| 		case err := <-p.Stderr(): | ||||
| 			t.Error("Unexpected error:", err) | ||||
| 		case <-timer.C: | ||||
| 			return | ||||
| 		} | ||||
| 		safeStop(timer) | ||||
| 	}) | ||||
| 	time.Sleep(time.Second) | ||||
| } | ||||
|  | ||||
| // test_timings_compressed_data can be used to test the performance of this library. | ||||
| func test_timings_compressed_data(t *testing.T) { | ||||
| 	runtime.GOMAXPROCS(runtime.NumCPU()) | ||||
|  | ||||
| 	f, err := os.Open("testdata/ipsum.zlib") | ||||
| 	fatalIfErr(t, err) | ||||
| 	content, err := ioutil.ReadAll(f) | ||||
| 	fatalIfErr(t, err) | ||||
|  | ||||
| 	producer := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:               funcName(), | ||||
| 		Name:             "./zlib.sh", | ||||
| 		Dir:              testDir(t), | ||||
| 		OutputParser:     su.MakeLineParser, | ||||
| 		ErrorParser:      su.MakeLineParser, | ||||
| 		MaxSpawnAttempts: 1, | ||||
| 		ParserBufferSize: 170000, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, producer.Start()) | ||||
|  | ||||
| 	stop := make(chan bool) | ||||
| 	pDone := make(chan bool) | ||||
|  | ||||
| 	prodInNum := int64(0) | ||||
| 	prodOutNum := int64(0) | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-stop: | ||||
| 				log.Println("prodInNum", prodInNum) | ||||
| 				return | ||||
| 			case <-time.After(time.Microsecond): | ||||
| 				producer.Input() <- content | ||||
| 				prodInNum++ | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-stop: | ||||
| 				log.Println("prodOutNum", prodOutNum) | ||||
| 				return | ||||
| 			case <-producer.Stdout(): | ||||
| 				prodOutNum++ | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		<-stop | ||||
| 		_ = producer.Stop() | ||||
| 		close(pDone) | ||||
| 	}() | ||||
|  | ||||
| 	time.AfterFunc(time.Second*10, func() { | ||||
| 		close(stop) | ||||
| 	}) | ||||
|  | ||||
| 	<-pDone | ||||
|  | ||||
| 	log.Println(prodInNum, prodOutNum) | ||||
| } | ||||
|  | ||||
| // test_timings can be used to test the performance of this library. | ||||
| func test_timings(t *testing.T) { | ||||
| 	runtime.GOMAXPROCS(runtime.NumCPU()) | ||||
|  | ||||
| 	f, err := os.Open("testdata/ipsum.txt") | ||||
| 	fatalIfErr(t, err) | ||||
|  | ||||
| 	ipsum, err := ioutil.ReadAll(f) | ||||
| 	fatalIfErr(t, err) | ||||
| 	ipsum = append(ipsum, '\n') | ||||
|  | ||||
| 	producer := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:               funcName(), | ||||
| 		Name:             "./producer.sh", | ||||
| 		Dir:              testDir(t), | ||||
| 		OutputParser:     su.MakeBytesParser, | ||||
| 		ErrorParser:      su.MakeLineParser, | ||||
| 		ParserBufferSize: 170000, | ||||
| 	}) | ||||
| 	incrementer := su.NewProcess(su.ProcessOptions{ | ||||
| 		Id:               funcName(), | ||||
| 		Name:             "./incrementer.sh", | ||||
| 		Dir:              testDir(t), | ||||
| 		OutputParser:     su.MakeBytesParser, | ||||
| 		ErrorParser:      su.MakeLineParser, | ||||
| 		ParserBufferSize: 170000, | ||||
| 	}) | ||||
|  | ||||
| 	fatalIfErr(t, producer.Start()) | ||||
| 	fatalIfErr(t, incrementer.Start()) | ||||
|  | ||||
| 	stop := make(chan bool) | ||||
| 	pDone := make(chan bool) | ||||
| 	iDone := make(chan bool) | ||||
|  | ||||
| 	prodInNum := int64(0) | ||||
| 	prodOutNum := int64(0) | ||||
| 	incOutNum := int64(0) | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-stop: | ||||
| 				log.Println("prodInNum", prodInNum) | ||||
| 				return | ||||
| 			case <-time.After(time.Microsecond * 50): | ||||
| 				producer.Input() <- ipsum | ||||
| 				prodInNum++ | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-stop: | ||||
| 				log.Println("prodOutNum", prodOutNum) | ||||
| 				return | ||||
| 			case msg := <-producer.Stdout(): | ||||
| 				incrementer.Input() <- (*msg).([]byte) | ||||
| 				prodOutNum++ | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-stop: | ||||
| 				log.Println("incOutNum", incOutNum) | ||||
| 				return | ||||
| 			case <-incrementer.Stdout(): | ||||
| 				incOutNum++ | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		<-stop | ||||
| 		_ = producer.Stop() | ||||
| 		close(pDone) | ||||
| 	}() | ||||
| 	go func() { | ||||
| 		<-stop | ||||
| 		_ = incrementer.Stop() | ||||
| 		close(iDone) | ||||
| 	}() | ||||
|  | ||||
| 	time.AfterFunc(time.Second*10, func() { | ||||
| 		close(stop) | ||||
| 	}) | ||||
|  | ||||
| 	<-iDone | ||||
| 	<-pDone | ||||
|  | ||||
| 	log.Println(prodInNum, prodOutNum, incOutNum) | ||||
| } | ||||
							
								
								
									
										5
									
								
								v2/testdata/echo.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										5
									
								
								v2/testdata/echo.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,5 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| echo "Hello world" | ||||
|  | ||||
| exit | ||||
							
								
								
									
										6
									
								
								v2/testdata/endless.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										6
									
								
								v2/testdata/endless.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| while :; do | ||||
|     echo "foo" | ||||
|     sleep 0.01 | ||||
| done | ||||
							
								
								
									
										6
									
								
								v2/testdata/endless_jsons.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										6
									
								
								v2/testdata/endless_jsons.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| while :; do | ||||
|     echo '{"foo": "bar","quo":["quz", 1, false]}' | ||||
|     sleep 0.01 | ||||
| done | ||||
							
								
								
									
										5
									
								
								v2/testdata/error.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										5
									
								
								v2/testdata/error.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,5 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| >&2 echo "Bye world" | ||||
|  | ||||
| exit 5 | ||||
							
								
								
									
										9
									
								
								v2/testdata/greet_with_error.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										9
									
								
								v2/testdata/greet_with_error.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,9 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| read greet | ||||
|  | ||||
| echo "$1 $greet" | ||||
|  | ||||
| >&2 echo "Bye $greet" | ||||
|  | ||||
| exit 5 | ||||
							
								
								
									
										13
									
								
								v2/testdata/incrementer.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										13
									
								
								v2/testdata/incrementer.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,13 @@ | ||||
| #!/usr/bin/env ruby | ||||
|  | ||||
| require 'json' | ||||
| STDOUT.sync = true | ||||
|  | ||||
| STDIN.each_line do |l| | ||||
|   begin | ||||
|     l = JSON(l.chomp) | ||||
|     puts({greetings: "from incrementer", msg: l["msg"], prev: l["num"], num: l["num"]+1}.to_json) | ||||
|   rescue StandardError => e | ||||
|     STDERR.puts e | ||||
|   end | ||||
| end | ||||
							
								
								
									
										1
									
								
								v2/testdata/ipsum.txt
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								v2/testdata/ipsum.txt
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
							
								
								
									
										1
									
								
								v2/testdata/ipsum.zlib
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								v2/testdata/ipsum.zlib
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
							
								
								
									
										9
									
								
								v2/testdata/parent.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										9
									
								
								v2/testdata/parent.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,9 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| ./$1 & child_pid=$! | ||||
|  | ||||
| >&2 echo $child_pid | ||||
|  | ||||
| while :; do | ||||
|     sleep 1 | ||||
| done | ||||
							
								
								
									
										12
									
								
								v2/testdata/producer.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										12
									
								
								v2/testdata/producer.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,12 @@ | ||||
| #!/usr/bin/env ruby | ||||
|  | ||||
| require 'json' | ||||
| STDOUT.sync = true | ||||
|  | ||||
| STDIN.each_line do |l| | ||||
|   begin | ||||
|     puts({hello: "from producer", msg: l.chomp, num: rand(1000)}.to_json) | ||||
|   rescue StandardError => e | ||||
|     STDERR.puts e | ||||
|   end | ||||
| end | ||||
							
								
								
									
										11
									
								
								v2/testdata/trap.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										11
									
								
								v2/testdata/trap.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,11 @@ | ||||
| #!/usr/bin/env bash | ||||
|  | ||||
| trap '' TERM INT | ||||
|  | ||||
| ./$1 & child_pid=$! | ||||
|  | ||||
| >&2 echo $child_pid | ||||
|  | ||||
| while :; do | ||||
|     sleep 0.01 | ||||
| done | ||||
							
								
								
									
										16
									
								
								v2/testdata/zlib.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								v2/testdata/zlib.sh
									
									
									
									
										vendored
									
									
										Executable file
									
								
							| @@ -0,0 +1,16 @@ | ||||
| #!/usr/bin/env ruby | ||||
|  | ||||
| require 'zlib' | ||||
| require 'json' | ||||
| require 'base64' | ||||
|  | ||||
| STDOUT.sync = true | ||||
|  | ||||
| STDIN.each_line do |l| | ||||
|   begin | ||||
|     buf = Zlib::Inflate.inflate Base64::strict_decode64 l.chomp | ||||
|     puts Base64::strict_encode64 Zlib::Deflate.deflate({hello: "from producer", msg: buf, num: rand(1000)}.to_json + "\n") | ||||
|   rescue StandardError => e | ||||
|     STDERR.puts "#{e.to_s}: #{e.backtrace[0].to_s}" | ||||
|   end | ||||
| end | ||||
		Reference in New Issue
	
	Block a user
	 Eyal Shalev
					Eyal Shalev