intranet support udp

This commit is contained in:
TenderIronh
2025-11-18 17:07:24 +08:00
parent a4c6668760
commit 471aa5e6ea
6 changed files with 229 additions and 136 deletions

View File

@@ -10,6 +10,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"runtime/pprof"
"time"
"github.com/openp2p-cn/totp"
@@ -21,58 +22,60 @@ func handlePush(subType uint16, msg []byte) error {
if err != nil {
return err
}
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
// gLog.d("handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq:
err = handleConnectReq(msg)
case MsgPushRsp:
rsp := PushRsp{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "Unmarshal pushRsp:%s", err)
gLog.e("Unmarshal pushRsp:%s", err)
return err
}
if rsp.Error == 0 {
gLog.Printf(LvDEBUG, "push ok, detail:%s", rsp.Detail)
gLog.dev("push ok, detail:%s", rsp.Detail)
} else {
gLog.Printf(LvERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail)
gLog.e("push error:%d, detail:%s", rsp.Error, rsp.Detail)
}
case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err)
gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err)
return err
}
config := AppConfig{}
config.PeerNode = req.RelayName
config.peerToken = req.RelayToken
config.relayMode = req.RelayMode
config.PunchPriority = req.PunchPriority
config.UnderlayProtocol = req.UnderlayProtocol
go func(r AddRelayTunnelReq) {
t, errDt := GNetwork.addDirectTunnel(config, 0)
if errDt == nil {
if errDt == nil && t != nil {
// notify peer relay ready
msg := TunnelMsg{ID: t.id}
GNetwork.push(r.From, MsgPushAddRelayTunnelRsp, msg)
appConfig := config
appConfig.PeerNode = req.From
} else {
gLog.Printf(LvERROR, "addDirectTunnel error:%s", errDt)
gLog.w("addDirectTunnel error:%s", errDt)
GNetwork.push(r.From, MsgPushAddRelayTunnelRsp, "error") // compatible with old version client, trigger unmarshal error
}
}(req)
case MsgPushServerSideSaveMemApp:
req := ServerSideSaveMemApp{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err)
gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Println(LvDEBUG, "handle MsgPushServerSideSaveMemApp:", prettyJson(req))
var existTunnel *P2PTunnel
i, ok := GNetwork.allTunnels.Load(req.TunnelID)
if !ok {
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 3000)
i, ok = GNetwork.allTunnels.Load(req.TunnelID) // retry sometimes will receive MsgPushServerSideSaveMemApp but p2ptunnel not store yet.
if !ok {
gLog.Println(LvERROR, "handle MsgPushServerSideSaveMemApp error:", ErrMemAppTunnelNotFound)
gLog.e("handle MsgPushServerSideSaveMemApp error:%s", ErrMemAppTunnelNotFound)
return ErrMemAppTunnelNotFound
}
}
@@ -91,10 +94,10 @@ func handlePush(subType uint16, msg []byte) error {
} else {
app.setRelayTunnel(existTunnel)
}
gLog.Println(LvDEBUG, "found existing memapp, update it")
gLog.d("found existing memapp, update it")
} else {
appConfig := existTunnel.config
appConfig.SrcPort = 0
appConfig.SrcPort = int(req.SrcPort)
appConfig.Protocol = ""
appConfig.AppName = fmt.Sprintf("%d", peerID)
appConfig.PeerNode = req.From
@@ -118,22 +121,15 @@ func handlePush(subType uint16, msg []byte) error {
}
return nil
case MsgPushAPPKey:
req := APPKeySync{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err)
return err
}
SaveKey(req.AppID, req.AppKey)
case MsgPushUpdate:
gLog.Println(LvINFO, "MsgPushUpdate")
gLog.i("MsgPushUpdate")
err := update(gConf.Network.ServerHost, gConf.Network.ServerPort)
if err == nil {
os.Exit(0)
os.Exit(9) // 9 tell daemon this exit because of update
}
return err
case MsgPushRestart:
gLog.Println(LvINFO, "MsgPushRestart")
gLog.i("MsgPushRestart")
os.Exit(0)
return err
case MsgPushReportApps:
@@ -144,29 +140,31 @@ func handlePush(subType uint16, msg []byte) error {
err = handleLog(msg)
case MsgPushReportGoroutine:
err = handleReportGoroutine()
case MsgPushReportHeap:
err = handleReportHeap()
case MsgPushCheckRemoteService:
err = handleCheckRemoteService(msg)
case MsgPushEditApp:
err = handleEditApp(msg)
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
gLog.i("MsgPushEditNode")
req := EditNode{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
os.Exit(0)
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
gLog.i("MsgPushSwitchApp")
app := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
gLog.Println(LvINFO, app.AppName, " switch to ", app.Enabled)
config := AppConfig{PeerNode: app.PeerNode, Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
gLog.i("%s switch to %d", app.AppName, app.Enabled)
gConf.switchApp(config, app.Enabled)
if app.Enabled == 0 {
// disable APP
@@ -175,10 +173,10 @@ func handlePush(subType uint16, msg []byte) error {
case MsgPushDstNodeOnline:
req := PushDstNodeOnline{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Printf(LvINFO, "%s online, retryApp", req.Node)
gLog.i("%s online, retryApp", req.Node)
gConf.retryApp(req.Node)
default:
i, ok := GNetwork.msgMap.Load(pushHead.From)
@@ -192,10 +190,10 @@ func handlePush(subType uint16, msg []byte) error {
}
func handleEditApp(msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushEditApp")
gLog.i("MsgPushEditApp")
newApp := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
@@ -211,13 +209,16 @@ func handleEditApp(msg []byte) (err error) {
gConf.delete(oldConf)
}
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
newConf.RelayNode = newApp.SpecRelayNode
newConf.PunchPriority = newApp.PunchPriority
gConf.add(newConf, false)
if newApp.SrcPort != 0 { // delete app
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
newConf.RelayNode = newApp.SpecRelayNode
newConf.PunchPriority = newApp.PunchPriority
gConf.add(newConf, false)
}
if newApp.Protocol0 != "" && newApp.SrcPort0 != 0 { // not edit
GNetwork.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
}
@@ -227,12 +228,12 @@ func handleEditApp(msg []byte) (err error) {
func handleConnectReq(msg []byte) (err error) {
req := PushConnectReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err)
gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting... push connect response", req.From)
gLog.d("%s is connecting... push connect response", req.From)
if compareVersion(req.Version, LeastSupportVersion) < 0 {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
gLog.e("%s:%s", ErrVersionNotCompatible.Error(), req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
@@ -245,7 +246,7 @@ func handleConnectReq(msg []byte) (err error) {
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, gConf.Network.Token, time.Now().Unix()-GNetwork.dt/int64(time.Second)) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "handleConnectReq Access Granted")
gLog.d("handleConnectReq Access Granted")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
@@ -261,7 +262,7 @@ func handleConnectReq(msg []byte) (err error) {
config.UnderlayProtocol = req.UnderlayProtocol
// share relay node will limit bandwidth
if req.Token != gConf.Network.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", gConf.Network.ShareBandwidth)
gLog.i("set share bandwidth %d mbps", gConf.Network.ShareBandwidth)
config.shareBandwidth = gConf.Network.ShareBandwidth
}
// go GNetwork.AddTunnel(config, req.ID)
@@ -270,7 +271,7 @@ func handleConnectReq(msg []byte) (err error) {
}()
return nil
}
gLog.Println(LvERROR, "handleConnectReq Access Denied:", req.From)
gLog.e("handleConnectReq Access Denied:%s", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", gConf.Network.Node),
@@ -281,10 +282,10 @@ func handleConnectReq(msg []byte) (err error) {
}
func handleReportApps() (err error) {
gLog.Println(LvINFO, "MsgPushReportApps")
gLog.i("MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
gConf.mtx.RLock()
defer gConf.mtx.RUnlock()
for _, config := range gConf.Apps {
appActive := 0
@@ -346,10 +347,8 @@ func handleReportApps() (err error) {
}
func handleReportMemApps() (err error) {
gLog.Println(LvINFO, "handleReportMemApps")
gLog.i("handleReportMemApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
GNetwork.sdwan.sysRoute.Range(func(key, value interface{}) bool {
node := value.(*sdwanNode)
appActive := 0
@@ -405,12 +404,12 @@ func handleReportMemApps() (err error) {
}
func handleLog(msg []byte) (err error) {
gLog.Println(LvDEBUG, "MsgPushReportLog")
gLog.d("MsgPushReportLog")
const defaultLen = 1024 * 128
const maxLen = 1024 * 1024
req := ReportLogReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
@@ -418,9 +417,12 @@ func handleLog(msg []byte) (err error) {
} else {
req.FileName = sanitizeFileName(req.FileName)
}
if req.IsSetLogLevel == 1 {
gLog.setLevel(LogLevel(req.LogLevel))
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
gLog.e("read log file error:%s", err)
return err
}
fi, err := f.Stat()
@@ -443,7 +445,7 @@ func handleLog(msg []byte) (err error) {
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
gLog.e("read log content error:%s", err)
return err
}
rsp := ReportLogRsp{}
@@ -455,17 +457,27 @@ func handleLog(msg []byte) (err error) {
}
func handleReportGoroutine() (err error) {
gLog.Println(LvDEBUG, "handleReportGoroutine")
gLog.d("handleReportGoroutine")
buf := make([]byte, 1024*128)
stackLen := runtime.Stack(buf, true)
return GNetwork.write(MsgReport, MsgPushReportLog, string(buf[:stackLen]))
return GNetwork.write(MsgReport, MsgReportResponse, string(buf[:stackLen]))
}
func handleReportHeap() error {
var buf bytes.Buffer
err := pprof.Lookup("heap").WriteTo(&buf, 1)
if err != nil {
return err
}
return GNetwork.write(MsgReport, MsgReportResponse, buf.String())
}
func handleCheckRemoteService(msg []byte) (err error) {
gLog.Println(LvDEBUG, "handleCheckRemoteService")
gLog.d("handleCheckRemoteService")
req := CheckRemoteService{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
rsp := PushRsp{Error: 0}

View File

@@ -331,7 +331,7 @@ func (t *P2PTunnel) connectUnderlayUDP() (c underlay, err error) {
func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
gLog.Printf(LvDEBUG, "connectUnderlayTCP %s start ", t.config.LogPeerNode())
defer gLog.Printf(LvDEBUG, "connectUnderlayTCP %s end ", t.config.LogPeerNode())
var ul *underlayTCP
var ul underlay
peerIP := t.config.peerIP
if t.config.linkMode == LinkModeIntranet {
peerIP = t.config.peerLanIP

View File

@@ -2,6 +2,7 @@ package openp2p
import (
"io"
"net"
"time"
)
@@ -18,6 +19,7 @@ type underlay interface {
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
Protocol() string
RemoteAddr() net.Addr
}
func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) {
@@ -28,6 +30,7 @@ func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) {
}
head, err := decodeHeader(headBuf)
if err != nil || head.MainType > 16 {
gLog.d("DefaultReadBuffer error:%v, %d", err, head.MainType)
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)

View File

@@ -46,19 +46,25 @@ func (conn *underlayTCP) WUnlock() {
conn.writeMtx.Unlock()
}
func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) {
if mode == LinkModeTCPPunch {
func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (underlay, error) {
if mode == LinkModeTCPPunch || mode == LinkModeTCP6 {
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) < 0 {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
gLog.d("peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + GNetwork.dt - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
gLog.d("sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
// gLog.d(" send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
var c net.Conn
var err error
if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
} else {
c, err = reuse.DialTimeout("tcp6", fmt.Sprintf("[::]:%d", localPort), fmt.Sprintf("[%s]:%d", t.config.peerIPv6, port), CheckActiveTimeout)
}
if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err)
// gLog.d("send tcp punch: ", err)
return nil, err
}
utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}
@@ -67,7 +73,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel)
return nil, fmt.Errorf("read start msg error:%s", err)
}
if buff != nil {
gLog.Println(LvDEBUG, string(buff))
gLog.d("handshake flag:%s", string(buff))
}
utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
return utcp, nil
@@ -77,59 +83,46 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel)
if compareVersion(t.config.peerVersion, PublicIPVersion) < 0 { // old version
ipBytes := net.ParseIP(t.config.peerIP).To4()
tid = uint64(binary.BigEndian.Uint32(ipBytes))
gLog.Println(LvDEBUG, "compatible with old client, use ip as key:", tid)
gLog.d("compatible with old client, use ip as key:%d", tid)
}
var utcp *underlayTCP
if mode == LinkModeIntranet && gConf.Network.hasIPv4 == 0 && gConf.Network.hasUPNPorNATPMP == 0 {
addr, _ := net.ResolveTCPAddr("tcp4", fmt.Sprintf("0.0.0.0:%d", localPort))
l, err := net.ListenTCP("tcp4", addr)
if err != nil {
gLog.Printf(LvERROR, "listen %d error:", localPort, err)
return nil, err
}
defer l.Close()
err = l.SetDeadline(time.Now().Add(UnderlayTCPConnectTimeout))
if err != nil {
gLog.Printf(LvERROR, "set listen timeout:", err)
return nil, err
}
c, err := l.Accept()
if err != nil {
return nil, err
}
utcp = &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}
} else {
if v4l != nil {
utcp = v4l.getUnderlayTCP(tid)
}
var ul underlay
if v4l != nil {
ul = v4l.getUnderlay(tid)
}
if utcp == nil {
if ul == nil {
return nil, ErrConnectPublicV4
}
return utcp, nil
return ul, nil
}
func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) {
var c net.Conn
var err error
if mode == LinkModeTCPPunch {
gLog.Println(LvDev, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
if c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout); err != nil {
gLog.Println(LvDev, "send tcp punch: ", err)
}
network := "tcp"
localAddr := fmt.Sprintf("0.0.0.0:%d", localPort)
remoteAddr := fmt.Sprintf("%s:%d", host, port)
if mode == LinkModeTCP6 { // address need [ip]
network = "tcp6"
localAddr = fmt.Sprintf("[::]:%d", localPort)
remoteAddr = fmt.Sprintf("[%s]:%d", host, port)
}
if mode == LinkModeTCP4 || mode == LinkModeIntranet { // random port
localAddr = fmt.Sprintf("0.0.0.0:%d", 0)
}
gLog.dev("send tcp punch: %s --> %s", localAddr, remoteAddr)
} else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
c, err = reuse.DialTimeout(network, localAddr, remoteAddr, CheckActiveTimeout)
if err != nil {
gLog.dev("send tcp punch: %v", err)
}
if err != nil {
gLog.Printf(LvDev, "Dial %s:%d error:%s", host, port, err)
gLog.dev("Dial %s:%d error:%s", host, port, err)
return nil, err
}
tc := c.(*net.TCPConn)
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(UnderlayTCPKeepalive)
gLog.Printf(LvDEBUG, "Dial %s:%d OK", host, port)
gLog.d("Dial %s:%d OK", host, port)
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil
}

21
core/underlay_tcp_test.go Normal file
View File

@@ -0,0 +1,21 @@
package openp2p
import (
"os"
"path/filepath"
"testing"
)
func TestDialTCP(t *testing.T) {
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFile|LogConsole)
// ul, err := dialTCP("[240e:3b1:6f6:d14:1c0b:9605:554d:351c]", 3389, 0, LinkModeTCP6)
// if err != nil || ul == nil {
// t.Error("dialTCP error:", err)
// }
ul, err := dialTCP("192.168.3.9", 3389, 0, LinkModeTCP6)
if err != nil || ul == nil {
t.Error("dialTCP error:", err)
}
}

View File

@@ -1,68 +1,130 @@
package openp2p
import (
"context"
"encoding/binary"
"fmt"
"net"
"sync"
"time"
"github.com/quic-go/quic-go"
)
type v4Listener struct {
conns sync.Map
port int
acceptCh chan bool
conns sync.Map
port int
acceptCh chan bool
running bool
tcpListener *net.TCPListener
udpListener quic.Listener
wg sync.WaitGroup
}
func (vl *v4Listener) start() error {
func (vl *v4Listener) start() {
vl.running = true
v4l.acceptCh = make(chan bool, 500)
for {
vl.listen()
time.Sleep(UnderlayTCPConnectTimeout)
}
vl.wg.Add(1)
go func() {
defer vl.wg.Done()
for vl.running {
vl.listenTCP()
time.Sleep(UnderlayTCPConnectTimeout)
}
}()
vl.wg.Add(1)
go func() {
defer vl.wg.Done()
for vl.running {
vl.listenUDP()
time.Sleep(UnderlayTCPConnectTimeout)
}
}()
}
func (vl *v4Listener) listen() error {
gLog.Printf(LvINFO, "v4Listener listen %d start", vl.port)
defer gLog.Printf(LvINFO, "v4Listener listen %d end", vl.port)
addr, _ := net.ResolveTCPAddr("tcp4", fmt.Sprintf("0.0.0.0:%d", vl.port))
l, err := net.ListenTCP("tcp4", addr)
func (vl *v4Listener) stop() {
vl.running = false
if vl.tcpListener != nil {
vl.tcpListener.Close()
}
if vl.udpListener != nil {
vl.udpListener.Close()
}
vl.wg.Wait()
}
func (vl *v4Listener) listenTCP() error {
gLog.d("v4Listener listenTCP %d start", vl.port)
defer gLog.d("v4Listener listenTCP %d end", vl.port)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port)) // system will auto listen both v4 and v6
var err error
vl.tcpListener, err = net.ListenTCP("tcp", addr)
if err != nil {
gLog.Printf(LvERROR, "v4Listener listen %d error:", vl.port, err)
gLog.e("v4Listener listen %d error:", vl.port, err)
return err
}
defer l.Close()
defer vl.tcpListener.Close()
for {
c, err := l.Accept()
c, err := vl.tcpListener.Accept()
if err != nil {
break
}
go vl.handleConnection(c)
utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c, connectTime: time.Now()}
go vl.handleConnection(utcp)
}
vl.tcpListener = nil
return nil
}
func (vl *v4Listener) handleConnection(c net.Conn) {
gLog.Println(LvDEBUG, "v4Listener accept connection: ", c.RemoteAddr().String())
utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c, connectTime: time.Now()}
utcp.SetReadDeadline(time.Now().Add(UnderlayTCPConnectTimeout))
_, buff, err := utcp.ReadBuffer()
func (vl *v4Listener) listenUDP() error {
gLog.d("v4Listener listenUDP %d start", vl.port)
defer gLog.d("v4Listener listenUDP %d end", vl.port)
var err error
vl.udpListener, err = quic.ListenAddr(fmt.Sprintf("0.0.0.0:%d", vl.port), generateTLSConfig(),
&quic.Config{Versions: quicVersion, MaxIdleTimeout: TunnelIdleTimeout, DisablePathMTUDiscovery: true})
if err != nil {
gLog.Println(LvERROR, "utcp.ReadBuffer error:", err)
return err
}
utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout)
defer cancel()
defer vl.udpListener.Close()
for {
sess, err := vl.udpListener.Accept(context.Background())
if err != nil {
break
}
stream, err := sess.AcceptStream(ctx)
if err != nil {
break
}
ul := &underlayQUIC{writeMtx: &sync.Mutex{}, Stream: stream, Connection: sess}
go vl.handleConnection(ul)
}
vl.udpListener = nil
return err
}
func (vl *v4Listener) handleConnection(ul underlay) {
gLog.d("v4Listener accept connection: %s", ul.RemoteAddr().String())
ul.SetReadDeadline(time.Now().Add(UnderlayTCPConnectTimeout))
_, buff, err := ul.ReadBuffer()
if err != nil || buff == nil {
gLog.e("v4Listener read MsgTunnelHandshake error:%s", err)
}
ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
var tid uint64
if string(buff) == "OpenP2P,hello" { // old client
// save remoteIP as key
remoteAddr := c.RemoteAddr().(*net.TCPAddr).IP
remoteAddr := ul.RemoteAddr().(*net.TCPAddr).IP
ipBytes := remoteAddr.To4()
tid = uint64(binary.BigEndian.Uint32(ipBytes)) // bytes not enough for uint64
gLog.Println(LvDEBUG, "hello ", string(buff))
gLog.d("hello %s", string(buff))
} else {
if len(buff) < 8 {
return
}
tid = binary.LittleEndian.Uint64(buff[:8])
gLog.Println(LvDEBUG, "hello ", tid)
gLog.d("hello %d", tid)
}
// clear timeout connection
vl.conns.Range(func(idx, i interface{}) bool {
@@ -72,20 +134,22 @@ func (vl *v4Listener) handleConnection(c net.Conn) {
}
return true
})
vl.conns.Store(tid, utcp)
if len(vl.acceptCh) == 0 {
vl.acceptCh <- true
vl.conns.Store(tid, ul)
select {
case vl.acceptCh <- true:
default:
gLog.e("msgQueue full, drop it")
}
}
func (vl *v4Listener) getUnderlayTCP(tid uint64) *underlayTCP {
func (vl *v4Listener) getUnderlay(tid uint64) underlay {
for i := 0; i < 100; i++ {
select {
case <-time.After(time.Millisecond * 50):
case <-vl.acceptCh:
}
if u, ok := vl.conns.LoadAndDelete(tid); ok {
return u.(*underlayTCP)
return u.(underlay)
}
}
return nil