Update On Tue Aug 27 20:32:22 CEST 2024

This commit is contained in:
github-action[bot]
2024-08-27 20:32:22 +02:00
parent 4afce62e47
commit 66248b55b9
141 changed files with 2871 additions and 1553 deletions

View File

@@ -16,11 +16,6 @@ const (
ConnectionTypeClosed = "closed"
)
type QueryNodeMetricsReq struct {
TimeRange string `json:"time_range"` // 15min/30min/1h/6h/12h/24h
Num int `json:"num"` // number of nodes to query
}
// connection manager interface/
// TODO support closed connection
type Cmgr interface {
@@ -53,7 +48,7 @@ type cmgrImpl struct {
closedConnectionsMap map[string][]conn.RelayConn
mr metric_reader.Reader
ms []*metric_reader.NodeMetrics // TODO gc this
ms *MetricsStore
}
func NewCmgr(cfg *Config) Cmgr {
@@ -65,6 +60,10 @@ func NewCmgr(cfg *Config) Cmgr {
}
if cfg.NeedMetrics() {
cmgr.mr = metric_reader.NewReader(cfg.MetricsURL)
// 当前只能存储 24h 的 metrics之后再优化
bufSize := 60 * 60 * 24 / cfg.SyncInterval
cmgr.l.Infof("metrics buffer size: %d", bufSize)
cmgr.ms = NewMetricsStore(bufSize, time.Hour*24)
}
return cmgr
}
@@ -180,13 +179,6 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
cm.l.Infof("Start Cmgr sync interval=%d", cm.cfg.SyncInterval)
ticker := time.NewTicker(time.Second * time.Duration(cm.cfg.SyncInterval))
defer ticker.Stop()
// sync once at the beginning
if err := cm.syncOnce(ctx); err != nil {
cm.l.Errorf("meet non retry error: %s ,exit now", err)
errCH <- err
return
}
for {
select {
case <-ctx.Done():
@@ -200,38 +192,3 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
}
}
}
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error) {
cm.lock.RLock()
defer cm.lock.RUnlock()
var startTime time.Time
switch req.TimeRange {
case "15min":
startTime = time.Now().Add(-15 * time.Minute)
case "30min":
startTime = time.Now().Add(-30 * time.Minute)
case "1h":
startTime = time.Now().Add(-1 * time.Hour)
case "6h":
startTime = time.Now().Add(-6 * time.Hour)
case "12h":
startTime = time.Now().Add(-12 * time.Hour)
case "24h":
startTime = time.Now().Add(-24 * time.Hour)
default:
// default to 15min
startTime = time.Now().Add(-15 * time.Minute)
}
res := []metric_reader.NodeMetrics{}
for _, metrics := range cm.ms {
if metrics.SyncTime.After(startTime) {
res = append(res, *metrics)
}
if req.Num > 0 && len(res) >= req.Num {
break
}
}
return res, nil
}

View File

@@ -0,0 +1,180 @@
package cmgr
import (
"context"
"sync"
"time"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/internal/constant"
myhttp "github.com/Ehco1996/ehco/pkg/http"
"github.com/Ehco1996/ehco/pkg/metric_reader"
"go.uber.org/zap"
)
type StatsPerRule struct {
RelayLabel string `json:"relay_label"`
Up int64 `json:"up_bytes"`
Down int64 `json:"down_bytes"`
ConnectionCnt int `json:"connection_count"`
HandShakeLatency int64 `json:"latency_in_ms"`
}
type VersionInfo struct {
Version string `json:"version"`
ShortCommit string `json:"short_commit"`
}
type syncReq struct {
Version VersionInfo `json:"version"`
Node metric_reader.NodeMetrics `json:"node"`
Stats []StatsPerRule `json:"stats"`
}
type MetricsStore struct {
mutex sync.RWMutex
metrics []metric_reader.NodeMetrics
bufSize int
clearDuration time.Duration
}
func NewMetricsStore(bufSize int, clearDuration time.Duration) *MetricsStore {
return &MetricsStore{
metrics: make([]metric_reader.NodeMetrics, bufSize),
clearDuration: clearDuration,
bufSize: bufSize,
}
}
func (ms *MetricsStore) Add(m *metric_reader.NodeMetrics) {
ms.mutex.Lock()
defer ms.mutex.Unlock()
// 直接添加新的 metric假设它是最新的
ms.metrics = append(ms.metrics, *m)
// 清理旧数据
cutoffTime := time.Now().Add(-ms.clearDuration)
for i, metric := range ms.metrics {
if metric.SyncTime.After(cutoffTime) {
ms.metrics = ms.metrics[i:]
break
}
}
}
func (ms *MetricsStore) Query(startTime, endTime time.Time) []metric_reader.NodeMetrics {
ms.mutex.RLock()
defer ms.mutex.RUnlock()
var result []metric_reader.NodeMetrics
for i := len(ms.metrics) - 1; i >= 0; i-- {
if ms.metrics[i].SyncTime.Before(startTime) {
break
}
if !ms.metrics[i].SyncTime.After(endTime) {
result = append(result, ms.metrics[i])
}
}
// 反转结果,使其按时间升序排列
for i := 0; i < len(result)/2; i++ {
j := len(result) - 1 - i
result[i], result[j] = result[j], result[i]
}
return result
}
type QueryNodeMetricsReq struct {
TimeRange string `json:"time_range"` // 15min/30min/1h/6h/12h/24h
Latest bool `json:"latest"` // whether to refresh the cache and get the latest data
}
func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()
shortCommit := constant.GitRevision
if len(constant.GitRevision) > 7 {
shortCommit = constant.GitRevision[:7]
}
req := syncReq{
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shortCommit},
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
cm.ms.Add(metrics)
}
}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
}
var totalLatency int64
for _, c := range conns {
s.ConnectionCnt++
s.Up += c.GetStats().Up
s.Down += c.GetStats().Down
totalLatency += c.GetStats().HandShakeLatency.Milliseconds()
}
if s.ConnectionCnt > 0 {
s.HandShakeLatency = totalLatency / int64(s.ConnectionCnt)
}
req.Stats = append(req.Stats, s)
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
cm.l.Debug("syncing data to server", zap.Any("data", req))
return myhttp.PostJSONWithRetry(cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))
}
return nil
}
func getTimeRangeDuration(timeRange string) time.Duration {
switch timeRange {
case "15min":
return 15 * time.Minute
case "30min":
return 30 * time.Minute
case "1h":
return 1 * time.Hour
case "6h":
return 6 * time.Hour
case "12h":
return 12 * time.Hour
case "24h":
return 24 * time.Hour
default:
return 15 * time.Minute
}
}
func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error) {
if req.Latest {
m, err := cm.mr.ReadOnce(ctx)
if err != nil {
return nil, err
}
cm.ms.Add(m)
return []metric_reader.NodeMetrics{*m}, nil
}
startTime := time.Now().Add(-getTimeRangeDuration(req.TimeRange))
return cm.ms.Query(startTime, time.Now()), nil
}

View File

@@ -1,83 +0,0 @@
package cmgr
import (
"context"
"github.com/Ehco1996/ehco/internal/conn"
"github.com/Ehco1996/ehco/internal/constant"
myhttp "github.com/Ehco1996/ehco/pkg/http"
"github.com/Ehco1996/ehco/pkg/metric_reader"
"go.uber.org/zap"
)
type StatsPerRule struct {
RelayLabel string `json:"relay_label"`
Up int64 `json:"up_bytes"`
Down int64 `json:"down_bytes"`
ConnectionCnt int `json:"connection_count"`
HandShakeLatency int64 `json:"latency_in_ms"`
}
type VersionInfo struct {
Version string `json:"version"`
ShortCommit string `json:"short_commit"`
}
type syncReq struct {
Version VersionInfo `json:"version"`
Node metric_reader.NodeMetrics `json:"node"`
Stats []StatsPerRule `json:"stats"`
}
func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()
shortCommit := constant.GitRevision
if len(constant.GitRevision) > 7 {
shortCommit = constant.GitRevision[:7]
}
req := syncReq{
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shortCommit},
}
if cm.cfg.NeedMetrics() {
metrics, err := cm.mr.ReadOnce(ctx)
if err != nil {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
cm.ms = append(cm.ms, metrics)
}
}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
}
var totalLatency int64
for _, c := range conns {
s.ConnectionCnt++
s.Up += c.GetStats().Up
s.Down += c.GetStats().Down
totalLatency += c.GetStats().HandShakeLatency.Milliseconds()
}
if s.ConnectionCnt > 0 {
s.HandShakeLatency = totalLatency / int64(s.ConnectionCnt)
}
req.Stats = append(req.Stats, s)
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
cm.l.Debug("syncing data to server", zap.Any("data", req))
return myhttp.PostJSONWithRetry(cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))
}
return nil
}

View File

@@ -124,14 +124,15 @@ func (s *Server) ListRules(c echo.Context) error {
func (s *Server) GetNodeMetrics(c echo.Context) error {
req := &cmgr.QueryNodeMetricsReq{TimeRange: c.QueryParam("time_range")}
num := c.QueryParam("num")
if num != "" {
n, err := strconv.Atoi(num)
latest := c.QueryParam("latest")
if latest != "" {
r, err := strconv.ParseBool(latest)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
req.Num = n
req.Latest = r
}
metrics, err := s.connMgr.QueryNodeMetrics(c.Request().Context(), req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())

View File

@@ -25,6 +25,12 @@
</div>
</div>
</div>
<button id="refreshButton" class="button ml-2">
<span class="icon">
<i class="fas fa-sync-alt"></i>
</span>
<span>Auto Refresh</span>
</button>
</div>
</header>
<div class="card-content">
@@ -152,7 +158,7 @@
// Data fetching functions
const fetchLatestMetric = async () => {
try {
const response = await fetch(`${API_BASE_URL}?time_range=15min&num=1`);
const response = await fetch(`${API_BASE_URL}?latest=true`);
if (!response.ok) throw new Error('Network response was not ok');
const data = await response.json();
return data[0];
@@ -264,21 +270,75 @@
charts.memory.options.plugins.title.text = `Total: ${formatBytes(metric.memory_total_bytes)} GB`;
charts.disk.options.plugins.title.text = `Total: ${formatBytes(metric.disk_total_bytes)} GB`;
charts.cpu.update();
charts.memory.update();
charts.disk.update();
// 更新图表
for (const chart of Object.values(charts)) {
chart.update();
}
};
const addLatestDataToCharts = (charts, latestMetric) => {
const timestamp = formatDate(latestMetric.SyncTime);
// 更新 CPU 图表
charts.cpu.data.labels.push(timestamp);
charts.cpu.data.datasets[0].data.push(latestMetric.cpu_usage_percent);
// 更新内存图表
charts.memory.data.labels.push(timestamp);
charts.memory.data.datasets[0].data.push(latestMetric.memory_usage_percent);
// 更新磁盘图表
charts.disk.data.labels.push(timestamp);
charts.disk.data.datasets[0].data.push(latestMetric.disk_usage_percent);
// 更新网络图表
charts.network.data.labels.push(timestamp);
charts.network.data.datasets[0].data.push(latestMetric.network_receive_bytes_rate / BYTE_TO_MB);
charts.network.data.datasets[1].data.push(latestMetric.network_transmit_bytes_rate / BYTE_TO_MB);
// 更新 ping 图表
charts.ping.data.labels.push(timestamp);
charts.ping.data.datasets.forEach((dataset) => {
const pingMetric = latestMetric.ping_metrics.find((ping) => ping.target === dataset.label);
dataset.data.push(pingMetric ? pingMetric.latency : null);
});
// 更新额外信息
updateAdditionalInfo(charts, latestMetric);
};
// Main execution
$(document).ready(async function () {
let charts = await initializeCharts();
if (!charts) return;
$('.dropdown-item').click(async function () {
const timeRange = $(this).data('time');
const metrics = await fetchMetrics(timeRange);
if (metrics) updateCharts(charts, metrics);
});
let autoRefreshInterval;
let isAutoRefreshing = false;
$('#refreshButton').click(function () {
if (isAutoRefreshing) {
clearInterval(autoRefreshInterval);
$(this).removeClass('is-info');
$(this).find('span:last').text('Auto Refresh');
isAutoRefreshing = false;
} else {
$(this).addClass('is-info');
$(this).find('span:last').text('Stop Refresh');
isAutoRefreshing = true;
refreshData();
autoRefreshInterval = setInterval(refreshData, 5000);
}
});
async function refreshData() {
const latestMetric = await fetchLatestMetric();
if (latestMetric) {
addLatestDataToCharts(charts, latestMetric);
}
}
});
</script>
</div>

View File

@@ -11,7 +11,6 @@ import (
func generateRetirableClient() *retryablehttp.Client {
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 3
retryClient.Logger = log.NewZapLeveledLogger("http")
return retryClient
}
@@ -29,6 +28,9 @@ func PostJSONWithRetry(url string, dataStruct interface{}) error {
}
defer r.Body.Close()
_, err = io.ReadAll(r.Body)
if err != nil {
return err
}
return err
}