1.补充注释

2.增加监控上报逻辑
3.增加OnConnect和OnDisconnect回调
This commit is contained in:
lwch
2022-08-05 17:03:18 +08:00
parent 350244cadf
commit 3bd8afeed3
6 changed files with 101 additions and 22 deletions

View File

@@ -4,49 +4,60 @@ import (
"context"
"github.com/jkstack/agent/conf"
"github.com/jkstack/agent/internal/utils"
"github.com/jkstack/anet"
)
// App app interface
// App app 接口每一个agent必须实现以下接口
type App interface {
// get agent name
// 获取当前agent编号
ID() string
// 获取当前agent名称
AgentName() string
// get agent version
// 获取当前agent版本号
Version() string
// get configure file path
// 获取配置文件路径
ConfDir() string
// get agent configure
// 获取libagent所需配置
// 该对象必须是一个相对全局作用域的变量,在后续执行过程中该变量将会被更新
Configure() *conf.Configure
// rewrite configure file
RewriteConfigure() error
// connect callback
// 重置配置文件时的回调函数,在以下情况下会回调
// - 连接成功后服务端分配了新的agent id
OnRewriteConfigure() error
// 连接成功后的回调函数
OnConnect()
// disconnect callback
// 断开连接时的回调函数
OnDisconnect()
// report callback
// 触发上报监控信息时的回调函数,该回调一般被用来上报一些自定义监控数据
OnReportMonitor()
// message received
// 收到数据包时的回调函数
OnMessage(*anet.Msg) error
// loop write
// 返回数据包时的回调函数,该函数必须是一个循环,
// 且在有数据需要返回时将其放入第二个参数中的队列内
LoopWrite(context.Context, chan *anet.Msg) error
}
// RegisterService register system service
func deferCallback(name string, fn func()) {
defer utils.Recover(name)
fn()
}
// RegisterService 注册系统服务
func RegisterService(app App) error {
svc := newService(app)
return svc.Install()
}
// UnregisterService unregister system service
// UnregisterService 卸载系统服务
func UnregisterService(app App) error {
svc := newService(app)
svc.Stop()
return svc.Uninstall()
}
// Run run agent
// Run 运行agent
func Run(app App) error {
svc := newService(app)
return svc.Run()

19
app.go
View File

@@ -10,11 +10,14 @@ import (
"github.com/jkstack/agent/internal/utils"
"github.com/jkstack/anet"
"github.com/jkstack/jkframe/logging"
"github.com/shirou/gopsutil/v3/process"
)
type app struct {
a App
shortExit int
startup int64
process *process.Process
chRead chan *anet.Msg
chWrite chan *anet.Msg
@@ -25,12 +28,16 @@ type app struct {
// monitor
inPackets, inBytes uint64
outPackets, outBytes uint64
reconnectCount int
}
func newApp(a App) *app {
ctx, cancel := context.WithCancel(context.Background())
p, _ := process.NewProcess(int32(os.Getpid()))
return &app{
a: a,
process: p,
startup: time.Now().Unix(),
chRead: make(chan *anet.Msg, 1024*1024),
chWrite: make(chan *anet.Msg, 1024*1024),
ctx: ctx,
@@ -67,19 +74,31 @@ func (app *app) start() {
logging.Flush()
os.Exit(255)
}
begin := time.Now()
conn, err := app.connect()
if err != nil {
app.shortExit++
continue
}
deferCallback("on_connect", app.a.OnConnect)
ctx, cancel := context.WithCancel(app.ctx)
go app.read(ctx, cancel, conn)
go app.write(ctx, cancel, conn)
go app.keepalive(ctx, conn)
<-ctx.Done()
if time.Since(begin).Seconds() < 1 {
app.shortExit++
}
conn.Close()
deferCallback("dis_connect", app.a.OnDisconnect)
app.reconnectCount++
}
}

View File

@@ -100,7 +100,7 @@ func (app *app) waitHandshake(conn *websocket.Conn, timeout time.Duration) error
}
if len(msg.Handshake.ID) > 0 {
app.a.Configure().SetAgentID(msg.Handshake.ID)
app.a.RewriteConfigure()
app.a.OnRewriteConfigure()
}
return nil
}

View File

@@ -1,16 +1,65 @@
package agent
import (
"fmt"
rt "runtime"
"runtime/debug"
"sync/atomic"
"time"
"github.com/jkstack/anet"
)
func (app *app) report() {
for {
time.Sleep(app.a.Configure().Monitor.Interval.Duration())
app.a.OnReportMonitor()
app.chWrite <- app.buildReport()
}
}
func (app *app) buildReport() *anet.Msg {
var msg anet.Msg
msg.Type = anet.TypeAgentInfo
var info anet.AgentInfo
info.Version = app.a.Version()
info.GoVersion = rt.Version()
cpu, _ := app.process.CPUPercent()
info.CpuUsage = float32(cpu)
mem, _ := app.process.MemoryPercent()
info.MemoryUsage = mem
n, _ := rt.ThreadCreateProfile(nil)
info.Threads = n
info.Routines = rt.NumGoroutine()
info.Startup = app.startup
var stats rt.MemStats
rt.ReadMemStats(&stats)
info.HeapInuse = stats.HeapInuse
var gc debug.GCStats
gc.PauseQuantiles = make([]time.Duration, 5)
debug.ReadGCStats(&gc)
quantiles := make(map[string]float64)
for i := 0; i < 5; i++ {
quantiles[fmt.Sprintf("%d", i*25)] = gc.PauseQuantiles[i].Seconds()
}
info.GC = quantiles
info.InPackets = app.inPackets
info.InBytes = app.inBytes
info.OutPackets = app.outPackets
info.OutBytes = app.outBytes
info.ReconnectCount = app.reconnectCount
msg.AgentInfo = &info
return &msg
}
func (app *app) incInPackets() {
atomic.AddUint64(&app.inPackets, 1)
}

4
go.mod
View File

@@ -7,9 +7,9 @@ require (
github.com/containerd/cgroups v1.0.4
github.com/dustin/go-humanize v1.0.0
github.com/gorilla/websocket v1.5.0
github.com/jkstack/anet v0.0.0-20220701093727-7f4683edc043
github.com/jkstack/anet v0.0.0-20220805084406-b2b99e224f87
github.com/jkstack/jkframe v1.0.5
github.com/kardianos/service v1.2.2-0.20220428125717-29f8c79c511b
github.com/kardianos/service v1.2.1
github.com/opencontainers/runtime-spec v1.0.2
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/shirou/gopsutil/v3 v3.22.7

8
go.sum
View File

@@ -152,8 +152,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jkstack/anet v0.0.0-20220701093727-7f4683edc043 h1:qChLQnZWoVfTNwlDYfmi65YFDqmjKHXJ43Om0FwvozQ=
github.com/jkstack/anet v0.0.0-20220701093727-7f4683edc043/go.mod h1:zpkHPpKpEdaNNLLLabhWdyjRcczZYwPLocie5ys/RTc=
github.com/jkstack/anet v0.0.0-20220805084406-b2b99e224f87 h1:cvdLKwhsL8PfEtAF0gwPa2gJ3QsFyXwsxg2gyHR8mnY=
github.com/jkstack/anet v0.0.0-20220805084406-b2b99e224f87/go.mod h1:zpkHPpKpEdaNNLLLabhWdyjRcczZYwPLocie5ys/RTc=
github.com/jkstack/jkframe v1.0.5 h1:SUUkgnSHQ6IVnVgJlP6d0bbaANvtbiTwPsuTECq6t2E=
github.com/jkstack/jkframe v1.0.5/go.mod h1:s3Eq2ScKxRLGjHU8qfjapfO5fcuXtDwsZSJiq+naIfY=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -165,8 +165,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kardianos/service v1.2.2-0.20220428125717-29f8c79c511b h1:b5xYQ10mzbfvw0tJnqclfR5sObkGBdghZ+6wYLe7LCQ=
github.com/kardianos/service v1.2.2-0.20220428125717-29f8c79c511b/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/kardianos/service v1.2.1 h1:AYndMsehS+ywIS6RB9KOlcXzteWUzxgMgBymJD7+BYk=
github.com/kardianos/service v1.2.1/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=