Update On Fri May 17 20:30:02 CEST 2024

This commit is contained in:
github-action[bot]
2024-05-17 20:30:03 +02:00
parent 697b6cd222
commit 0dbdd1669c
73 changed files with 1554 additions and 1362 deletions

View File

@@ -29,6 +29,8 @@ type Cmgr interface {
// CountConnection returns the number of active connections.
CountConnection(connType string) int
GetActiveConnectCntByRelayLabel(label string) int
// Start starts the connection manager.
Start(ctx context.Context, errCH chan error)
}
@@ -158,6 +160,12 @@ func (cm *cmgrImpl) countClosedConnection() int {
return cnt
}
func (cm *cmgrImpl) GetActiveConnectCntByRelayLabel(label string) int {
cm.lock.RLock()
defer cm.lock.RUnlock()
return len(cm.activeConnectionsMap[label])
}
func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
cm.l.Infof("start sync interval=%d", cm.cfg.SyncInterval)
ticker := time.NewTicker(time.Second * time.Duration(cm.cfg.SyncInterval))

View File

@@ -17,7 +17,8 @@ type Config struct {
TCPRemotes []string `json:"tcp_remotes"`
UDPRemotes []string `json:"udp_remotes"`
Label string `json:"label,omitempty"`
Label string `json:"label,omitempty"`
MaxConnection int `json:"max_connection,omitempty"`
}
func (r *Config) Validate() error {

View File

@@ -41,11 +41,18 @@ func (b *baseTransporter) RelayTCPConn(c net.Conn, handshakeF TCPHandShakeF) err
metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Inc()
defer metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Dec()
// check limit
if b.cfg.MaxConnection > 0 && b.cmgr.CountConnection(cmgr.ConnectionTypeActive) >= b.cfg.MaxConnection {
b.l.Warnf("Relay %s active connection count exceed limit", remote.Label)
c.Close()
}
clonedRemote := remote.Clone()
rc, err := handshakeF(clonedRemote)
if err != nil {
return err
}
b.l.Infof("RelayTCPConn from %s to %s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(
b.cfg.Label, c, rc, conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))

View File

@@ -2,16 +2,14 @@ package node_metric
import "regexp"
var (
// parse disk name from device path,such as:
// e.g. /dev/disk1s1 -> disk1
// e.g. /dev/disk1s2 -> disk1
// e.g. ntfs://disk1s1 -> disk1
// e.g. ntfs://disk1s2 -> disk1
// e.g. /dev/sda1 -> sda
// e.g. /dev/sda2 -> sda
diskNameRegex = regexp.MustCompile(`/dev/disk(\d+)|ntfs://disk(\d+)|/dev/sd[a-zA-Z]`)
)
// parse disk name from device path,such as:
// e.g. /dev/disk1s1 -> disk1
// e.g. /dev/disk1s2 -> disk1
// e.g. ntfs://disk1s1 -> disk1
// e.g. ntfs://disk1s2 -> disk1
// e.g. /dev/sda1 -> sda
// e.g. /dev/sda2 -> sda
var diskNameRegex = regexp.MustCompile(`/dev/disk(\d+)|ntfs://disk(\d+)|/dev/sd[a-zA-Z]`)
func getDiskName(devicePath string) string {
matches := diskNameRegex.FindStringSubmatch(devicePath)

View File

@@ -8,25 +8,28 @@ import (
"os"
"strconv"
"time"
"go.uber.org/zap"
)
func echo(conn net.Conn) {
logger := zap.S().Named(("echo-test-server"))
defer conn.Close()
defer fmt.Println("conn closed", conn.RemoteAddr().String())
buf := make([]byte, 10)
for {
i, err := conn.Read(buf)
if err == io.EOF {
fmt.Println("read eof")
logger.Info("conn closed,read eof ", conn.RemoteAddr().String())
return
}
if err != nil {
fmt.Println(err.Error())
logger.Error(err.Error())
return
}
_, err = conn.Write(buf[:i])
if err != nil {
fmt.Println(err.Error())
logger.Error(err.Error())
return
}
}
@@ -114,6 +117,33 @@ func SendTcpMsg(msg []byte, address string) []byte {
return buf[:n]
}
func EchoTcpMsgLong(msg []byte, sleepTime time.Duration, address string) error {
logger := zap.S()
buf := make([]byte, len(msg))
conn, err := net.Dial("tcp", address)
if err != nil {
return err
}
defer conn.Close()
logger.Infof("conn start %s %s", conn.RemoteAddr().String(), conn.LocalAddr().String())
for i := 0; i < 10; i++ {
if _, err := conn.Write(msg); err != nil {
return err
}
n, err := conn.Read(buf)
if err != nil {
return err
}
if string(buf[:n]) != string(msg) {
return fmt.Errorf("msg not equal")
}
// to fake a long connection
time.Sleep(sleepTime)
}
logger.Infof("conn closed %s %s", conn.RemoteAddr().String(), conn.LocalAddr().String())
return nil
}
func SendUdpMsg(msg []byte, address string) []byte {
conn, err := net.Dial("udp", address)
if err != nil {

View File

@@ -23,7 +23,8 @@ const (
ECHO_PORT = 9002
ECHO_SERVER = "0.0.0.0:9002"
RAW_LISTEN = "0.0.0.0:1234"
RAW_LISTEN = "0.0.0.0:1234"
RAW_LISTEN_WITH_MAX_CONNECTION = "0.0.0.0:2234"
WS_LISTEN = "0.0.0.0:1235"
WS_REMOTE = "ws://0.0.0.0:2000"
@@ -61,6 +62,15 @@ func init() {
UDPRemotes: []string{ECHO_SERVER},
TransportType: constant.RelayTypeRaw,
},
// raw cfg with max connection
{
Listen: RAW_LISTEN_WITH_MAX_CONNECTION,
ListenType: constant.RelayTypeRaw,
TCPRemotes: []string{ECHO_SERVER},
UDPRemotes: []string{ECHO_SERVER},
TransportType: constant.RelayTypeRaw,
MaxConnection: 1,
},
// ws
{
@@ -155,6 +165,24 @@ func TestRelayOverRaw(t *testing.T) {
// t.Log("test udp done!")
}
func TestRelayWithMaxConnectionCount(t *testing.T) {
msg := []byte("hello")
// first connection will be accepted
go func() {
err := echo.EchoTcpMsgLong(msg, time.Second, RAW_LISTEN_WITH_MAX_CONNECTION)
if err != nil {
t.Error(err)
}
}()
// second connection will be rejected
time.Sleep(time.Second) // wait for first connection
if err := echo.EchoTcpMsgLong(msg, time.Second, RAW_LISTEN_WITH_MAX_CONNECTION); err == nil {
t.Fatal("need error here")
}
}
func TestRelayWithDeadline(t *testing.T) {
logger, _ := zap.NewDevelopment()
msg := []byte("hello")