mirror of
https://github.com/datarhei/core.git
synced 2025-10-05 16:07:07 +08:00
Write stop/kill reason to process parser
This commit is contained in:
@@ -30,9 +30,21 @@ type LimitFunc func(cpu float64, memory uint64)
|
|||||||
|
|
||||||
type LimitMode int
|
type LimitMode int
|
||||||
|
|
||||||
|
func (m LimitMode) String() string {
|
||||||
|
if m == LimitModeHard {
|
||||||
|
return "hard"
|
||||||
|
}
|
||||||
|
|
||||||
|
if m == LimitModeSoft {
|
||||||
|
return "soft"
|
||||||
|
}
|
||||||
|
|
||||||
|
return "undefined"
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
LimitModeHard LimitMode = 0 // Killing the process if either CPU or memory is above the limit (for a certain time)
|
LimitModeHard LimitMode = 0 // Killing the process if either CPU or memory is above the limit for a certain time
|
||||||
LimitModeSoft LimitMode = 1 // Throttling the CPU if activated, killing the process if memory is above the limit
|
LimitModeSoft LimitMode = 1 // Throttling the CPU if activated, killing the process if memory is above the limit for a certain time
|
||||||
)
|
)
|
||||||
|
|
||||||
type LimiterConfig struct {
|
type LimiterConfig struct {
|
||||||
|
@@ -37,7 +37,7 @@ type Process interface {
|
|||||||
|
|
||||||
// Kill stops the process such that it will restart
|
// Kill stops the process such that it will restart
|
||||||
// automatically if it is defined to do so.
|
// automatically if it is defined to do so.
|
||||||
Kill(wait bool) error
|
Kill(wait bool, reason string) error
|
||||||
|
|
||||||
// IsRunning returns whether the process is currently
|
// IsRunning returns whether the process is currently
|
||||||
// running or not.
|
// running or not.
|
||||||
@@ -195,14 +195,16 @@ type process struct {
|
|||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
stopTimer *time.Timer
|
stopTimer *time.Timer
|
||||||
stopTimerLock sync.Mutex
|
stopTimerLock sync.Mutex
|
||||||
killTimer *time.Timer
|
stopReason string
|
||||||
killTimerLock sync.Mutex
|
stopReasonLock sync.Mutex
|
||||||
logger log.Logger
|
killTimer *time.Timer
|
||||||
debuglogger log.Logger
|
killTimerLock sync.Mutex
|
||||||
callbacks struct {
|
logger log.Logger
|
||||||
|
debuglogger log.Logger
|
||||||
|
callbacks struct {
|
||||||
onArgs func(args []string) []string
|
onArgs func(args []string) []string
|
||||||
onBeforeStart func() error
|
onBeforeStart func() error
|
||||||
onStart func()
|
onStart func()
|
||||||
@@ -280,8 +282,8 @@ func New(config Config) (Process, error) {
|
|||||||
p.logger.WithFields(log.Fields{
|
p.logger.WithFields(log.Fields{
|
||||||
"cpu": cpu,
|
"cpu": cpu,
|
||||||
"memory": memory,
|
"memory": memory,
|
||||||
}).Warn().Log("Stopping because limits are exceeded")
|
}).Warn().Log("Killed because limits are exceeded")
|
||||||
p.Kill(false)
|
p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory))
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -608,7 +610,7 @@ func (p *process) start() error {
|
|||||||
if p.stopTimer == nil {
|
if p.stopTimer == nil {
|
||||||
// Only create a new timer if there isn't already one running
|
// Only create a new timer if there isn't already one running
|
||||||
p.stopTimer = time.AfterFunc(p.timeout, func() {
|
p.stopTimer = time.AfterFunc(p.timeout, func() {
|
||||||
p.Kill(false)
|
p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout))
|
||||||
|
|
||||||
p.stopTimerLock.Lock()
|
p.stopTimerLock.Lock()
|
||||||
p.stopTimer.Stop()
|
p.stopTimer.Stop()
|
||||||
@@ -657,7 +659,7 @@ func (p *process) Stop(wait bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.stop(wait)
|
err := p.stop(wait, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.debuglogger.WithFields(log.Fields{
|
p.debuglogger.WithFields(log.Fields{
|
||||||
"state": p.getStateString(),
|
"state": p.getStateString(),
|
||||||
@@ -671,20 +673,20 @@ func (p *process) Stop(wait bool) error {
|
|||||||
|
|
||||||
// Kill will stop the process without changing the order such that it
|
// Kill will stop the process without changing the order such that it
|
||||||
// will restart automatically if enabled.
|
// will restart automatically if enabled.
|
||||||
func (p *process) Kill(wait bool) error {
|
func (p *process) Kill(wait bool, reason string) error {
|
||||||
// If the process is currently not running, we don't need
|
// If the process is currently not running, we don't need
|
||||||
// to do anything.
|
// to do anything.
|
||||||
if !p.isRunning() {
|
if !p.isRunning() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.stop(wait)
|
err := p.stop(wait, reason)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop will stop a process considering the current order and state.
|
// stop will stop a process considering the current order and state.
|
||||||
func (p *process) stop(wait bool) error {
|
func (p *process) stop(wait bool, reason string) error {
|
||||||
// If the process is currently not running, stop the restart timer
|
// If the process is currently not running, stop the restart timer
|
||||||
if !p.isRunning() {
|
if !p.isRunning() {
|
||||||
p.unreconnect()
|
p.unreconnect()
|
||||||
@@ -696,6 +698,10 @@ func (p *process) stop(wait bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.stopReasonLock.Lock()
|
||||||
|
p.stopReason = reason
|
||||||
|
p.stopReasonLock.Unlock()
|
||||||
|
|
||||||
p.setState(stateFinishing)
|
p.setState(stateFinishing)
|
||||||
|
|
||||||
p.logger.Info().Log("Stopping")
|
p.logger.Info().Log("Stopping")
|
||||||
@@ -769,7 +775,7 @@ func (p *process) stop(wait bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// reconnect will setup a timer to restart the process
|
// reconnect will setup a timer to restart the process
|
||||||
func (p *process) reconnect(delay time.Duration) {
|
func (p *process) reconnect(delay time.Duration) {
|
||||||
if delay < time.Duration(0) {
|
if delay < time.Duration(0) {
|
||||||
return
|
return
|
||||||
@@ -828,8 +834,8 @@ func (p *process) staler(ctx context.Context) {
|
|||||||
|
|
||||||
d := t.Sub(last)
|
d := t.Sub(last)
|
||||||
if d.Seconds() > timeout.Seconds() {
|
if d.Seconds() > timeout.Seconds() {
|
||||||
p.logger.Info().Log("Stale timeout after %s (%.2f).", timeout, d.Seconds())
|
p.logger.Info().Log("Stale timeout after %s (%.2fs).", timeout, d.Seconds())
|
||||||
p.stop(false)
|
p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -872,8 +878,16 @@ func (p *process) reader() {
|
|||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
p.logger.Debug().WithError(err).Log("")
|
p.logger.Debug().WithError(err).Log("")
|
||||||
|
p.parser.Parse(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.stopReasonLock.Lock()
|
||||||
|
if len(p.stopReason) != 0 {
|
||||||
|
p.parser.Parse(p.stopReason)
|
||||||
|
p.stopReason = ""
|
||||||
|
}
|
||||||
|
p.stopReasonLock.Unlock()
|
||||||
|
|
||||||
// Wait for the process to finish
|
// Wait for the process to finish
|
||||||
p.waiter()
|
p.waiter()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user