This commit is contained in:
TenderIronh
2025-12-10 16:47:16 +08:00
parent d3e8ee2a32
commit 47220fe38b
5 changed files with 70 additions and 38 deletions

View File

@@ -191,3 +191,9 @@ func (l *logger) e(format string, params ...interface{}) {
func (l *logger) dev(format string, params ...interface{}) {
l.Printf(LvDev, format, params...)
}
func InitForUnitTest(lv LogLevel) {
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProductName, lv, 1024*1024, LogFile|LogConsole)
}

View File

@@ -13,8 +13,8 @@ import (
)
func natDetectTCP(serverHost string, serverPort int, lp int) (publicIP string, publicPort int, localPort int, err error) {
gLog.d("natDetectTCP start")
defer gLog.d("natDetectTCP end")
gLog.dev("natDetectTCP start")
defer gLog.dev("natDetectTCP end")
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("0.0.0.0:%d", lp), fmt.Sprintf("%s:%d", serverHost, serverPort), NatDetectTimeout)
if err != nil {
err = fmt.Errorf("dial tcp4 %s:%d error: %w", serverHost, serverPort, err)
@@ -56,8 +56,8 @@ func natDetectTCP(serverHost string, serverPort int, lp int) (publicIP string, p
}
func natDetectUDP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int, err error) {
gLog.d("natDetectUDP start")
defer gLog.d("natDetectUDP end")
gLog.dev("natDetectUDP start")
defer gLog.dev("natDetectUDP end")
conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", localPort))
if err != nil {
gLog.e("natDetectUDP listen udp error:%s", err)

View File

@@ -16,14 +16,14 @@ import (
"github.com/quic-go/quic-go"
)
// quic.DialContext do not support version 44,disable it
var quicVersion []quic.VersionNumber
// quic.Dial do not support version 44, disable it
var quicVersion []quic.Version
type underlayQUIC struct {
listener quic.Listener
listener *quic.Listener
writeMtx *sync.Mutex
quic.Stream
quic.Connection
*quic.Stream
*quic.Conn
}
func (conn *underlayQUIC) Protocol() string {
@@ -47,8 +47,17 @@ func (conn *underlayQUIC) WriteMessage(mainType uint16, subType uint16, packet i
}
func (conn *underlayQUIC) Close() error {
conn.Stream.CancelRead(1)
conn.Connection.CloseWithError(0, "")
// CancelRead expects a StreamErrorCode; using 1 as before (application-defined)
if conn.Stream != nil {
conn.Stream.CancelRead(1)
// close send-side of stream
_ = conn.Stream.Close()
}
if conn.Conn != nil {
// CloseWithError expects an ApplicationErrorCode and a description.
// 0 is zero-value; keep behavior similar to old CloseWithError(0,"")
_ = conn.Conn.CloseWithError(0, "")
}
conn.CloseListener()
return nil
}
@@ -60,7 +69,7 @@ func (conn *underlayQUIC) WUnlock() {
}
func (conn *underlayQUIC) CloseListener() {
if conn.listener != nil {
conn.listener.Close()
_ = conn.listener.Close()
}
}
@@ -76,7 +85,7 @@ func (conn *underlayQUIC) Accept() error {
return err
}
conn.Stream = stream
conn.Connection = sess
conn.Conn = sess
return nil
}
@@ -96,23 +105,25 @@ func listenQuic(addr string, idleTimeout time.Duration) (*underlayQUIC, error) {
return ul, nil
}
func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, timeout time.Duration) (*underlayQUIC, error) {
func dialQuic(pconn *net.UDPConn, remoteAddr *net.UDPAddr, timeout time.Duration) (*underlayQUIC, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"openp2pv1"},
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Connection, err := quic.DialContext(ctx, conn, remoteAddr, conn.LocalAddr().String(), tlsConf,
// New API: quic.Dial(ctx, packetConn, remoteAddr, tlsConf, quicConfig)
connection, err := quic.Dial(ctx, pconn, remoteAddr, tlsConf,
&quic.Config{Versions: quicVersion, MaxIdleTimeout: TunnelIdleTimeout, DisablePathMTUDiscovery: true})
if err != nil {
return nil, fmt.Errorf("quic.DialContext error:%s", err)
return nil, fmt.Errorf("quic.Dial error:%s", err)
}
stream, err := Connection.OpenStreamSync(context.Background())
stream, err := connection.OpenStreamSync(context.Background())
if err != nil {
return nil, fmt.Errorf("OpenStreamSync error:%s", err)
}
qConn := &underlayQUIC{nil, &sync.Mutex{}, stream, Connection}
qConn := &underlayQUIC{nil, &sync.Mutex{}, stream, connection}
return qConn, nil
}

View File

@@ -1,15 +1,11 @@
package openp2p
import (
"os"
"path/filepath"
"testing"
)
func TestDialTCP(t *testing.T) {
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFile|LogConsole)
InitForUnitTest(LvDEBUG)
// ul, err := dialTCP("[240e:3b1:6f6:d14:1c0b:9605:554d:351c]", 3389, 0, LinkModeTCP6)
// if err != nil || ul == nil {
// t.Error("dialTCP error:", err)

View File

@@ -17,13 +17,13 @@ type v4Listener struct {
acceptCh chan bool
running bool
tcpListener *net.TCPListener
udpListener quic.Listener
udpListener *quic.Listener
wg sync.WaitGroup
}
func (vl *v4Listener) start() {
vl.running = true
v4l.acceptCh = make(chan bool, 500)
vl.acceptCh = make(chan bool, 500)
vl.wg.Add(1)
go func() {
defer vl.wg.Done()
@@ -56,11 +56,11 @@ func (vl *v4Listener) stop() {
func (vl *v4Listener) listenTCP() error {
gLog.d("v4Listener listenTCP %d start", vl.port)
defer gLog.d("v4Listener listenTCP %d end", vl.port)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port)) // system will auto listen both v4 and v6
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port))
var err error
vl.tcpListener, err = net.ListenTCP("tcp", addr)
if err != nil {
gLog.e("v4Listener listen %d error:", vl.port, err)
gLog.e("v4Listener listen %d error:%s", vl.port, err)
return err
}
defer vl.tcpListener.Close()
@@ -69,7 +69,11 @@ func (vl *v4Listener) listenTCP() error {
if err != nil {
break
}
utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c, connectTime: time.Now()}
utcp := &underlayTCP{
writeMtx: &sync.Mutex{},
Conn: c,
connectTime: time.Now(),
}
go vl.handleConnection(utcp)
}
vl.tcpListener = nil
@@ -80,8 +84,15 @@ func (vl *v4Listener) listenUDP() error {
gLog.d("v4Listener listenUDP %d start", vl.port)
defer gLog.d("v4Listener listenUDP %d end", vl.port)
var err error
vl.udpListener, err = quic.ListenAddr(fmt.Sprintf("0.0.0.0:%d", vl.port), generateTLSConfig(),
&quic.Config{Versions: quicVersion, MaxIdleTimeout: TunnelIdleTimeout, DisablePathMTUDiscovery: true})
vl.udpListener, err = quic.ListenAddr(
fmt.Sprintf("0.0.0.0:%d", vl.port),
generateTLSConfig(),
&quic.Config{
Versions: quicVersion,
MaxIdleTimeout: TunnelIdleTimeout,
DisablePathMTUDiscovery: true,
},
)
if err != nil {
return err
}
@@ -97,7 +108,14 @@ func (vl *v4Listener) listenUDP() error {
if err != nil {
break
}
ul := &underlayQUIC{writeMtx: &sync.Mutex{}, Stream: stream, Connection: sess}
ul := &underlayQUIC{
listener: nil,
writeMtx: &sync.Mutex{},
Stream: stream,
Conn: sess,
}
go vl.handleConnection(ul)
}
vl.udpListener = nil
@@ -110,14 +128,14 @@ func (vl *v4Listener) handleConnection(ul underlay) {
_, buff, err := ul.ReadBuffer()
if err != nil || buff == nil {
gLog.e("v4Listener read MsgTunnelHandshake error:%s", err)
return
}
ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
var tid uint64
if string(buff) == "OpenP2P,hello" { // old client
// save remoteIP as key
remoteAddr := ul.RemoteAddr().(*net.TCPAddr).IP
ipBytes := remoteAddr.To4()
tid = uint64(binary.BigEndian.Uint32(ipBytes)) // bytes not enough for uint64
tid = uint64(binary.BigEndian.Uint32(ipBytes))
gLog.d("hello %s", string(buff))
} else {
if len(buff) < 8 {
@@ -126,11 +144,12 @@ func (vl *v4Listener) handleConnection(ul underlay) {
tid = binary.LittleEndian.Uint64(buff[:8])
gLog.d("hello %d", tid)
}
// clear timeout connection
// clear timeout connections
vl.conns.Range(func(idx, i interface{}) bool {
ut := i.(*underlayTCP)
if ut.connectTime.Before(time.Now().Add(-UnderlayTCPConnectTimeout)) {
vl.conns.Delete(idx)
if ut, ok := i.(*underlayTCP); ok {
if ut.connectTime.Before(time.Now().Add(-UnderlayTCPConnectTimeout)) {
vl.conns.Delete(idx)
}
}
return true
})
@@ -145,7 +164,7 @@ func (vl *v4Listener) handleConnection(ul underlay) {
func (vl *v4Listener) getUnderlay(tid uint64) underlay {
for i := 0; i < 100; i++ {
select {
case <-time.After(time.Millisecond * 50):
case <-time.After(50 * time.Millisecond):
case <-vl.acceptCh:
}
if u, ok := vl.conns.LoadAndDelete(tid); ok {