Update On Sun Sep 1 20:31:00 CEST 2024

This commit is contained in:
github-action[bot]
2024-09-01 20:31:00 +02:00
parent 82a0991106
commit e2e637c916
95 changed files with 2318 additions and 6346 deletions

View File

@@ -2,10 +2,13 @@ package cmgr
import (
"context"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/Ehco1996/ehco/internal/cmgr/ms"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/pkg/metric_reader"
"go.uber.org/zap"
@@ -35,7 +38,8 @@ type Cmgr interface {
// Start starts the connection manager.
Start(ctx context.Context, errCH chan error)
QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error)
// Metrics related
QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq) (*ms.QueryNodeMetricsResp, error)
}
type cmgrImpl struct {
@@ -47,11 +51,11 @@ type cmgrImpl struct {
activeConnectionsMap map[string][]conn.RelayConn
closedConnectionsMap map[string][]conn.RelayConn
ms *ms.MetricsStore
mr metric_reader.Reader
ms *MetricsStore
}
func NewCmgr(cfg *Config) Cmgr {
func NewCmgr(cfg *Config) (Cmgr, error) {
cmgr := &cmgrImpl{
cfg: cfg,
l: zap.S().Named("cmgr"),
@@ -60,12 +64,16 @@ func NewCmgr(cfg *Config) Cmgr {
}
if cfg.NeedMetrics() {
cmgr.mr = metric_reader.NewReader(cfg.MetricsURL)
// 当前只能存储 24h 的 metrics之后再优化
bufSize := 60 * 60 * 24 / cfg.SyncInterval
cmgr.l.Infof("metrics buffer size: %d", bufSize)
cmgr.ms = NewMetricsStore(bufSize, time.Hour*24)
homeDir, _ := os.UserHomeDir()
dbPath := filepath.Join(homeDir, ".ehco", "metrics.db")
ms, err := ms.NewMetricsStore(dbPath)
if err != nil {
return nil, err
}
cmgr.ms = ms
}
return cmgr
return cmgr, nil
}
func (cm *cmgrImpl) ListConnections(connType string, page, pageSize int) []conn.RelayConn {
@@ -192,3 +200,21 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
}
}
}
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq) (*ms.QueryNodeMetricsResp, error) {
num := -1 // default to return all metrics
if req.Latest {
m, err := cm.mr.ReadOnce(ctx)
if err != nil {
return nil, err
}
if err := cm.ms.AddNodeMetric(m); err != nil {
return nil, err
}
num = 1
}
startTime := time.Unix(req.StartTimestamp, 0)
endTime := time.Unix(req.EndTimestamp, 0)
return cm.ms.QueryNodeMetric(startTime, endTime, num)
}

View File

@@ -1,7 +1,5 @@
package cmgr
var DummyConfig = &Config{}
type Config struct {
SyncURL string
MetricsURL string

View File

@@ -1,180 +0,0 @@
package cmgr
import (
"context"
"sync"
"time"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/internal/constant"
myhttp "github.com/Ehco1996/ehco/pkg/http"
"github.com/Ehco1996/ehco/pkg/metric_reader"
"go.uber.org/zap"
)
type StatsPerRule struct {
RelayLabel string `json:"relay_label"`
Up int64 `json:"up_bytes"`
Down int64 `json:"down_bytes"`
ConnectionCnt int `json:"connection_count"`
HandShakeLatency int64 `json:"latency_in_ms"`
}
type VersionInfo struct {
Version string `json:"version"`
ShortCommit string `json:"short_commit"`
}
type syncReq struct {
Version VersionInfo `json:"version"`
Node metric_reader.NodeMetrics `json:"node"`
Stats []StatsPerRule `json:"stats"`
}
type MetricsStore struct {
mutex sync.RWMutex
metrics []metric_reader.NodeMetrics
bufSize int
clearDuration time.Duration
}
func NewMetricsStore(bufSize int, clearDuration time.Duration) *MetricsStore {
return &MetricsStore{
metrics: make([]metric_reader.NodeMetrics, bufSize),
clearDuration: clearDuration,
bufSize: bufSize,
}
}
func (ms *MetricsStore) Add(m *metric_reader.NodeMetrics) {
ms.mutex.Lock()
defer ms.mutex.Unlock()
// 直接添加新的 metric假设它是最新的
ms.metrics = append(ms.metrics, *m)
// 清理旧数据
cutoffTime := time.Now().Add(-ms.clearDuration)
for i, metric := range ms.metrics {
if metric.SyncTime.After(cutoffTime) {
ms.metrics = ms.metrics[i:]
break
}
}
}
func (ms *MetricsStore) Query(startTime, endTime time.Time) []metric_reader.NodeMetrics {
ms.mutex.RLock()
defer ms.mutex.RUnlock()
var result []metric_reader.NodeMetrics
for i := len(ms.metrics) - 1; i >= 0; i-- {
if ms.metrics[i].SyncTime.Before(startTime) {
break
}
if !ms.metrics[i].SyncTime.After(endTime) {
result = append(result, ms.metrics[i])
}
}
// 反转结果,使其按时间升序排列
for i := 0; i < len(result)/2; i++ {
j := len(result) - 1 - i
result[i], result[j] = result[j], result[i]
}
return result
}
type QueryNodeMetricsReq struct {
TimeRange string `json:"time_range"` // 15min/30min/1h/6h/12h/24h
Latest bool `json:"latest"` // whether to refresh the cache and get the latest data
}
func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()
shortCommit := constant.GitRevision
if len(constant.GitRevision) > 7 {
shortCommit = constant.GitRevision[:7]
}
req := syncReq{
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shortCommit},
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
cm.ms.Add(metrics)
}
}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
}
var totalLatency int64
for _, c := range conns {
s.ConnectionCnt++
s.Up += c.GetStats().Up
s.Down += c.GetStats().Down
totalLatency += c.GetStats().HandShakeLatency.Milliseconds()
}
if s.ConnectionCnt > 0 {
s.HandShakeLatency = totalLatency / int64(s.ConnectionCnt)
}
req.Stats = append(req.Stats, s)
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
cm.l.Debug("syncing data to server", zap.Any("data", req))
return myhttp.PostJSONWithRetry(cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))
}
return nil
}
func getTimeRangeDuration(timeRange string) time.Duration {
switch timeRange {
case "15min":
return 15 * time.Minute
case "30min":
return 30 * time.Minute
case "1h":
return 1 * time.Hour
case "6h":
return 6 * time.Hour
case "12h":
return 12 * time.Hour
case "24h":
return 24 * time.Hour
default:
return 15 * time.Minute
}
}
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error) {
if req.Latest {
m, err := cm.mr.ReadOnce(ctx)
if err != nil {
return nil, err
}
cm.ms.Add(m)
return []metric_reader.NodeMetrics{*m}, nil
}
startTime := time.Now().Add(-getTimeRangeDuration(req.TimeRange))
return cm.ms.Query(startTime, time.Now()), nil
}

118
echo/internal/cmgr/ms/ms.go Normal file
View File

@@ -0,0 +1,118 @@
package ms
import (
"database/sql"
"os"
"path/filepath"
"time"
"go.uber.org/zap"
_ "modernc.org/sqlite"
"github.com/Ehco1996/ehco/pkg/metric_reader"
)
type NodeMetrics struct {
Timestamp int64 `json:"timestamp"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
DiskUsage float64 `json:"disk_usage"`
NetworkIn float64 `json:"network_in"`
NetworkOut float64 `json:"network_out"`
}
type QueryNodeMetricsReq struct {
StartTimestamp int64 `json:"start_ts"`
EndTimestamp int64 `json:"end_ts"`
Latest bool `json:"latest"` // whether to refresh the cache and get the latest data
}
type QueryNodeMetricsResp struct {
TOTAL int `json:"total"`
Data []NodeMetrics `json:"data"`
}
type MetricsStore struct {
db *sql.DB
dbPath string
l *zap.SugaredLogger
}
func NewMetricsStore(dbPath string) (*MetricsStore, error) {
// ensure the directory exists
dirPath := filepath.Dir(dbPath)
if err := os.MkdirAll(dirPath, 0o755); err != nil {
return nil, err
}
// create db file if not exists
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
f, err := os.Create(dbPath)
if err != nil {
return nil, err
}
if err := f.Close(); err != nil {
return nil, err
}
}
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, err
}
ms := &MetricsStore{dbPath: dbPath, db: db, l: zap.S().Named("ms")}
if err := ms.initDB(); err != nil {
return nil, err
}
return ms, nil
}
func (ms *MetricsStore) initDB() error {
// init NodeMetrics table
_, err := ms.db.Exec(`
CREATE TABLE IF NOT EXISTS node_metrics (
timestamp INTEGER,
cpu_usage REAL,
memory_usage REAL,
disk_usage REAL,
network_in REAL,
network_out REAL,
PRIMARY KEY (timestamp)
)
`)
return err
}
func (ms *MetricsStore) AddNodeMetric(m *metric_reader.NodeMetrics) error {
_, err := ms.db.Exec(`
INSERT OR REPLACE INTO node_metrics (timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?)
`, m.SyncTime.Unix(), m.CpuUsagePercent, m.MemoryUsagePercent, m.DiskUsagePercent, m.NetworkReceiveBytesRate, m.NetworkTransmitBytesRate)
return err
}
func (ms *MetricsStore) QueryNodeMetric(startTime, endTime time.Time, num int) (*QueryNodeMetricsResp, error) {
rows, err := ms.db.Query(`
SELECT timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out
FROM node_metrics
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp DESC
LIMIT ?
`, startTime.Unix(), endTime.Unix(), num)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var resp QueryNodeMetricsResp
for rows.Next() {
var m NodeMetrics
if err := rows.Scan(&m.Timestamp, &m.CPUUsage, &m.MemoryUsage, &m.DiskUsage, &m.NetworkIn, &m.NetworkOut); err != nil {
return nil, err
}
resp.Data = append(resp.Data, m)
}
resp.TOTAL = len(resp.Data)
return &resp, nil
}

View File

@@ -0,0 +1,85 @@
package cmgr
import (
"context"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/internal/constant"
myhttp "github.com/Ehco1996/ehco/pkg/http"
"github.com/Ehco1996/ehco/pkg/metric_reader"
"go.uber.org/zap"
)
type StatsPerRule struct {
RelayLabel string `json:"relay_label"`
Up int64 `json:"up_bytes"`
Down int64 `json:"down_bytes"`
ConnectionCnt int `json:"connection_count"`
HandShakeLatency int64 `json:"latency_in_ms"`
}
type VersionInfo struct {
Version string `json:"version"`
ShortCommit string `json:"short_commit"`
}
type syncReq struct {
Version VersionInfo `json:"version"`
Node metric_reader.NodeMetrics `json:"node"`
Stats []StatsPerRule `json:"stats"`
}
func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()
shortCommit := constant.GitRevision
if len(constant.GitRevision) > 7 {
shortCommit = constant.GitRevision[:7]
}
req := syncReq{
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shortCommit},
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
if err := cm.ms.AddNodeMetric(metrics); err != nil {
cm.l.Errorf("add metrics to store failed: %v", err)
}
}
}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
}
var totalLatency int64
for _, c := range conns {
s.ConnectionCnt++
s.Up += c.GetStats().Up
s.Down += c.GetStats().Down
totalLatency += c.GetStats().HandShakeLatency.Milliseconds()
}
if s.ConnectionCnt > 0 {
s.HandShakeLatency = totalLatency / int64(s.ConnectionCnt)
}
req.Stats = append(req.Stats, s)
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
cm.l.Debug("syncing data to server", zap.Any("data", req))
return myhttp.PostJSONWithRetry(cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))
}
return nil
}