Update On Wed Jul 23 20:41:44 CEST 2025

This commit is contained in:
github-action[bot]
2025-07-23 20:41:44 +02:00
parent e72b5a9691
commit ef90160412
239 changed files with 1895 additions and 1457 deletions

View File

@@ -73,8 +73,8 @@ func (c *Client) Run() {
// start 启动客户端服务
func (c *Client) start() error {
// 初始化基本信息
c.initBackground()
// 初始化上下文
c.initContext()
// 通过是否监听成功判断单端转发或双端握手
if err := c.initTunnelListener(); err == nil {

View File

@@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"os"
@@ -46,7 +45,6 @@ type Common struct {
semaphore chan struct{} // 信号量通道
bufReader *bufio.Reader // 缓冲读取器
signalChan chan string // 信号通道
errChan chan error // 错误通道
checkPoint time.Time // 检查点时间
ctx context.Context // 上下文
cancel context.CancelFunc // 取消函数
@@ -167,13 +165,12 @@ func (c *Common) getAddress(parsedURL *url.URL) {
}
}
// initBackground 初始化基本信息
func (c *Common) initBackground() {
// initContext 初始化上下文
func (c *Common) initContext() {
if c.cancel != nil {
c.cancel()
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.errChan = make(chan error, 3)
}
// initTargetListener 初始化目标监听器
@@ -226,6 +223,17 @@ func (c *Common) initTunnelListener() error {
return nil
}
// drain 清空通道中的所有元素
func drain[T any](ch <-chan T) {
for {
select {
case <-ch:
default:
return
}
}
}
// stop 共用停止服务
func (c *Common) stop() {
// 取消上下文
@@ -285,14 +293,9 @@ func (c *Common) stop() {
c.logger.Debug("Tunnel listener closed: %v", c.tunnelListener.Addr())
}
// 清空信号通道
for {
select {
case <-c.signalChan:
default:
return
}
}
// 清空通道
drain(c.semaphore)
drain(c.signalChan)
}
// shutdown 共用优雅关闭
@@ -313,15 +316,17 @@ func (c *Common) shutdown(ctx context.Context, stopFunc func()) error {
// commonControl 共用控制逻辑
func (c *Common) commonControl() error {
errChan := make(chan error, 3)
// 信号消纳、信号队列和健康检查
go func() { c.errChan <- c.commonOnce() }()
go func() { c.errChan <- c.commonQueue() }()
go func() { c.errChan <- c.healthCheck() }()
go func() { errChan <- c.commonOnce() }()
go func() { errChan <- c.commonQueue() }()
go func() { errChan <- c.healthCheck() }()
select {
case <-c.ctx.Done():
return c.ctx.Err()
case err := <-c.errChan:
case err := <-errChan:
return err
}
}
@@ -345,6 +350,7 @@ func (c *Common) commonQueue() error {
case c.signalChan <- signal:
default:
c.logger.Debug("Queue limit reached: %v", semaphoreLimit)
time.Sleep(50 * time.Millisecond)
}
}
}
@@ -361,7 +367,7 @@ func (c *Common) healthCheck() error {
default:
// 尝试获取锁
if !c.mu.TryLock() {
time.Sleep(time.Millisecond)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -406,7 +412,7 @@ func (c *Common) commonLoop() {
go c.commonUDPLoop()
return
}
time.Sleep(time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
}
}
@@ -421,6 +427,8 @@ func (c *Common) commonTCPLoop() {
// 接受来自目标的TCP连接
targetConn, err := c.targetListener.Accept()
if err != nil {
c.logger.Error("Accept failed: %v", err)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -454,6 +462,14 @@ func (c *Common) commonTCPLoop() {
c.logger.Debug("Tunnel connection: %v <-> %v", remoteConn.LocalAddr(), remoteConn.RemoteAddr())
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if remoteConn != nil {
remoteConn.Close()
}
}()
// 构建并发送启动URL到客户端
launchURL := &url.URL{
Host: id,
@@ -495,6 +511,8 @@ func (c *Common) commonUDPLoop() {
// 读取来自目标的UDP数据
n, clientAddr, err := c.targetUDPConn.ReadFromUDP(buffer)
if err != nil {
c.logger.Error("Read failed: %v", err)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -535,6 +553,14 @@ func (c *Common) commonUDPLoop() {
<-c.semaphore
}()
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if remoteConn != nil {
remoteConn.Close()
}
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: tcpReadTimeout}
@@ -548,10 +574,6 @@ func (c *Common) commonUDPLoop() {
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -561,7 +583,7 @@ func (c *Common) commonUDPLoop() {
// 将数据写入目标UDP连接
tx, err := c.targetUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("WriteToUDP failed: %v", err)
c.logger.Error("Write failed: %v", err)
return
}
// 传输完成,广播统计信息
@@ -612,7 +634,7 @@ func (c *Common) commonOnce() error {
for {
// 等待连接池准备就绪
if !c.tunnelPool.Ready() {
time.Sleep(time.Millisecond)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -741,6 +763,15 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
go func() {
defer func() { done <- struct{}{} }()
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if remoteConn != nil {
remoteConn.Close()
}
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: tcpReadTimeout}
for {
@@ -753,10 +784,6 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -779,6 +806,15 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
go func() {
defer func() { done <- struct{}{} }()
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if targetConn != nil {
targetConn.Close()
}
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
for {
@@ -791,10 +827,6 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -832,18 +864,20 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
// singleLoop 单端转发处理循环
func (c *Common) singleLoop() error {
errChan := make(chan error, 2)
for {
select {
case <-c.ctx.Done():
return context.Canceled
default:
go func() {
c.errChan <- c.singleTCPLoop()
errChan <- c.singleTCPLoop()
}()
go func() {
c.errChan <- c.singleUDPLoop()
errChan <- c.singleUDPLoop()
}()
return <-c.errChan
return <-errChan
}
}
}
@@ -858,6 +892,8 @@ func (c *Common) singleTCPLoop() error {
// 接受来自隧道的TCP连接
tunnelConn, err := c.tunnelListener.Accept()
if err != nil {
c.logger.Error("Accept failed: %v", err)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -875,11 +911,19 @@ func (c *Common) singleTCPLoop() error {
<-c.semaphore
}()
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if tunnelConn != nil {
tunnelConn.Close()
}
}()
// 从连接池中获取连接
targetConn := c.tunnelPool.ClientGet("")
if targetConn == nil {
c.logger.Error("Get failed: no target connection available")
time.Sleep(100 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
return
}
@@ -919,6 +963,8 @@ func (c *Common) singleUDPLoop() error {
// 读取来自隧道的UDP数据
rx, clientAddr, err := c.tunnelUDPConn.ReadFromUDP(buffer)
if err != nil {
c.logger.Error("Read failed: %v", err)
time.Sleep(50 * time.Millisecond)
continue
}
@@ -955,6 +1001,14 @@ func (c *Common) singleUDPLoop() error {
<-c.semaphore
}()
// 监听上下文,避免泄漏
go func() {
<-c.ctx.Done()
if targetConn != nil {
targetConn.Close()
}
}()
buffer := make([]byte, udpDataBufSize)
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: udpReadTimeout}
@@ -968,10 +1022,6 @@ func (c *Common) singleUDPLoop() error {
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.logger.Debug("UDP session abort: %v", err)
} else if err == io.EOF {
c.logger.Debug("UDP session close: %v", err)
} else if strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Debug("UDP session close: %v", err)
} else {
c.logger.Error("Read failed: %v", err)
}
@@ -985,7 +1035,7 @@ func (c *Common) singleUDPLoop() error {
// 将响应写回隧道UDP连接
tx, err := c.tunnelUDPConn.WriteToUDP(buffer[:x], clientAddr)
if err != nil {
c.logger.Error("WriteToUDP failed: %v", err)
c.logger.Error("Write failed: %v", err)
c.targetUDPSession.Delete(sessionKey)
if targetConn != nil {
targetConn.Close()

View File

@@ -145,8 +145,6 @@ func (w *InstanceLogWriter) Write(p []byte) (n int, err error) {
}
}
w.master.instances.Store(w.instanceID, w.instance)
// 发送流量更新事件
w.master.sendSSEEvent("update", w.instance)
// 过滤统计日志
continue
}

View File

@@ -77,8 +77,8 @@ func (s *Server) Run() {
// start 启动服务端
func (s *Server) start() error {
// 初始化基本信息
s.initBackground()
// 初始化上下文
s.initContext()
// 初始化隧道监听器
if err := s.initTunnelListener(); err != nil {