This commit is contained in:
TenderIronh
2023-10-28 18:03:42 +08:00
parent 52dfe5c938
commit 7e57237ec9
18 changed files with 476 additions and 318 deletions

View File

@@ -1,45 +0,0 @@
package openp2p
import (
"sync"
"time"
)
// BandwidthLimiter ...
type BandwidthLimiter struct {
ts time.Time
bw int // mbps
freeBytes int // bytes
maxFreeBytes int // bytes
mtx sync.Mutex
}
// mbps
func newBandwidthLimiter(bw int) *BandwidthLimiter {
return &BandwidthLimiter{
bw: bw,
ts: time.Now(),
maxFreeBytes: bw * 1024 * 1024 / 8,
freeBytes: bw * 1024 * 1024 / 8,
}
}
// Add ...
func (bl *BandwidthLimiter) Add(bytes int) {
if bl.bw <= 0 {
return
}
bl.mtx.Lock()
defer bl.mtx.Unlock()
// calc free flow 1000*1000/1024/1024=0.954; 1024*1024/1000/1000=1.048
bl.freeBytes += int(time.Since(bl.ts) * time.Duration(bl.bw) / 8 / 954)
if bl.freeBytes > bl.maxFreeBytes {
bl.freeBytes = bl.maxFreeBytes
}
bl.freeBytes -= bytes
bl.ts = time.Now()
if bl.freeBytes < 0 {
// sleep for the overflow
time.Sleep(time.Millisecond * time.Duration(-bl.freeBytes/(bl.bw*1048/8)))
}
}

View File

@@ -13,10 +13,16 @@ var (
ErrorNewUser = errors.New("new user") ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct") ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters") ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrReadDB = errors.New("read db error")
ErrNoUpdate = errors.New("there are currently no updates available")
ErrPeerOffline = errors.New("peer offline") ErrPeerOffline = errors.New("peer offline")
ErrNetwork = errors.New("network error") ErrNetwork = errors.New("network error")
ErrMsgFormat = errors.New("message format wrong") ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible") ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected") ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
ErrConnectRelayNode = errors.New("connect relay node error") ErrConnectRelayNode = errors.New("connect relay node error")
ErrConnectPublicV4 = errors.New("connect public ipv4 error")
ErrMsgChannelNotFound = errors.New("message channel not found")
ErrRelayTunnelNotFound = errors.New("relay tunnel not found")
ErrSymmetricLimit = errors.New("symmetric limit")
) )

View File

@@ -21,7 +21,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
} }
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType { switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function case MsgPushConnectReq:
err = handleConnectReq(pn, subType, msg) err = handleConnectReq(pn, subType, msg)
case MsgPushRsp: case MsgPushRsp:
rsp := PushRsp{} rsp := PushRsp{}
@@ -86,7 +86,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
} }
gConf.setNode(req.NewName) gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth) gConf.setShareBandwidth(req.Bandwidth)
// TODO: hot reload
os.Exit(0) os.Exit(0)
case MsgPushSwitchApp: case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp") gLog.Println(LvINFO, "MsgPushSwitchApp")
@@ -112,10 +111,12 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Println(LvINFO, "retry peerNode ", req.Node) gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node) gConf.retryApp(req.Node)
default: default:
pn.msgMapMtx.Lock() i, ok := pn.msgMap.Load(pushHead.From)
ch := pn.msgMap[pushHead.From] if !ok {
pn.msgMapMtx.Unlock() return ErrMsgChannelNotFound
ch <- pushMsg{data: msg, ts: time.Now()} }
ch := i.(chan msgCtx)
ch <- msgCtx{data: msg, ts: time.Now()}
} }
return err return err
} }
@@ -145,9 +146,6 @@ func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gConf.add(newConf, false) gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
} }
func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) { func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {

View File

@@ -24,13 +24,8 @@ func handshakeC2C(t *P2PTunnel) (err error) {
} }
ra, head, _, _, err := UDPRead(conn, HandshakeTimeout) ra, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
time.Sleep(time.Millisecond * 200) gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read") return err
ra, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
}
} }
t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) t.ra, _ = net.ResolveUDPAddr("udp", ra.String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
@@ -57,6 +52,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
func handshakeC2S(t *P2PTunnel) error { func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start") gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end") defer gLog.Printf(LvDEBUG, "handshakeC2S end")
startTime := time.Now()
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532) randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la) conn, err := net.ListenUDP("udp", t.la)
@@ -68,7 +64,6 @@ func handshakeC2S(t *P2PTunnel) error {
go func() error { go func() error {
gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort)
for i := 0; i < SymmetricHandshakeNum; i++ { for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
// time.Sleep(SymmetricHandshakeInterval) // time.Sleep(SymmetricHandshakeInterval)
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2)) dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2))
if err != nil { if err != nil {
@@ -124,19 +119,19 @@ func handshakeC2S(t *P2PTunnel) error {
} else { } else {
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
} }
gLog.Printf(LvINFO, "handshakeC2S ok") gLog.Printf(LvINFO, "handshakeC2S ok. cost %d ms", time.Since(startTime)/time.Millisecond)
return nil return nil
} }
func handshakeS2C(t *P2PTunnel) error { func handshakeS2C(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeS2C start") gLog.Printf(LvDEBUG, "handshakeS2C start")
defer gLog.Printf(LvDEBUG, "handshakeS2C end") defer gLog.Printf(LvDEBUG, "handshakeS2C end")
startTime := time.Now()
gotCh := make(chan *net.UDPAddr, 5) gotCh := make(chan *net.UDPAddr, 5)
// sequencely udp send handshake, do not parallel send // sequencely udp send handshake, do not parallel send
gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort)
gotIt := false gotIt := false
for i := 0; i < SymmetricHandshakeNum; i++ { for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
// time.Sleep(SymmetricHandshakeInterval) // time.Sleep(SymmetricHandshakeInterval)
go func(t *P2PTunnel) error { go func(t *P2PTunnel) error {
conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random? conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random?
@@ -197,7 +192,7 @@ func handshakeS2C(t *P2PTunnel) error {
case la := <-gotCh: case la := <-gotCh:
t.la = la t.la = la
gLog.Println(LvDEBUG, "symmetric handshake ok", la) gLog.Println(LvDEBUG, "symmetric handshake ok", la)
gLog.Printf(LvINFO, "handshakeS2C ok") gLog.Printf(LvINFO, "handshakeS2C ok. cost %dms", time.Since(startTime)/time.Millisecond)
} }
return nil return nil
} }

View File

@@ -76,6 +76,13 @@ func (iptree *IPTree) Contains(ipStr string) bool {
return iptree.ContainsInt(ip) return iptree.ContainsInt(ip)
} }
func IsLocalhost(ipStr string) bool {
if ipStr == "localhost" || ipStr == "127.0.0.1" || ipStr == "::1" {
return true
}
return false
}
func (iptree *IPTree) ContainsInt(ip uint32) bool { func (iptree *IPTree) ContainsInt(ip uint32) bool {
iptree.treeMtx.RLock() iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock() defer iptree.treeMtx.RUnlock()

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"net" "net"
"testing" "testing"
"time"
) )
func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) { func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) {
@@ -128,6 +129,7 @@ func BenchmarkBuildipTree20k(t *testing.B) {
t.Logf("clear. ipTree size:%d\n", iptree.Size()) t.Logf("clear. ipTree size:%d\n", iptree.Size())
} }
func BenchmarkQuery(t *testing.B) { func BenchmarkQuery(t *testing.B) {
ts := time.Now()
iptree := NewIPTree("") iptree := NewIPTree("")
iptree.Clear() iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100") iptree.Add("10.1.5.50", "10.1.5.100")
@@ -145,7 +147,7 @@ func BenchmarkQuery(t *testing.B) {
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP) binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single // insert 10k block ip single
nodeNum := uint32(10000 * 100) nodeNum := uint32(10000 * 1000)
gap := uint32(10) gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap { for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i) iptree.AddIntIP(i, i)
@@ -156,8 +158,9 @@ func BenchmarkQuery(t *testing.B) {
for i := minIP; i < minIP+nodeNum*gap; i += gap { for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5) iptree.AddIntIP(i, i+5)
} }
t.Logf("ipTree size:%d\n", iptree.Size()) t.Logf("ipTree size:%d cost:%dms\n", iptree.Size(), time.Since(ts)/time.Millisecond)
t.ResetTimer() ts = time.Now()
// t.ResetTimer()
queryNum := 100 * 10000 queryNum := 100 * 10000
for i := 0; i < queryNum; i++ { for i := 0; i < queryNum; i++ {
iptree.ContainsInt(minIP + uint32(i)) iptree.ContainsInt(minIP + uint32(i))
@@ -166,6 +169,6 @@ func BenchmarkQuery(t *testing.B) {
wrapBenchmarkContains(t, iptree, "10.1.5.200", false) wrapBenchmarkContains(t, iptree, "10.1.5.200", false)
wrapBenchmarkContains(t, iptree, "200.1.1.1", false) wrapBenchmarkContains(t, iptree, "200.1.1.1", false)
} }
t.Logf("query list:%d\n", queryNum*4) t.Logf("query num:%d cost:%dms\n", queryNum*4, time.Since(ts)/time.Millisecond)
} }

View File

@@ -67,13 +67,13 @@ func (oConn *overlayConn) run() {
writeBytes := append(tunnelHead.Bytes(), payload...) writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 { if oConn.rtid == 0 {
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
gLog.Printf(LvDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) gLog.Printf(LvDEBUG, "write overlay data to tid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.id, len(writeBytes))
} else { } else {
// write raley data // write raley data
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
all = append(all, writeBytes...) all = append(all, writeBytes...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
gLog.Printf(LvDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) gLog.Printf(LvDEBUG, "write relay data to tid:%d,rtid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.rtid, oConn.id, len(writeBytes))
} }
} }
if oConn.connTCP != nil { if oConn.connTCP != nil {

View File

@@ -52,7 +52,11 @@ func (app *p2pApp) listenTCP() error {
gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort) gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort) defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort)
var err error var err error
app.listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) // support tcp4 and tcp6 listenAddr := ""
if IsLocalhost(app.config.Whitelist) { // not expose port
listenAddr = "127.0.0.1"
}
app.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", listenAddr, app.config.SrcPort))
if err != nil { if err != nil {
gLog.Printf(LvERROR, "listen error:%s", err) gLog.Printf(LvERROR, "listen error:%s", err)
return err return err
@@ -67,8 +71,8 @@ func (app *p2pApp) listenTCP() error {
} }
// check white list // check white list
if app.config.Whitelist != "" { if app.config.Whitelist != "" {
remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0] remoteIP := conn.RemoteAddr().(*net.TCPAddr).IP.String()
if !app.iptree.Contains(remoteIP) { if !app.iptree.Contains(remoteIP) && !IsLocalhost(remoteIP) {
conn.Close() conn.Close()
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP) gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
continue continue
@@ -252,8 +256,8 @@ func (app *p2pApp) close() {
func (app *p2pApp) relayHeartbeatLoop() { func (app *p2pApp) relayHeartbeatLoop() {
app.wg.Add(1) app.wg.Add(1)
defer app.wg.Done() defer app.wg.Done()
gLog.Printf(LvDEBUG, "relayHeartbeat to %d start", app.rtid) gLog.Printf(LvDEBUG, "relayHeartbeat to rtid:%d start", app.rtid)
defer gLog.Printf(LvDEBUG, "relayHeartbeat to %d end", app.rtid) defer gLog.Printf(LvDEBUG, "relayHeartbeat to rtid%d end", app.rtid)
relayHead := new(bytes.Buffer) relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, app.rtid) binary.Write(relayHead, binary.LittleEndian, app.rtid)
req := RelayHeartbeat{RelayTunnelID: app.tunnel.id, req := RelayHeartbeat{RelayTunnelID: app.tunnel.id,
@@ -261,7 +265,12 @@ func (app *p2pApp) relayHeartbeatLoop() {
msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req) msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req)
msgWithHead := append(relayHead.Bytes(), msg...) msgWithHead := append(relayHead.Bytes(), msg...)
for app.tunnel.isRuning() && app.running { for app.tunnel.isRuning() && app.running {
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) err := app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
if err != nil {
gLog.Printf(LvERROR, "%d app write relay tunnel heartbeat error %s", app.rtid, err)
return
}
gLog.Printf(LvDEBUG, "%d app write relay tunnel heartbeat ok", app.rtid)
time.Sleep(TunnelHeartbeatTime) time.Sleep(TunnelHeartbeatTime)
} }
} }

View File

@@ -20,8 +20,10 @@ import (
) )
var ( var (
instance *P2PNetwork v4l *v4Listener
once sync.Once instance *P2PNetwork
once sync.Once
onceV4Listener sync.Once
) )
const ( const (
@@ -32,7 +34,7 @@ const (
// golang not support float64 const // golang not support float64 const
var ( var (
ma10 float64 = 1.0 / 10 ma10 float64 = 1.0 / 10
ma20 float64 = 1.0 / 20 ma5 float64 = 1.0 / 5
) )
type P2PNetwork struct { type P2PNetwork struct {
@@ -44,26 +46,23 @@ type P2PNetwork struct {
writeMtx sync.Mutex writeMtx sync.Mutex
hbTime time.Time hbTime time.Time
// for sync server time // for sync server time
t1 int64 // nanoSeconds t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds dt int64 // client faster then server dt nanoSeconds
ddtma int64 ddtma int64
ddt int64 // differential of dt ddt int64 // differential of dt
// msgMap sync.Map msgMap sync.Map //key: nodeID
msgMap map[uint64]chan pushMsg //key: nodeID // msgMap map[uint64]chan pushMsg //key: nodeID
msgMapMtx sync.Mutex
config NetworkConfig config NetworkConfig
allTunnels sync.Map allTunnels sync.Map
apps sync.Map //key: protocol+srcport; value: p2pApp apps sync.Map //key: protocol+srcport; value: p2pApp
limiter *BandwidthLimiter limiter *SpeedLimiter
} }
type pushMsg struct { type msgCtx struct {
data []byte data []byte
ts time.Time ts time.Time
} }
const msgExpiredTime = time.Minute
func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
if instance == nil { if instance == nil {
once.Do(func() { once.Do(func() {
@@ -71,12 +70,11 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
restartCh: make(chan bool, 2), restartCh: make(chan bool, 2),
online: false, online: false,
running: true, running: true,
msgMap: make(map[uint64]chan pushMsg), limiter: newSpeedLimiter(config.ShareBandwidth*1024*1024/8, 1),
limiter: newBandwidthLimiter(config.ShareBandwidth),
dt: 0, dt: 0,
ddt: 0, ddt: 0,
} }
instance.msgMap[0] = make(chan pushMsg) // for gateway instance.msgMap.Store(uint64(0), make(chan msgCtx)) // for gateway
if config != nil { if config != nil {
instance.config = *config instance.config = *config
} }
@@ -183,7 +181,6 @@ func (pn *P2PNetwork) autorunApp() {
func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, string, error) { func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, string, error) {
gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode) gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode)
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode) defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode)
// request a relay node or specify manually(TODO)
relayConfig := config relayConfig := config
relayMode := "private" relayMode := "private"
if config.RelayNode == "" { if config.RelayNode == "" {
@@ -265,8 +262,6 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
peerNatType = t.config.peerNatType peerNatType = t.config.peerNatType
peerIP = t.config.peerIP peerIP = t.config.peerIP
} }
// TODO: if tcp failed, should try udp punching, nattype should refactor also, when NATNONE and failed we don't know the peerNatType
if err != nil && err == ErrorHandshake { if err != nil && err == ErrorHandshake {
gLog.Println(LvERROR, "direct connect failed, try to relay") gLog.Println(LvERROR, "direct connect failed, try to relay")
t, rtid, relayMode, err = pn.addRelayTunnel(config) t, rtid, relayMode, err = pn.addRelayTunnel(config)
@@ -303,7 +298,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
AppID: appID, AppID: appID,
AppKey: appKey, AppKey: appKey,
} }
gLog.Printf(LvINFO, "sync appkey to %s", config.PeerNode) gLog.Printf(LvDEBUG, "sync appkey to %s", config.PeerNode)
pn.push(config.PeerNode, MsgPushAPPKey, &req) pn.push(config.PeerNode, MsgPushAPPKey, &req)
} }
app := p2pApp{ app := p2pApp{
@@ -317,7 +312,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
relayMode: relayMode, relayMode: relayMode,
hbTime: time.Now()} hbTime: time.Now()}
pn.apps.Store(config.ID(), &app) pn.apps.Store(config.ID(), &app)
gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id) gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, app.tunnel.id)
if err == nil { if err == nil {
go app.listen() go app.listen()
} }
@@ -359,18 +354,18 @@ func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) {
} }
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) { func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid)
isClient := false isClient := false
// client side tid=0, assign random uint64 // client side tid=0, assign random uint64
if tid == 0 { if tid == 0 {
tid = rand.Uint64() tid = rand.Uint64()
isClient = true isClient = true
} }
if _, ok := pn.msgMap.Load(nodeNameToID(config.PeerNode)); !ok {
pn.msgMap.Store(nodeNameToID(config.PeerNode), make(chan msgCtx, 50))
}
pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan pushMsg, 50)
pn.msgMapMtx.Unlock()
// server side // server side
if !isClient { if !isClient {
t, err = pn.newTunnel(config, tid, isClient) t, err = pn.newTunnel(config, tid, isClient)
@@ -407,13 +402,15 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne
} }
if t, err = pn.newTunnel(config, tid, isClient); err == nil { if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil return t, nil
} else if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 { // peer has ipv4 no punching
return nil, ErrConnectPublicV4
} }
} }
// TODO: try UDP4 // TODO: try UDP4
// try TCPPunch // try TCPPunch
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries for i := 0; i < Cone2ConeTCPPunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s if config.peerNatType == NATCone && pn.config.natType == NATCone {
gLog.Println(LvINFO, "try TCP4 Punch") gLog.Println(LvINFO, "try TCP4 Punch")
config.linkMode = LinkModeTCPPunch config.linkMode = LinkModeTCPPunch
config.isUnderlayServer = 0 config.isUnderlayServer = 0
@@ -425,7 +422,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne
} }
// try UDPPunch // try UDPPunch
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries for i := 0; i < Cone2ConeUDPPunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone || pn.config.natType == NATCone { if config.peerNatType == NATCone || pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch") gLog.Println(LvINFO, "try UDP4 Punch")
config.linkMode = LinkModeUDPPunch config.linkMode = LinkModeUDPPunch
@@ -470,7 +467,7 @@ func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t
return return
} }
func (pn *P2PNetwork) init() error { func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start") gLog.Println(LvINFO, "P2PNetwork start")
pn.wgReconnect.Add(1) pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done() defer pn.wgReconnect.Done()
var err error var err error
@@ -495,7 +492,13 @@ func (pn *P2PNetwork) init() error {
gLog.Println(LvDEBUG, "detect NAT type error:", err) gLog.Println(LvDEBUG, "detect NAT type error:", err)
break break
} }
gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 {
onceV4Listener.Do(func() {
v4l = &v4Listener{port: gConf.Network.TCPPort}
go v4l.start()
})
}
gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/api/v1/login" uri := "/api/v1/login"
caCertPool, err := x509.SystemCertPool() caCertPool, err := x509.SystemCertPool()
@@ -510,6 +513,7 @@ func (pn *P2PNetwork) init() error {
RootCAs: caCertPool, RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config websocket.DefaultDialer.TLSClientConfig = &config
websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri} u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query() q := u.Query()
q.Add("node", pn.config.Node) q.Add("node", pn.config.Node)
@@ -521,6 +525,7 @@ func (pn *P2PNetwork) init() error {
var ws *websocket.Conn var ws *websocket.Conn
ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil) ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil { if err != nil {
gLog.Println(LvERROR, "Dial error:", err)
break break
} }
pn.online = true pn.online = true
@@ -606,28 +611,27 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
dt := pn.t1 + rtt/2 - t2 dt := pn.t1 + rtt/2 - t2
if pn.dt != 0 { if pn.dt != 0 {
ddt := dt - pn.dt ddt := dt - pn.dt
if pn.ddtma > 0 && (ddt > (pn.ddtma+pn.ddtma/3) || ddt < (pn.ddtma-pn.ddtma/3)) {
newdt := pn.dt + pn.ddtma
gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond))
dt = newdt
}
pn.ddt = ddt pn.ddt = ddt
if pn.ddtma == 0 { if pn.ddtma == 0 {
pn.ddtma = pn.ddt pn.ddtma = pn.ddt
} else { } else {
pn.ddtma = int64(float64(pn.ddtma)*(1-ma10) + float64(pn.ddt)*ma10) // avoid int64 overflow pn.ddtma = int64(float64(pn.ddtma)*(1-ma10) + float64(pn.ddt)*ma10) // avoid int64 overflow
newdt := pn.dt + pn.ddtma
// gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond))
dt = newdt
} }
} }
pn.dt = dt pn.dt = dt
gLog.Printf(LvDEBUG, "synctime dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond))
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond))
case MsgPush: case MsgPush:
handlePush(pn, head.SubType, msg) handlePush(pn, head.SubType, msg)
default: default:
pn.msgMapMtx.Lock() i, ok := pn.msgMap.Load(uint64(0))
ch := pn.msgMap[0] if ok {
pn.msgMapMtx.Unlock() ch := i.(chan msgCtx)
ch <- pushMsg{data: msg, ts: time.Now()} ch <- msgCtx{data: msg, ts: time.Now()}
}
return return
} }
} }
@@ -668,17 +672,20 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{})
} }
func (pn *P2PNetwork) relay(to uint64, body []byte) error { func (pn *P2PNetwork) relay(to uint64, body []byte) error {
gLog.Printf(LvDEBUG, "relay data to %d", to)
i, ok := pn.allTunnels.Load(to) i, ok := pn.allTunnels.Load(to)
if !ok { if !ok {
return nil gLog.Printf(LvERROR, "relay to %d len=%d error:%s", to, len(body), ErrRelayTunnelNotFound)
return ErrRelayTunnelNotFound
} }
tunnel := i.(*P2PTunnel) tunnel := i.(*P2PTunnel)
if tunnel.config.shareBandwidth > 0 { if tunnel.config.shareBandwidth > 0 {
pn.limiter.Add(len(body)) pn.limiter.Add(len(body), true)
} }
tunnel.conn.WriteBuffer(body) var err error
return nil if err = tunnel.conn.WriteBuffer(body); err != nil {
gLog.Printf(LvERROR, "relay to %d len=%d error:%s", to, len(body), err)
}
return err
} }
func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error { func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error {
@@ -717,28 +724,31 @@ func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout
} else { } else {
nodeID = nodeNameToID(node) nodeID = nodeNameToID(node)
} }
pn.msgMapMtx.Lock() i, ok := pn.msgMap.Load(nodeID)
ch := pn.msgMap[nodeID] if !ok {
pn.msgMapMtx.Unlock() return
}
ch := i.(chan msgCtx)
for { for {
select { select {
case <-time.After(timeout): case <-time.After(timeout):
gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType) gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType)
return return
case msg := <-ch: case msg := <-ch:
if msg.ts.Before(time.Now().Add(-msgExpiredTime)) {
gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType)
continue
}
head = &openP2PHeader{} head = &openP2PHeader{}
err := binary.Read(bytes.NewReader(msg.data[:openP2PHeaderSize]), binary.LittleEndian, head) err := binary.Read(bytes.NewReader(msg.data[:openP2PHeaderSize]), binary.LittleEndian, head)
if err != nil { if err != nil {
gLog.Println(LvERROR, "read msg error:", err) gLog.Println(LvERROR, "read msg error:", err)
break break
} }
if time.Since(msg.ts) > ReadMsgTimeout {
gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType)
continue
}
if head.MainType != mainType || head.SubType != subType { if head.MainType != mainType || head.SubType != subType {
gLog.Printf(LvDEBUG, "read msg type error %d:%d, requeue it", head.MainType, head.SubType) gLog.Printf(LvDEBUG, "read msg type error %d:%d, requeue it", head.MainType, head.SubType)
ch <- msg ch <- msg
time.Sleep(time.Second)
continue continue
} }
if mainType == MsgPush { if mainType == MsgPush {

View File

@@ -18,7 +18,6 @@ type P2PTunnel struct {
conn underlay conn underlay
hbTime time.Time hbTime time.Time
hbMtx sync.Mutex hbMtx sync.Mutex
hbTimeRelay time.Time
config AppConfig config AppConfig
la *net.UDPAddr // local hole address la *net.UDPAddr // local hole address
ra *net.UDPAddr // remote hole address ra *net.UDPAddr // remote hole address
@@ -35,11 +34,7 @@ type P2PTunnel struct {
func (t *P2PTunnel) initPort() { func (t *P2PTunnel) initPort() {
t.running = true t.running = true
t.hbMtx.Lock() localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param
t.hbTime = time.Now()
t.hbMtx.Unlock()
t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time
localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param
if t.config.linkMode == LinkModeTCP6 || t.config.linkMode == LinkModeTCP4 { if t.config.linkMode == LinkModeTCP6 || t.config.linkMode == LinkModeTCP4 {
t.coneLocalPort = t.pn.config.TCPPort t.coneLocalPort = t.pn.config.TCPPort
t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort
@@ -83,7 +78,7 @@ func (t *P2PTunnel) connect() error {
req.Token = t.pn.config.Token req.Token = t.pn.config.Token
} }
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, HandshakeTimeout*3) head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, UnderlayConnectTimeout*3)
if head == nil { if head == nil {
return errors.New("connect error") return errors.New("connect error")
} }
@@ -184,13 +179,12 @@ func (t *P2PTunnel) handshake() error {
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else { } else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts) time.Sleep(ts)
} }
gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode) gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode)
var err error var err error
// TODO: handle NATNone, nodes with public ip has no punching
if t.pn.config.natType == NATCone && t.config.peerNatType == NATCone { if t.pn.config.natType == NATCone && t.config.peerNatType == NATCone {
err = handshakeC2C(t) err = handshakeC2C(t)
} else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATSymmetric { } else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATSymmetric {
@@ -216,7 +210,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) {
case LinkModeTCP6: case LinkModeTCP6:
t.conn, err = t.connectUnderlayTCP6() t.conn, err = t.connectUnderlayTCP6()
case LinkModeTCP4: case LinkModeTCP4:
t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell t.conn, err = t.connectUnderlayTCP()
case LinkModeTCPPunch: case LinkModeTCPPunch:
t.conn, err = t.connectUnderlayTCP() t.conn, err = t.connectUnderlayTCP()
case LinkModeUDPPunch: case LinkModeUDPPunch:
@@ -238,30 +232,30 @@ func (t *P2PTunnel) connectUnderlay() (err error) {
func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
gLog.Println(LvINFO, "connectUnderlayQuic start") gLog.Println(LvINFO, "connectUnderlayQuic start")
defer gLog.Println(LvINFO, "connectUnderlayQuic end") defer gLog.Println(LvINFO, "connectUnderlayQuic end")
var qConn *underlayQUIC var ul *underlayQUIC
if t.config.isUnderlayServer == 1 { if t.config.isUnderlayServer == 1 {
time.Sleep(time.Millisecond * 10) // punching udp port will need some times in some env time.Sleep(time.Millisecond * 10) // punching udp port will need some times in some env
qConn, err = listenQuic(t.la.String(), TunnelIdleTimeout) ul, err = listenQuic(t.la.String(), TunnelIdleTimeout)
if err != nil { if err != nil {
gLog.Println(LvINFO, "listen quic error:", err, ", retry...") gLog.Println(LvINFO, "listen quic error:", err, ", retry...")
} }
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
err = qConn.Accept() err = ul.Accept()
if err != nil { if err != nil {
qConn.CloseListener() ul.CloseListener()
return nil, fmt.Errorf("accept quic error:%s", err) return nil, fmt.Errorf("accept quic error:%s", err)
} }
_, buff, err := qConn.ReadBuffer() _, buff, err := ul.ReadBuffer()
if err != nil { if err != nil {
qConn.listener.Close() ul.listener.Close()
return nil, fmt.Errorf("read start msg error:%s", err) return nil, fmt.Errorf("read start msg error:%s", err)
} }
if buff != nil { if buff != nil {
gLog.Println(LvDEBUG, string(buff)) gLog.Println(LvDEBUG, string(buff))
} }
qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2"))
gLog.Println(LvDEBUG, "quic connection ok") gLog.Println(LvDEBUG, "quic connection ok")
return qConn, nil return ul, nil
} }
//else //else
@@ -273,17 +267,17 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
return nil, fmt.Errorf("quic listen error:%s", e) return nil, fmt.Errorf("quic listen error:%s", e)
} }
} }
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout)
gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) gLog.Println(LvDEBUG, "quic dial to ", t.ra.String())
qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) ul, e = dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil { if e != nil {
return nil, fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e) return nil, fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e)
} }
handshakeBegin := time.Now() handshakeBegin := time.Now()
qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) ul.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello"))
_, buff, err := qConn.ReadBuffer() _, buff, err := ul.ReadBuffer()
if e != nil { if e != nil {
qConn.listener.Close() ul.listener.Close()
return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err)
} }
if buff != nil { if buff != nil {
@@ -293,101 +287,92 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin))
gLog.Println(LvDEBUG, "quic connection ok") gLog.Println(LvDEBUG, "quic connection ok")
t.linkModeWeb = LinkModeUDPPunch t.linkModeWeb = LinkModeUDPPunch
return qConn, nil return ul, nil
} }
// websocket // websocket
func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
gLog.Println(LvINFO, "connectUnderlayTCP start") gLog.Println(LvDEBUG, "connectUnderlayTCP start")
defer gLog.Println(LvINFO, "connectUnderlayTCP end") defer gLog.Println(LvDEBUG, "connectUnderlayTCP end")
var qConn *underlayTCP var ul *underlayTCP
if t.config.isUnderlayServer == 1 { if t.config.isUnderlayServer == 1 {
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t) ul, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen TCP error:%s", err) return nil, fmt.Errorf("listen TCP error:%s", err)
} }
_, buff, err := qConn.ReadBuffer()
if err != nil {
return nil, fmt.Errorf("read start msg error:%s", err)
}
if buff != nil {
gLog.Println(LvDEBUG, string(buff))
}
qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2"))
gLog.Println(LvINFO, "TCP connection ok") gLog.Println(LvINFO, "TCP connection ok")
return qConn, nil return ul, nil
} }
// client side // client side
if t.config.linkMode == LinkModeTCP4 { if t.config.linkMode == LinkModeTCP4 {
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout)
} else { //tcp punch should sleep for punch the same time } else { //tcp punch should sleep for punch the same time
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else { } else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts) time.Sleep(ts)
} }
} }
ul, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort))
qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
if err != nil { if err != nil {
return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err) return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err)
} }
handshakeBegin := time.Now() handshakeBegin := time.Now()
qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) tidBuff := new(bytes.Buffer)
_, buff, err := qConn.ReadBuffer() binary.Write(tidBuff, binary.LittleEndian, t.id)
ul.WriteBytes(MsgP2P, MsgTunnelHandshake, tidBuff.Bytes()) // tunnelID
_, buff, err := ul.ReadBuffer()
if err != nil { if err != nil {
return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err)
} }
if buff != nil { if buff != nil {
gLog.Println(LvDEBUG, string(buff)) gLog.Println(LvDEBUG, "hello ", string(buff))
} }
gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin))
gLog.Println(LvINFO, "TCP connection ok") gLog.Println(LvINFO, "TCP connection ok")
t.linkModeWeb = LinkModeIPv4 t.linkModeWeb = LinkModeIPv4
return qConn, nil return ul, nil
} }
func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
gLog.Println(LvINFO, "connectUnderlayTCP6 start") gLog.Println(LvINFO, "connectUnderlayTCP6 start")
defer gLog.Println(LvINFO, "connectUnderlayTCP6 end") defer gLog.Println(LvINFO, "connectUnderlayTCP6 end")
var qConn *underlayTCP6 var ul *underlayTCP6
if t.config.isUnderlayServer == 1 { if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout) ul, err = listenTCP6(t.coneNatPort, UnderlayConnectTimeout)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen TCP6 error:%s", err) return nil, fmt.Errorf("listen TCP6 error:%s", err)
} }
_, buff, err := qConn.ReadBuffer() _, buff, err := ul.ReadBuffer()
if err != nil { if err != nil {
qConn.listener.Close() ul.listener.Close()
return nil, fmt.Errorf("read start msg error:%s", err) return nil, fmt.Errorf("read start msg error:%s", err)
} }
if buff != nil { if buff != nil {
gLog.Println(LvDEBUG, string(buff)) gLog.Println(LvDEBUG, string(buff))
} }
qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2"))
gLog.Println(LvDEBUG, "TCP6 connection ok") gLog.Println(LvDEBUG, "TCP6 connection ok")
return qConn, nil return ul, nil
} }
//else //else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout)
gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6)
qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) ul, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort)
if err != nil { if err != nil {
return nil, fmt.Errorf("TCP6 dial to %s:%d error:%s", t.config.peerIPv6, t.config.peerConeNatPort, err) return nil, fmt.Errorf("TCP6 dial to %s:%d error:%s", t.config.peerIPv6, t.config.peerConeNatPort, err)
} }
handshakeBegin := time.Now() handshakeBegin := time.Now()
qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) ul.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello"))
_, buff, err := qConn.ReadBuffer() _, buff, err := ul.ReadBuffer()
if err != nil { if err != nil {
qConn.listener.Close() ul.listener.Close()
return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err)
} }
if buff != nil { if buff != nil {
@@ -397,7 +382,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin))
gLog.Println(LvDEBUG, "TCP6 connection ok") gLog.Println(LvDEBUG, "TCP6 connection ok")
t.linkModeWeb = LinkModeIPv6 t.linkModeWeb = LinkModeIPv6
return qConn, nil return ul, nil
} }
func (t *P2PTunnel) readLoop() { func (t *P2PTunnel) readLoop() {
@@ -413,6 +398,7 @@ func (t *P2PTunnel) readLoop() {
break break
} }
if head.MainType != MsgP2P { if head.MainType != MsgP2P {
gLog.Printf(LvWARN, "%d head.MainType != MsgP2P", t.id)
continue continue
} }
switch head.SubType { switch head.SubType {
@@ -427,6 +413,7 @@ func (t *P2PTunnel) readLoop() {
gLog.Printf(LvDEBUG, "%d read tunnel heartbeat ack", t.id) gLog.Printf(LvDEBUG, "%d read tunnel heartbeat ack", t.id)
case MsgOverlayData: case MsgOverlayData:
if len(body) < overlayHeaderSize { if len(body) < overlayHeaderSize {
gLog.Printf(LvWARN, "%d len(body) < overlayHeaderSize", t.id)
continue continue
} }
overlayID := binary.LittleEndian.Uint64(body[:8]) overlayID := binary.LittleEndian.Uint64(body[:8])
@@ -451,19 +438,19 @@ func (t *P2PTunnel) readLoop() {
gLog.Println(LvERROR, "overlay write error:", err) gLog.Println(LvERROR, "overlay write error:", err)
} }
case MsgRelayData: case MsgRelayData:
gLog.Printf(LvDEBUG, "got relay data datalen=%d", head.DataLen)
if len(body) < 8 { if len(body) < 8 {
continue continue
} }
tunnelID := binary.LittleEndian.Uint64(body[:8]) tunnelID := binary.LittleEndian.Uint64(body[:8])
t.pn.relay(tunnelID, body[8:]) gLog.Printf(LvDEBUG, "relay data to %d, len=%d", tunnelID, head.DataLen-RelayHeaderSize)
t.pn.relay(tunnelID, body[RelayHeaderSize:])
case MsgRelayHeartbeat: case MsgRelayHeartbeat:
req := RelayHeartbeat{} req := RelayHeartbeat{}
if err := json.Unmarshal(body, &req); err != nil { if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue continue
} }
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) gLog.Printf(LvDEBUG, "read MsgRelayHeartbeat from rtid:%d,appid:%d", req.RelayTunnelID, req.AppID)
relayHead := new(bytes.Buffer) relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID) binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID)
msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req) msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req)
@@ -476,7 +463,7 @@ func (t *P2PTunnel) readLoop() {
gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err) gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err)
continue continue
} }
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID) gLog.Printf(LvDEBUG, "read MsgRelayHeartbeatAck to appid:%d", req.AppID)
t.pn.updateAppHeartbeat(req.AppID) t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq: case MsgOverlayConnectReq:
req := OverlayConnectReq{} req := OverlayConnectReq{}
@@ -504,7 +491,7 @@ func (t *P2PTunnel) readLoop() {
if req.Protocol == "udp" { if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort}) oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
} else { } else {
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout) oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), ReadMsgTimeout)
} }
if err != nil { if err != nil {
@@ -544,6 +531,9 @@ func (t *P2PTunnel) readLoop() {
} }
func (t *P2PTunnel) heartbeatLoop() { func (t *P2PTunnel) heartbeatLoop() {
t.hbMtx.Lock()
t.hbTime = time.Now() // init
t.hbMtx.Unlock()
tc := time.NewTicker(TunnelHeartbeatTime) tc := time.NewTicker(TunnelHeartbeatTime)
defer tc.Stop() defer tc.Stop()
gLog.Printf(LvDEBUG, "%d tunnel heartbeatLoop start", t.id) gLog.Printf(LvDEBUG, "%d tunnel heartbeatLoop start", t.id)

View File

@@ -10,11 +10,12 @@ import (
"time" "time"
) )
const OpenP2PVersion = "3.10.9" const OpenP2PVersion = "3.12.0"
const ProductName string = "openp2p" const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0" const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0" const SyncServerTimeVersion = "3.9.0"
const SymmetricSimultaneouslySendVersion = "3.10.7" const SymmetricSimultaneouslySendVersion = "3.10.7"
const PublicIPVersion = "3.11.2"
const ( const (
IfconfigPort1 = 27180 IfconfigPort1 = 27180
@@ -39,6 +40,8 @@ type PushHeader struct {
var PushHeaderSize = binary.Size(PushHeader{}) var PushHeaderSize = binary.Size(PushHeader{})
const RelayHeaderSize = 8
type overlayHeader struct { type overlayHeader struct {
id uint64 id uint64
} }
@@ -134,27 +137,29 @@ const (
) )
const ( const (
ReadBuffLen = 4096 // for UDP maybe not enough ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow NetworkHeartbeatTime = time.Second * 30
TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s
TunnelIdleTimeout = time.Minute TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379 SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510 // SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond SymmetricHandshakeInterval = time.Millisecond
HandshakeTimeout = time.Second * 10 HandshakeTimeout = time.Second * 7
PeerAddRelayTimeount = time.Second * 30 // peer need times PunchTsDelay = time.Second * 3
PeerAddRelayTimeount = time.Second * 30 // peer need times. S2C\TCP\TCP Punch\UDP Punch
CheckActiveTimeout = time.Second * 5 CheckActiveTimeout = time.Second * 5
ReadMsgTimeout = time.Second * 5
PaddingSize = 16 PaddingSize = 16
AESKeySize = 16 AESKeySize = 16
MaxRetry = 10 MaxRetry = 10
Cone2ConePunchMaxRetry = 1 Cone2ConeTCPPunchMaxRetry = 1
Cone2ConeUDPPunchMaxRetry = 1
PublicIPEchoTimeout = time.Second * 1 PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5 NatTestTimeout = time.Second * 5
UDPReadTimeout = time.Second * 5 UDPReadTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10 ClientAPITimeout = time.Second * 10
UnderlayConnectTimeout = time.Second * 10 UnderlayConnectTimeout = time.Second * 10
MaxDirectTry = 3 MaxDirectTry = 3
PunchTsDelay = time.Second * 3
) )
// NATNone has public ip // NATNone has public ip

51
core/speedlimiter.go Normal file
View File

@@ -0,0 +1,51 @@
package openp2p
import (
"fmt"
"sync"
"time"
)
// SpeedLimiter ...
type SpeedLimiter struct {
lastUpdate time.Time
speed int // per second
precision int // seconds
freeCap int
maxFreeCap int
mtx sync.Mutex
}
func newSpeedLimiter(speed int, precision int) *SpeedLimiter {
return &SpeedLimiter{
speed: speed,
precision: precision,
lastUpdate: time.Now(),
maxFreeCap: speed * precision,
freeCap: speed * precision,
}
}
// Add ...
func (sl *SpeedLimiter) Add(increment int, wait bool) bool {
if sl.speed <= 0 {
return true
}
sl.mtx.Lock()
defer sl.mtx.Unlock()
sl.freeCap += int(time.Since(sl.lastUpdate) * time.Duration(sl.speed) / time.Second)
if sl.freeCap > sl.maxFreeCap {
sl.freeCap = sl.maxFreeCap
}
if !wait && sl.freeCap < increment {
return false
}
sl.freeCap -= increment
sl.lastUpdate = time.Now()
if sl.freeCap < 0 {
// sleep for the overflow
fmt.Println("sleep ", time.Millisecond*time.Duration(-sl.freeCap*100)/time.Duration(sl.speed))
time.Sleep(time.Millisecond * time.Duration(-sl.freeCap*1000) / time.Duration(sl.speed)) // sleep ms
}
return true
}

58
core/speedlimiter_test.go Normal file
View File

@@ -0,0 +1,58 @@
package openp2p
import (
"testing"
"time"
)
func TestBandwidth(t *testing.T) {
speed := 10 * 1024 * 1024 / 8 // 10mbps
speedl := newSpeedLimiter(speed, 1)
oneBuffSize := 4096
writeNum := 5000
expectTime := oneBuffSize * writeNum / speed
startTs := time.Now()
for i := 0; i < writeNum; i++ {
speedl.Add(oneBuffSize, true)
}
t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime)
if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second {
t.Error("error")
}
}
func TestSymmetric(t *testing.T) {
speed := 20000 / 180
speedl := newSpeedLimiter(speed, 180)
oneBuffSize := 300
writeNum := 100
expectTime := (oneBuffSize*writeNum - 20000) / speed
startTs := time.Now()
for i := 0; i < writeNum; i++ {
speedl.Add(oneBuffSize, true)
}
t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime)
if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second {
t.Error("error")
}
}
func TestSymmetric2(t *testing.T) {
speed := 30000 / 180
speedl := newSpeedLimiter(speed, 180)
oneBuffSize := 800
writeNum := 50
expectTime := (oneBuffSize*writeNum - 30000) / speed
startTs := time.Now()
for i := 0; i < writeNum; {
if speedl.Add(oneBuffSize, true) {
i++
} else {
time.Sleep(time.Millisecond)
}
}
t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime)
if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second {
t.Error("error")
}
}

View File

@@ -1,16 +1,62 @@
package openp2p package openp2p
import ( import (
"io"
"time" "time"
) )
type underlay interface { type underlay interface {
Read([]byte) (int, error)
Write([]byte) (int, error)
ReadBuffer() (*openP2PHeader, []byte, error) ReadBuffer() (*openP2PHeader, []byte, error)
WriteBytes(uint16, uint16, []byte) error WriteBytes(uint16, uint16, []byte) error
WriteBuffer([]byte) error WriteBuffer([]byte) error
WriteMessage(uint16, uint16, interface{}) error WriteMessage(uint16, uint16, interface{}) error
Close() error Close() error
WLock()
WUnlock()
SetReadDeadline(t time.Time) error SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error SetWriteDeadline(t time.Time) error
Protocol() string Protocol() string
} }
func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) {
headBuf := make([]byte, openP2PHeaderSize)
_, err := io.ReadFull(ul, headBuf)
if err != nil {
return nil, nil, err
}
head, err := decodeHeader(headBuf)
if err != nil {
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)
_, err = io.ReadFull(ul, dataBuf)
return head, dataBuf, err
}
func DefaultWriteBytes(ul underlay, mainType, subType uint16, data []byte) error {
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
ul.WLock()
_, err := ul.Write(writeBytes)
ul.WUnlock()
return err
}
func DefaultWriteBuffer(ul underlay, data []byte) error {
ul.WLock()
_, err := ul.Write(data)
ul.WUnlock()
return err
}
func DefaultWriteMessage(ul underlay, mainType uint16, subType uint16, packet interface{}) error {
writeBytes, err := newMessage(mainType, subType, packet)
if err != nil {
return err
}
ul.WLock()
_, err = ul.Write(writeBytes)
ul.WUnlock()
return err
}

View File

@@ -6,10 +6,8 @@ import (
"crypto/rsa" "crypto/rsa"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"io"
"math/big" "math/big"
"net" "net"
"sync" "sync"
@@ -33,46 +31,19 @@ func (conn *underlayQUIC) Protocol() string {
} }
func (conn *underlayQUIC) ReadBuffer() (*openP2PHeader, []byte, error) { func (conn *underlayQUIC) ReadBuffer() (*openP2PHeader, []byte, error) {
headBuf := make([]byte, openP2PHeaderSize) return DefaultReadBuffer(conn)
_, err := io.ReadFull(conn, headBuf)
if err != nil {
return nil, nil, err
}
head, err := decodeHeader(headBuf)
if err != nil {
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)
_, err = io.ReadFull(conn, dataBuf)
return head, dataBuf, err
} }
func (conn *underlayQUIC) WriteBytes(mainType uint16, subType uint16, data []byte) error { func (conn *underlayQUIC) WriteBytes(mainType uint16, subType uint16, data []byte) error {
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) return DefaultWriteBytes(conn, mainType, subType, data)
conn.writeMtx.Lock()
_, err := conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayQUIC) WriteBuffer(data []byte) error { func (conn *underlayQUIC) WriteBuffer(data []byte) error {
conn.writeMtx.Lock() return DefaultWriteBuffer(conn, data)
_, err := conn.Write(data)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayQUIC) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { func (conn *underlayQUIC) WriteMessage(mainType uint16, subType uint16, packet interface{}) error {
// TODO: call newMessage return DefaultWriteMessage(conn, mainType, subType, packet)
data, err := json.Marshal(packet)
if err != nil {
return err
}
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
conn.writeMtx.Lock()
_, err = conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayQUIC) Close() error { func (conn *underlayQUIC) Close() error {
@@ -80,6 +51,12 @@ func (conn *underlayQUIC) Close() error {
conn.Connection.CloseWithError(0, "") conn.Connection.CloseWithError(0, "")
return nil return nil
} }
func (conn *underlayQUIC) WLock() {
conn.writeMtx.Lock()
}
func (conn *underlayQUIC) WUnlock() {
conn.writeMtx.Unlock()
}
func (conn *underlayQUIC) CloseListener() { func (conn *underlayQUIC) CloseListener() {
if conn.listener != nil { if conn.listener != nil {
conn.listener.Close() conn.listener.Close()

View File

@@ -1,9 +1,8 @@
package openp2p package openp2p
import ( import (
"encoding/json" "encoding/binary"
"fmt" "fmt"
"io"
"net" "net"
"sync" "sync"
"time" "time"
@@ -21,51 +20,30 @@ func (conn *underlayTCP) Protocol() string {
} }
func (conn *underlayTCP) ReadBuffer() (*openP2PHeader, []byte, error) { func (conn *underlayTCP) ReadBuffer() (*openP2PHeader, []byte, error) {
headBuf := make([]byte, openP2PHeaderSize) return DefaultReadBuffer(conn)
_, err := io.ReadFull(conn, headBuf)
if err != nil {
return nil, nil, err
}
head, err := decodeHeader(headBuf)
if err != nil {
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)
_, err = io.ReadFull(conn, dataBuf)
return head, dataBuf, err
} }
func (conn *underlayTCP) WriteBytes(mainType uint16, subType uint16, data []byte) error { func (conn *underlayTCP) WriteBytes(mainType uint16, subType uint16, data []byte) error {
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) return DefaultWriteBytes(conn, mainType, subType, data)
conn.writeMtx.Lock()
_, err := conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP) WriteBuffer(data []byte) error { func (conn *underlayTCP) WriteBuffer(data []byte) error {
conn.writeMtx.Lock() return DefaultWriteBuffer(conn, data)
_, err := conn.Write(data)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { func (conn *underlayTCP) WriteMessage(mainType uint16, subType uint16, packet interface{}) error {
// TODO: call newMessage return DefaultWriteMessage(conn, mainType, subType, packet)
data, err := json.Marshal(packet)
if err != nil {
return err
}
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
conn.writeMtx.Lock()
_, err = conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP) Close() error { func (conn *underlayTCP) Close() error {
return conn.Conn.Close() return conn.Conn.Close()
} }
func (conn *underlayTCP) WLock() {
conn.writeMtx.Lock()
}
func (conn *underlayTCP) WUnlock() {
conn.writeMtx.Unlock()
}
func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) { func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) {
if mode == LinkModeTCPPunch { if mode == LinkModeTCPPunch {
@@ -76,34 +54,46 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel)
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts) time.Sleep(ts)
} }
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err) gLog.Println(LvDEBUG, "send tcp punch: ", err)
return nil, err return nil, err
} }
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}
_, buff, err := utcp.ReadBuffer()
if err != nil {
return nil, fmt.Errorf("read start msg error:%s", err)
}
if buff != nil {
gLog.Println(LvDEBUG, string(buff))
}
utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
return utcp, nil
} }
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort)) tid := t.id
l, err := net.ListenTCP("tcp", addr) if compareVersion(t.config.peerVersion, PublicIPVersion) == LESS { // old version
if err != nil { ipBytes := net.ParseIP(t.config.peerIP).To4()
return nil, err tid = uint64(binary.BigEndian.Uint32(ipBytes))
gLog.Println(LvDEBUG, "compatible with old client, use ip as key:", tid)
} }
l.SetDeadline(time.Now().Add(CheckActiveTimeout)) utcp := v4l.getUnderlayTCP(tid)
c, err := l.Accept() if utcp == nil {
defer l.Close() return nil, ErrConnectPublicV4
if err != nil {
return nil, err
} }
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil return utcp, nil
} }
func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) { func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) {
var c net.Conn var c net.Conn
var err error var err error
if mode == LinkModeTCPPunch { if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
if c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout); err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err)
}
} else { } else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
} }

View File

@@ -1,9 +1,7 @@
package openp2p package openp2p
import ( import (
"encoding/json"
"fmt" "fmt"
"io"
"net" "net"
"sync" "sync"
"time" "time"
@@ -20,52 +18,30 @@ func (conn *underlayTCP6) Protocol() string {
} }
func (conn *underlayTCP6) ReadBuffer() (*openP2PHeader, []byte, error) { func (conn *underlayTCP6) ReadBuffer() (*openP2PHeader, []byte, error) {
headBuf := make([]byte, openP2PHeaderSize) return DefaultReadBuffer(conn)
_, err := io.ReadFull(conn, headBuf)
if err != nil {
return nil, nil, err
}
head, err := decodeHeader(headBuf)
if err != nil {
return nil, nil, err
}
dataBuf := make([]byte, head.DataLen)
_, err = io.ReadFull(conn, dataBuf)
return head, dataBuf, err
} }
func (conn *underlayTCP6) WriteBytes(mainType uint16, subType uint16, data []byte) error { func (conn *underlayTCP6) WriteBytes(mainType uint16, subType uint16, data []byte) error {
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) return DefaultWriteBytes(conn, mainType, subType, data)
conn.writeMtx.Lock()
_, err := conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP6) WriteBuffer(data []byte) error { func (conn *underlayTCP6) WriteBuffer(data []byte) error {
conn.writeMtx.Lock() return DefaultWriteBuffer(conn, data)
_, err := conn.Write(data)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP6) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { func (conn *underlayTCP6) WriteMessage(mainType uint16, subType uint16, packet interface{}) error {
// TODO: call newMessage return DefaultWriteMessage(conn, mainType, subType, packet)
data, err := json.Marshal(packet)
if err != nil {
return err
}
writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...)
conn.writeMtx.Lock()
_, err = conn.Write(writeBytes)
conn.writeMtx.Unlock()
return err
} }
func (conn *underlayTCP6) Close() error { func (conn *underlayTCP6) Close() error {
return conn.Conn.Close() return conn.Conn.Close()
} }
func (conn *underlayTCP6) WLock() {
conn.writeMtx.Lock()
}
func (conn *underlayTCP6) WUnlock() {
conn.writeMtx.Unlock()
}
func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
addr, _ := net.ResolveTCPAddr("tcp6", fmt.Sprintf("[::]:%d", port)) addr, _ := net.ResolveTCPAddr("tcp6", fmt.Sprintf("[::]:%d", port))
l, err := net.ListenTCP("tcp6", addr) l, err := net.ListenTCP("tcp6", addr)
@@ -73,7 +49,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
return nil, err return nil, err
} }
defer l.Close() defer l.Close()
l.SetDeadline(time.Now().Add(HandshakeTimeout)) l.SetDeadline(time.Now().Add(UnderlayConnectTimeout))
c, err := l.Accept() c, err := l.Accept()
defer l.Close() defer l.Close()
if err != nil { if err != nil {
@@ -83,7 +59,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
} }
func dialTCP6(host string, port int) (*underlayTCP6, error) { func dialTCP6(host string, port int) (*underlayTCP6, error) {
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout) c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), UnderlayConnectTimeout)
if err != nil { if err != nil {
gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err) gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err)
return nil, err return nil, err

82
core/v4listener.go Normal file
View File

@@ -0,0 +1,82 @@
package openp2p
import (
"encoding/binary"
"fmt"
"net"
"sync"
"time"
)
type v4Listener struct {
conns sync.Map
port int
acceptCh chan bool
}
func (vl *v4Listener) start() error {
v4l.acceptCh = make(chan bool, 10)
for {
vl.listen()
time.Sleep(time.Second * 5)
}
}
func (vl *v4Listener) listen() error {
gLog.Printf(LvINFO, "listen %d start", vl.port)
defer gLog.Printf(LvINFO, "listen %d end", vl.port)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port))
l, err := net.ListenTCP("tcp", addr)
if err != nil {
gLog.Printf(LvERROR, "listen %d error:", vl.port, err)
return err
}
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
break
}
go vl.handleConnection(c)
}
return nil
}
func (vl *v4Listener) handleConnection(c net.Conn) {
gLog.Println(LvDEBUG, "v4Listener accept connection: ", c.RemoteAddr().String())
utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}
utcp.SetReadDeadline(time.Now().Add(time.Second * 5))
_, buff, err := utcp.ReadBuffer()
if err != nil {
gLog.Printf(LvERROR, "utcp.ReadBuffer error:", err)
}
utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff)
var tid uint64
if string(buff) == "OpenP2P,hello" { // old client
// save remoteIP as key
remoteAddr := c.RemoteAddr().(*net.TCPAddr).IP
ipBytes := remoteAddr.To4()
tid = uint64(binary.BigEndian.Uint32(ipBytes)) // bytes not enough for uint64
gLog.Println(LvDEBUG, "hello ", string(buff))
} else {
if len(buff) < 8 {
return
}
tid = binary.LittleEndian.Uint64(buff[:8])
gLog.Println(LvDEBUG, "hello ", tid)
}
vl.conns.Store(tid, utcp)
vl.acceptCh <- true
}
func (vl *v4Listener) getUnderlayTCP(tid uint64) *underlayTCP {
for i := 0; i < 100; i++ {
select {
case <-time.After(time.Millisecond * 50):
case <-vl.acceptCh:
}
if u, ok := vl.conns.LoadAndDelete(tid); ok {
return u.(*underlayTCP)
}
}
return nil
}