Update On Fri Oct 24 20:39:48 CEST 2025

This commit is contained in:
github-action[bot]
2025-10-24 20:39:49 +02:00
parent e8b76761e1
commit bf9e802a78
558 changed files with 7599 additions and 27751 deletions

View File

@@ -163,6 +163,9 @@ nodepass "client://server.example.com:10101/192.168.1.100:8080?log=debug&min=128
# Resource-constrained configuration with forced mode
nodepass "client://server.example.com:10101/127.0.0.1:8080?mode=2&min=16&log=info"
# Resource-constrained configuration - Small connection pool
nodepass "client://server.example.com:10101/127.0.0.1:8080?min=16&log=info"
```
### Master Mode (API)

View File

@@ -72,10 +72,7 @@ func (c *Client) Run() {
// 启动客户端服务并处理重启
go func() {
for {
if ctx.Err() != nil {
return
}
for ctx.Err() == nil {
// 启动客户端
if err := c.start(); err != nil && err != io.EOF {
c.logger.Error("Client error: %v", err)
@@ -113,10 +110,11 @@ func (c *Client) start() error {
// 运行模式判断
switch c.runMode {
case "1": // 单端模式
if err := c.initTunnelListener(); err != nil {
if err := c.initTunnelListener(); err == nil {
return c.singleStart()
} else {
return fmt.Errorf("start: initTunnelListener failed: %w", err)
}
return c.singleStart()
case "2": // 双端模式
return c.commonStart()
default: // 自动判断

View File

@@ -9,6 +9,7 @@ import (
"encoding/hex"
"fmt"
"hash/fnv"
"io"
"net"
"net/url"
"os"
@@ -474,46 +475,54 @@ func (c *Common) initContext() {
// initTunnelListener 初始化隧道监听器
func (c *Common) initTunnelListener() error {
if c.tunnelTCPAddr == nil || c.tunnelUDPAddr == nil {
if c.tunnelTCPAddr == nil && c.tunnelUDPAddr == nil {
return fmt.Errorf("initTunnelListener: nil tunnel address")
}
// 初始化隧道TCP监听器
tunnelListener, err := net.ListenTCP("tcp", c.tunnelTCPAddr)
if err != nil {
return fmt.Errorf("initTunnelListener: listenTCP failed: %w", err)
if c.tunnelTCPAddr != nil {
tunnelListener, err := net.ListenTCP("tcp", c.tunnelTCPAddr)
if err != nil {
return fmt.Errorf("initTunnelListener: listenTCP failed: %w", err)
}
c.tunnelListener = tunnelListener
}
c.tunnelListener = tunnelListener
// 初始化隧道UDP监听器
tunnelUDPConn, err := net.ListenUDP("udp", c.tunnelUDPAddr)
if err != nil {
return fmt.Errorf("initTunnelListener: listenUDP failed: %w", err)
if c.tunnelUDPAddr != nil {
tunnelUDPConn, err := net.ListenUDP("udp", c.tunnelUDPAddr)
if err != nil {
return fmt.Errorf("initTunnelListener: listenUDP failed: %w", err)
}
c.tunnelUDPConn = &conn.StatConn{Conn: tunnelUDPConn, RX: &c.udpRX, TX: &c.udpTX, Rate: c.rateLimiter}
}
c.tunnelUDPConn = &conn.StatConn{Conn: tunnelUDPConn, RX: &c.udpRX, TX: &c.udpTX, Rate: c.rateLimiter}
return nil
}
// initTargetListener 初始化目标监听器
func (c *Common) initTargetListener() error {
if len(c.targetTCPAddrs) == 0 || len(c.targetUDPAddrs) == 0 {
if len(c.targetTCPAddrs) == 0 && len(c.targetUDPAddrs) == 0 {
return fmt.Errorf("initTargetListener: no target address")
}
// 初始化目标TCP监听器
targetListener, err := net.ListenTCP("tcp", c.targetTCPAddrs[0])
if err != nil {
return fmt.Errorf("initTargetListener: listenTCP failed: %w", err)
if len(c.targetTCPAddrs) > 0 {
targetListener, err := net.ListenTCP("tcp", c.targetTCPAddrs[0])
if err != nil {
return fmt.Errorf("initTargetListener: listenTCP failed: %w", err)
}
c.targetListener = targetListener
}
c.targetListener = targetListener
// 初始化目标UDP监听器
targetUDPConn, err := net.ListenUDP("udp", c.targetUDPAddrs[0])
if err != nil {
return fmt.Errorf("initTargetListener: listenUDP failed: %w", err)
if len(c.targetUDPAddrs) > 0 {
targetUDPConn, err := net.ListenUDP("udp", c.targetUDPAddrs[0])
if err != nil {
return fmt.Errorf("initTargetListener: listenUDP failed: %w", err)
}
c.targetUDPConn = &conn.StatConn{Conn: targetUDPConn, RX: &c.udpRX, TX: &c.udpTX, Rate: c.rateLimiter}
}
c.targetUDPConn = &conn.StatConn{Conn: targetUDPConn, RX: &c.udpRX, TX: &c.udpTX, Rate: c.rateLimiter}
return nil
}
@@ -626,11 +635,7 @@ func (c *Common) commonControl() error {
// commonQueue 共用信号队列
func (c *Common) commonQueue() error {
for {
if c.ctx.Err() != nil {
return fmt.Errorf("commonQueue: context error: %w", c.ctx.Err())
}
for c.ctx.Err() == nil {
// 读取原始信号
rawSignal, err := c.bufReader.ReadBytes('\n')
if err != nil {
@@ -662,15 +667,16 @@ func (c *Common) commonQueue() error {
}
}
}
return fmt.Errorf("commonQueue: context error: %w", c.ctx.Err())
}
// healthCheck 共用健康度检查
func (c *Common) healthCheck() error {
for {
if c.ctx.Err() != nil {
return fmt.Errorf("healthCheck: context error: %w", c.ctx.Err())
}
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for c.ctx.Err() == nil {
// 尝试获取锁
if !c.mu.TryLock() {
continue
@@ -707,7 +713,7 @@ func (c *Common) healthCheck() error {
select {
case <-c.ctx.Done():
return fmt.Errorf("healthCheck: context error: %w", c.ctx.Err())
case <-time.After(reportInterval):
case <-ticker.C:
}
c.logger.Debug("Tunnel pool flushed: %v active connections", c.tunnelPool.Active())
@@ -727,18 +733,16 @@ func (c *Common) healthCheck() error {
select {
case <-c.ctx.Done():
return fmt.Errorf("healthCheck: context error: %w", c.ctx.Err())
case <-time.After(reportInterval):
case <-ticker.C:
}
}
return fmt.Errorf("healthCheck: context error: %w", c.ctx.Err())
}
// commonLoop 共用处理循环
func (c *Common) commonLoop() {
for {
if c.ctx.Err() != nil {
return
}
for c.ctx.Err() == nil {
// 等待连接池准备就绪
if c.tunnelPool.Ready() {
go c.commonTCPLoop()
@@ -756,11 +760,7 @@ func (c *Common) commonLoop() {
// commonTCPLoop 共用TCP请求处理循环
func (c *Common) commonTCPLoop() {
for {
if c.ctx.Err() != nil {
return
}
for c.ctx.Err() == nil {
// 接受来自目标的TCP连接
targetConn, err := c.targetListener.Accept()
if err != nil {
@@ -855,11 +855,7 @@ func (c *Common) commonTCPLoop() {
// commonUDPLoop 共用UDP请求处理循环
func (c *Common) commonUDPLoop() {
for {
if c.ctx.Err() != nil {
return
}
for c.ctx.Err() == nil {
buffer := c.getUDPBuffer()
// 读取来自目标的UDP数据
@@ -931,13 +927,13 @@ func (c *Common) commonUDPLoop() {
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: udpReadTimeout}
for {
for c.ctx.Err() == nil {
// 从池连接读取数据
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else {
} else if err != io.EOF {
c.logger.Error("commonUDPLoop: read from tunnel failed: %v", err)
}
return
@@ -946,7 +942,9 @@ func (c *Common) commonUDPLoop() {
// 将数据写入目标UDP连接
_, err = c.targetUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("commonUDPLoop: writeToUDP failed: %v", err)
if err != io.EOF {
c.logger.Error("commonUDPLoop: writeToUDP failed: %v", err)
}
return
}
// 传输完成
@@ -979,7 +977,9 @@ func (c *Common) commonUDPLoop() {
// 将原始数据写入池连接
_, err = remoteConn.Write(buffer[:x])
if err != nil {
c.logger.Error("commonUDPLoop: write to tunnel failed: %v", err)
if err != io.EOF {
c.logger.Error("commonUDPLoop: write to tunnel failed: %v", err)
}
c.targetUDPSession.Delete(sessionKey)
remoteConn.Close()
c.putUDPBuffer(buffer)
@@ -994,7 +994,7 @@ func (c *Common) commonUDPLoop() {
// commonOnce 共用处理单个请求
func (c *Common) commonOnce() error {
for {
for c.ctx.Err() == nil {
// 等待连接池准备就绪
if !c.tunnelPool.Ready() {
select {
@@ -1073,6 +1073,8 @@ func (c *Common) commonOnce() error {
}
}
}
return fmt.Errorf("commonOnce: context error: %w", c.ctx.Err())
}
// commonTCPOnce 共用处理单个TCP请求
@@ -1238,13 +1240,13 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: udpReadTimeout}
for {
for c.ctx.Err() == nil {
// 从隧道连接读取数据
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else {
} else if err != io.EOF {
c.logger.Error("commonUDPOnce: read from tunnel failed: %v", err)
}
return
@@ -1253,7 +1255,9 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
// 将数据写入目标UDP连接
_, err = targetConn.Write(buffer[:x])
if err != nil {
c.logger.Error("commonUDPOnce: write to target failed: %v", err)
if err != io.EOF {
c.logger.Error("commonUDPOnce: write to target failed: %v", err)
}
return
}
@@ -1269,13 +1273,13 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
for {
for c.ctx.Err() == nil {
// 从目标UDP连接读取数据
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else {
} else if err != io.EOF {
c.logger.Error("commonUDPOnce: read from target failed: %v", err)
}
return
@@ -1284,7 +1288,9 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
// 将数据写回隧道连接
_, err = remoteConn.Write(buffer[:x])
if err != nil {
c.logger.Error("commonUDPOnce: write to tunnel failed: %v", err)
if err != io.EOF {
c.logger.Error("commonUDPOnce: write to tunnel failed: %v", err)
}
return
}
@@ -1302,9 +1308,15 @@ func (c *Common) singleControl() error {
errChan := make(chan error, 3)
// 启动单端控制、TCP和UDP处理循环
go func() { errChan <- c.singleEventLoop() }()
go func() { errChan <- c.singleTCPLoop() }()
go func() { errChan <- c.singleUDPLoop() }()
if len(c.targetTCPAddrs) > 0 {
go func() { errChan <- c.singleEventLoop() }()
}
if c.tunnelListener != nil {
go func() { errChan <- c.singleTCPLoop() }()
}
if c.tunnelUDPConn != nil {
go func() { errChan <- c.singleUDPLoop() }()
}
select {
case <-c.ctx.Done():
@@ -1316,11 +1328,10 @@ func (c *Common) singleControl() error {
// singleEventLoop 单端转发事件循环
func (c *Common) singleEventLoop() error {
for {
if c.ctx.Err() != nil {
return fmt.Errorf("singleEventLoop: context error: %w", c.ctx.Err())
}
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for c.ctx.Err() == nil {
ping := 0
now := time.Now()
@@ -1340,18 +1351,16 @@ func (c *Common) singleEventLoop() error {
select {
case <-c.ctx.Done():
return fmt.Errorf("singleEventLoop: context error: %w", c.ctx.Err())
case <-time.After(reportInterval):
case <-ticker.C:
}
}
return fmt.Errorf("singleEventLoop: context error: %w", c.ctx.Err())
}
// singleTCPLoop 单端转发TCP处理循环
func (c *Common) singleTCPLoop() error {
for {
if c.ctx.Err() != nil {
return fmt.Errorf("singleTCPLoop: context error: %w", c.ctx.Err())
}
for c.ctx.Err() == nil {
// 接受来自隧道的TCP连接
tunnelConn, err := c.tunnelListener.Accept()
if err != nil {
@@ -1418,15 +1427,13 @@ func (c *Common) singleTCPLoop() error {
c.logger.Debug("Exchange complete: %v", conn.DataExchange(tunnelConn, targetConn, c.readTimeout, buffer1, buffer2))
}(tunnelConn)
}
return fmt.Errorf("singleTCPLoop: context error: %w", c.ctx.Err())
}
// singleUDPLoop 单端转发UDP处理循环
func (c *Common) singleUDPLoop() error {
for {
if c.ctx.Err() != nil {
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
}
for c.ctx.Err() == nil {
buffer := c.getUDPBuffer()
// 读取来自隧道的UDP数据
@@ -1489,17 +1496,13 @@ func (c *Common) singleUDPLoop() error {
defer c.putUDPBuffer(buffer)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
for {
if c.ctx.Err() != nil {
return
}
for c.ctx.Err() == nil {
// 从UDP读取响应
x, err := reader.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else {
} else if err != io.EOF {
c.logger.Error("singleUDPLoop: read from target failed: %v", err)
}
c.targetUDPSession.Delete(sessionKey)
@@ -1512,7 +1515,9 @@ func (c *Common) singleUDPLoop() error {
// 将响应写回隧道UDP连接
_, err = c.tunnelUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("singleUDPLoop: writeToUDP failed: %v", err)
if err != io.EOF {
c.logger.Error("singleUDPLoop: writeToUDP failed: %v", err)
}
c.targetUDPSession.Delete(sessionKey)
if targetConn != nil {
targetConn.Close()
@@ -1529,7 +1534,9 @@ func (c *Common) singleUDPLoop() error {
c.logger.Debug("Starting transfer: %v <-> %v", targetConn.LocalAddr(), c.tunnelUDPConn.LocalAddr())
_, err = targetConn.Write(buffer[:x])
if err != nil {
c.logger.Error("singleUDPLoop: write to target failed: %v", err)
if err != io.EOF {
c.logger.Error("singleUDPLoop: write to target failed: %v", err)
}
c.targetUDPSession.Delete(sessionKey)
if targetConn != nil {
targetConn.Close()
@@ -1542,4 +1549,6 @@ func (c *Common) singleUDPLoop() error {
c.logger.Debug("Transfer complete: %v <-> %v", targetConn.LocalAddr(), c.tunnelUDPConn.LocalAddr())
c.putUDPBuffer(buffer)
}
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
}

View File

@@ -569,102 +569,95 @@ func (m *Master) Shutdown(ctx context.Context) error {
// startPeriodicTasks 启动所有定期任务
func (m *Master) startPeriodicTasks() {
go m.startPeriodicBackup()
go m.startPeriodicCleanup()
go m.startPeriodicRestart()
}
ticker := time.NewTicker(ReloadInterval)
defer ticker.Stop()
// startPeriodicBackup 启动定期备份
func (m *Master) startPeriodicBackup() {
for {
select {
case <-time.After(ReloadInterval):
// 固定备份文件名
backupPath := fmt.Sprintf("%s.backup", m.statePath)
if err := m.saveStateToPath(backupPath); err != nil {
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
} else {
m.logger.Info("State backup saved: %v", backupPath)
}
case <-ticker.C:
// 执行定期备份
m.performPeriodicBackup()
// 执行定期清理
m.performPeriodicCleanup()
// 执行定期重启
m.performPeriodicRestart()
case <-m.periodicDone:
ticker.Stop()
return
}
}
}
// startPeriodicCleanup 启动定期清理重复ID的实例
func (m *Master) startPeriodicCleanup() {
for {
select {
case <-time.After(reportInterval):
// 收集实例并按ID分组
idInstances := make(map[string][]*Instance)
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
idInstances[id] = append(idInstances[id], value.(*Instance))
}
return true
})
// performPeriodicBackup 定期备份任务
func (m *Master) performPeriodicBackup() {
// 固定备份文件名
backupPath := fmt.Sprintf("%s.backup", m.statePath)
// 清理重复实例
for _, instances := range idInstances {
if len(instances) <= 1 {
continue
}
if err := m.saveStateToPath(backupPath); err != nil {
m.logger.Error("performPeriodicBackup: backup state failed: %v", err)
} else {
m.logger.Info("State backup saved: %v", backupPath)
}
}
// 选择保留实例
keepIdx := 0
for i, inst := range instances {
if inst.Status == "running" && instances[keepIdx].Status != "running" {
keepIdx = i
}
}
// performPeriodicCleanup 定期清理重复ID的实例
func (m *Master) performPeriodicCleanup() {
// 收集实例并按ID分组
idInstances := make(map[string][]*Instance)
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
idInstances[id] = append(idInstances[id], value.(*Instance))
}
return true
})
// 清理多余实例
for i, inst := range instances {
if i == keepIdx {
continue
}
inst.deleted = true
if inst.Status != "stopped" {
m.stopInstance(inst)
}
m.instances.Delete(inst.ID)
}
// 清理重复实例
for _, instances := range idInstances {
if len(instances) <= 1 {
continue
}
// 选择保留实例
keepIdx := 0
for i, inst := range instances {
if inst.Status == "running" && instances[keepIdx].Status != "running" {
keepIdx = i
}
case <-m.periodicDone:
return
}
// 清理多余实例
for i, inst := range instances {
if i == keepIdx {
continue
}
inst.deleted = true
if inst.Status != "stopped" {
m.stopInstance(inst)
}
m.instances.Delete(inst.ID)
}
}
}
// startPeriodicRestart 启动定期错误实例重启
func (m *Master) startPeriodicRestart() {
for {
select {
case <-time.After(reportInterval):
// 收集所有error状态的实例
var errorInstances []*Instance
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
instance := value.(*Instance)
if instance.Status == "error" && !instance.deleted {
errorInstances = append(errorInstances, instance)
}
}
return true
})
// 重启所有error状态的实例
for _, instance := range errorInstances {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
// performPeriodicRestart 定期错误实例重启
func (m *Master) performPeriodicRestart() {
// 收集所有error状态的实例
var errorInstances []*Instance
m.instances.Range(func(key, value any) bool {
if id := key.(string); id != apiKeyID {
instance := value.(*Instance)
if instance.Status == "error" && !instance.deleted {
errorInstances = append(errorInstances, instance)
}
case <-m.periodicDone:
return
}
return true
})
// 重启所有error状态的实例
for _, instance := range errorInstances {
m.stopInstance(instance)
time.Sleep(baseDuration)
m.startInstance(instance)
}
}
@@ -775,6 +768,11 @@ func (m *Master) loadState() {
for id, instance := range persistentData {
instance.stopped = make(chan struct{})
// 重置实例状态
if instance.ID != apiKeyID {
instance.Status = "stopped"
}
// 生成完整配置
if instance.Config == "" && instance.ID != apiKeyID {
instance.Config = m.generateConfigURL(instance)
@@ -934,9 +932,8 @@ func getLinuxSysInfo() SystemInfo {
idle1, total1 := readStat()
time.Sleep(baseDuration)
idle2, total2 := readStat()
numCPU := runtime.NumCPU()
if deltaIdle, deltaTotal := idle2-idle1, total2-total1; deltaTotal > 0 && numCPU > 0 {
info.CPU = min(int((deltaTotal-deltaIdle)*100/deltaTotal/uint64(numCPU)), 100)
if deltaIdle, deltaTotal := idle2-idle1, total2-total1; deltaTotal > 0 {
info.CPU = min(int((deltaTotal-deltaIdle)*100/deltaTotal), 100)
}
// RAM占用解析/proc/meminfo
@@ -1690,6 +1687,10 @@ func (m *Master) stopInstance(instance *Instance) {
instance.Status = "stopped"
instance.stopped = make(chan struct{})
instance.cancelFunc = nil
instance.Ping = 0
instance.Pool = 0
instance.TCPS = 0
instance.UDPS = 0
m.instances.Store(instance.ID, instance)
// 保存状态变更

View File

@@ -74,10 +74,7 @@ func (s *Server) Run() {
// 启动服务端并处理重启
go func() {
for {
if ctx.Err() != nil {
return
}
for ctx.Err() == nil {
// 启动服务端
if err := s.start(); err != nil && err != io.EOF {
s.logger.Error("Server error: %v", err)
@@ -167,11 +164,7 @@ func (s *Server) start() error {
// tunnelHandshake 与客户端进行握手
func (s *Server) tunnelHandshake() error {
// 接受隧道连接
for {
if s.ctx.Err() != nil {
return fmt.Errorf("tunnelHandshake: context error: %w", s.ctx.Err())
}
for s.ctx.Err() == nil {
tunnelTCPConn, err := s.tunnelListener.Accept()
if err != nil {
s.logger.Error("tunnelHandshake: accept error: %v", err)
@@ -235,6 +228,10 @@ func (s *Server) tunnelHandshake() error {
break
}
if s.ctx.Err() != nil {
return fmt.Errorf("tunnelHandshake: context error: %w", s.ctx.Err())
}
// 发送客户端配置
tunnelURL := &url.URL{
Scheme: "np",