websocket readmessage hang

This commit is contained in:
TenderIronh
2025-12-10 16:45:34 +08:00
parent 8e303e93f8
commit d3e8ee2a32

View File

@@ -149,13 +149,17 @@ func (pn *P2PNetwork) keepAlive() {
time.Sleep(time.Second * 10)
// Skip check if we're waking from sleep/hibernation
now := time.Now()
if !lastCheckTime.IsZero() && now.Sub(lastCheckTime) > time.Minute*2 {
if !lastCheckTime.IsZero() && now.Sub(lastCheckTime) > NetworkHeartbeatTime*3 {
gLog.i("Detected possible sleep/wake cycle, skipping this check")
lastCheckTime = now
continue
}
lastCheckTime = now
if pn.hbTime.Before(time.Now().Add(-3*time.Minute)) && pn.initTime.Before(time.Now().Add(-3*time.Minute)) {
if pn.hbTime.Before(time.Now().Add(-NetworkHeartbeatTime * 3)) {
if pn.initTime.After(time.Now().Add(-NetworkHeartbeatTime * 3)) {
gLog.d("Init less than 3 mins, skipping this check")
continue
}
gLog.e("P2PNetwork keepAlive error, exit worker")
dumpStack()
os.Exit(9)
@@ -568,7 +572,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne
var primaryPunchFunc func() (*P2PTunnel, error)
var secondaryPunchFunc func() (*P2PTunnel, error)
funcUDP := func() (t *P2PTunnel, err error) {
if thisTunnelForcev6 && config.PunchPriority&PunchPriorityTCPOnly != 0 {
if thisTunnelForcev6 || config.PunchPriority&PunchPriorityTCPOnly != 0 {
return
}
// try UDPPunch
@@ -588,7 +592,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne
return
}
funcTCP := func() (t *P2PTunnel, err error) {
if thisTunnelForcev6 && config.PunchPriority&PunchPriorityUDPOnly != 0 {
if thisTunnelForcev6 || config.PunchPriority&PunchPriorityUDPOnly != 0 {
return
}
// try TCPPunch
@@ -661,6 +665,13 @@ func (pn *P2PNetwork) init() error {
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
var err error
defer func() {
if err != nil {
// init failed, retry
pn.close(true)
gLog.e("P2PNetwork init error:%s", err)
}
}()
ips, err := resolveServerIP(gConf.Network.ServerHost)
if err != nil {
gLog.e("resolve dns failed: %v", err)
@@ -791,11 +802,7 @@ func (pn *P2PNetwork) init() error {
gLog.d("P2PNetwork init ok")
break
}
if err != nil {
// init failed, retry
pn.close(true)
gLog.e("P2PNetwork init error:%s", err)
}
return err
}
@@ -877,23 +884,48 @@ func (pn *P2PNetwork) readLoop() {
gLog.d("P2PNetwork readLoop start")
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running {
pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second))
_, msg, err := pn.conn.ReadMessage()
if err != nil {
gLog.e("P2PNetwork read error:%s", err)
if closeErr, ok := err.(*websocket.CloseError); ok {
if closeErr.Code == 1006 {
pn.close(true)
} else {
pn.close(false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 使用带超时的 goroutine 读取
readChan := make(chan []byte, 10)
go func() {
for {
select {
case <-ctx.Done():
return
default:
pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second))
_, msg, err := pn.conn.ReadMessage()
if err != nil {
gLog.e("ReadMessage error:%s", err)
readChan <- nil
return
}
} else {
pn.close(false)
readChan <- msg
}
break
}
pn.handleMessage(msg)
}()
readTimeout := 60 * time.Second
for pn.running {
select {
case result := <-readChan:
if result == nil {
// 处理错误
pn.close(false)
return
}
pn.handleMessage(result)
case <-time.After(readTimeout):
gLog.e("ReadMessage timeout after %v", readTimeout)
cancel()
pn.close(false)
return
}
}
gLog.d("P2PNetwork readLoop end")
}