Update On Fri Sep 6 20:35:08 CEST 2024

This commit is contained in:
github-action[bot]
2024-09-06 20:35:09 +02:00
parent d6a419be50
commit 1a9fdafeaa
118 changed files with 7507 additions and 4309 deletions

View File

@@ -39,7 +39,8 @@ type Cmgr interface {
Start(ctx context.Context, errCH chan error)
// Metrics related
QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq) (*ms.QueryNodeMetricsResp, error)
QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq, refresh bool) (*ms.QueryNodeMetricsResp, error)
QueryRuleMetrics(ctx context.Context, req *ms.QueryRuleMetricsReq, refresh bool) (*ms.QueryRuleMetricsResp, error)
}
type cmgrImpl struct {
@@ -201,20 +202,30 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
}
}
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq) (*ms.QueryNodeMetricsResp, error) {
num := -1 // default to return all metrics
if req.Latest {
m, err := cm.mr.ReadOnce(ctx)
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq, refresh bool) (*ms.QueryNodeMetricsResp, error) {
if refresh {
nm, _, err := cm.mr.ReadOnce(ctx)
if err != nil {
return nil, err
}
if err := cm.ms.AddNodeMetric(m); err != nil {
if err := cm.ms.AddNodeMetric(ctx, nm); err != nil {
return nil, err
}
num = 1
}
startTime := time.Unix(req.StartTimestamp, 0)
endTime := time.Unix(req.EndTimestamp, 0)
return cm.ms.QueryNodeMetric(startTime, endTime, num)
return cm.ms.QueryNodeMetric(ctx, req)
}
func (cm *cmgrImpl) QueryRuleMetrics(ctx context.Context, req *ms.QueryRuleMetricsReq, refresh bool) (*ms.QueryRuleMetricsResp, error) {
if refresh {
_, rm, err := cm.mr.ReadOnce(ctx)
if err != nil {
return nil, err
}
for _, m := range rm {
if err := cm.ms.AddRuleMetric(ctx, m); err != nil {
return nil, err
}
}
}
return cm.ms.QueryRuleMetric(ctx, req)
}

View File

@@ -0,0 +1,163 @@
package ms
import (
"context"
"github.com/Ehco1996/ehco/pkg/metric_reader"
)
type NodeMetrics struct {
Timestamp int64 `json:"timestamp"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
DiskUsage float64 `json:"disk_usage"`
NetworkIn float64 `json:"network_in"` // bytes per second
NetworkOut float64 `json:"network_out"` // bytes per second
}
type QueryNodeMetricsReq struct {
StartTimestamp int64
EndTimestamp int64
Num int64
}
type QueryNodeMetricsResp struct {
TOTAL int `json:"total"`
Data []NodeMetrics `json:"data"`
}
type RuleMetricsData struct {
Timestamp int64 `json:"timestamp"`
Label string `json:"label"`
Remote string `json:"remote"`
PingLatency int64 `json:"ping_latency"`
TCPConnectionCount int64 `json:"tcp_connection_count"`
TCPHandshakeDuration int64 `json:"tcp_handshake_duration"`
TCPNetworkTransmitBytes int64 `json:"tcp_network_transmit_bytes"`
UDPConnectionCount int64 `json:"udp_connection_count"`
UDPHandshakeDuration int64 `json:"udp_handshake_duration"`
UDPNetworkTransmitBytes int64 `json:"udp_network_transmit_bytes"`
}
type QueryRuleMetricsReq struct {
RuleLabel string
Remote string
StartTimestamp int64
EndTimestamp int64
Num int64
}
type QueryRuleMetricsResp struct {
TOTAL int `json:"total"`
Data []RuleMetricsData `json:"data"`
}
func (ms *MetricsStore) AddNodeMetric(ctx context.Context, m *metric_reader.NodeMetrics) error {
_, err := ms.db.ExecContext(ctx, `
INSERT OR REPLACE INTO node_metrics (timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?)
`, m.SyncTime.Unix(), m.CpuUsagePercent, m.MemoryUsagePercent, m.DiskUsagePercent, m.NetworkReceiveBytesRate, m.NetworkTransmitBytesRate)
return err
}
func (ms *MetricsStore) AddRuleMetric(ctx context.Context, rm *metric_reader.RuleMetrics) error {
tx, err := ms.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck
stmt, err := tx.PrepareContext(ctx, `
INSERT OR REPLACE INTO rule_metrics
(timestamp, label, remote, ping_latency,
tcp_connection_count, tcp_handshake_duration, tcp_network_transmit_bytes,
udp_connection_count, udp_handshake_duration, udp_network_transmit_bytes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
defer stmt.Close() //nolint:errcheck
for remote, pingMetric := range rm.PingMetrics {
_, err := stmt.ExecContext(ctx, rm.SyncTime.Unix(), rm.Label, remote, pingMetric.Latency,
rm.TCPConnectionCount[remote], rm.TCPHandShakeDuration[remote], rm.TCPNetworkTransmitBytes[remote],
rm.UDPConnectionCount[remote], rm.UDPHandShakeDuration[remote], rm.UDPNetworkTransmitBytes[remote])
if err != nil {
return err
}
}
return tx.Commit()
}
func (ms *MetricsStore) QueryNodeMetric(ctx context.Context, req *QueryNodeMetricsReq) (*QueryNodeMetricsResp, error) {
rows, err := ms.db.QueryContext(ctx, `
SELECT timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out
FROM node_metrics
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp DESC
LIMIT ?
`, req.StartTimestamp, req.EndTimestamp, req.Num)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var resp QueryNodeMetricsResp
for rows.Next() {
var m NodeMetrics
if err := rows.Scan(&m.Timestamp, &m.CPUUsage, &m.MemoryUsage, &m.DiskUsage, &m.NetworkIn, &m.NetworkOut); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
}
resp.TOTAL = len(resp.Data)
return &resp, nil
}
func (ms *MetricsStore) QueryRuleMetric(ctx context.Context, req *QueryRuleMetricsReq) (*QueryRuleMetricsResp, error) {
query := `
SELECT timestamp, label, remote, ping_latency,
tcp_connection_count, tcp_handshake_duration, tcp_network_transmit_bytes,
udp_connection_count, udp_handshake_duration, udp_network_transmit_bytes
FROM rule_metrics
WHERE timestamp >= ? AND timestamp <= ?
`
args := []interface{}{req.StartTimestamp, req.EndTimestamp}
if req.RuleLabel != "" {
query += " AND label = ?"
args = append(args, req.RuleLabel)
}
if req.Remote != "" {
query += " AND remote = ?"
args = append(args, req.Remote)
}
query += `
ORDER BY timestamp DESC
LIMIT ?
`
args = append(args, req.Num)
rows, err := ms.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var resp QueryRuleMetricsResp
for rows.Next() {
var m RuleMetricsData
if err := rows.Scan(&m.Timestamp, &m.Label, &m.Remote, &m.PingLatency,
&m.TCPConnectionCount, &m.TCPHandshakeDuration, &m.TCPNetworkTransmitBytes,
&m.UDPConnectionCount, &m.UDPHandshakeDuration, &m.UDPNetworkTransmitBytes); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
}
resp.TOTAL = len(resp.Data)
return &resp, nil
}

View File

@@ -8,31 +8,8 @@ import (
"go.uber.org/zap"
_ "modernc.org/sqlite"
"github.com/Ehco1996/ehco/pkg/metric_reader"
)
type NodeMetrics struct {
Timestamp int64 `json:"timestamp"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
DiskUsage float64 `json:"disk_usage"`
NetworkIn float64 `json:"network_in"`
NetworkOut float64 `json:"network_out"`
}
type QueryNodeMetricsReq struct {
StartTimestamp int64 `json:"start_ts"`
EndTimestamp int64 `json:"end_ts"`
Latest bool `json:"latest"` // whether to refresh the cache and get the latest data
}
type QueryNodeMetricsResp struct {
TOTAL int `json:"total"`
Data []NodeMetrics `json:"data"`
}
type MetricsStore struct {
db *sql.DB
dbPath string
@@ -65,12 +42,34 @@ func NewMetricsStore(dbPath string) (*MetricsStore, error) {
if err := ms.initDB(); err != nil {
return nil, err
}
if err := ms.cleanOldData(); err != nil {
return nil, err
}
return ms, nil
}
func (ms *MetricsStore) cleanOldData() error {
thirtyDaysAgo := time.Now().AddDate(0, 0, -30).Unix()
// 清理 node_metrics 表
_, err := ms.db.Exec("DELETE FROM node_metrics WHERE timestamp < ?", thirtyDaysAgo)
if err != nil {
return err
}
// 清理 rule_metrics 表
_, err = ms.db.Exec("DELETE FROM rule_metrics WHERE timestamp < ?", thirtyDaysAgo)
if err != nil {
return err
}
ms.l.Infof("Cleaned data older than 30 days")
return nil
}
func (ms *MetricsStore) initDB() error {
// init NodeMetrics table
_, err := ms.db.Exec(`
if _, err := ms.db.Exec(`
CREATE TABLE IF NOT EXISTS node_metrics (
timestamp INTEGER,
cpu_usage REAL,
@@ -80,39 +79,27 @@ func (ms *MetricsStore) initDB() error {
network_out REAL,
PRIMARY KEY (timestamp)
)
`)
return err
}
func (ms *MetricsStore) AddNodeMetric(m *metric_reader.NodeMetrics) error {
_, err := ms.db.Exec(`
INSERT OR REPLACE INTO node_metrics (timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?)
`, m.SyncTime.Unix(), m.CpuUsagePercent, m.MemoryUsagePercent, m.DiskUsagePercent, m.NetworkReceiveBytesRate, m.NetworkTransmitBytesRate)
return err
}
func (ms *MetricsStore) QueryNodeMetric(startTime, endTime time.Time, num int) (*QueryNodeMetricsResp, error) {
rows, err := ms.db.Query(`
SELECT timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out
FROM node_metrics
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp DESC
LIMIT ?
`, startTime.Unix(), endTime.Unix(), num)
if err != nil {
return nil, err
`); err != nil {
return err
}
defer rows.Close() //nolint:errcheck
var resp QueryNodeMetricsResp
for rows.Next() {
var m NodeMetrics
if err := rows.Scan(&m.Timestamp, &m.CPUUsage, &m.MemoryUsage, &m.DiskUsage, &m.NetworkIn, &m.NetworkOut); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
// init rule_metrics
if _, err := ms.db.Exec(`
CREATE TABLE IF NOT EXISTS rule_metrics (
timestamp INTEGER,
label TEXT,
remote TEXT,
ping_latency INTEGER,
tcp_connection_count INTEGER,
tcp_handshake_duration INTEGER,
tcp_network_transmit_bytes INTEGER,
udp_connection_count INTEGER,
udp_handshake_duration INTEGER,
udp_network_transmit_bytes INTEGER,
PRIMARY KEY (timestamp, label, remote)
)
`); err != nil {
return err
}
resp.TOTAL = len(resp.Data)
return &resp, nil
return nil
}

View File

@@ -45,14 +45,19 @@ func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
nm, rmm, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
if err := cm.ms.AddNodeMetric(metrics); err != nil {
req.Node = *nm
if err := cm.ms.AddNodeMetric(ctx, nm); err != nil {
cm.l.Errorf("add metrics to store failed: %v", err)
}
for _, rm := range rmm {
if err := cm.ms.AddRuleMetric(ctx, rm); err != nil {
cm.l.Errorf("add rule metrics to store failed: %v", err)
}
}
}
}

View File

@@ -209,14 +209,12 @@ func (c *innerConn) recordStats(n int, isRead bool) {
return
}
if isRead {
metrics.NetWorkTransmitBytes.WithLabelValues(
c.rc.remote.Label, metrics.METRIC_CONN_TYPE_TCP, metrics.METRIC_CONN_FLOW_READ,
).Add(float64(n))
labels := []string{c.rc.RelayLabel, c.rc.ConnType, metrics.METRIC_FLOW_READ, c.rc.remote.Address}
metrics.NetWorkTransmitBytes.WithLabelValues(labels...).Add(float64(n))
c.rc.Stats.Record(0, int64(n))
} else {
metrics.NetWorkTransmitBytes.WithLabelValues(
c.rc.remote.Label, metrics.METRIC_CONN_TYPE_TCP, metrics.METRIC_CONN_FLOW_WRITE,
).Add(float64(n))
labels := []string{c.rc.RelayLabel, c.rc.ConnType, metrics.METRIC_FLOW_WRITE, c.rc.remote.Address}
metrics.NetWorkTransmitBytes.WithLabelValues(labels...).Add(float64(n))
c.rc.Stats.Record(int64(n), 0)
}
}
@@ -236,7 +234,7 @@ func (c *innerConn) Read(p []byte) (n int, err error) {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
since := time.Since(c.lastActive)
if since > c.rc.Options.IdleTimeout {
c.l.Debugf("Read idle, close remote: %s", c.rc.remote.Label)
c.l.Debugf("Read idle, close remote: %s", c.rc.remote.Address)
return 0, ErrIdleTimeout
}
continue

View File

@@ -24,7 +24,7 @@ func TestInnerConn_ReadWrite(t *testing.T) {
serverConn.SetDeadline(time.Now().Add(1 * time.Second))
defer clientConn.Close()
defer serverConn.Close()
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{Label: "client"}, Options: &testOptions}
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{}, Options: &testOptions}
innerC := newInnerConn(clientConn, &rc)
errChan := make(chan error, 1)
go func() {
@@ -100,7 +100,7 @@ func TestCopyTCPConn(t *testing.T) {
assert.NoError(t, err)
defer remoteConn.Close()
testOptions := conf.Options{IdleTimeout: time.Second, ReadTimeout: time.Second}
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{Label: "client"}, Options: &testOptions}
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{}, Options: &testOptions}
c1 := newInnerConn(clientConn, &rc)
c2 := newInnerConn(remoteConn, &rc)
@@ -161,7 +161,7 @@ func TestCopyUDPConn(t *testing.T) {
defer remoteConn.Close()
testOptions := conf.Options{IdleTimeout: time.Second, ReadTimeout: time.Second}
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{Label: "client"}, Options: &testOptions}
rc := relayConnImpl{Stats: &Stats{}, remote: &lb.Node{}, Options: &testOptions}
c1 := newInnerConn(clientConn, &rc)
c2 := newInnerConn(remoteConn, &rc)

View File

@@ -1,6 +1,8 @@
package lb
import (
"net/url"
"strings"
"time"
"go.uber.org/atomic"
@@ -8,21 +10,38 @@ import (
type Node struct {
Address string
Label string
HandShakeDuration time.Duration
}
func (n *Node) Clone() *Node {
return &Node{
Address: n.Address,
Label: n.Label,
HandShakeDuration: n.HandShakeDuration,
}
}
func extractHost(input string) (string, error) {
// Check if the input string has a scheme, if not, add "http://"
if !strings.Contains(input, "://") {
input = "http://" + input
}
// Parse the URL
u, err := url.Parse(input)
if err != nil {
return "", err
}
return u.Hostname(), nil
}
// NOTE for (https/ws/wss)://xxx.com -> xxx.com
func (n *Node) GetAddrHost() (string, error) {
return extractHost(n.Address)
}
// RoundRobin is an interface for representing round-robin balancing.
type RoundRobin interface {
Next() *Node
GetAll() []*Node
}
type roundrobin struct {
@@ -42,3 +61,7 @@ func (r *roundrobin) Next() *Node {
next := r.nodeList[(int(n)-1)%r.len]
return next
}
func (r *roundrobin) GetAll() []*Node {
return r.nodeList
}

View File

@@ -13,53 +13,43 @@ const (
METRIC_SUBSYSTEM_TRAFFIC = "traffic"
METRIC_SUBSYSTEM_PING = "ping"
METRIC_LABEL_REMOTE = "remote"
METRIC_LABEL_CONN_FLOW = "flow"
METRIC_CONN_FLOW_WRITE = "write"
METRIC_CONN_FLOW_READ = "read"
METRIC_LABEL_CONN_TYPE = "type"
METRIC_CONN_TYPE_TCP = "tcp"
METRIC_CONN_TYPE_UDP = "udp"
METRIC_CONN_TYPE_TCP = "tcp"
METRIC_CONN_TYPE_UDP = "udp"
METRIC_FLOW_READ = "read"
METRIC_FLOW_WRITE = "write"
EhcoAliveStateInit = 0
EhcoAliveStateRunning = 1
)
var (
Hostname, _ = os.Hostname()
ConstLabels = map[string]string{
"ehco_runner_hostname": Hostname,
}
// 1ms ~ 5s (1ms 到 437ms )
msBuckets = prometheus.ExponentialBuckets(1, 1.5, 16)
)
// ping metrics
var (
pingLabelNames = []string{"ip", "host", "label"}
pingBuckets = prometheus.ExponentialBuckets(0.001, 2, 12) // 1ms ~ 4s
pingInterval = time.Second * 30
PingResponseDurationSeconds = prometheus.NewHistogramVec(
pingInterval = time.Second * 30
PingResponseDurationMilliseconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: METRIC_NS,
Subsystem: METRIC_SUBSYSTEM_PING,
Name: "response_duration_seconds",
Name: "response_duration_milliseconds",
Help: "A histogram of latencies for ping responses.",
Buckets: pingBuckets,
Buckets: msBuckets,
ConstLabels: ConstLabels,
},
pingLabelNames,
)
PingRequestTotal = prometheus.NewDesc(
prometheus.BuildFQName(METRIC_NS, METRIC_SUBSYSTEM_PING, "requests_total"),
"Number of ping requests sent",
pingLabelNames,
ConstLabels,
[]string{"label", "remote", "ip"},
)
)
// traffic metrics
var (
Hostname, _ = os.Hostname()
ConstLabels = map[string]string{
"ehco_runner_hostname": Hostname,
}
EhcoAlive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: METRIC_NS,
Subsystem: "",
@@ -74,7 +64,15 @@ var (
Name: "current_connection_count",
Help: "当前链接数",
ConstLabels: ConstLabels,
}, []string{METRIC_LABEL_REMOTE, METRIC_LABEL_CONN_TYPE})
}, []string{"label", "conn_type", "remote"})
HandShakeDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: METRIC_SUBSYSTEM_TRAFFIC,
Namespace: METRIC_NS,
Name: "handshake_duration_milliseconds",
Help: "握手时间ms",
ConstLabels: ConstLabels,
}, []string{"label", "conn_type", "remote"})
NetWorkTransmitBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: METRIC_NS,
@@ -82,15 +80,7 @@ var (
Name: "network_transmit_bytes",
Help: "传输流量总量bytes",
ConstLabels: ConstLabels,
}, []string{METRIC_LABEL_REMOTE, METRIC_LABEL_CONN_TYPE, METRIC_LABEL_CONN_FLOW})
HandShakeDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: METRIC_SUBSYSTEM_TRAFFIC,
Namespace: METRIC_NS,
Name: "handshake_duration",
Help: "握手时间ms",
ConstLabels: ConstLabels,
}, []string{METRIC_LABEL_REMOTE})
}, []string{"label", "conn_type", "flow", "remote"})
)
func RegisterEhcoMetrics(cfg *config.Config) error {
@@ -98,15 +88,14 @@ func RegisterEhcoMetrics(cfg *config.Config) error {
prometheus.MustRegister(EhcoAlive)
prometheus.MustRegister(CurConnectionCount)
prometheus.MustRegister(NetWorkTransmitBytes)
prometheus.MustRegister(HandShakeDuration)
prometheus.MustRegister(HandShakeDurationMilliseconds)
EhcoAlive.Set(EhcoAliveStateInit)
// ping
if cfg.EnablePing {
pg := NewPingGroup(cfg)
prometheus.MustRegister(PingResponseDurationSeconds)
prometheus.MustRegister(pg)
prometheus.MustRegister(PingResponseDurationMilliseconds)
go pg.Run()
}
return nil

View File

@@ -1,20 +1,16 @@
package metrics
import (
"fmt"
"math"
"net/url"
"runtime"
"strings"
"time"
"github.com/Ehco1996/ehco/internal/config"
"github.com/go-ping/ping"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
func (pg *PingGroup) newPinger(addr string) (*ping.Pinger, error) {
func (pg *PingGroup) newPinger(ruleLabel string, remote string, addr string) (*ping.Pinger, error) {
pinger := ping.New(addr)
if err := pinger.Resolve(); err != nil {
pg.logger.Error("failed to resolve pinger", zap.String("addr", addr), zap.Error(err))
@@ -26,6 +22,13 @@ func (pg *PingGroup) newPinger(addr string) (*ping.Pinger, error) {
if runtime.GOOS != "darwin" {
pinger.SetPrivileged(true)
}
pinger.OnRecv = func(pkt *ping.Packet) {
ip := pkt.IPAddr.String()
PingResponseDurationMilliseconds.WithLabelValues(
ruleLabel, remote, ip).Observe(float64(pkt.Rtt.Milliseconds()))
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v",
pkt.Nbytes, pkt.Addr, pkt.Seq, pkt.Rtt, pkt.Ttl)
}
return pinger, nil
}
@@ -34,89 +37,29 @@ type PingGroup struct {
// k: addr
Pingers map[string]*ping.Pinger
// k: addr v:relay rule label joined by ","
PingerLabels map[string]string
}
func extractHost(input string) (string, error) {
// Check if the input string has a scheme, if not, add "http://"
if !strings.Contains(input, "://") {
input = "http://" + input
}
// Parse the URL
u, err := url.Parse(input)
if err != nil {
return "", err
}
return u.Hostname(), nil
}
func NewPingGroup(cfg *config.Config) *PingGroup {
logger := zap.L().Named("pinger")
pg := &PingGroup{
logger: logger,
Pingers: make(map[string]*ping.Pinger),
PingerLabels: map[string]string{},
logger: zap.L().Named("pinger"),
Pingers: make(map[string]*ping.Pinger),
}
// parse addr from rule
for _, relayCfg := range cfg.RelayConfigs {
// NOTE for (https/ws/wss)://xxx.com -> xxx.com
for _, remote := range relayCfg.Remotes {
addr, err := extractHost(remote)
for _, remote := range relayCfg.GetAllRemotes() {
addr, err := remote.GetAddrHost()
if err != nil {
pg.logger.Error("try parse host error", zap.Error(err))
}
if _, ok := pg.Pingers[addr]; ok {
// append rule label when remote host is same
pg.PingerLabels[addr] += fmt.Sprintf(",%s", relayCfg.Label)
continue
}
if pinger, err := pg.newPinger(addr); err != nil {
if pinger, err := pg.newPinger(relayCfg.Label, remote.Address, addr); err != nil {
pg.logger.Error("new pinger meet error", zap.Error(err))
} else {
pg.Pingers[pinger.Addr()] = pinger
pg.PingerLabels[addr] = relayCfg.Label
pg.Pingers[addr] = pinger
}
}
}
// update metrics
for addr, pinger := range pg.Pingers {
pinger.OnRecv = func(pkt *ping.Packet) {
PingResponseDurationSeconds.WithLabelValues(
pkt.IPAddr.String(), pkt.Addr, pg.PingerLabels[addr]).Observe(pkt.Rtt.Seconds())
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v",
pkt.Nbytes, pkt.Addr, pkt.Seq, pkt.Rtt, pkt.Ttl)
}
pinger.OnDuplicateRecv = func(pkt *ping.Packet) {
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v (DUP!)",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
}
}
return pg
}
func (pg *PingGroup) Describe(ch chan<- *prometheus.Desc) {
ch <- PingRequestTotal
}
func (pg *PingGroup) Collect(ch chan<- prometheus.Metric) {
for addr, pinger := range pg.Pingers {
stats := pinger.Statistics()
ch <- prometheus.MustNewConstMetric(
PingRequestTotal,
prometheus.CounterValue,
float64(stats.PacketsSent),
stats.IPAddr.String(),
stats.Addr,
pg.PingerLabels[addr],
)
}
}
func (pg *PingGroup) Run() {
if len(pg.Pingers) <= 0 {
return

View File

@@ -179,14 +179,16 @@ func (r *Config) DefaultLabel() string {
func (r *Config) ToRemotesLB() lb.RoundRobin {
tcpNodeList := make([]*lb.Node, len(r.Remotes))
for idx, addr := range r.Remotes {
tcpNodeList[idx] = &lb.Node{
Address: addr,
Label: fmt.Sprintf("%s-%s", r.Label, addr),
}
tcpNodeList[idx] = &lb.Node{Address: addr}
}
return lb.NewRoundRobin(tcpNodeList)
}
func (r *Config) GetAllRemotes() []*lb.Node {
lb := r.ToRemotesLB()
return lb.GetAll()
}
func (r *Config) GetLoggerName() string {
return fmt.Sprintf("%s(%s<->%s)", r.Label, r.ListenType, r.TransportType)
}

View File

@@ -44,8 +44,9 @@ func newBaseRelayServer(cfg *conf.Config, cmgr cmgr.Cmgr) (*BaseRelayServer, err
}
func (b *BaseRelayServer) RelayTCPConn(ctx context.Context, c net.Conn, remote *lb.Node) error {
metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Inc()
defer metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Dec()
labels := []string{b.cfg.Label, metrics.METRIC_CONN_TYPE_TCP, remote.Address}
metrics.CurConnectionCount.WithLabelValues(labels...).Inc()
defer metrics.CurConnectionCount.WithLabelValues(labels...).Dec()
if err := b.checkConnectionLimit(); err != nil {
return err
@@ -68,8 +69,9 @@ func (b *BaseRelayServer) RelayTCPConn(ctx context.Context, c net.Conn, remote *
}
func (b *BaseRelayServer) RelayUDPConn(ctx context.Context, c net.Conn, remote *lb.Node) error {
metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_UDP).Inc()
defer metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_UDP).Dec()
labels := []string{b.cfg.Label, metrics.METRIC_CONN_TYPE_UDP, remote.Address}
metrics.CurConnectionCount.WithLabelValues(labels...).Inc()
defer metrics.CurConnectionCount.WithLabelValues(labels...).Dec()
rc, err := b.relayer.HandShake(ctx, remote, false)
if err != nil {

View File

@@ -47,7 +47,12 @@ func (raw *RawClient) HandShake(ctx context.Context, remote *lb.Node, isTCP bool
return nil, err
}
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
connType := metrics.METRIC_CONN_TYPE_TCP
if !isTCP {
connType = metrics.METRIC_CONN_TYPE_UDP
}
labels := []string{raw.cfg.Label, connType, remote.Address}
metrics.HandShakeDurationMilliseconds.WithLabelValues(labels...).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return rc, nil
}

View File

@@ -67,7 +67,12 @@ func (s *WsClient) HandShake(ctx context.Context, remote *lb.Node, isTCP bool) (
return nil, err
}
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
connType := metrics.METRIC_CONN_TYPE_TCP
if !isTCP {
connType = metrics.METRIC_CONN_TYPE_UDP
}
labels := []string{s.cfg.Label, connType, remote.Address}
metrics.HandShakeDurationMilliseconds.WithLabelValues(labels...).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
c := conn.NewWSConn(wsc, false)
return c, nil
@@ -97,7 +102,7 @@ func (s *WsServer) handleRequest(w http.ResponseWriter, req *http.Request) {
var remote *lb.Node
if addr := req.URL.Query().Get(conf.WS_QUERY_REMOTE_ADDR); addr != "" {
remote = &lb.Node{Address: addr, Label: addr}
remote = &lb.Node{Address: addr}
} else {
remote = s.remotes.Next()
}

View File

@@ -0,0 +1,134 @@
package web
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/Ehco1996/ehco/internal/cmgr/ms"
"github.com/labstack/echo/v4"
)
const (
defaultTimeRange = 60 // seconds
errInvalidParam = "invalid parameter: %s"
)
type queryParams struct {
startTS int64
endTS int64
refresh bool
}
func parseQueryParams(c echo.Context) (*queryParams, error) {
now := time.Now().Unix()
params := &queryParams{
startTS: now - defaultTimeRange,
endTS: now,
refresh: false,
}
if start, err := parseTimestamp(c.QueryParam("start_ts")); err == nil {
params.startTS = start
}
if end, err := parseTimestamp(c.QueryParam("end_ts")); err == nil {
params.endTS = end
}
if refresh, err := strconv.ParseBool(c.QueryParam("latest")); err == nil {
params.refresh = refresh
}
if params.startTS >= params.endTS {
return nil, fmt.Errorf(errInvalidParam, "time range")
}
return params, nil
}
func parseTimestamp(s string) (int64, error) {
if s == "" {
return 0, fmt.Errorf("empty timestamp")
}
return strconv.ParseInt(s, 10, 64)
}
func (s *Server) GetNodeMetrics(c echo.Context) error {
params, err := parseQueryParams(c)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
req := &ms.QueryNodeMetricsReq{StartTimestamp: params.startTS, EndTimestamp: params.endTS, Num: -1}
if params.refresh {
req.Num = 1
}
metrics, err := s.connMgr.QueryNodeMetrics(c.Request().Context(), req, params.refresh)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
return c.JSON(http.StatusOK, metrics)
}
func (s *Server) GetRuleMetrics(c echo.Context) error {
params, err := parseQueryParams(c)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
req := &ms.QueryRuleMetricsReq{
StartTimestamp: params.startTS,
EndTimestamp: params.endTS,
Num: -1,
RuleLabel: c.QueryParam("label"),
Remote: c.QueryParam("remote"),
}
if params.refresh {
req.Num = 1
}
metrics, err := s.connMgr.QueryRuleMetrics(c.Request().Context(), req, params.refresh)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}
return c.JSON(http.StatusOK, metrics)
}
func (s *Server) CurrentConfig(c echo.Context) error {
ret, err := json.Marshal(s.cfg)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return c.JSONBlob(http.StatusOK, ret)
}
func (s *Server) HandleReload(c echo.Context) error {
if s.Reloader == nil {
return echo.NewHTTPError(http.StatusBadRequest, "reload not support")
}
err := s.Reloader.Reload(true)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if _, err := c.Response().Write([]byte("reload success")); err != nil {
s.l.Errorf("write response meet err=%v", err)
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return nil
}
func (s *Server) HandleHealthCheck(c echo.Context) error {
relayLabel := c.QueryParam("relay_label")
if relayLabel == "" {
return echo.NewHTTPError(http.StatusBadRequest, "relay_label is required")
}
latency, err := s.HealthCheck(c.Request().Context(), relayLabel)
if err != nil {
res := HealthCheckResp{Message: err.Error(), ErrorCode: -1}
return c.JSON(http.StatusBadRequest, res)
}
return c.JSON(http.StatusOK, HealthCheckResp{Message: "connect success", Latency: latency})
}

View File

@@ -1,13 +1,10 @@
package web
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/Ehco1996/ehco/internal/cmgr/ms"
"github.com/Ehco1996/ehco/internal/config"
"github.com/Ehco1996/ehco/internal/constant"
"github.com/labstack/echo/v4"
@@ -42,44 +39,6 @@ func (s *Server) index(c echo.Context) error {
return c.Render(http.StatusOK, "index.html", data)
}
func (s *Server) HandleReload(c echo.Context) error {
if s.Reloader == nil {
return echo.NewHTTPError(http.StatusBadRequest, "reload not support")
}
err := s.Reloader.Reload(true)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
if _, err := c.Response().Write([]byte("reload success")); err != nil {
s.l.Errorf("write response meet err=%v", err)
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return nil
}
func (s *Server) HandleHealthCheck(c echo.Context) error {
relayLabel := c.QueryParam("relay_label")
if relayLabel == "" {
return echo.NewHTTPError(http.StatusBadRequest, "relay_label is required")
}
latency, err := s.HealthCheck(c.Request().Context(), relayLabel)
if err != nil {
res := HealthCheckResp{Message: err.Error(), ErrorCode: -1}
return c.JSON(http.StatusBadRequest, res)
}
return c.JSON(http.StatusOK, HealthCheckResp{Message: "connect success", Latency: latency})
}
func (s *Server) CurrentConfig(c echo.Context) error {
ret, err := json.Marshal(s.cfg)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return c.JSONBlob(http.StatusOK, ret)
}
func (s *Server) ListConnections(c echo.Context) error {
pageStr := c.QueryParam("page")
page, err := strconv.Atoi(pageStr)
@@ -126,36 +85,12 @@ func (s *Server) ListRules(c echo.Context) error {
})
}
func (s *Server) GetNodeMetrics(c echo.Context) error {
startTS := time.Now().Unix() - 60
if c.QueryParam("start_ts") != "" {
star, err := strconv.ParseInt(c.QueryParam("start_ts"), 10, 64)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
startTS = star
}
endTS := time.Now().Unix()
if c.QueryParam("end_ts") != "" {
end, err := strconv.ParseInt(c.QueryParam("end_ts"), 10, 64)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
endTS = end
}
req := &ms.QueryNodeMetricsReq{StartTimestamp: startTS, EndTimestamp: endTS}
latest := c.QueryParam("latest")
if latest != "" {
r, err := strconv.ParseBool(latest)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
req.Latest = r
}
metrics, err := s.connMgr.QueryNodeMetrics(c.Request().Context(), req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return c.JSON(http.StatusOK, metrics)
func (s *Server) RuleMetrics(c echo.Context) error {
return c.Render(http.StatusOK, "rule_metrics.html", map[string]interface{}{
"Configs": s.cfg.RelayConfigs,
})
}
func (s *Server) LogsPage(c echo.Context) error {
return c.Render(http.StatusOK, "logs.html", nil)
}

View File

@@ -0,0 +1,33 @@
package web
import (
"net"
"github.com/Ehco1996/ehco/pkg/log"
"github.com/gobwas/ws"
"github.com/labstack/echo/v4"
)
func (s *Server) handleWebSocketLogs(c echo.Context) error {
conn, _, _, err := ws.UpgradeHTTP(c.Request(), c.Response())
if err != nil {
return err
}
defer conn.Close()
log.SetWebSocketConn(conn)
// 保持连接打开并处理可能的入站消息
for {
_, err := ws.ReadFrame(conn)
if err != nil {
if _, ok := err.(net.Error); ok {
// 处理网络错误
s.l.Errorf("WebSocket read error: %v", err)
}
break
}
}
log.SetWebSocketConn(nil)
return nil
}

View File

@@ -1,393 +0,0 @@
const MetricsModule = (function () {
// Constants
const API_BASE_URL = '/api/v1';
const NODE_METRICS_PATH = '/node_metrics/';
const BYTE_TO_MB = 1024 * 1024;
const handleError = (error) => {
console.error('Error:', error);
};
// API functions
const fetchData = async (path, params = {}) => {
const url = new URL(API_BASE_URL + path, window.location.origin);
Object.entries(params).forEach(([key, value]) => url.searchParams.append(key, value));
try {
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return await response.json();
} catch (error) {
handleError(error);
return null;
}
};
const fetchLatestMetric = () => fetchData(NODE_METRICS_PATH, { latest: true }).then((data) => data?.data[0]);
const fetchMetrics = (startTs, endTs) => fetchData(NODE_METRICS_PATH, { start_ts: startTs, end_ts: endTs }).then((data) => data?.data);
// Chart functions
const initChart = (canvasId, type, datasets, legendPosition = '', yDisplayText = '', title = '', unit = '') => {
const ctx = $(`#${canvasId}`)[0].getContext('2d');
const colors = {
cpu: 'rgba(255, 99, 132, 1)',
memory: 'rgba(54, 162, 235, 1)',
disk: 'rgba(255, 206, 86, 1)',
receive: 'rgba(0, 150, 255, 1)',
transmit: 'rgba(255, 140, 0, 1)',
};
const getDatasetConfig = (label) => {
const color = colors[label.toLowerCase()] || 'rgba(0, 0, 0, 1)';
return {
label,
borderColor: color,
backgroundColor: color.replace('1)', '0.2)'),
borderWidth: 2,
pointRadius: 2,
pointHoverRadius: 2,
fill: true,
data: [],
};
};
const data = {
labels: [],
datasets: $.isArray(datasets) ? datasets.map((dataset) => getDatasetConfig(dataset.label)) : [getDatasetConfig(datasets.label)],
};
return new Chart(ctx, {
type,
data,
options: {
line: {
spanGaps: false, // 设置为 false不连接空值
},
responsive: true,
plugins: {
legend: { position: legendPosition },
title: {
display: !!title,
text: title,
position: 'bottom',
font: { size: 14, weight: 'bold' },
},
tooltip: {
callbacks: {
title: function (tooltipItems) {
return new Date(tooltipItems[0].label).toLocaleString();
},
label: function (context) {
let label = context.dataset.label || '';
if (label) {
label += ': ';
}
if (context.parsed.y !== null) {
label += context.parsed.y.toFixed(2) + ' ' + unit;
}
return label;
},
},
},
},
scales: {
x: {
type: 'time',
time: {
unit: 'minute',
displayFormats: {
minute: 'HH:mm',
},
},
ticks: {
maxRotation: 0,
autoSkip: true,
maxTicksLimit: 10,
},
adapters: {
date: {
locale: 'en',
},
},
},
y: {
beginAtZero: true,
title: { display: true, text: yDisplayText, font: { weight: 'bold' } },
},
},
elements: { line: { tension: 0.4 } },
downsample: {
enabled: true,
threshold: 200,
},
},
});
};
const updateChart = (chart, newData, labels) => {
if (!newData || !labels) {
console.error('Invalid data or labels provided');
return;
}
if ($.isArray(newData) && $.isArray(newData[0])) {
$.each(chart.data.datasets, (index, dataset) => {
if (newData[index]) {
dataset.data = newData[index].map((value, i) => ({ x: moment(labels[i]), y: value }));
}
});
} else {
chart.data.datasets[0].data = newData.map((value, i) => ({ x: moment(labels[i]), y: value }));
}
chart.options.scales.x.min = moment(labels[0]);
chart.options.scales.x.max = moment(labels[labels.length - 1]);
chart.update();
};
const updateCharts = (charts, metrics, startTs, endTs) => {
console.log('Raw metrics data:', metrics);
const generateTimestamps = (start, end) => {
const timestamps = [];
let current = moment.unix(start);
const endMoment = moment.unix(end);
while (current.isSameOrBefore(endMoment)) {
timestamps.push(current.toISOString());
current.add(1, 'minute');
}
return timestamps;
};
const timestamps = generateTimestamps(startTs, endTs);
const processData = (dataKey) => {
const data = new Array(timestamps.length).fill(null);
metrics.forEach((metric) => {
const index = Math.floor((metric.timestamp - startTs) / 60);
if (index >= 0 && index < data.length) {
data[index] = metric[dataKey];
}
});
return data;
};
updateChart(charts.cpu, processData('cpu_usage'), timestamps);
updateChart(charts.memory, processData('memory_usage'), timestamps);
updateChart(charts.disk, processData('disk_usage'), timestamps);
updateChart(
charts.network,
[
processData('network_in').map((v) => (v === null ? null : v / BYTE_TO_MB)),
processData('network_out').map((v) => (v === null ? null : v / BYTE_TO_MB)),
],
timestamps
);
};
const addLatestDataToCharts = (charts, latestMetric) => {
console.log('Raw latestMetric data:', latestMetric);
const timestamp = moment.unix(latestMetric.timestamp);
$.each(charts, (key, chart) => {
// 检查是否已经有这个时间戳的数据
const existingDataIndex = chart.data.labels.findIndex((label) => label.isSame(timestamp));
if (existingDataIndex === -1) {
// 如果是新数据,添加到末尾
chart.data.labels.push(timestamp);
if (key === 'network') {
chart.data.datasets[0].data.push({ x: timestamp, y: latestMetric.network_in / BYTE_TO_MB });
chart.data.datasets[1].data.push({ x: timestamp, y: latestMetric.network_out / BYTE_TO_MB });
} else {
chart.data.datasets[0].data.push({ x: timestamp, y: latestMetric[`${key}_usage`] });
}
// 更新x轴范围但保持一定的时间窗口
const timeWindow = moment.duration(30, 'minutes'); // 设置显示的时间窗口例如30分钟
const oldestAllowedTime = moment(timestamp).subtract(timeWindow);
chart.options.scales.x.min = oldestAllowedTime;
chart.options.scales.x.max = timestamp;
// 开启图表的平移和缩放功能
chart.options.plugins.zoom = {
pan: {
enabled: true,
mode: 'x',
},
zoom: {
wheel: {
enabled: true,
},
pinch: {
enabled: true,
},
mode: 'x',
},
};
chart.update();
}
// 如果数据已存在,我们不做任何操作,保持现有数据
});
};
// Chart initialization
const initializeCharts = async () => {
const metric = await fetchLatestMetric();
if (!metric) return null;
return {
cpu: initChart('cpuChart', 'line', { label: 'CPU' }, 'top', 'Usage (%)', `CPU`, '%'),
memory: initChart('memoryChart', 'line', { label: 'Memory' }, 'top', 'Usage (%)', `Memory`, '%'),
disk: initChart('diskChart', 'line', { label: 'Disk' }, 'top', 'Usage (%)', `Disk`, '%'),
network: initChart(
'networkChart',
'line',
[{ label: 'Receive' }, { label: 'Transmit' }],
'top',
'Rate (MB/s)',
'Network Rate',
'MB/s'
),
};
};
// Date range functions
const setupDateRangeDropdown = (charts) => {
const $dateRangeDropdown = $('#dateRangeDropdown');
const $dateRangeButton = $('#dateRangeButton');
const $dateRangeText = $('#dateRangeText');
const $dateRangeInput = $('#dateRangeInput');
$dateRangeDropdown.find('.dropdown-item[data-range]').on('click', function (e) {
e.preventDefault();
const range = $(this).data('range');
const now = new Date();
let start, end;
switch (range) {
case '30m':
start = new Date(now - 30 * 60 * 1000);
break;
case '1h':
start = new Date(now - 60 * 60 * 1000);
break;
case '3h':
start = new Date(now - 3 * 60 * 60 * 1000);
break;
case '6h':
start = new Date(now - 6 * 60 * 60 * 1000);
break;
case '12h':
start = new Date(now - 12 * 60 * 60 * 1000);
break;
case '24h':
start = new Date(now - 24 * 60 * 60 * 1000);
break;
case '7d':
start = new Date(now - 7 * 24 * 60 * 60 * 1000);
break;
}
end = now;
const startTs = Math.floor(start.getTime() / 1000);
const endTs = Math.floor(end.getTime() / 1000);
fetchDataForRange(charts, startTs, endTs);
$dateRangeText.text($(this).text());
$dateRangeDropdown.removeClass('is-active');
});
$dateRangeButton.on('click', (event) => {
event.stopPropagation();
$dateRangeDropdown.toggleClass('is-active');
});
$(document).on('click', (event) => {
if (!$dateRangeDropdown.has(event.target).length) {
$dateRangeDropdown.removeClass('is-active');
}
});
const picker = flatpickr($dateRangeInput[0], {
mode: 'range',
enableTime: true,
dateFormat: 'Y-m-d H:i',
onChange: function (selectedDates) {
if (selectedDates.length === 2) {
const startTs = Math.floor(selectedDates[0].getTime() / 1000);
const endTs = Math.floor(selectedDates[1].getTime() / 1000);
fetchDataForRange(charts, startTs, endTs);
const formattedStart = selectedDates[0].toLocaleString();
const formattedEnd = selectedDates[1].toLocaleString();
$dateRangeText.text(`${formattedStart} - ${formattedEnd}`);
// 关闭下拉菜单
$dateRangeDropdown.removeClass('is-active');
}
},
onClose: function () {
// 确保在日期选择器关闭时也关闭下拉菜单
$dateRangeDropdown.removeClass('is-active');
},
});
// 防止点击日期选择器时关闭下拉菜单
$dateRangeInput.on('click', (event) => {
event.stopPropagation();
});
};
const fetchDataForRange = async (charts, startTs, endTs) => {
const metrics = await fetchMetrics(startTs, endTs);
if (metrics) {
console.log('Raw metrics data:', metrics);
updateCharts(charts, metrics, startTs, endTs);
}
};
// Auto refresh functions
const setupAutoRefresh = (charts) => {
let autoRefreshInterval;
let isAutoRefreshing = false;
$('#refreshButton').click(function () {
if (isAutoRefreshing) {
clearInterval(autoRefreshInterval);
$(this).removeClass('is-info');
$(this).find('span:last').text('Auto Refresh');
isAutoRefreshing = false;
} else {
$(this).addClass('is-info');
$(this).find('span:last').text('Stop Refresh');
isAutoRefreshing = true;
refreshData(charts);
autoRefreshInterval = setInterval(() => refreshData(charts), 5000);
}
});
};
const refreshData = async (charts) => {
const latestMetric = await fetchLatestMetric();
if (latestMetric) {
addLatestDataToCharts(charts, latestMetric);
}
};
// Main initialization function
const init = async () => {
const charts = await initializeCharts();
if (charts) {
setupDateRangeDropdown(charts);
setupAutoRefresh(charts);
}
};
// Public API
return {
init: init,
};
})();
// Initialize when the DOM is ready
document.addEventListener('DOMContentLoaded', MetricsModule.init);

View File

@@ -0,0 +1,404 @@
const Config = {
API_BASE_URL: '/api/v1',
NODE_METRICS_PATH: '/node_metrics/',
BYTE_TO_MB: 1024 * 1024,
CHART_COLORS: {
cpu: 'rgba(255, 99, 132, 1)',
memory: 'rgba(54, 162, 235, 1)',
disk: 'rgba(255, 206, 86, 1)',
receive: 'rgba(0, 150, 255, 1)',
transmit: 'rgba(255, 140, 0, 1)',
},
TIME_WINDOW: 30, // minutes
AUTO_REFRESH_INTERVAL: 5000, // milliseconds
};
class ApiService {
static async fetchData(path, params = {}) {
const url = new URL(Config.API_BASE_URL + path, window.location.origin);
Object.entries(params).forEach(([key, value]) => url.searchParams.append(key, value));
try {
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return await response.json();
} catch (error) {
console.error('Error:', error);
return null;
}
}
static async fetchLatestMetric() {
const data = await this.fetchData(Config.NODE_METRICS_PATH, { latest: true });
return data?.data[0];
}
static async fetchMetrics(startTs, endTs) {
const data = await this.fetchData(Config.NODE_METRICS_PATH, { start_ts: startTs, end_ts: endTs });
return data?.data;
}
}
class ChartManager {
constructor() {
this.charts = {};
}
initializeCharts() {
this.charts = {
cpu: this.initChart('cpuChart', 'line', { label: 'CPU' }, 'top', 'Usage (%)', 'CPU', '%'),
memory: this.initChart('memoryChart', 'line', { label: 'Memory' }, 'top', 'Usage (%)', 'Memory', '%'),
disk: this.initChart('diskChart', 'line', { label: 'Disk' }, 'top', 'Usage (%)', 'Disk', '%'),
network: this.initChart(
'networkChart',
'line',
[{ label: 'Receive' }, { label: 'Transmit' }],
'top',
'Rate (MB/s)',
'Network Rate',
'MB/s'
),
};
}
initChart(canvasId, type, datasets, legendPosition, yDisplayText, title, unit) {
const ctx = $(`#${canvasId}`)[0].getContext('2d');
const data = {
labels: [],
datasets: Array.isArray(datasets)
? datasets.map((dataset) => this.getDatasetConfig(dataset.label))
: [this.getDatasetConfig(datasets.label)],
};
return new Chart(ctx, {
type,
data,
options: this.getChartOptions(legendPosition, yDisplayText, title, unit),
});
}
getDatasetConfig(label) {
const color = Config.CHART_COLORS[label.toLowerCase()] || 'rgba(0, 0, 0, 1)';
return {
label,
borderColor: color,
backgroundColor: color.replace('1)', '0.2)'),
borderWidth: 2,
pointRadius: 2,
pointHoverRadius: 2,
fill: true,
data: [],
};
}
getChartOptions(legendPosition, yDisplayText, title, unit) {
return {
line: { spanGaps: false },
responsive: true,
plugins: {
legend: { position: legendPosition },
title: {
display: !!title,
text: title,
position: 'bottom',
font: { size: 14, weight: 'bold' },
},
tooltip: {
callbacks: {
title: (tooltipItems) => new Date(tooltipItems[0].label).toLocaleString(),
label: (context) => {
let label = context.dataset.label || '';
if (label) {
label += ': ';
}
if (context.parsed.y !== null) {
label += context.parsed.y.toFixed(2) + ' ' + unit;
}
return label;
},
},
},
zoom: {
pan: { enabled: true, mode: 'x' },
zoom: {
wheel: { enabled: true },
pinch: { enabled: true },
mode: 'x',
},
},
},
scales: {
x: {
type: 'time',
time: {
unit: 'minute',
displayFormats: { minute: 'HH:mm' },
},
ticks: {
maxRotation: 0,
autoSkip: true,
maxTicksLimit: 10,
},
adapters: {
date: { locale: 'en' },
},
},
y: {
beginAtZero: true,
title: { display: true, text: yDisplayText, font: { weight: 'bold' } },
},
},
elements: { line: { tension: 0.4 } },
downsample: {
enabled: true,
threshold: 200,
},
};
}
updateCharts(metrics, startTs, endTs) {
const timestamps = this.generateTimestamps(startTs, endTs);
const processData = (dataKey) => {
const data = new Array(timestamps.length).fill(null);
metrics.forEach((metric) => {
const index = Math.floor((metric.timestamp - startTs) / 60);
if (index >= 0 && index < data.length) {
data[index] = metric[dataKey];
}
});
return data;
};
this.updateChart(this.charts.cpu, processData('cpu_usage'), timestamps);
this.updateChart(this.charts.memory, processData('memory_usage'), timestamps);
this.updateChart(this.charts.disk, processData('disk_usage'), timestamps);
this.updateChart(
this.charts.network,
[
processData('network_in').map((v) => (v === null ? null : v / Config.BYTE_TO_MB)),
processData('network_out').map((v) => (v === null ? null : v / Config.BYTE_TO_MB)),
],
timestamps
);
}
updateChart(chart, newData, labels) {
if (!newData || !labels) {
console.error('Invalid data or labels provided');
return;
}
if (Array.isArray(newData) && Array.isArray(newData[0])) {
chart.data.datasets.forEach((dataset, index) => {
if (newData[index]) {
dataset.data = newData[index].map((value, i) => ({ x: moment(labels[i]), y: value }));
}
});
} else {
chart.data.datasets[0].data = newData.map((value, i) => ({ x: moment(labels[i]), y: value }));
}
chart.options.scales.x.min = moment(labels[0]);
chart.options.scales.x.max = moment(labels[labels.length - 1]);
chart.update();
}
addLatestDataToCharts(latestMetric) {
const timestamp = moment.unix(latestMetric.timestamp);
Object.entries(this.charts).forEach(([key, chart]) => {
const existingDataIndex = chart.data.labels.findIndex((label) => label.isSame(timestamp));
if (existingDataIndex === -1) {
chart.data.labels.push(timestamp);
if (key === 'network') {
chart.data.datasets[0].data.push({ x: timestamp, y: latestMetric.network_in / Config.BYTE_TO_MB });
chart.data.datasets[1].data.push({ x: timestamp, y: latestMetric.network_out / Config.BYTE_TO_MB });
} else {
chart.data.datasets[0].data.push({ x: timestamp, y: latestMetric[`${key}_usage`] });
}
const timeWindow = moment.duration(Config.TIME_WINDOW, 'minutes');
const oldestAllowedTime = moment(timestamp).subtract(timeWindow);
chart.options.scales.x.min = oldestAllowedTime;
chart.options.scales.x.max = timestamp;
chart.update();
}
});
}
generateTimestamps(start, end) {
const timestamps = [];
let current = moment.unix(start);
const endMoment = moment.unix(end);
while (current.isSameOrBefore(endMoment)) {
timestamps.push(current.toISOString());
current.add(1, 'minute');
}
return timestamps;
}
}
class DateRangeManager {
constructor(chartManager) {
this.chartManager = chartManager;
this.$dateRangeDropdown = $('#dateRangeDropdown');
this.$dateRangeButton = $('#dateRangeButton');
this.$dateRangeText = $('#dateRangeText');
this.$dateRangeInput = $('#dateRangeInput');
this.setupEventListeners();
}
setupEventListeners() {
this.$dateRangeDropdown.find('.dropdown-item[data-range]').on('click', (e) => this.handlePresetDateRange(e));
this.$dateRangeButton.on('click', (event) => this.toggleDropdown(event));
$(document).on('click', (event) => this.closeDropdownOnOutsideClick(event));
this.initializeDatePicker();
}
handlePresetDateRange(e) {
e.preventDefault();
const range = $(e.currentTarget).data('range');
const [start, end] = this.calculateDateRange(range);
this.fetchAndUpdateCharts(start, end);
this.$dateRangeText.text($(e.currentTarget).text());
this.$dateRangeDropdown.removeClass('is-active');
}
calculateDateRange(range) {
const now = new Date();
let start;
switch (range) {
case '30m':
start = new Date(now - 30 * 60 * 1000);
break;
case '1h':
start = new Date(now - 60 * 60 * 1000);
break;
case '3h':
start = new Date(now - 3 * 60 * 60 * 1000);
break;
case '6h':
start = new Date(now - 6 * 60 * 60 * 1000);
break;
case '12h':
start = new Date(now - 12 * 60 * 60 * 1000);
break;
case '24h':
start = new Date(now - 24 * 60 * 60 * 1000);
break;
case '7d':
start = new Date(now - 7 * 24 * 60 * 60 * 1000);
break;
}
return [start, now];
}
toggleDropdown(event) {
event.stopPropagation();
this.$dateRangeDropdown.toggleClass('is-active');
}
closeDropdownOnOutsideClick(event) {
if (!this.$dateRangeDropdown.has(event.target).length) {
this.$dateRangeDropdown.removeClass('is-active');
}
}
initializeDatePicker() {
flatpickr(this.$dateRangeInput[0], {
mode: 'range',
enableTime: true,
dateFormat: 'Y-m-d H:i',
onChange: (selectedDates) => this.handleDatePickerChange(selectedDates),
onClose: () => this.$dateRangeDropdown.removeClass('is-active'),
});
this.$dateRangeInput.on('click', (event) => event.stopPropagation());
}
handleDatePickerChange(selectedDates) {
if (selectedDates.length === 2) {
const [start, end] = selectedDates;
this.fetchAndUpdateCharts(start, end);
const formattedStart = start.toLocaleString();
const formattedEnd = end.toLocaleString();
this.$dateRangeText.text(`${formattedStart} - ${formattedEnd}`);
this.$dateRangeDropdown.removeClass('is-active');
}
}
async fetchAndUpdateCharts(start, end) {
const startTs = Math.floor(start.getTime() / 1000);
const endTs = Math.floor(end.getTime() / 1000);
const metrics = await ApiService.fetchMetrics(startTs, endTs);
if (metrics) {
this.chartManager.updateCharts(metrics, startTs, endTs);
}
}
}
class AutoRefreshManager {
constructor(chartManager) {
this.chartManager = chartManager;
this.autoRefreshInterval = null;
this.isAutoRefreshing = false;
this.$refreshButton = $('#refreshButton');
this.setupEventListeners();
}
setupEventListeners() {
this.$refreshButton.click(() => this.toggleAutoRefresh());
}
toggleAutoRefresh() {
if (this.isAutoRefreshing) {
this.stopAutoRefresh();
} else {
this.startAutoRefresh();
}
}
startAutoRefresh() {
this.isAutoRefreshing = true;
this.$refreshButton.addClass('is-info');
this.$refreshButton.find('span:last').text('Stop Refresh');
this.refreshData();
this.autoRefreshInterval = setInterval(() => this.refreshData(), Config.AUTO_REFRESH_INTERVAL);
}
stopAutoRefresh() {
this.isAutoRefreshing = false;
clearInterval(this.autoRefreshInterval);
this.$refreshButton.removeClass('is-info');
this.$refreshButton.find('span:last').text('Auto Refresh');
}
async refreshData() {
const latestMetric = await ApiService.fetchLatestMetric();
if (latestMetric) {
this.chartManager.addLatestDataToCharts(latestMetric);
}
}
}
class MetricsModule {
constructor() {
this.chartManager = new ChartManager();
this.dateRangeManager = new DateRangeManager(this.chartManager);
this.autoRefreshManager = new AutoRefreshManager(this.chartManager);
}
async init() {
this.chartManager.initializeCharts();
}
}
// Initialize when the DOM is ready
document.addEventListener('DOMContentLoaded', () => {
const metricsModule = new MetricsModule();
metricsModule.init();
});

View File

@@ -0,0 +1,402 @@
const Config = {
API_BASE_URL: '/api/v1',
RULE_METRICS_PATH: '/rule_metrics/',
BYTE_TO_MB: 1024 * 1024,
CHART_COLORS: {
connectionCount: 'rgba(255, 99, 132, 1)',
handshakeDuration: 'rgba(54, 162, 235, 1)',
pingLatency: 'rgba(255, 206, 86, 1)',
networkTransmitBytes: 'rgba(75, 192, 192, 1)',
},
TIME_WINDOW: 30, // minutes
AUTO_REFRESH_INTERVAL: 5000, // milliseconds
};
class ApiService {
static async fetchData(path, params = {}) {
const url = new URL(Config.API_BASE_URL + path, window.location.origin);
Object.entries(params).forEach(([key, value]) => url.searchParams.append(key, value));
try {
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return await response.json();
} catch (error) {
console.error('Error:', error);
return null;
}
}
static async fetchRuleMetrics(startTs, endTs, label = '', remote = '') {
const params = { start_ts: startTs, end_ts: endTs };
if (label) params.label = label;
if (remote) params.remote = remote;
return await this.fetchData(Config.RULE_METRICS_PATH, params);
}
static async fetchConfig() {
return await this.fetchData('/config/');
}
static async fetchLabelsAndRemotes() {
const config = await this.fetchConfig();
if (!config || !config.relay_configs) {
return { labels: [], remotes: [] };
}
const labels = new Set();
const remotes = new Set();
config.relay_configs.forEach((relayConfig) => {
if (relayConfig.label) labels.add(relayConfig.label);
if (relayConfig.remotes) {
relayConfig.remotes.forEach((remote) => remotes.add(remote));
}
});
return {
labels: Array.from(labels),
remotes: Array.from(remotes),
};
}
}
class ChartManager {
constructor() {
this.charts = {};
}
initializeCharts() {
this.charts = {
connectionCount: this.initChart('connectionCountChart', 'line', 'Connection Count', 'Count'),
handshakeDuration: this.initChart('handshakeDurationChart', 'line', 'Handshake Duration', 'ms'),
pingLatency: this.initChart('pingLatencyChart', 'line', 'Ping Latency', 'ms'),
networkTransmitBytes: this.initChart('networkTransmitBytesChart', 'line', 'Network Transmit', 'MB'),
};
}
initChart(canvasId, type, title, unit) {
const ctx = $(`#${canvasId}`)[0].getContext('2d');
const color = Config.CHART_COLORS[canvasId.replace('Chart', '')];
return new Chart(ctx, {
type: type,
data: {
labels: [],
datasets: [
{
label: title,
borderColor: color,
backgroundColor: color.replace('1)', '0.2)'),
borderWidth: 2,
data: [],
},
],
},
options: this.getChartOptions(title, unit),
});
}
getChartOptions(title, unit) {
return {
responsive: true,
plugins: {
title: {
display: true,
text: title,
font: { size: 16, weight: 'bold' },
},
tooltip: {
callbacks: {
label: (context) => `${context.dataset.label}: ${context.parsed.y.toFixed(2)} ${unit}`,
},
},
},
scales: {
x: {
type: 'time',
time: { unit: 'minute', displayFormats: { minute: 'HH:mm' } },
title: { display: true, text: 'Time' },
},
y: {
beginAtZero: true,
title: { display: true, text: unit },
},
},
};
}
fillMissingDataPoints(data, startTime, endTime) {
const filledData = [];
let currentTime = new Date(startTime);
const endTimeDate = new Date(endTime);
while (currentTime <= endTimeDate) {
const existingPoint = data.find((point) => Math.abs(point.x.getTime() - currentTime.getTime()) < 60000);
if (existingPoint) {
filledData.push(existingPoint);
} else {
filledData.push({ x: new Date(currentTime), y: null });
}
currentTime.setMinutes(currentTime.getMinutes() + 1);
}
return filledData;
}
updateCharts(metrics, startTime, endTime) {
// 检查metrics是否为null或undefined
if (!metrics) {
// 如果为null则更新所有图表为空
Object.values(this.charts).forEach((chart) => {
chart.data.datasets = [
{
label: 'No Data',
data: [],
},
];
chart.update();
});
return;
}
// 首先按时间正序排列数据
metrics.sort((a, b) => a.timestamp - b.timestamp);
// 按 label-remote 分组
const groupedMetrics = this.groupMetricsByLabelRemote(metrics);
console.log('groupedMetrics', groupedMetrics);
// 预处理所有指标的数据
const processedData = {};
Object.keys(this.charts).forEach((key) => {
processedData[key] = groupedMetrics.map((group, index) => {
const data = group.metrics.map((m) => ({
x: new Date(m.timestamp * 1000),
y: this.getMetricValue(key, m),
}));
const filledData = this.fillMissingDataPoints(data, startTime, endTime);
return {
label: `${group.label} - ${group.remote}`,
borderColor: this.getColor(index),
backgroundColor: this.getColor(index, 0.2),
borderWidth: 2,
data: filledData,
};
});
});
// 更新每个图表
Object.entries(this.charts).forEach(([key, chart]) => {
chart.data.datasets = processedData[key];
chart.update();
});
}
groupMetricsByLabelRemote(metrics) {
const groups = {};
metrics.forEach((metric) => {
const key = `${metric.label}-${metric.remote}`;
if (!groups[key]) {
groups[key] = { label: metric.label, remote: metric.remote, metrics: [] };
}
groups[key].metrics.push(metric);
});
return Object.values(groups);
}
getMetricValue(metricType, metric) {
switch (metricType) {
case 'connectionCount':
return metric.tcp_connection_count + metric.udp_connection_count;
case 'handshakeDuration':
return Math.max(metric.tcp_handshake_duration, metric.udp_handshake_duration);
case 'pingLatency':
return metric.ping_latency;
case 'networkTransmitBytes':
return (metric.tcp_network_transmit_bytes + metric.udp_network_transmit_bytes) / Config.BYTE_TO_MB;
default:
return 0;
}
}
getColor(index, alpha = 1) {
const colors = [
`rgba(255, 99, 132, ${alpha})`,
`rgba(54, 162, 235, ${alpha})`,
`rgba(255, 206, 86, ${alpha})`,
`rgba(75, 192, 192, ${alpha})`,
`rgba(153, 102, 255, ${alpha})`,
`rgba(255, 159, 64, ${alpha})`,
];
return colors[index % colors.length];
}
}
class FilterManager {
constructor(chartManager, dateRangeManager) {
this.chartManager = chartManager;
this.dateRangeManager = dateRangeManager;
this.$labelFilter = $('#labelFilter');
this.$remoteFilter = $('#remoteFilter');
this.relayConfigs = [];
this.currentStartDate = null;
this.currentEndDate = null;
this.setupEventListeners();
this.loadFilters();
}
setupEventListeners() {
this.$labelFilter.on('change', () => this.onLabelChange());
this.$remoteFilter.on('change', () => this.applyFilters());
}
async loadFilters() {
const config = await ApiService.fetchConfig();
if (config && config.relay_configs) {
this.relayConfigs = config.relay_configs;
this.populateLabelFilter();
this.onLabelChange(); // Initialize remotes for the first label
}
}
populateLabelFilter() {
const labels = [...new Set(this.relayConfigs.map((config) => config.label))];
this.populateFilter(this.$labelFilter, labels);
}
onLabelChange() {
const selectedLabel = this.$labelFilter.val();
const remotes = this.getRemotesForLabel(selectedLabel);
this.populateFilter(this.$remoteFilter, remotes);
this.applyFilters();
}
getRemotesForLabel(label) {
const config = this.relayConfigs.find((c) => c.label === label);
return config ? config.remotes : [];
}
populateFilter($select, options) {
$select.empty().append($('<option>', { value: '', text: 'All' }));
options.forEach((option) => {
$select.append($('<option>', { value: option, text: option }));
});
}
async applyFilters() {
const label = this.$labelFilter.val();
const remote = this.$remoteFilter.val();
// 使用当前保存的日期范围如果没有则使用默认的30分钟
const endDate = this.currentEndDate || new Date();
const startDate = this.currentStartDate || new Date(endDate - Config.TIME_WINDOW * 60 * 1000);
const metrics = await ApiService.fetchRuleMetrics(
Math.floor(startDate.getTime() / 1000),
Math.floor(endDate.getTime() / 1000),
label,
remote
);
this.chartManager.updateCharts(metrics.data, startDate, endDate);
}
setDateRange(start, end) {
this.currentStartDate = start;
this.currentEndDate = end;
}
}
class DateRangeManager {
constructor(chartManager, filterManager) {
this.chartManager = chartManager;
this.filterManager = filterManager;
this.$dateRangeDropdown = $('#dateRangeDropdown');
this.$dateRangeButton = $('#dateRangeButton');
this.$dateRangeText = $('#dateRangeText');
this.$dateRangeInput = $('#dateRangeInput');
this.setupEventListeners();
}
setupEventListeners() {
this.$dateRangeDropdown.find('.dropdown-item[data-range]').on('click', (e) => this.handlePresetDateRange(e));
this.$dateRangeButton.on('click', () => this.$dateRangeDropdown.toggleClass('is-active'));
$(document).on('click', (e) => {
if (!this.$dateRangeDropdown.has(e.target).length) {
this.$dateRangeDropdown.removeClass('is-active');
}
});
this.initializeDatePicker();
}
handlePresetDateRange(e) {
e.preventDefault();
const range = $(e.currentTarget).data('range');
const [start, end] = this.calculateDateRange(range);
this.fetchAndUpdateCharts(start, end);
this.$dateRangeText.text($(e.currentTarget).text());
this.$dateRangeDropdown.removeClass('is-active');
}
calculateDateRange(range) {
const now = new Date();
const start = new Date(now - this.getMillisecondsFromRange(range));
return [start, now];
}
getMillisecondsFromRange(range) {
const rangeMap = {
'30m': 30 * 60 * 1000,
'1h': 60 * 60 * 1000,
'3h': 3 * 60 * 60 * 1000,
'6h': 6 * 60 * 60 * 1000,
'12h': 12 * 60 * 60 * 1000,
'24h': 24 * 60 * 60 * 1000,
'7d': 7 * 24 * 60 * 60 * 1000,
};
return rangeMap[range] || 30 * 60 * 1000; // Default to 30 minutes
}
initializeDatePicker() {
flatpickr(this.$dateRangeInput[0], {
mode: 'range',
enableTime: true,
dateFormat: 'Y-m-d H:i',
onChange: (selectedDates) => this.handleDatePickerChange(selectedDates),
});
}
handleDatePickerChange(selectedDates) {
if (selectedDates.length === 2) {
const [start, end] = selectedDates;
this.fetchAndUpdateCharts(start, end);
this.$dateRangeText.text(`${start.toLocaleString()} - ${end.toLocaleString()}`);
this.$dateRangeDropdown.removeClass('is-active');
}
}
async fetchAndUpdateCharts(start, end) {
this.filterManager.setDateRange(start, end);
await this.filterManager.applyFilters();
}
}
class RuleMetricsModule {
constructor() {
this.chartManager = new ChartManager();
this.filterManager = new FilterManager(this.chartManager);
this.dateRangeManager = new DateRangeManager(this.chartManager, this.filterManager);
this.filterManager.dateRangeManager = this.dateRangeManager;
}
init() {
this.chartManager.initializeCharts();
this.filterManager.applyFilters();
}
}
// Initialize when the DOM is ready
$(document).ready(() => {
const ruleMetricsModule = new RuleMetricsModule();
ruleMetricsModule.init();
});

View File

@@ -165,15 +165,22 @@ func setupRoutes(s *Server) {
e.GET(metricsPath, echo.WrapHandler(promhttp.Handler()))
e.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
// web pages
e.GET(indexPath, s.index)
e.GET(connectionsPath, s.ListConnections)
e.GET(rulesPath, s.ListRules)
e.GET("/rule_metrics/", s.RuleMetrics)
e.GET("/logs/", s.LogsPage)
api := e.Group(apiPrefix)
api.GET("/config/", s.CurrentConfig)
api.POST("/config/reload/", s.HandleReload)
api.GET("/health_check/", s.HandleHealthCheck)
api.GET("/node_metrics/", s.GetNodeMetrics)
api.GET("/rule_metrics/", s.GetRuleMetrics)
// ws
e.GET("/ws/logs", s.handleWebSocketLogs)
}
func (s *Server) Start() error {

View File

@@ -16,18 +16,20 @@
<div class="navbar-start">
<a href="/rules/" class="navbar-item">
<span class="icon"><i class="fas fa-list"></i></span>
<span>Rule List</span>
<span>Rules</span>
</a>
<a href="/rule_metrics/" class="navbar-item">
<span class="icon"><i class="fas fa-chart-line"></i></span>
<span>Metrics</span>
</a>
<a href="/logs/" class="navbar-item">
<span class="icon"><i class="fas fa-file-alt"></i></span>
<span>Logs</span>
</a>
<a href="/connections/?conn_type=active/" class="navbar-item">
<span class="icon"><i class="fas fa-link"></i></span>
<span>Connections</span>
</a>
<div class="navbar-item has-dropdown is-hoverable">
<a class="navbar-link">
<span class="icon"><i class="fas fa-link"></i></span>
<span>Connections</span>
</a>
<div class="navbar-dropdown">
<a href="/connections/?conn_type=active" class="navbar-item">Active</a>
<a href="/connections/?conn_type=closed" class="navbar-item">Closed</a>
</div>
</div>
</div>
<div class="navbar-end">

View File

@@ -52,6 +52,7 @@
</div>
</div>
</div>
<!-- </div> -->
<script src="js/metrics.js"></script>
</div>
<script src="/js/node_metrics.js"></script>
</script>

View File

@@ -0,0 +1,82 @@
<div class="card" id="rule-metrics-card">
<header class="card-header is-flex is-flex-wrap-wrap">
<p class="card-header-title has-text-centered">Rule Metrics</p>
<div class="card-header-icon is-flex-grow-1 is-flex is-justify-content-space-between">
<div class="field is-horizontal mr-2">
<div class="field-label is-small mr-2">
<label class="label" for="labelFilter">Label:</label>
</div>
<div class="field-body">
<div class="field">
<div class="control">
<div class="select is-small">
<select id="labelFilter" aria-label="Filter by label">
<option value="">All Labels</option>
</select>
</div>
</div>
</div>
</div>
</div>
<div class="field is-horizontal mr-2">
<div class="field-label is-small mr-2">
<label class="label" for="remoteFilter">Remote:</label>
</div>
<div class="field-body">
<div class="field">
<div class="control">
<div class="select is-small">
<select id="remoteFilter" aria-label="Filter by remote">
<option value="">All Remotes</option>
</select>
</div>
</div>
</div>
</div>
</div>
<div class="dropdown" id="dateRangeDropdown">
<div class="dropdown-trigger">
<button class="button is-small" aria-haspopup="true" aria-controls="dropdown-menu" id="dateRangeButton">
<span id="dateRangeText">Select date range</span>
<span class="icon is-small">
<i class="fas fa-angle-down" aria-hidden="true"></i>
</span>
</button>
</div>
<div class="dropdown-menu" id="dropdown-menu" role="menu">
<div class="dropdown-content">
<a href="#" class="dropdown-item" data-range="30m">Last 30 minutes</a>
<a href="#" class="dropdown-item" data-range="1h">Last 1 hour</a>
<a href="#" class="dropdown-item" data-range="3h">Last 3 hours</a>
<a href="#" class="dropdown-item" data-range="6h">Last 6 hours</a>
<a href="#" class="dropdown-item" data-range="12h">Last 12 hours</a>
<a href="#" class="dropdown-item" data-range="24h">Last 24 hours</a>
<a href="#" class="dropdown-item" data-range="7d">Last 7 days</a>
<hr class="dropdown-divider" />
<a href="#" class="dropdown-item" id="dateRangeInput">Custom range</a>
</div>
</div>
</div>
</div>
</header>
<div class="card-content">
<div class="content">
<div class="columns is-multiline">
<div class="column is-6">
<canvas id="connectionCountChart"></canvas>
</div>
<div class="column is-6">
<canvas id="handshakeDurationChart"></canvas>
</div>
<div class="column is-12">
<canvas id="pingLatencyChart"></canvas>
</div>
<div class="column is-12">
<canvas id="networkTransmitBytesChart"></canvas>
</div>
</div>
</div>
</div>
</div>
<script src="/js/rule_metrics.js"></script>

View File

@@ -1,4 +1,4 @@
<!doctype html>
<!DOCTYPE html>
<html lang="en">
{{template "_head.html" .}}
<body>
@@ -32,7 +32,7 @@
</div>
</div>
<!-- metrics -->
{{template "_metrics.html" .}}
{{template "_node_metrics_dash.html" .}}
</div>
<!-- Footer -->

View File

@@ -0,0 +1,176 @@
<!DOCTYPE html>
<html lang="en">
{{template "_head.html" .}}
<style>
.logs-container {
height: 700px;
overflow-y: auto;
font-family: 'Fira Code', monospace;
border-radius: 6px;
background-color: #f5f5f5;
padding: 1rem;
transition: background-color 0.3s ease;
}
.log-entry {
margin-bottom: 0.5rem;
padding: 0.5rem;
border-radius: 4px;
transition: background-color 0.3s ease;
}
</style>
<body>
{{ template "_navbar.html" . }}
<section class="section">
<div class="container-fluid">
<h1 class="title is-2 mb-6">Real-time Logs</h1>
<div class="card">
<div class="card-content">
<div class="field has-addons mb-4">
<div class="control is-expanded">
<input class="input is-medium" type="text" id="filterInput" placeholder="Filter logs..." />
</div>
<div class="control">
<button class="button is-info is-medium" id="filterButton">
<span class="icon">
<i class="fas fa-filter"></i>
</span>
</button>
</div>
</div>
<div class="buttons mb-4">
<button class="button is-warning is-medium" id="pauseButton">
<span class="icon">
<i class="fas fa-pause"></i>
</span>
<span>Pause</span>
</button>
<button class="button is-danger is-medium" id="clearButton">
<span class="icon">
<i class="fas fa-trash"></i>
</span>
<span>Clear</span>
</button>
</div>
<div id="logsContainer" class="logs-container"></div>
</div>
</div>
</div>
</section>
<script>
const logsContainer = document.getElementById('logsContainer');
const filterInput = document.getElementById('filterInput');
const filterButton = document.getElementById('filterButton');
const pauseButton = document.getElementById('pauseButton');
const clearButton = document.getElementById('clearButton');
let isPaused = false;
let ws;
let filterTerm = '';
function connectWebSocket() {
ws = new WebSocket('ws://' + window.location.host + '/ws/logs');
ws.onopen = function (event) {
console.log('WebSocket connection established');
};
ws.onerror = function (event) {
console.error('WebSocket error observed:', event);
};
ws.onmessage = function (event) {
if (!isPaused) {
const logEntry = document.createElement('div');
logEntry.innerHTML = formatLogMessage(event.data);
if (shouldDisplayLog(logEntry.textContent)) {
logsContainer.appendChild(logEntry);
logsContainer.scrollTop = logsContainer.scrollHeight;
}
}
};
ws.onclose = function (event) {
console.log('WebSocket connection closed. Reconnecting...');
setTimeout(connectWebSocket, 1000);
};
}
function formatLogMessage(message) {
try {
const logEntry = JSON.parse(message);
const timestamp = logEntry.ts;
const level = logEntry.level;
const msg = logEntry.msg;
const logger = logEntry.logger;
console.log('Log entry:', logEntry);
const levelClass = getLevelClass(level);
return `
<div class="columns is-mobile">
<div class="column is-3"><span class="has-text-grey-light is-small">${timestamp}</span></div>
<div class="column is-1"><span class="tag ${levelClass} is-small">${level.toUpperCase()}</span></div>
<div class="column is-2"><span class="is-info is-light is-small">${logger}</span></div>
<div class="column"><span>${msg}</span></div>
</div>
`;
} catch (e) {
console.error('Error parsing log message:', e);
return `<div class="has-text-danger">${message}</div>`;
}
}
function getLevelClass(level) {
switch (level) {
case 'debug':
return 'is-light';
case 'info':
return 'is-info';
case 'warn':
return 'is-warning';
case 'error':
return 'is-danger';
case 'fatal':
return 'is-dark';
default:
return 'is-light';
}
}
function shouldDisplayLog(logText) {
return filterTerm === '' || logText.toLowerCase().includes(filterTerm.toLowerCase());
}
function applyFilter() {
filterTerm = filterInput.value;
const logEntries = logsContainer.getElementsByClassName('log-entry');
for (let entry of logEntries) {
if (shouldDisplayLog(entry.textContent)) {
entry.style.display = '';
} else {
entry.style.display = 'none';
}
}
}
connectWebSocket();
filterButton.addEventListener('click', applyFilter);
filterInput.addEventListener('keyup', function (event) {
if (event.key === 'Enter') {
applyFilter();
}
});
pauseButton.addEventListener('click', function () {
isPaused = !isPaused;
pauseButton.textContent = isPaused ? 'Resume' : 'Pause';
pauseButton.classList.toggle('is-warning');
pauseButton.classList.toggle('is-success');
});
clearButton.addEventListener('click', function () {
logsContainer.innerHTML = '';
});
</script>
</body>
</html>

View File

@@ -1,4 +1,4 @@
<!doctype html>
<!DOCTYPE html>
<html lang="en">
{{template "_head.html" .}}
<body>
@@ -53,7 +53,7 @@
response.msg + // Use 'msg' as per Go struct
' (Latency: ' +
response.latency + // Ensure this matches the Go struct field name
'ms)',
'ms)'
);
} else {
// If error code is not 0, show error message

View File

@@ -0,0 +1,14 @@
<!DOCTYPE html>
<html lang="en">
{{template "_head.html" .}}
<body>
{{ template "_navbar.html" . }}
<section class="section">
<div class="container">
<h1 class="title">Rule Metrics</h1>
{{template "_rule_metrics_dash.html" .}}
</div>
</section>
</body>
</html>

View File

@@ -11,6 +11,8 @@ import (
var (
doOnce sync.Once
globalInitd bool
globalWebSocketSyncher *WebSocketLogSyncher
)
func initLogger(logLevel string, replaceGlobal bool) (*zap.Logger, error) {
@@ -18,8 +20,8 @@ func initLogger(logLevel string, replaceGlobal bool) (*zap.Logger, error) {
if err := level.UnmarshalText([]byte(logLevel)); err != nil {
return nil, err
}
writers := []zapcore.WriteSyncer{zapcore.AddSync(os.Stdout)}
encoder := zapcore.EncoderConfig{
consoleEncoder := zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
MessageKey: "msg",
@@ -27,12 +29,29 @@ func initLogger(logLevel string, replaceGlobal bool) (*zap.Logger, error) {
EncodeLevel: zapcore.LowercaseColorLevelEncoder,
EncodeTime: zapcore.RFC3339TimeEncoder,
EncodeName: zapcore.FullNameEncoder,
}
core := zapcore.NewCore(
zapcore.NewConsoleEncoder(encoder),
zapcore.NewMultiWriteSyncer(writers...),
level,
)
})
stdoutCore := zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), level)
jsonEncoder := zapcore.NewJSONEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
})
globalWebSocketSyncher = NewWebSocketLogSyncher()
wsCore := zapcore.NewCore(jsonEncoder, globalWebSocketSyncher, level)
// 合并两个 core
core := zapcore.NewTee(stdoutCore, wsCore)
l := zap.New(core)
if replaceGlobal {
zap.ReplaceGlobals(l)

52
echo/pkg/log/ws.go Normal file
View File

@@ -0,0 +1,52 @@
package log
import (
"encoding/json"
"net"
"sync"
"github.com/gobwas/ws"
)
type WebSocketLogSyncher struct {
conn net.Conn
mu sync.Mutex
}
func NewWebSocketLogSyncher() *WebSocketLogSyncher {
return &WebSocketLogSyncher{}
}
func (wsSync *WebSocketLogSyncher) Write(p []byte) (n int, err error) {
wsSync.mu.Lock()
defer wsSync.mu.Unlock()
if wsSync.conn != nil {
var logEntry map[string]interface{}
if err := json.Unmarshal(p, &logEntry); err == nil {
jsonData, _ := json.Marshal(logEntry)
_ = ws.WriteFrame(wsSync.conn, ws.NewTextFrame(jsonData))
}
if err != nil {
return 0, err
}
}
return len(p), nil
}
func (wsSync *WebSocketLogSyncher) Sync() error {
return nil
}
func (wsSync *WebSocketLogSyncher) SetWSConn(conn net.Conn) {
wsSync.mu.Lock()
defer wsSync.mu.Unlock()
wsSync.conn = conn
}
func SetWebSocketConn(conn net.Conn) {
if globalWebSocketSyncher != nil {
globalWebSocketSyncher.SetWSConn(conn)
}
}

View File

@@ -0,0 +1,165 @@
package metric_reader
import (
"fmt"
"math"
"strings"
"time"
dto "github.com/prometheus/client_model/go"
)
const (
metricCPUSecondsTotal = "node_cpu_seconds_total"
metricLoad1 = "node_load1"
metricLoad5 = "node_load5"
metricLoad15 = "node_load15"
metricMemoryTotalBytes = "node_memory_total_bytes"
metricMemoryActiveBytes = "node_memory_active_bytes"
metricMemoryWiredBytes = "node_memory_wired_bytes"
metricMemoryMemTotalBytes = "node_memory_MemTotal_bytes"
metricMemoryMemAvailableBytes = "node_memory_MemAvailable_bytes"
metricFilesystemSizeBytes = "node_filesystem_size_bytes"
metricFilesystemAvailBytes = "node_filesystem_avail_bytes"
metricNetworkReceiveBytesTotal = "node_network_receive_bytes_total"
metricNetworkTransmitBytesTotal = "node_network_transmit_bytes_total"
)
type NodeMetrics struct {
// cpu
CpuCoreCount int `json:"cpu_core_count"`
CpuLoadInfo string `json:"cpu_load_info"`
CpuUsagePercent float64 `json:"cpu_usage_percent"`
// memory
MemoryTotalBytes int64 `json:"memory_total_bytes"`
MemoryUsageBytes int64 `json:"memory_usage_bytes"`
MemoryUsagePercent float64 `json:"memory_usage_percent"`
// disk
DiskTotalBytes int64 `json:"disk_total_bytes"`
DiskUsageBytes int64 `json:"disk_usage_bytes"`
DiskUsagePercent float64 `json:"disk_usage_percent"`
// network
NetworkReceiveBytesTotal int64 `json:"network_receive_bytes_total"`
NetworkTransmitBytesTotal int64 `json:"network_transmit_bytes_total"`
NetworkReceiveBytesRate float64 `json:"network_receive_bytes_rate"`
NetworkTransmitBytesRate float64 `json:"network_transmit_bytes_rate"`
SyncTime time.Time
}
type cpuStats struct {
totalTime float64
idleTime float64
cores int
}
func (b *readerImpl) ParseNodeMetrics(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
isMac := metricMap[metricMemoryTotalBytes] != nil
cpu := &cpuStats{}
b.processCPUMetrics(metricMap, cpu)
b.processMemoryMetrics(metricMap, nm, isMac)
b.processDiskMetrics(metricMap, nm)
b.processNetworkMetrics(metricMap, nm)
b.processLoadMetrics(metricMap, nm)
b.calculateFinalMetrics(nm, cpu)
return nil
}
func (b *readerImpl) processCPUMetrics(metricMap map[string]*dto.MetricFamily, cpu *cpuStats) {
if cpuMetric, ok := metricMap[metricCPUSecondsTotal]; ok {
for _, metric := range cpuMetric.Metric {
value := getMetricValue(metric, cpuMetric.GetType())
cpu.totalTime += value
if getLabel(metric, "mode") == "idle" {
cpu.idleTime += value
cpu.cores++
}
}
}
}
func (b *readerImpl) processMemoryMetrics(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics, isMac bool) {
if isMac {
nm.MemoryTotalBytes = sumInt64Metric(metricMap, metricMemoryTotalBytes)
nm.MemoryUsageBytes = sumInt64Metric(metricMap, metricMemoryActiveBytes) + sumInt64Metric(metricMap, metricMemoryWiredBytes)
} else {
nm.MemoryTotalBytes = sumInt64Metric(metricMap, metricMemoryMemTotalBytes)
availableMemory := sumInt64Metric(metricMap, metricMemoryMemAvailableBytes)
nm.MemoryUsageBytes = nm.MemoryTotalBytes - availableMemory
}
}
func (b *readerImpl) processDiskMetrics(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) {
nm.DiskTotalBytes = sumInt64Metric(metricMap, metricFilesystemSizeBytes)
availableDisk := sumInt64Metric(metricMap, metricFilesystemAvailBytes)
nm.DiskUsageBytes = nm.DiskTotalBytes - availableDisk
}
func (b *readerImpl) processNetworkMetrics(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) {
nm.NetworkReceiveBytesTotal = sumInt64Metric(metricMap, metricNetworkReceiveBytesTotal)
nm.NetworkTransmitBytesTotal = sumInt64Metric(metricMap, metricNetworkTransmitBytesTotal)
}
func (b *readerImpl) processLoadMetrics(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) {
loads := []string{metricLoad1, metricLoad5, metricLoad15}
for _, load := range loads {
value := sumFloat64Metric(metricMap, load)
nm.CpuLoadInfo += fmt.Sprintf("%.2f|", value)
}
nm.CpuLoadInfo = strings.TrimRight(nm.CpuLoadInfo, "|")
}
func (b *readerImpl) calculateFinalMetrics(nm *NodeMetrics, cpu *cpuStats) {
nm.CpuCoreCount = cpu.cores
nm.CpuUsagePercent = 100 * (cpu.totalTime - cpu.idleTime) / cpu.totalTime
nm.MemoryUsagePercent = 100 * float64(nm.MemoryUsageBytes) / float64(nm.MemoryTotalBytes)
nm.DiskUsagePercent = 100 * float64(nm.DiskUsageBytes) / float64(nm.DiskTotalBytes)
nm.CpuUsagePercent = math.Round(nm.CpuUsagePercent*100) / 100
nm.MemoryUsagePercent = math.Round(nm.MemoryUsagePercent*100) / 100
nm.DiskUsagePercent = math.Round(nm.DiskUsagePercent*100) / 100
if b.lastMetrics != nil {
duration := time.Since(b.lastMetrics.SyncTime).Seconds()
if duration > 0.1 {
nm.NetworkReceiveBytesRate = math.Max(0, float64(nm.NetworkReceiveBytesTotal-b.lastMetrics.NetworkReceiveBytesTotal)/duration)
nm.NetworkTransmitBytesRate = math.Max(0, float64(nm.NetworkTransmitBytesTotal-b.lastMetrics.NetworkTransmitBytesTotal)/duration)
nm.NetworkReceiveBytesRate = math.Round(nm.NetworkReceiveBytesRate)
nm.NetworkTransmitBytesRate = math.Round(nm.NetworkTransmitBytesRate)
}
}
}
func sumInt64Metric(metricMap map[string]*dto.MetricFamily, metricName string) int64 {
ret := int64(0)
if metric, ok := metricMap[metricName]; ok && len(metric.Metric) > 0 {
for _, m := range metric.Metric {
ret += int64(getMetricValue(m, metric.GetType()))
}
}
return ret
}
func sumFloat64Metric(metricMap map[string]*dto.MetricFamily, metricName string) float64 {
ret := float64(0)
if metric, ok := metricMap[metricName]; ok && len(metric.Metric) > 0 {
for _, m := range metric.Metric {
ret += getMetricValue(m, metric.GetType())
}
}
return 0
}
func getLabel(metric *dto.Metric, name string) string {
for _, label := range metric.Label {
if label.GetName() == name {
return label.GetValue()
}
}
return ""
}

View File

@@ -2,25 +2,28 @@ package metric_reader
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
)
type Reader interface {
ReadOnce(ctx context.Context) (*NodeMetrics, error)
ReadOnce(ctx context.Context) (*NodeMetrics, map[string]*RuleMetrics, error)
}
type readerImpl struct {
metricsURL string
httpClient *http.Client
lastMetrics *NodeMetrics
metricsURL string
httpClient *http.Client
lastMetrics *NodeMetrics
lastRuleMetrics map[string]*RuleMetrics // key: label value: RuleMetrics
l *zap.SugaredLogger
}
func NewReader(metricsURL string) *readerImpl {
@@ -28,267 +31,47 @@ func NewReader(metricsURL string) *readerImpl {
return &readerImpl{
httpClient: c,
metricsURL: metricsURL,
l: zap.S().Named("metric_reader"),
}
}
func (b *readerImpl) parsePingInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
metric, ok := metricMap["ehco_ping_response_duration_seconds"]
if !ok {
// this metric is optional when enable_ping = false
zap.S().Debug("ping metric not found")
return nil
}
for _, m := range metric.Metric {
g := m.GetHistogram()
ip := ""
val := float64(g.GetSampleSum()) / float64(g.GetSampleCount()) * 1000 // to ms
for _, label := range m.GetLabel() {
if label.GetName() == "ip" {
ip = label.GetValue()
}
}
nm.PingMetrics = append(nm.PingMetrics, PingMetric{Latency: val, Target: ip})
}
return nil
}
func (b *readerImpl) parseCpuInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64, string)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetCounter()
mode := ""
for _, label := range m.GetLabel() {
if label.GetName() == "mode" {
mode = label.GetValue()
}
}
handleValue(g.GetValue(), mode)
}
return nil
}
var (
totalIdleTime float64
totalCpuTime float64
cpuCores int
)
err := handleMetric("node_cpu_seconds_total", func(val float64, mode string) {
totalCpuTime += val
if mode == "idle" {
totalIdleTime += val
cpuCores++
}
})
func (b *readerImpl) ReadOnce(ctx context.Context) (*NodeMetrics, map[string]*RuleMetrics, error) {
metricMap, err := b.fetchMetrics(ctx)
if err != nil {
return err
return nil, nil, errors.Wrap(err, "failed to fetch metrics")
}
nm := &NodeMetrics{SyncTime: time.Now()}
if err := b.ParseNodeMetrics(metricMap, nm); err != nil {
return nil, nil, err
}
nm.CpuCoreCount = cpuCores
nm.CpuUsagePercent = 100 * (totalCpuTime - totalIdleTime) / totalCpuTime
for _, load := range []string{"1", "5", "15"} {
loadMetricName := fmt.Sprintf("node_load%s", load)
loadMetric, ok := metricMap[loadMetricName]
if !ok {
return fmt.Errorf("%s not found", loadMetricName)
}
for _, m := range loadMetric.Metric {
g := m.GetGauge()
nm.CpuLoadInfo += fmt.Sprintf("%.2f|", g.GetValue())
}
}
nm.CpuLoadInfo = strings.TrimRight(nm.CpuLoadInfo, "|")
return nil
}
func (b *readerImpl) parseMemoryInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetGauge()
handleValue(g.GetValue())
}
return nil
}
isMac := false
if _, ok := metricMap["node_memory_total_bytes"]; ok {
isMac = true
}
if isMac {
err := handleMetric("node_memory_total_bytes", func(val float64) {
nm.MemoryTotalBytes = val
})
if err != nil {
return err
}
err = handleMetric("node_memory_active_bytes", func(val float64) {
nm.MemoryUsageBytes += val
})
if err != nil {
return err
}
err = handleMetric("node_memory_wired_bytes", func(val float64) {
nm.MemoryUsageBytes += val
})
if err != nil {
return err
}
} else {
err := handleMetric("node_memory_MemTotal_bytes", func(val float64) {
nm.MemoryTotalBytes = val
})
if err != nil {
return err
}
err = handleMetric("node_memory_MemAvailable_bytes", func(val float64) {
nm.MemoryUsageBytes = nm.MemoryTotalBytes - val
})
if err != nil {
return err
}
}
if nm.MemoryTotalBytes != 0 {
nm.MemoryUsagePercent = 100 * nm.MemoryUsageBytes / nm.MemoryTotalBytes
}
return nil
}
func (b *readerImpl) parseDiskInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64)) error {
forMac := false
diskMap := make(map[string]float64)
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetGauge()
disk := ""
for _, label := range m.GetLabel() {
if label.GetName() == "device" {
disk = getDiskName(label.GetValue())
}
if label.GetName() == "fstype" && label.GetValue() == "apfs" {
forMac = true
}
}
diskMap[disk] = g.GetValue()
}
// 对于 macos 的 apfs 文件系统,可能会有多个相同大小的磁盘,这是因为 apfs 磁盘(卷)会共享物理磁盘
seenVal := map[float64]bool{}
for _, val := range diskMap {
if seenVal[val] && forMac {
continue
}
handleValue(val)
seenVal[val] = true
}
return nil
}
err := handleMetric("node_filesystem_size_bytes", func(val float64) {
nm.DiskTotalBytes += val
})
if err != nil {
return err
}
var availBytes float64
err = handleMetric("node_filesystem_avail_bytes", func(val float64) {
availBytes += val
})
if err != nil {
return err
}
nm.DiskUsageBytes = nm.DiskTotalBytes - availBytes
if nm.DiskTotalBytes != 0 {
nm.DiskUsagePercent = 100 * nm.DiskUsageBytes / nm.DiskTotalBytes
}
return nil
}
func (b *readerImpl) parseNetworkInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
now := time.Now()
handleMetric := func(metricName string, handleValue func(float64)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetCounter()
handleValue(g.GetValue())
}
return nil
}
err := handleMetric("node_network_receive_bytes_total", func(val float64) {
nm.NetworkReceiveBytesTotal += val
})
if err != nil {
return err
}
err = handleMetric("node_network_transmit_bytes_total", func(val float64) {
nm.NetworkTransmitBytesTotal += val
})
if err != nil {
return err
}
if b.lastMetrics != nil {
passedTime := now.Sub(b.lastMetrics.SyncTime).Seconds()
nm.NetworkReceiveBytesRate = (nm.NetworkReceiveBytesTotal - b.lastMetrics.NetworkReceiveBytesTotal) / passedTime
nm.NetworkTransmitBytesRate = (nm.NetworkTransmitBytesTotal - b.lastMetrics.NetworkTransmitBytesTotal) / passedTime
}
return nil
}
func (b *readerImpl) ReadOnce(ctx context.Context) (*NodeMetrics, error) {
response, err := b.httpClient.Get(b.metricsURL)
if err != nil {
return nil, err
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
var parser expfmt.TextParser
parsed, err := parser.TextToMetricFamilies(strings.NewReader(string(body)))
if err != nil {
return nil, err
}
nm := &NodeMetrics{SyncTime: time.Now(), PingMetrics: []PingMetric{}}
if err := b.parseCpuInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseMemoryInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseDiskInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseNetworkInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parsePingInfo(parsed, nm); err != nil {
return nil, err
rm := make(map[string]*RuleMetrics)
if err := b.ParseRuleMetrics(metricMap, rm); err != nil {
return nil, nil, err
}
b.lastMetrics = nm
return nm, nil
b.lastRuleMetrics = rm
return nm, rm, nil
}
func (r *readerImpl) fetchMetrics(ctx context.Context) (map[string]*dto.MetricFamily, error) {
req, err := http.NewRequestWithContext(ctx, "GET", r.metricsURL, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}
resp, err := r.httpClient.Do(req)
if err != nil {
return nil, errors.Wrap(err, "failed to send request")
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read response body")
}
var parser expfmt.TextParser
return parser.TextToMetricFamilies(strings.NewReader(string(body)))
}

View File

@@ -0,0 +1,146 @@
package metric_reader
import (
"time"
dto "github.com/prometheus/client_model/go"
)
const (
metricConnectionCount = "ehco_traffic_current_connection_count"
metricNetworkTransmit = "ehco_traffic_network_transmit_bytes"
metricPingResponse = "ehco_ping_response_duration_milliseconds"
metricHandshakeDuration = "ehco_traffic_handshake_duration_milliseconds"
labelKey = "label"
remoteKey = "remote"
connTypeKey = "conn_type"
flowKey = "flow"
ipKey = "ip"
)
type PingMetric struct {
Latency int64 `json:"latency"` // in ms
Target string `json:"target"`
}
type RuleMetrics struct {
Label string // rule label
PingMetrics map[string]*PingMetric // key: remote
TCPConnectionCount map[string]int64 // key: remote
TCPHandShakeDuration map[string]int64 // key: remote in ms
TCPNetworkTransmitBytes map[string]int64 // key: remote
UDPConnectionCount map[string]int64 // key: remote
UDPHandShakeDuration map[string]int64 // key: remote in ms
UDPNetworkTransmitBytes map[string]int64 // key: remote
SyncTime time.Time
}
func (b *readerImpl) ParseRuleMetrics(metricMap map[string]*dto.MetricFamily, rm map[string]*RuleMetrics) error {
requiredMetrics := []string{
metricConnectionCount,
metricNetworkTransmit,
metricPingResponse,
metricHandshakeDuration,
}
for _, metricName := range requiredMetrics {
metricFamily, ok := metricMap[metricName]
if !ok {
continue
}
for _, metric := range metricFamily.Metric {
labels := getLabelMap(metric)
value := int64(getMetricValue(metric, metricFamily.GetType()))
label, ok := labels[labelKey]
if !ok || label == "" {
continue
}
ruleMetric := b.ensureRuleMetric(rm, label)
switch metricName {
case metricConnectionCount:
b.updateConnectionCount(ruleMetric, labels, value)
case metricNetworkTransmit:
b.updateNetworkTransmit(ruleMetric, labels, value)
case metricPingResponse:
b.updatePingMetrics(ruleMetric, labels, value)
case metricHandshakeDuration:
b.updateHandshakeDuration(ruleMetric, labels, value)
}
}
}
return nil
}
func (b *readerImpl) ensureRuleMetric(rm map[string]*RuleMetrics, label string) *RuleMetrics {
if _, ok := rm[label]; !ok {
rm[label] = &RuleMetrics{
Label: label,
PingMetrics: make(map[string]*PingMetric),
TCPConnectionCount: make(map[string]int64),
TCPHandShakeDuration: make(map[string]int64),
TCPNetworkTransmitBytes: make(map[string]int64),
UDPConnectionCount: make(map[string]int64),
UDPHandShakeDuration: make(map[string]int64),
UDPNetworkTransmitBytes: make(map[string]int64),
SyncTime: time.Now(),
}
}
return rm[label]
}
func (b *readerImpl) updateConnectionCount(rm *RuleMetrics, labels map[string]string, value int64) {
key := labels[remoteKey]
switch labels[connTypeKey] {
case "tcp":
rm.TCPConnectionCount[key] = value
default:
rm.UDPConnectionCount[key] = value
}
}
func (b *readerImpl) updateNetworkTransmit(rm *RuleMetrics, labels map[string]string, value int64) {
if labels[flowKey] == "read" {
key := labels[remoteKey]
switch labels[connTypeKey] {
case "tcp":
rm.TCPNetworkTransmitBytes[key] += value
default:
rm.UDPNetworkTransmitBytes[key] += value
}
}
}
func (b *readerImpl) updatePingMetrics(rm *RuleMetrics, labels map[string]string, value int64) {
remote := labels[remoteKey]
rm.PingMetrics[remote] = &PingMetric{
Latency: value,
Target: labels[ipKey],
}
}
func (b *readerImpl) updateHandshakeDuration(rm *RuleMetrics, labels map[string]string, value int64) {
key := labels[remoteKey]
switch labels[connTypeKey] {
case "tcp":
rm.TCPHandShakeDuration[key] = value
default:
rm.UDPHandShakeDuration[key] = value
}
}
func getLabelMap(metric *dto.Metric) map[string]string {
labels := make(map[string]string)
for _, label := range metric.Label {
labels[label.GetName()] = label.GetValue()
}
return labels
}

View File

@@ -1,38 +0,0 @@
package metric_reader
import (
"time"
)
type NodeMetrics struct {
// cpu
CpuCoreCount int `json:"cpu_core_count"`
CpuLoadInfo string `json:"cpu_load_info"`
CpuUsagePercent float64 `json:"cpu_usage_percent"`
// memory
MemoryTotalBytes float64 `json:"memory_total_bytes"`
MemoryUsageBytes float64 `json:"memory_usage_bytes"`
MemoryUsagePercent float64 `json:"memory_usage_percent"`
// disk
DiskTotalBytes float64 `json:"disk_total_bytes"`
DiskUsageBytes float64 `json:"disk_usage_bytes"`
DiskUsagePercent float64 `json:"disk_usage_percent"`
// network
NetworkReceiveBytesTotal float64 `json:"network_receive_bytes_total"`
NetworkTransmitBytesTotal float64 `json:"network_transmit_bytes_total"`
NetworkReceiveBytesRate float64 `json:"network_receive_bytes_rate"`
NetworkTransmitBytesRate float64 `json:"network_transmit_bytes_rate"`
// ping
PingMetrics []PingMetric `json:"ping_metrics"`
SyncTime time.Time
}
type PingMetric struct {
Latency float64 `json:"latency"` // in ms
Target string `json:"target"`
}

View File

@@ -1,22 +1,46 @@
package metric_reader
import "regexp"
import (
"math"
// parse disk name from device path,such as:
// e.g. /dev/disk1s1 -> disk1
// e.g. /dev/disk1s2 -> disk1
// e.g. ntfs://disk1s1 -> disk1
// e.g. ntfs://disk1s2 -> disk1
// e.g. /dev/sda1 -> sda
// e.g. /dev/sda2 -> sda
var diskNameRegex = regexp.MustCompile(`/dev/disk(\d+)|ntfs://disk(\d+)|/dev/sd[a-zA-Z]`)
dto "github.com/prometheus/client_model/go"
)
func getDiskName(devicePath string) string {
matches := diskNameRegex.FindStringSubmatch(devicePath)
for _, match := range matches {
if match != "" {
return match
func calculatePercentile(histogram *dto.Histogram, percentile float64) float64 {
if histogram == nil {
return 0
}
totalSamples := histogram.GetSampleCount()
targetSample := percentile * float64(totalSamples)
cumulativeCount := uint64(0)
var lastBucketBound float64
for _, bucket := range histogram.Bucket {
cumulativeCount += bucket.GetCumulativeCount()
if float64(cumulativeCount) >= targetSample {
// Linear interpolation between bucket boundaries
if bucket.GetCumulativeCount() > 0 && lastBucketBound != bucket.GetUpperBound() {
return lastBucketBound + (float64(targetSample-float64(cumulativeCount-bucket.GetCumulativeCount()))/float64(bucket.GetCumulativeCount()))*(bucket.GetUpperBound()-lastBucketBound)
} else {
return bucket.GetUpperBound()
}
}
lastBucketBound = bucket.GetUpperBound()
}
return math.NaN()
}
func getMetricValue(metric *dto.Metric, metricType dto.MetricType) float64 {
switch metricType {
case dto.MetricType_COUNTER:
return metric.Counter.GetValue()
case dto.MetricType_GAUGE:
return metric.Gauge.GetValue()
case dto.MetricType_HISTOGRAM:
histogram := metric.Histogram
if histogram != nil {
return calculatePercentile(histogram, 0.9)
}
}
return ""
return 0
}

View File

@@ -2,6 +2,7 @@
package echo
import (
"errors"
"fmt"
"io"
"log"
@@ -118,6 +119,10 @@ func (s *EchoServer) handleTCPConn(conn net.Conn) {
}
}
func isClosedConnError(err error) bool {
return errors.Is(err, net.ErrClosed)
}
func (s *EchoServer) serveUDP() {
defer s.wg.Done()
buf := make([]byte, 1024)
@@ -128,6 +133,9 @@ func (s *EchoServer) serveUDP() {
default:
n, remoteAddr, err := s.udpConn.ReadFromUDP(buf)
if err != nil {
if isClosedConnError(err) {
break
}
s.logger.Errorf("Error reading UDP: %v", err)
continue
}