Update On Mon Apr 29 20:27:42 CEST 2024

This commit is contained in:
github-action[bot]
2024-04-29 20:27:42 +02:00
parent 584bec4f3d
commit 3e452bc337
106 changed files with 1355 additions and 663 deletions

View File

@@ -17,7 +17,7 @@ ehco 现在提供 SaaS软件即服务版本这是一个全托管的解
- tcp/udp relay
- tunnel relay (ws/wss/mwss/mtcp)
- proxy server (内嵌了完整版本的 xray)
- 监控报警 (prometheus/grafana)
- proxy server (内嵌了完整版本的 xray)
- 监控报警 (Prometheus/Grafana)
- WebAPI (http://web_host:web_port)
- [更多功能请探索文档](https://docs.ehco-relay.cc/)

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/pkg/node_metric"
"go.uber.org/zap"
)
@@ -36,6 +37,7 @@ type cmgrImpl struct {
lock sync.RWMutex
cfg *Config
l *zap.SugaredLogger
mr node_metric.Reader
// k: relay label, v: connection list
activeConnectionsMap map[string][]conn.RelayConn
@@ -43,12 +45,16 @@ type cmgrImpl struct {
}
func NewCmgr(cfg *Config) Cmgr {
return &cmgrImpl{
cmgr := &cmgrImpl{
cfg: cfg,
l: zap.S().Named("cmgr"),
activeConnectionsMap: make(map[string][]conn.RelayConn),
closedConnectionsMap: make(map[string][]conn.RelayConn),
}
if cfg.NeedMetrics() {
cmgr.mr = node_metric.NewReader(cfg.MetricsURL)
}
return cmgr
}
func (cm *cmgrImpl) ListConnections(connType string, page, pageSize int) []conn.RelayConn {
@@ -163,7 +169,7 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
cm.l.Info("sync stop")
return
case <-ticker.C:
if err := cm.syncOnce(); err != nil {
if err := cm.syncOnce(ctx); err != nil {
cm.l.Errorf("meet non retry error: %s ,exit now", err)
errCH <- err
}

View File

@@ -4,6 +4,7 @@ var DummyConfig = &Config{}
type Config struct {
SyncURL string `json:"sync_url,omitempty"`
MetricsURL string `json:"metrics_url,omitempty"`
SyncInterval int `json:"sync_interval,omitempty"` // in seconds
}
@@ -11,6 +12,10 @@ func (c *Config) NeedSync() bool {
return c.SyncURL != ""
}
func (c *Config) NeedMetrics() bool {
return c.MetricsURL != ""
}
func (c *Config) Adjust() {
if c.SyncInterval <= 0 {
c.SyncInterval = 60

View File

@@ -1,11 +1,14 @@
package cmgr
import (
"context"
"net/http"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/internal/constant"
myhttp "github.com/Ehco1996/ehco/pkg/http"
"github.com/Ehco1996/ehco/pkg/node_metric"
"go.uber.org/zap"
)
type StatsPerRule struct {
@@ -23,11 +26,12 @@ type VersionInfo struct {
}
type syncReq struct {
Version VersionInfo `json:"version"`
Stats []StatsPerRule `json:"stats"`
Version VersionInfo `json:"version"`
Node node_metric.NodeMetrics `json:"node"`
Stats []StatsPerRule `json:"stats"`
}
func (cm *cmgrImpl) syncOnce() error {
func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()
@@ -40,6 +44,16 @@ func (cm *cmgrImpl) syncOnce() error {
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shorCommit},
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
}
}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
@@ -59,6 +73,7 @@ func (cm *cmgrImpl) syncOnce() error {
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
cm.l.Debug("syncing data to server", zap.Any("data", req))
return myhttp.PostJson(http.DefaultClient, cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))

View File

@@ -149,6 +149,10 @@ func (c *Config) GetMetricURL() string {
if c.WebToken != "" {
url += fmt.Sprintf("?token=%s", c.WebToken)
}
// for basic auth
if c.WebAuthUser != "" && c.WebAuthPass != "" {
url = fmt.Sprintf("http://%s:%s@%s:%d/metrics/", c.WebAuthUser, c.WebAuthPass, c.WebHost, c.WebPort)
}
return url
}

View File

@@ -32,6 +32,7 @@ func NewServer(cfg *config.Config) (*Server, error) {
cmgrCfg := &cmgr.Config{
SyncURL: cfg.RelaySyncURL,
SyncInterval: cfg.RelaySyncInterval,
MetricsURL: cfg.GetMetricURL(),
}
cmgrCfg.Adjust()
s := &Server{

View File

@@ -0,0 +1,269 @@
package node_metric
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
type Reader interface {
ReadOnce(ctx context.Context) (*NodeMetrics, error)
}
type readerImpl struct {
metricsURL string
httpClient *http.Client
lastMetrics *NodeMetrics
}
func NewReader(metricsURL string) *readerImpl {
c := &http.Client{Timeout: 30 * time.Second}
return &readerImpl{
httpClient: c,
metricsURL: metricsURL,
}
}
func (b *readerImpl) parseCpuInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64, string)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetCounter()
mode := ""
for _, label := range m.GetLabel() {
if label.GetName() == "mode" {
mode = label.GetValue()
}
}
handleValue(g.GetValue(), mode)
}
return nil
}
var (
totalIdleTime float64
totalCpuTime float64
cpuCores int
)
err := handleMetric("node_cpu_seconds_total", func(val float64, mode string) {
totalCpuTime += val
if mode == "idle" {
totalIdleTime += val
cpuCores++
}
})
if err != nil {
return err
}
nm.CpuCoreCount = cpuCores
nm.CpuUsagePercent = 100 * (totalCpuTime - totalIdleTime) / totalCpuTime
for _, load := range []string{"1", "5", "15"} {
loadMetricName := fmt.Sprintf("node_load%s", load)
loadMetric, ok := metricMap[loadMetricName]
if !ok {
return fmt.Errorf("%s not found", loadMetricName)
}
for _, m := range loadMetric.Metric {
g := m.GetGauge()
nm.CpuLoadInfo += fmt.Sprintf("%.2f|", g.GetValue())
}
}
nm.CpuLoadInfo = strings.TrimRight(nm.CpuLoadInfo, "|")
return nil
}
func (b *readerImpl) parseMemoryInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetGauge()
handleValue(g.GetValue())
}
return nil
}
isMac := false
if _, ok := metricMap["node_memory_total_bytes"]; ok {
isMac = true
}
if isMac {
err := handleMetric("node_memory_total_bytes", func(val float64) {
nm.MemoryTotalBytes = val
})
if err != nil {
return err
}
err = handleMetric("node_memory_active_bytes", func(val float64) {
nm.MemoryUsageBytes += val
})
if err != nil {
return err
}
err = handleMetric("node_memory_wired_bytes", func(val float64) {
nm.MemoryUsageBytes += val
})
if err != nil {
return err
}
} else {
err := handleMetric("node_memory_MemTotal_bytes", func(val float64) {
nm.MemoryTotalBytes = val
})
if err != nil {
return err
}
err = handleMetric("node_memory_MemAvailable_bytes", func(val float64) {
nm.MemoryUsageBytes = nm.MemoryTotalBytes - val
})
if err != nil {
return err
}
}
if nm.MemoryTotalBytes != 0 {
nm.MemoryUsagePercent = 100 * nm.MemoryUsageBytes / nm.MemoryTotalBytes
}
return nil
}
func (b *readerImpl) parseDiskInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
handleMetric := func(metricName string, handleValue func(float64)) error {
forMac := false
diskMap := make(map[string]float64)
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetGauge()
disk := ""
for _, label := range m.GetLabel() {
if label.GetName() == "device" {
disk = getDiskName(label.GetValue())
}
if label.GetName() == "fstype" && label.GetValue() == "apfs" {
forMac = true
}
}
diskMap[disk] = g.GetValue()
}
// 对于 macos 的 apfs 文件系统,可能会有多个相同大小的磁盘,这是因为 apfs 磁盘(卷)会共享物理磁盘
seenVal := map[float64]bool{}
for _, val := range diskMap {
if seenVal[val] && forMac {
continue
}
handleValue(val)
seenVal[val] = true
}
return nil
}
err := handleMetric("node_filesystem_size_bytes", func(val float64) {
nm.DiskTotalBytes += val
})
if err != nil {
return err
}
var availBytes float64
err = handleMetric("node_filesystem_avail_bytes", func(val float64) {
availBytes += val
})
if err != nil {
return err
}
nm.DiskUsageBytes = nm.DiskTotalBytes - availBytes
if nm.DiskTotalBytes != 0 {
nm.DiskUsagePercent = 100 * nm.DiskUsageBytes / nm.DiskTotalBytes
}
return nil
}
func (b *readerImpl) parseNetworkInfo(metricMap map[string]*dto.MetricFamily, nm *NodeMetrics) error {
now := time.Now()
handleMetric := func(metricName string, handleValue func(float64)) error {
metric, ok := metricMap[metricName]
if !ok {
return fmt.Errorf("%s not found", metricName)
}
for _, m := range metric.Metric {
g := m.GetCounter()
handleValue(g.GetValue())
}
return nil
}
err := handleMetric("node_network_receive_bytes_total", func(val float64) {
nm.NetworkReceiveBytesTotal += val
})
if err != nil {
return err
}
err = handleMetric("node_network_transmit_bytes_total", func(val float64) {
nm.NetworkTransmitBytesTotal += val
})
if err != nil {
return err
}
if b.lastMetrics != nil {
passedTime := now.Sub(b.lastMetrics.syncTime).Seconds()
nm.NetworkReceiveBytesRate = (nm.NetworkReceiveBytesTotal - b.lastMetrics.NetworkReceiveBytesTotal) / passedTime
nm.NetworkTransmitBytesRate = (nm.NetworkTransmitBytesTotal - b.lastMetrics.NetworkTransmitBytesTotal) / passedTime
}
return nil
}
func (b *readerImpl) ReadOnce(ctx context.Context) (*NodeMetrics, error) {
response, err := b.httpClient.Get(b.metricsURL)
if err != nil {
return nil, err
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
var parser expfmt.TextParser
parsed, err := parser.TextToMetricFamilies(strings.NewReader(string(body)))
if err != nil {
return nil, err
}
nm := &NodeMetrics{syncTime: time.Now()}
if err := b.parseCpuInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseMemoryInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseDiskInfo(parsed, nm); err != nil {
return nil, err
}
if err := b.parseNetworkInfo(parsed, nm); err != nil {
return nil, err
}
b.lastMetrics = nm
return nm, nil
}

View File

@@ -0,0 +1,28 @@
package node_metric
import "time"
type NodeMetrics struct {
// cpu
CpuCoreCount int `json:"cpu_core_count"`
CpuLoadInfo string `json:"cpu_load_info"`
CpuUsagePercent float64 `json:"cpu_usage_percent"`
// memory
MemoryTotalBytes float64 `json:"memory_total_bytes"`
MemoryUsageBytes float64 `json:"memory_usage_bytes"`
MemoryUsagePercent float64 `json:"memory_usage_percent"`
// disk
DiskTotalBytes float64 `json:"disk_total_bytes"`
DiskUsageBytes float64 `json:"disk_usage_bytes"`
DiskUsagePercent float64 `json:"disk_usage_percent"`
// network
NetworkReceiveBytesTotal float64 `json:"network_receive_bytes_total"`
NetworkTransmitBytesTotal float64 `json:"network_transmit_bytes_total"`
NetworkReceiveBytesRate float64 `json:"network_receive_bytes_rate"`
NetworkTransmitBytesRate float64 `json:"network_transmit_bytes_rate"`
syncTime time.Time
}

View File

@@ -0,0 +1,24 @@
package node_metric
import "regexp"
var (
// parse disk name from device path,such as:
// e.g. /dev/disk1s1 -> disk1
// e.g. /dev/disk1s2 -> disk1
// e.g. ntfs://disk1s1 -> disk1
// e.g. ntfs://disk1s2 -> disk1
// e.g. /dev/sda1 -> sda
// e.g. /dev/sda2 -> sda
diskNameRegex = regexp.MustCompile(`/dev/disk(\d+)|ntfs://disk(\d+)|/dev/sd[a-zA-Z]`)
)
func getDiskName(devicePath string) string {
matches := diskNameRegex.FindStringSubmatch(devicePath)
for _, match := range matches {
if match != "" {
return match
}
}
return ""
}