feat: add profile and alarm notification

This commit is contained in:
zhuyasen
2022-12-10 22:26:32 +08:00
parent b99a84fef1
commit 64af176419
30 changed files with 754 additions and 54 deletions

View File

@@ -53,7 +53,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -54,7 +54,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -54,7 +54,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -53,7 +53,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -54,7 +54,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -53,7 +53,10 @@ func Config() {
// initializing the print system and process resources
if cfg.App.EnableStat {
stat.Init(stat.WithLog(logger.Get()))
stat.Init(
stat.WithLog(logger.Get()),
stat.WithAlarm(), // invalid if it is windows, the default threshold for cpu and memory is 0.8, you can modify them
)
}
}

View File

@@ -6,7 +6,7 @@ app:
env: "dev" # runtime environment, dev: development environment, prod: production environment, test: test environment
version: "v0.0.0"
host: "192.168.3.27" # host name or ip
enablePprof: true # whether to enable performance analysis, true:enable, false:disable
enableHTTPProfile: false # whether to enable performance analysis, true:enable, false:disable
enableStat: true # whether to enable printing statistics, true:enable, false:disable
enableMetrics: true # whether to enable indicator collection, true:enable, false:disable
enableLimit: false # whether to turn on rate limiting (adaptive), true:on, false:off
@@ -19,15 +19,15 @@ app:
# http server settings
http:
port: 8080 # listening port
port: 8080 # listen port
readTimeout: 3 # read timeout, unit(second)
writeTimeout: 60 # write timeout, unit(second), if enablePprof is true, it needs to be greater than 60s, the default value for pprof to do profiling is 60s
writeTimeout: 60 # write timeout, unit(second), if enableHTTPProfile is true, it needs to be greater than 60s, the default value for pprof to do profiling is 60s
# grpc server settings
grpc:
port: 8282 # listening port
httpPort: 8283 # get pprof and monitor indicator ports
port: 8282 # listen port
httpPort: 8283 # profile and metrics ports
readTimeout: 3 # read timeout, unit(second)
writeTimeout: 3 # write timeout, unit(second)
@@ -37,7 +37,7 @@ grpcClient:
- name: "serverNameExample" # rpc service name, used for service discovery
host: "192.168.3.27" # rpc service address, used for direct connection
port: 8282 # rpc service port
registryDiscoveryType: "" # registration and discovery types: consul, etcd, nacos, if empty, registration and discovery are not used
registryDiscoveryType: "" # registration and discovery types: consul, etcd, nacos, if empty, connecting to server using host and port
# logger settings

View File

@@ -61,7 +61,7 @@ type App struct {
EnableCircuitBreaker bool `yaml:"enableCircuitBreaker" json:"enableCircuitBreaker"`
EnableLimit bool `yaml:"enableLimit" json:"enableLimit"`
EnableMetrics bool `yaml:"enableMetrics" json:"enableMetrics"`
EnablePprof bool `yaml:"enablePprof" json:"enablePprof"`
EnableHTTPProfile bool `yaml:"enableHTTPProfile" json:"enableHTTPProfile"`
EnableStat bool `yaml:"enableStat" json:"enableStat"`
EnableTrace bool `yaml:"enableTrace" json:"enableTrace"`
Env string `yaml:"env" json:"env"`

View File

@@ -63,8 +63,8 @@ func NewRouter() *gin.Engine {
r.Use(middleware.Tracing(config.Get().App.Name))
}
// pprof performance analysis
if config.Get().App.EnablePprof {
// profile performance analysis
if config.Get().App.EnableHTTPProfile {
prof.Register(r, prof.WithIOWaitTime())
}

View File

@@ -63,7 +63,7 @@ func NewRouter_pbExample() *gin.Engine { //nolint
}
// pprof performance analysis
if config.Get().App.EnablePprof {
if config.Get().App.EnableHTTPProfile {
prof.Register(r, prof.WithIOWaitTime())
}

View File

@@ -23,7 +23,7 @@ func TestNewRouter_pbExample(t *testing.T) {
config.Get().App.EnableMetrics = true
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true

View File

@@ -22,7 +22,7 @@ func TestNewRouter(t *testing.T) {
config.Get().App.EnableMetrics = false
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true

View File

@@ -174,7 +174,7 @@ func NewGRPCServer(addr string, opts ...GrpcOption) app.IServer {
iRegistry: o.iRegistry,
instance: o.instance,
}
if config.Get().App.EnablePprof {
if config.Get().App.EnableHTTPProfile {
s.registerProfMux()
}

View File

@@ -25,7 +25,7 @@ func TestGRPCServer(t *testing.T) {
config.Get().App.EnableMetrics = true
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true
@@ -51,7 +51,7 @@ func TestGRPCServerMock(t *testing.T) {
}
config.Get().App.EnableMetrics = true
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true

View File

@@ -25,7 +25,7 @@ func TestHTTPServer(t *testing.T) {
}
config.Get().App.EnableMetrics = true
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true
@@ -63,7 +63,7 @@ func TestHTTPServerMock(t *testing.T) {
}
config.Get().App.EnableMetrics = true
config.Get().App.EnableTrace = true
config.Get().App.EnablePprof = true
config.Get().App.EnableHTTPProfile = true
config.Get().App.EnableLimit = true
config.Get().App.EnableCircuitBreaker = true

View File

@@ -1,6 +1,6 @@
## app
Elegantly start and stop services, using [errgroup](golang.org/x/sync/errgroup) to ensure that multiple services are started properly at the same time.
Start and stop services gracefully, using [errgroup](golang.org/x/sync/errgroup) to ensure that multiple services are started properly at the same time.
<br>
@@ -8,32 +8,22 @@ Elegantly start and stop services, using [errgroup](golang.org/x/sync/errgroup)
```go
func main() {
inits := registerInits()
initApp()
servers := registerServers()
closes := registerCloses(servers)
s := app.New(inits, servers, closes)
s.Run()
a := app.New(servers, closes)
a.Run()
}
func registerInits() []app.Init {
func initApp() {
// get configuration
var inits []app.Init
// initializing log
inits = append(inits, func() {
})
// initializing database
inits = append(inits, func() {
})
// ......
return inits
}
func registerServers() []app.IServer {

View File

@@ -7,6 +7,8 @@ import (
"os/signal"
"syscall"
"github.com/zhufuyi/sponge/pkg/prof"
"golang.org/x/sync/errgroup"
)
@@ -63,16 +65,22 @@ func (a *App) Run() {
// watch the os signal and the ctx signal from the errgroup, and stop the service if either signal is triggered
func (a *App) watch(ctx context.Context) error {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGTRAP)
profile := prof.NewProfile()
for {
select {
case <-ctx.Done(): // service error signals
case <-ctx.Done(): // service error
_ = a.stop()
return ctx.Err()
case s := <-quit: // system notification signal
fmt.Printf("quit signal: %s\n", s.String())
case sigType := <-sig: // system notification signal
fmt.Printf("received system notification signal: %s\n", sigType.String())
switch sigType {
case syscall.SIGTRAP:
profile.StartOrStop() // start or stop sampling profile
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP:
if err := a.stop(); err != nil {
return err
}
@@ -81,6 +89,7 @@ func (a *App) watch(ctx context.Context) error {
}
}
}
}
// stopping services and releasing resources
func (a *App) stop() error {

View File

@@ -33,6 +33,24 @@ func GetFilename(filePath string) string {
return name
}
// GetFilenameWithoutSuffix get file name without suffix
func GetFilenameWithoutSuffix(filePath string) string {
_, name := filepath.Split(filePath)
return strings.TrimSuffix(name, path.Ext(name))
}
// Join joins any number of path elements into a single path
func Join(elem ...string) string {
path := strings.Join(elem, "/")
if IsWindows() {
return strings.ReplaceAll(path, "/", "\\")
}
return path
}
// IsWindows determining whether a window environment
func IsWindows() bool {
return runtime.GOOS == "windows"

View File

@@ -71,6 +71,9 @@ func TestListDirsAndFiles(t *testing.T) {
func TestGetFilename(t *testing.T) {
name := GetFilename("./README.md")
assert.Equal(t, "README.md", name)
name = GetFilenameWithoutSuffix("./README.md")
assert.Equal(t, "README", name)
}
func TestGetPathDelimiter(t *testing.T) {
@@ -119,3 +122,13 @@ func TestFuzzyMatchFiles(t *testing.T) {
files = FuzzyMatchFiles("./*_test.go")
assert.Equal(t, 2, len(files))
}
func TestJoin(t *testing.T) {
elements := []string{"a/b", "c"}
path := Join(elements...)
t.Log(path)
elements = []string{"a\\b", "c"}
path = Join(elements...)
t.Log(path)
}

View File

@@ -6,9 +6,11 @@ Wrap the official `net/http/pprof` route and add the profile io wait time route.
### Example of use
#### sampling profile by http
```go
mux := http.NewServeMux()
prof.Register(r, WithPrefix("/myServer"), WithIOWaitTime())
prof.Register(mux, WithPrefix("/myServer"), WithIOWaitTime())
httpServer := &http.Server{
Addr: ":8080",
@@ -19,3 +21,35 @@ Wrap the official `net/http/pprof` route and add the profile io wait time route.
panic("listen and serve error: " + err.Error())
}
```
<br>
#### sampling profile by system notification signal
```go
func WaitSign() {
p := NewProfile()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGTRAP)
for {
v := <-signals
switch v {
case syscall.SIGTRAP:
p.StartOrStop()
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP:
os.Exit(0)
}
}
}
```
```bash
# view the program's pid
ps -aux | grep serverName
# notification of sampling profile, default 60s, in less than 60s, if the second execution will actively stop sampling profile
kill -trap pid
```

346
pkg/prof/profile.go Normal file
View File

@@ -0,0 +1,346 @@
package prof
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"runtime/trace"
"strings"
"sync/atomic"
"syscall"
"time"
)
var (
durationSecond uint32 = 60
isSamplingTrace = false
serverName = getServerName()
pid = syscall.Getpid()
timeFormat = "20060102T150405"
status uint32
statusStart uint32 = 1
statusStop uint32 = 0
)
// WaitSign wait system notification signals
//func WaitSign() {
// p := NewProfile()
//
// signals := make(chan os.Signal, 1)
// signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGTRAP)
//
// for {
// v := <-signals
// switch v {
// case syscall.SIGTRAP:
// p.StartOrStop()
//
// case syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP:
// gracefully exit
// os.Exit(0)
// }
// }
//}
type profile struct {
files []string
closeFns []func()
ctx context.Context
stopCh chan struct{}
}
// NewProfile create a new profile
func NewProfile() *profile {
p := new(profile)
p.stopCh = make(chan struct{})
return p
}
// StartOrStop start and stop sampling profile, the first call to start sampling data, the default maximum is 60 seconds,
// in less than 60s, if the second execution will actively stop sampling profile
func (p *profile) StartOrStop() {
if isStart() {
p.startProfile()
} else if isStop() {
p.stopProfile()
}
}
func (p *profile) startProfile() {
fmt.Printf("[profile] start sampling profile, status=%d\n", status)
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
err := p.cpu()
if err != nil {
fmt.Println(err)
}
err = p.mem()
if err != nil {
fmt.Println(err)
}
err = p.goroutine()
if err != nil {
fmt.Println(err)
}
err = p.block()
if err != nil {
fmt.Println(err)
}
err = p.mutex()
if err != nil {
fmt.Println(err)
}
err = p.threadCreate()
if err != nil {
fmt.Println(err)
}
if isSamplingTrace {
err = p.trace()
if err != nil {
fmt.Println(err)
}
}
go p.checkTimeout()
}
func (p *profile) stopProfile() {
fmt.Printf("[profile] stop sampling profile, status=%d\n", status)
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
if p == nil || len(p.closeFns) == 0 {
return
}
for _, fn := range p.closeFns {
fn()
}
select {
case p.stopCh <- struct{}{}:
default:
}
p = NewProfile() // reset profile
}
func (p *profile) checkTimeout() {
if p == nil {
return
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*time.Duration(durationSecond)) //nolint
select {
case <-p.stopCh:
fmt.Println("[profile] reason for stopping: manual")
return
case <-ctx.Done():
if isStop() {
p.stopProfile()
}
fmt.Println("[profile] reason for stopping: timeout")
}
}
func (p *profile) cpu() error {
profileName := "cpu"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
_ = pprof.StartCPUProfile(f)
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
pprof.StopCPUProfile()
_ = f.Close()
})
return nil
}
func (p *profile) mem() error {
profileName := "mem"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
old := runtime.MemProfileRate
runtime.MemProfileRate = 4096
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
_ = pprof.Lookup("heap").WriteTo(f, 0)
_ = f.Close()
runtime.MemProfileRate = old
})
return nil
}
func (p *profile) goroutine() error {
profileName := "goroutine"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
_ = pprof.Lookup(profileName).WriteTo(f, 2)
_ = f.Close()
})
return nil
}
func (p *profile) block() error {
profileName := "block"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
runtime.SetBlockProfileRate(1)
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
_ = pprof.Lookup(profileName).WriteTo(f, 0)
_ = f.Close()
runtime.SetBlockProfileRate(0)
})
return nil
}
func (p *profile) mutex() error {
profileName := "mutex"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
runtime.SetMutexProfileFraction(1)
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
if mp := pprof.Lookup(profileName); mp != nil {
_ = mp.WriteTo(f, 0)
}
_ = f.Close()
runtime.SetMutexProfileFraction(0)
})
return nil
}
func (p *profile) threadCreate() error {
profileName := "threadcreate"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
if mp := pprof.Lookup(profileName); mp != nil {
_ = mp.WriteTo(f, 0)
}
_ = f.Close()
})
return nil
}
func (p *profile) trace() error {
profileName := "trace"
file := getFilePath(profileName)
f, err := os.Create(file)
if err != nil {
return err
}
err = trace.Start(f)
if err != nil {
_ = f.Close()
return err
}
p.files = append(p.files, file)
p.closeFns = append(p.closeFns, func() {
trace.Stop()
_ = f.Close()
})
return nil
}
// SetDurationSecond set sampling profile duration
func SetDurationSecond(d uint32) {
atomic.StoreUint32(&durationSecond, d)
}
// EnableTrace enable sampling trace profile
func EnableTrace() {
isSamplingTrace = true
}
func isStart() bool {
return atomic.CompareAndSwapUint32(&status, statusStop, statusStart)
}
func isStop() bool {
return atomic.CompareAndSwapUint32(&status, statusStart, statusStop)
}
func getFilePath(profileName string) string {
dir := joinPath(os.TempDir(), serverName+"_profile")
_ = os.MkdirAll(dir, 0666)
return joinPath(dir, fmt.Sprintf("%s_%d_%s_%s.out",
time.Now().Format(timeFormat), pid, serverName, profileName))
}
func getServerName() string {
_, name := filepath.Split(os.Args[0])
return strings.TrimSuffix(name, path.Ext(name))
}
func joinPath(elem ...string) string {
path := strings.Join(elem, "/")
if runtime.GOOS == "windows" {
return strings.ReplaceAll(path, "/", "\\")
}
return path
}

21
pkg/prof/profile_test.go Normal file
View File

@@ -0,0 +1,21 @@
package prof
import (
"testing"
"time"
)
func TestProfile(t *testing.T) {
EnableTrace()
SetDurationSecond(2)
p := NewProfile()
p.StartOrStop()
time.Sleep(time.Second)
p.StartOrStop()
time.Sleep(time.Second)
p.StartOrStop()
time.Sleep(time.Millisecond * 2100)
}

View File

@@ -1,6 +1,6 @@
## stat
Statistics on system and process cpu and memory information.
Statistics on system and process cpu and memory information, alarm notification support.
<br>
@@ -11,5 +11,6 @@ Statistics on system and process cpu and memory information.
stat.Init(
WithLog(l),
WithPrintInterval(time.Minute),
WithEnableAlarm(WithCPUThreshold(0.9), WithMemoryThreshold(0.85)), // invalid if it is windows
)
```

111
pkg/stat/alarm.go Normal file
View File

@@ -0,0 +1,111 @@
package stat
import (
"fmt"
"time"
)
var (
cpuThreshold = 0.8
memoryThreshold = 0.8
triggerInterval float64 = 900 // unit(s)
)
// AlarmOption set the alarm options field.
type AlarmOption func(*alarmOptions)
type alarmOptions struct{}
func (o *alarmOptions) apply(opts ...AlarmOption) {
for _, opt := range opts {
opt(o)
}
}
// WithCPUThreshold set cpu threshold, range 0 to 1
func WithCPUThreshold(threshold float64) AlarmOption {
return func(o *alarmOptions) {
if threshold < 0 || threshold >= 1.0 {
return
}
cpuThreshold = threshold
}
}
// WithMemoryThreshold set memory threshold, range 0 to 1
func WithMemoryThreshold(threshold float64) AlarmOption {
return func(o *alarmOptions) {
if threshold < 0 || threshold >= 1.0 {
return
}
memoryThreshold = threshold
}
}
type statGroup struct {
data [3]*statData
alarmAt time.Time
}
func newStatGroup() *statGroup {
return &statGroup{data: [3]*statData{}}
}
func (g *statGroup) check(sd *statData) bool {
if g.data[0] == nil {
g.data[0] = sd
return false
} else if g.data[1] == nil {
g.data[1] = g.data[0]
g.data[0] = sd
return false
}
g.data[2] = g.data[1]
g.data[1] = g.data[0]
g.data[0] = sd
if g.checkCPU(cpuThreshold) || g.checkMemory(memoryThreshold) {
if g.alarmAt.IsZero() {
g.alarmAt = time.Now()
return true
}
if time.Since(g.alarmAt).Seconds() >= triggerInterval {
g.alarmAt = time.Now()
return true
}
}
return false
}
func (g *statGroup) checkCPU(threshold float64) bool {
if g.data[0].sys.CPUCores == 0 {
return false
}
// average cpu usage exceeds cpuCores*threshold
average := (g.data[0].proc.CPUUsage + g.data[1].proc.CPUUsage + g.data[2].proc.CPUUsage) / 3 / float64(g.data[0].sys.CPUCores)
threshold = threshold * 100
if average >= threshold {
fmt.Printf("[cpu] processes cpu usage(%.f%%) exceeds %.f%%\n", average, threshold)
return true
}
return false
}
func (g *statGroup) checkMemory(threshold float64) bool {
if g.data[0].sys.MemTotal == 0 {
return false
}
// processes occupying more than threshold of system memory
procAverage := (g.data[0].proc.RSS + g.data[1].proc.RSS + g.data[2].proc.RSS) / 3
procAverageUsage := float64(procAverage) / float64(g.data[0].sys.MemTotal)
if procAverageUsage >= threshold {
fmt.Printf("[memory] processes memory usage(%.f%%) exceeds %.f%%\n", procAverageUsage*100, threshold*100)
return true
}
return false
}

61
pkg/stat/alarm_test.go Normal file
View File

@@ -0,0 +1,61 @@
package stat
import (
"encoding/json"
"testing"
"time"
)
func Test_statGroup_check(t *testing.T) {
testData := `[
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":0.1,"rss":58,"vms":1475,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":0.1,"rss":4500,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":0.1,"rss":5500,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":0.1,"rss":5100,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":0.1,"rss":58,"vms":1475,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":161,"rss":52,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":158,"rss":53,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":175,"rss":53,"vms":5000,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":2,"mem_total":0,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":17.5,"rss":53,"vms":50,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}},
{"system": {"cpu_usage":0,"cpu_cores":0,"mem_total":6000,"mem_free":555,"mem_usage":56.7}, "process": {"cpu_usage":17.5,"rss":53,"vms":50,"alloc":14,"total_alloc":27,"sys":39,"num_gc":6}}
]`
type stData struct {
System system `json:"system"`
Process process `json:"process"`
}
sd := []stData{}
err := json.Unmarshal([]byte(testData), &sd)
if err != nil {
t.Error(err)
return
}
sg := newStatGroup()
triggerInterval = 1
for _, data := range sd {
isAlarm := sg.check(&statData{
sys: data.System,
proc: data.Process,
})
t.Log(isAlarm)
time.Sleep(250 * time.Millisecond)
}
sg.checkCPU(0)
}
func Test_alarmOptions_apply(t *testing.T) {
t.Log(cpuThreshold, memoryThreshold)
ao := &alarmOptions{}
ao.apply(
WithCPUThreshold(-0.5), // invalid value
WithMemoryThreshold(1.5), // invalid value
WithCPUThreshold(0.9),
WithMemoryThreshold(0.85),
)
t.Log(cpuThreshold, memoryThreshold)
}

17
pkg/stat/notify.go Normal file
View File

@@ -0,0 +1,17 @@
//go:build linux || darwin
// +build linux darwin
package stat
import "syscall"
func init() {
go func() {
for {
select {
case <-notifyCh:
syscall.Kill(syscall.Getpid(), syscall.SIGTRAP)
}
}
}()
}

View File

@@ -1,6 +1,8 @@
package stat
import (
"math"
"runtime"
"time"
"github.com/zhufuyi/sponge/pkg/stat/cpu"
@@ -12,12 +14,16 @@ import (
var (
printInfoInterval = time.Minute // minimum 1 second
zapLog, _ = zap.NewProduction()
notifyCh = make(chan struct{})
)
// Option set the options field.
// Option set the stat options field.
type Option func(*options)
type options struct{}
type options struct {
enableAlarm bool
}
func (o *options) apply(opts ...Option) {
for _, opt := range opts {
@@ -45,6 +51,18 @@ func WithLog(l *zap.Logger) Option {
}
}
// WithAlarm enable alarm and notify, except windows
func WithAlarm(opts ...AlarmOption) Option {
return func(o *options) {
if runtime.GOOS == "windows" {
return
}
ao := &alarmOptions{}
ao.apply(opts...)
o.enableAlarm = true
}
}
// Init initialize statistical information
func Init(opts ...Option) {
o := &options{}
@@ -53,17 +71,31 @@ func Init(opts ...Option) {
go func() {
printTick := time.NewTicker(printInfoInterval)
defer printTick.Stop()
sg := newStatGroup()
for {
select {
case <-printTick.C:
printUsageInfo()
data := printUsageInfo()
if o.enableAlarm {
if sg.check(data) {
sendSystemSignForLinux()
}
}
}
}
}()
}
func printUsageInfo() {
func sendSystemSignForLinux() {
select {
case notifyCh <- struct{}{}:
default:
}
}
func printUsageInfo() *statData {
defer func() { recover() }()
mSys := mem.GetSystemMemory()
@@ -79,8 +111,9 @@ func printUsageInfo() {
sys := system{
CPUUsage: cSys.UsagePercent,
CPUCores: cors,
MemTotal: mSys.Total,
MemFree: mSys.Free,
MemUsage: mSys.UsagePercent,
MemUsage: float64(int(math.Round(mSys.UsagePercent))), // rounding
}
proc := process{
CPUUsage: cProc.UsagePercent,
@@ -96,11 +129,17 @@ func printUsageInfo() {
zap.Any("system", sys),
zap.Any("process", proc),
)
return &statData{
sys: sys,
proc: proc,
}
}
type system struct {
CPUUsage float64 `json:"cpu_usage"` // system cpu usage, unit(%)
CPUCores int32 `json:"cpu_cores"` // cpu cores, multiple cpu accumulation
MemTotal uint64 `json:"mem_total"` // system total physical memory, unit(M)
MemFree uint64 `json:"mem_free"` // system free physical memory, unit(M)
MemUsage float64 `json:"mem_usage"` // system memory usage, unit(%)
}
@@ -114,3 +153,8 @@ type process struct {
Sys uint64 `json:"sys"` // requesting memory capacity from the system, unit(M)
NumGc uint32 `json:"num_gc"` // number of GC cycles
}
type statData struct {
sys system
proc process
}

View File

@@ -16,7 +16,24 @@ func TestInit(t *testing.T) {
WithLog(l),
WithPrintInterval(time.Second),
WithAlarm(WithCPUThreshold(0.9), WithMemoryThreshold(0.85)),
)
time.Sleep(time.Second * 2)
}
func Test_sendSystemSignForLinux(t *testing.T) {
go func() {
select {
case <-notifyCh:
t.Log("received signal")
}
}()
time.Sleep(time.Millisecond * 200)
sendSystemSignForLinux()
time.Sleep(time.Millisecond * 200)
sendSystemSignForLinux()
time.Sleep(time.Millisecond * 200)
}