mirror of
https://github.com/bolucat/Archive.git
synced 2025-12-24 13:28:37 +08:00
Update On Thu May 30 20:33:30 CEST 2024
This commit is contained in:
@@ -1,22 +1,11 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Ehco1996/ehco/internal/config"
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/go-ping/ping"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/common/version"
|
||||
"github.com/prometheus/node_exporter/collector"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -104,125 +93,6 @@ var (
|
||||
}, []string{METRIC_LABEL_REMOTE})
|
||||
)
|
||||
|
||||
func (pg *PingGroup) newPinger(addr string) (*ping.Pinger, error) {
|
||||
pinger := ping.New(addr)
|
||||
if err := pinger.Resolve(); err != nil {
|
||||
pg.logger.Error("failed to resolve pinger", zap.String("addr", addr), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
pinger.Interval = pingInterval
|
||||
pinger.Timeout = time.Duration(math.MaxInt64)
|
||||
pinger.RecordRtts = false
|
||||
if runtime.GOOS != "darwin" {
|
||||
pinger.SetPrivileged(true)
|
||||
}
|
||||
return pinger, nil
|
||||
}
|
||||
|
||||
type PingGroup struct {
|
||||
logger *zap.Logger
|
||||
|
||||
// k: addr
|
||||
Pingers map[string]*ping.Pinger
|
||||
|
||||
// k: addr v:relay rule label joined by ","
|
||||
PingerLabels map[string]string
|
||||
}
|
||||
|
||||
func extractHost(input string) (string, error) {
|
||||
// Check if the input string has a scheme, if not, add "http://"
|
||||
if !strings.Contains(input, "://") {
|
||||
input = "http://" + input
|
||||
}
|
||||
// Parse the URL
|
||||
u, err := url.Parse(input)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return u.Hostname(), nil
|
||||
}
|
||||
|
||||
func NewPingGroup(cfg *config.Config) *PingGroup {
|
||||
logger := zap.L().Named("pinger")
|
||||
|
||||
pg := &PingGroup{
|
||||
logger: logger,
|
||||
Pingers: make(map[string]*ping.Pinger),
|
||||
PingerLabels: map[string]string{},
|
||||
}
|
||||
|
||||
// parse addr from rule
|
||||
for _, relayCfg := range cfg.RelayConfigs {
|
||||
// NOTE for (https/ws/wss)://xxx.com -> xxx.com
|
||||
for _, remote := range relayCfg.TCPRemotes {
|
||||
addr, err := extractHost(remote)
|
||||
if err != nil {
|
||||
pg.logger.Error("try parse host error", zap.Error(err))
|
||||
}
|
||||
if _, ok := pg.Pingers[addr]; ok {
|
||||
// append rule label when remote host is same
|
||||
pg.PingerLabels[addr] += fmt.Sprintf(",%s", relayCfg.Label)
|
||||
continue
|
||||
}
|
||||
if pinger, err := pg.newPinger(addr); err != nil {
|
||||
pg.logger.Error("new pinger meet error", zap.Error(err))
|
||||
} else {
|
||||
pg.Pingers[pinger.Addr()] = pinger
|
||||
pg.PingerLabels[addr] = relayCfg.Label
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update metrics
|
||||
for addr, pinger := range pg.Pingers {
|
||||
pinger.OnRecv = func(pkt *ping.Packet) {
|
||||
PingResponseDurationSeconds.WithLabelValues(
|
||||
pkt.IPAddr.String(), pkt.Addr, pg.PingerLabels[addr]).Observe(pkt.Rtt.Seconds())
|
||||
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v",
|
||||
pkt.Nbytes, pkt.Addr, pkt.Seq, pkt.Rtt, pkt.Ttl)
|
||||
}
|
||||
pinger.OnDuplicateRecv = func(pkt *ping.Packet) {
|
||||
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v (DUP!)",
|
||||
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
|
||||
}
|
||||
}
|
||||
return pg
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- PingRequestTotal
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Collect(ch chan<- prometheus.Metric) {
|
||||
for addr, pinger := range pg.Pingers {
|
||||
stats := pinger.Statistics()
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
PingRequestTotal,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.PacketsSent),
|
||||
stats.IPAddr.String(),
|
||||
stats.Addr,
|
||||
pg.PingerLabels[addr],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Run() {
|
||||
if len(pg.Pingers) <= 0 {
|
||||
return
|
||||
}
|
||||
pg.logger.Sugar().Infof("Start Ping Group now total pinger: %d", len(pg.Pingers))
|
||||
splay := time.Duration(pingInterval.Nanoseconds() / int64(len(pg.Pingers)))
|
||||
for addr, pinger := range pg.Pingers {
|
||||
go func() {
|
||||
if err := pinger.Run(); err != nil {
|
||||
pg.logger.Error("Starting pinger meet err", zap.String("addr", addr), zap.Error(err))
|
||||
}
|
||||
}()
|
||||
time.Sleep(splay)
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterEhcoMetrics(cfg *config.Config) error {
|
||||
// traffic
|
||||
prometheus.MustRegister(EhcoAlive)
|
||||
@@ -241,27 +111,3 @@ func RegisterEhcoMetrics(cfg *config.Config) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func RegisterNodeExporterMetrics(cfg *config.Config) error {
|
||||
level := &promlog.AllowedLevel{}
|
||||
// mute node_exporter logger
|
||||
if err := level.Set("error"); err != nil {
|
||||
return err
|
||||
}
|
||||
promlogConfig := &promlog.Config{Level: level}
|
||||
logger := promlog.New(promlogConfig)
|
||||
// see this https://github.com/prometheus/node_exporter/pull/2463
|
||||
if _, err := kingpin.CommandLine.Parse([]string{}); err != nil {
|
||||
return err
|
||||
}
|
||||
nc, err := collector.NewNodeCollector(logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create collector: %s", err)
|
||||
}
|
||||
// nc.Collectors = collectors
|
||||
prometheus.MustRegister(
|
||||
nc,
|
||||
version.NewCollector("node_exporter"),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
32
echo/internal/metrics/node.go
Normal file
32
echo/internal/metrics/node.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/Ehco1996/ehco/internal/config"
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/node_exporter/collector"
|
||||
)
|
||||
|
||||
func RegisterNodeExporterMetrics(cfg *config.Config) error {
|
||||
level := &promlog.AllowedLevel{}
|
||||
// mute node_exporter logger
|
||||
if err := level.Set("error"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger := promlog.New(&promlog.Config{Level: level})
|
||||
// node_exporter relay on `kingpin` to enable default node collector
|
||||
// see https://github.com/prometheus/node_exporter/pull/2463
|
||||
if _, err := kingpin.CommandLine.Parse([]string{}); err != nil {
|
||||
return err
|
||||
}
|
||||
nc, err := collector.NewNodeCollector(logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create collector: %s", err)
|
||||
}
|
||||
prometheus.MustRegister(nc)
|
||||
return nil
|
||||
}
|
||||
134
echo/internal/metrics/ping.go
Normal file
134
echo/internal/metrics/ping.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Ehco1996/ehco/internal/config"
|
||||
"github.com/go-ping/ping"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (pg *PingGroup) newPinger(addr string) (*ping.Pinger, error) {
|
||||
pinger := ping.New(addr)
|
||||
if err := pinger.Resolve(); err != nil {
|
||||
pg.logger.Error("failed to resolve pinger", zap.String("addr", addr), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
pinger.Interval = pingInterval
|
||||
pinger.Timeout = time.Duration(math.MaxInt64)
|
||||
pinger.RecordRtts = false
|
||||
if runtime.GOOS != "darwin" {
|
||||
pinger.SetPrivileged(true)
|
||||
}
|
||||
return pinger, nil
|
||||
}
|
||||
|
||||
type PingGroup struct {
|
||||
logger *zap.Logger
|
||||
|
||||
// k: addr
|
||||
Pingers map[string]*ping.Pinger
|
||||
|
||||
// k: addr v:relay rule label joined by ","
|
||||
PingerLabels map[string]string
|
||||
}
|
||||
|
||||
func extractHost(input string) (string, error) {
|
||||
// Check if the input string has a scheme, if not, add "http://"
|
||||
if !strings.Contains(input, "://") {
|
||||
input = "http://" + input
|
||||
}
|
||||
// Parse the URL
|
||||
u, err := url.Parse(input)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return u.Hostname(), nil
|
||||
}
|
||||
|
||||
func NewPingGroup(cfg *config.Config) *PingGroup {
|
||||
logger := zap.L().Named("pinger")
|
||||
|
||||
pg := &PingGroup{
|
||||
logger: logger,
|
||||
Pingers: make(map[string]*ping.Pinger),
|
||||
PingerLabels: map[string]string{},
|
||||
}
|
||||
|
||||
// parse addr from rule
|
||||
for _, relayCfg := range cfg.RelayConfigs {
|
||||
// NOTE for (https/ws/wss)://xxx.com -> xxx.com
|
||||
for _, remote := range relayCfg.TCPRemotes {
|
||||
addr, err := extractHost(remote)
|
||||
if err != nil {
|
||||
pg.logger.Error("try parse host error", zap.Error(err))
|
||||
}
|
||||
if _, ok := pg.Pingers[addr]; ok {
|
||||
// append rule label when remote host is same
|
||||
pg.PingerLabels[addr] += fmt.Sprintf(",%s", relayCfg.Label)
|
||||
continue
|
||||
}
|
||||
if pinger, err := pg.newPinger(addr); err != nil {
|
||||
pg.logger.Error("new pinger meet error", zap.Error(err))
|
||||
} else {
|
||||
pg.Pingers[pinger.Addr()] = pinger
|
||||
pg.PingerLabels[addr] = relayCfg.Label
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update metrics
|
||||
for addr, pinger := range pg.Pingers {
|
||||
pinger.OnRecv = func(pkt *ping.Packet) {
|
||||
PingResponseDurationSeconds.WithLabelValues(
|
||||
pkt.IPAddr.String(), pkt.Addr, pg.PingerLabels[addr]).Observe(pkt.Rtt.Seconds())
|
||||
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v",
|
||||
pkt.Nbytes, pkt.Addr, pkt.Seq, pkt.Rtt, pkt.Ttl)
|
||||
}
|
||||
pinger.OnDuplicateRecv = func(pkt *ping.Packet) {
|
||||
pg.logger.Sugar().Infof("%d bytes from %s icmp_seq=%d time=%v ttl=%v (DUP!)",
|
||||
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
|
||||
}
|
||||
}
|
||||
return pg
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- PingRequestTotal
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Collect(ch chan<- prometheus.Metric) {
|
||||
for addr, pinger := range pg.Pingers {
|
||||
stats := pinger.Statistics()
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
PingRequestTotal,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.PacketsSent),
|
||||
stats.IPAddr.String(),
|
||||
stats.Addr,
|
||||
pg.PingerLabels[addr],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *PingGroup) Run() {
|
||||
if len(pg.Pingers) <= 0 {
|
||||
return
|
||||
}
|
||||
pg.logger.Sugar().Infof("Start Ping Group now total pinger: %d", len(pg.Pingers))
|
||||
splay := time.Duration(pingInterval.Nanoseconds() / int64(len(pg.Pingers)))
|
||||
for addr, pinger := range pg.Pingers {
|
||||
go func() {
|
||||
if err := pinger.Run(); err != nil {
|
||||
pg.logger.Error("Starting pinger meet err", zap.String("addr", addr), zap.Error(err))
|
||||
}
|
||||
}()
|
||||
time.Sleep(splay)
|
||||
}
|
||||
}
|
||||
@@ -43,7 +43,8 @@ func (s *WsClient) TCPHandShake(remote *lb.Node) (net.Conn, error) {
|
||||
latency := time.Since(t1)
|
||||
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
|
||||
remote.HandShakeDuration = latency
|
||||
return wsc, nil
|
||||
c := newWsConn(wsc, false)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
type WsServer struct {
|
||||
@@ -90,7 +91,8 @@ func (s *WsServer) HandleRequest(w http.ResponseWriter, req *http.Request) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := s.RelayTCPConn(wsc, s.relayer.TCPHandShake); err != nil {
|
||||
c := newWsConn(wsc, true)
|
||||
if err := s.RelayTCPConn(c, s.relayer.TCPHandShake); err != nil {
|
||||
s.l.Errorf("RelayTCPConn error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
82
echo/internal/transporter/ws_conn.go
Normal file
82
echo/internal/transporter/ws_conn.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package transporter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/Ehco1996/ehco/pkg/buffer"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
)
|
||||
|
||||
type wsConn struct {
|
||||
conn net.Conn
|
||||
isServer bool
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newWsConn(conn net.Conn, isServer bool) *wsConn {
|
||||
return &wsConn{conn: conn, isServer: isServer, buf: buffer.BufferPool.Get()}
|
||||
}
|
||||
|
||||
func (c *wsConn) Read(b []byte) (n int, err error) {
|
||||
header, err := ws.ReadHeader(c.conn)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if header.Length > int64(cap(c.buf)) {
|
||||
c.buf = make([]byte, header.Length)
|
||||
}
|
||||
payload := c.buf[:header.Length]
|
||||
_, err = io.ReadFull(c.conn, payload)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if header.Masked {
|
||||
ws.Cipher(payload, header.Mask, 0)
|
||||
}
|
||||
if len(payload) > len(b) {
|
||||
return 0, fmt.Errorf("buffer too small to transport ws msg")
|
||||
}
|
||||
copy(b, payload)
|
||||
return len(payload), nil
|
||||
}
|
||||
|
||||
func (c *wsConn) Write(b []byte) (n int, err error) {
|
||||
if c.isServer {
|
||||
err = wsutil.WriteServerBinary(c.conn, b)
|
||||
} else {
|
||||
err = wsutil.WriteClientBinary(c.conn, b)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (c *wsConn) Close() error {
|
||||
defer buffer.BufferPool.Put(c.buf)
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *wsConn) LocalAddr() net.Addr {
|
||||
return c.conn.LocalAddr()
|
||||
}
|
||||
|
||||
func (c *wsConn) RemoteAddr() net.Addr {
|
||||
return c.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (c *wsConn) SetDeadline(t time.Time) error {
|
||||
return c.conn.SetDeadline(t)
|
||||
}
|
||||
|
||||
func (c *wsConn) SetReadDeadline(t time.Time) error {
|
||||
return c.conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (c *wsConn) SetWriteDeadline(t time.Time) error {
|
||||
return c.conn.SetWriteDeadline(t)
|
||||
}
|
||||
70
echo/internal/transporter/ws_conn_test.go
Normal file
70
echo/internal/transporter/ws_conn_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package transporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestClientConn_ReadWrite(t *testing.T) {
|
||||
data := []byte("hello")
|
||||
|
||||
// Create a WebSocket server
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, _, _, err := ws.UpgradeHTTP(r, w)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
wsc := newWsConn(conn, true)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := wsc.Read(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, len(data), n)
|
||||
assert.Equal(t, "hello", string(buf[:n]))
|
||||
_, err = wsc.Write(buf[:n])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// Create a WebSocket client
|
||||
addr, err := url.Parse(server.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn, _, _, err := ws.DefaultDialer.Dial(context.TODO(), "ws://"+addr.Host)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
wsClientConn := newWsConn(conn, false)
|
||||
for i := 0; i < 3; i++ {
|
||||
// test write
|
||||
n, err := wsClientConn.Write(data)
|
||||
assert.NoError(t, err, "test cnt %d", i)
|
||||
assert.Equal(t, len(data), n, "test cnt %d", i)
|
||||
|
||||
// test read
|
||||
buf := make([]byte, 100)
|
||||
n, err = wsClientConn.Read(buf)
|
||||
assert.NoError(t, err, "test cnt %d", i)
|
||||
assert.Equal(t, len(data), n, "test cnt %d", i)
|
||||
assert.Equal(t, "hello", string(buf[:n]), "test cnt %d", i)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user