mirror of
https://github.com/openp2p-cn/openp2p.git
synced 2025-09-27 04:56:03 +08:00
254 lines
8.6 KiB
Go
254 lines
8.6 KiB
Go
package openp2p
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"time"
|
|
)
|
|
|
|
func handshakeC2C(t *P2PTunnel) (err error) {
|
|
gLog.Printf(LvDEBUG, "handshakeC2C %s:%d:%d to %s:%d", gConf.Network.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort)
|
|
defer gLog.Printf(LvDEBUG, "handshakeC2C end")
|
|
conn, err := net.ListenUDP("udp", t.localHoleAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
_, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err)
|
|
return err
|
|
}
|
|
ra, head, buff, _, err := UDPRead(conn, HandshakeTimeout)
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
|
|
return err
|
|
}
|
|
t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", ra.String())
|
|
var tunnelID uint64
|
|
if len(buff) > openP2PHeaderSize {
|
|
req := P2PHandshakeReq{}
|
|
if err := json.Unmarshal(buff[openP2PHeaderSize:openP2PHeaderSize+int(head.DataLen)], &req); err == nil {
|
|
tunnelID = req.ID
|
|
}
|
|
} else { // compatible with old version
|
|
tunnelID = t.id
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id {
|
|
gLog.Printf(LvDEBUG, "read %d handshake ", t.id)
|
|
UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
|
|
return err
|
|
}
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id {
|
|
gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id)
|
|
_, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
|
|
return err
|
|
}
|
|
}
|
|
gLog.Printf(LvINFO, "handshakeC2C ok")
|
|
return nil
|
|
}
|
|
|
|
func handshakeC2S(t *P2PTunnel) error {
|
|
gLog.Printf(LvDEBUG, "handshakeC2S start")
|
|
defer gLog.Printf(LvDEBUG, "handshakeC2S end")
|
|
if !buildTunnelMtx.TryLock() {
|
|
// time.Sleep(time.Second * 3)
|
|
return ErrBuildTunnelBusy
|
|
}
|
|
defer buildTunnelMtx.Unlock()
|
|
startTime := time.Now()
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
randPorts := r.Perm(65532)
|
|
conn, err := net.ListenUDP("udp", t.localHoleAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
go func() error {
|
|
gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort)
|
|
for i := 0; i < SymmetricHandshakeNum; i++ {
|
|
// time.Sleep(SymmetricHandshakeInterval)
|
|
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2S write MsgPunchHandshake error:", err)
|
|
return err
|
|
}
|
|
}
|
|
gLog.Println(LvDEBUG, "send symmetric handshake end")
|
|
return nil
|
|
}()
|
|
err = conn.SetReadDeadline(time.Now().Add(HandshakeTimeout))
|
|
if err != nil {
|
|
gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
|
|
return err
|
|
}
|
|
// read response of the punching hole ok port
|
|
buff := make([]byte, 1024)
|
|
_, dst, err := conn.ReadFrom(buff)
|
|
if err != nil {
|
|
gLog.Println(LvERROR, "handshakeC2S wait timeout")
|
|
return err
|
|
}
|
|
head := &openP2PHeader{}
|
|
err = binary.Read(bytes.NewReader(buff[:openP2PHeaderSize]), binary.LittleEndian, head)
|
|
if err != nil {
|
|
gLog.Println(LvERROR, "parse p2pheader error:", err)
|
|
return err
|
|
}
|
|
t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", dst.String())
|
|
var tunnelID uint64
|
|
if len(buff) > openP2PHeaderSize {
|
|
req := P2PHandshakeReq{}
|
|
if err := json.Unmarshal(buff[openP2PHeaderSize:openP2PHeaderSize+int(head.DataLen)], &req); err == nil {
|
|
tunnelID = req.ID
|
|
}
|
|
} else { // compatible with old version
|
|
tunnelID = t.id
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id {
|
|
gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ", t.id)
|
|
UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
for {
|
|
_, head, buff, _, err = UDPRead(conn, HandshakeTimeout)
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeC2S handshake error")
|
|
return err
|
|
}
|
|
var tunnelID uint64
|
|
if len(buff) > openP2PHeaderSize {
|
|
req := P2PHandshakeReq{}
|
|
if err := json.Unmarshal(buff[openP2PHeaderSize:openP2PHeaderSize+int(head.DataLen)], &req); err == nil {
|
|
tunnelID = req.ID
|
|
}
|
|
} else { // compatible with old version
|
|
tunnelID = t.id
|
|
}
|
|
// waiting ack
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
|
|
gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, t.remoteHoleAddr.String())
|
|
_, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
return err
|
|
} else {
|
|
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
|
|
}
|
|
gLog.Printf(LvINFO, "handshakeC2S ok. cost %d ms", time.Since(startTime)/time.Millisecond)
|
|
return nil
|
|
}
|
|
|
|
func handshakeS2C(t *P2PTunnel) error {
|
|
gLog.Printf(LvDEBUG, "handshakeS2C start")
|
|
defer gLog.Printf(LvDEBUG, "handshakeS2C end")
|
|
if !buildTunnelMtx.TryLock() {
|
|
// time.Sleep(time.Second * 3)
|
|
return ErrBuildTunnelBusy
|
|
}
|
|
defer buildTunnelMtx.Unlock()
|
|
startTime := time.Now()
|
|
gotCh := make(chan *net.UDPAddr, 5)
|
|
// sequencely udp send handshake, do not parallel send
|
|
gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort)
|
|
gotIt := false
|
|
for i := 0; i < SymmetricHandshakeNum; i++ {
|
|
// time.Sleep(SymmetricHandshakeInterval)
|
|
go func(t *P2PTunnel) error {
|
|
conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random?
|
|
if err != nil {
|
|
gLog.Printf(LvDEBUG, "listen error")
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
|
|
_, head, buff, _, err := UDPRead(conn, HandshakeTimeout)
|
|
if err != nil {
|
|
// gLog.Println(LevelDEBUG, "one of the handshake error:", err)
|
|
return err
|
|
}
|
|
if gotIt {
|
|
return nil
|
|
}
|
|
var tunnelID uint64
|
|
if len(buff) >= openP2PHeaderSize+8 {
|
|
req := P2PHandshakeReq{}
|
|
if err := json.Unmarshal(buff[openP2PHeaderSize:openP2PHeaderSize+int(head.DataLen)], &req); err == nil {
|
|
tunnelID = req.ID
|
|
}
|
|
} else { // compatible with old version
|
|
tunnelID = t.id
|
|
}
|
|
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id {
|
|
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id)
|
|
UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
// may read several MsgPunchHandshake
|
|
for {
|
|
_, head, buff, _, err = UDPRead(conn, HandshakeTimeout)
|
|
if err != nil {
|
|
gLog.Println(LvDEBUG, "handshakeS2C handshake error")
|
|
return err
|
|
}
|
|
if len(buff) > openP2PHeaderSize {
|
|
req := P2PHandshakeReq{}
|
|
if err := json.Unmarshal(buff[openP2PHeaderSize:openP2PHeaderSize+int(head.DataLen)], &req); err == nil {
|
|
tunnelID = req.ID
|
|
}
|
|
} else { // compatible with old version
|
|
tunnelID = t.id
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id {
|
|
break
|
|
} else {
|
|
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
|
|
}
|
|
}
|
|
}
|
|
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
|
|
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String())
|
|
UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
|
|
gotIt = true
|
|
la, _ := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
|
|
gotCh <- la
|
|
return nil
|
|
} else {
|
|
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
|
|
}
|
|
return nil
|
|
}(t)
|
|
}
|
|
gLog.Printf(LvDEBUG, "send symmetric handshake end")
|
|
if compareVersion(t.config.peerVersion, SymmetricSimultaneouslySendVersion) < 0 { // compatible with old client
|
|
gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect")
|
|
GNetwork.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
|
|
}
|
|
|
|
select {
|
|
case <-time.After(HandshakeTimeout):
|
|
return fmt.Errorf("wait handshake timeout")
|
|
case la := <-gotCh:
|
|
t.localHoleAddr = la
|
|
gLog.Println(LvDEBUG, "symmetric handshake ok", la)
|
|
gLog.Printf(LvINFO, "handshakeS2C ok. cost %dms", time.Since(startTime)/time.Millisecond)
|
|
}
|
|
return nil
|
|
}
|