mirror of
https://github.com/openp2p-cn/openp2p.git
synced 2025-10-05 08:37:04 +08:00
617 lines
18 KiB
Go
617 lines
18 KiB
Go
package openp2p
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type p2pApp struct {
|
|
config AppConfig
|
|
listener net.Listener
|
|
listenerUDP *net.UDPConn
|
|
directTunnel *P2PTunnel
|
|
relayTunnel *P2PTunnel
|
|
tunnelMtx sync.Mutex
|
|
iptree *IPTree // for whitelist
|
|
rtid uint64 // relay tunnelID
|
|
relayNode string
|
|
relayMode string // public/private
|
|
hbTimeRelay time.Time
|
|
hbMtx sync.Mutex
|
|
running bool
|
|
id uint64
|
|
key uint64 // aes
|
|
wg sync.WaitGroup
|
|
relayHead *bytes.Buffer
|
|
once sync.Once
|
|
// for relayTunnel
|
|
retryRelayNum int
|
|
retryRelayTime time.Time
|
|
nextRetryRelayTime time.Time
|
|
errMsg string
|
|
connectTime time.Time
|
|
}
|
|
|
|
func (app *p2pApp) Tunnel() *P2PTunnel {
|
|
app.tunnelMtx.Lock()
|
|
defer app.tunnelMtx.Unlock()
|
|
if app.directTunnel != nil {
|
|
return app.directTunnel
|
|
}
|
|
return app.relayTunnel
|
|
}
|
|
|
|
func (app *p2pApp) DirectTunnel() *P2PTunnel {
|
|
app.tunnelMtx.Lock()
|
|
defer app.tunnelMtx.Unlock()
|
|
return app.directTunnel
|
|
}
|
|
|
|
func (app *p2pApp) setDirectTunnel(t *P2PTunnel) {
|
|
app.tunnelMtx.Lock()
|
|
defer app.tunnelMtx.Unlock()
|
|
app.directTunnel = t
|
|
}
|
|
|
|
func (app *p2pApp) RelayTunnel() *P2PTunnel {
|
|
app.tunnelMtx.Lock()
|
|
defer app.tunnelMtx.Unlock()
|
|
return app.relayTunnel
|
|
}
|
|
|
|
func (app *p2pApp) setRelayTunnel(t *P2PTunnel) {
|
|
app.tunnelMtx.Lock()
|
|
defer app.tunnelMtx.Unlock()
|
|
app.relayTunnel = t
|
|
}
|
|
|
|
func (app *p2pApp) isDirect() bool {
|
|
return app.directTunnel != nil
|
|
}
|
|
|
|
func (app *p2pApp) RelayTunnelID() uint64 {
|
|
if app.isDirect() {
|
|
return 0
|
|
}
|
|
return app.rtid
|
|
}
|
|
|
|
func (app *p2pApp) ConnectTime() time.Time {
|
|
if app.isDirect() {
|
|
return app.config.connectTime
|
|
}
|
|
return app.connectTime
|
|
}
|
|
|
|
func (app *p2pApp) RetryTime() time.Time {
|
|
if app.isDirect() {
|
|
return app.config.retryTime
|
|
}
|
|
return app.retryRelayTime
|
|
}
|
|
|
|
func (app *p2pApp) checkP2PTunnel() error {
|
|
for app.running {
|
|
app.checkDirectTunnel()
|
|
app.checkRelayTunnel()
|
|
time.Sleep(time.Second * 3)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) directRetryLimit() int {
|
|
if app.config.peerIP == gConf.Network.publicIP && compareVersion(app.config.peerVersion, SupportIntranetVersion) >= 0 {
|
|
return retryLimit
|
|
}
|
|
if IsIPv6(app.config.peerIPv6) && IsIPv6(gConf.IPv6()) {
|
|
return retryLimit
|
|
}
|
|
if app.config.hasIPv4 == 1 || gConf.Network.hasIPv4 == 1 || app.config.hasUPNPorNATPMP == 1 || gConf.Network.hasUPNPorNATPMP == 1 {
|
|
return retryLimit
|
|
}
|
|
if gConf.Network.natType == NATCone && app.config.peerNatType == NATCone {
|
|
return retryLimit
|
|
}
|
|
if app.config.peerNatType == NATSymmetric && gConf.Network.natType == NATSymmetric {
|
|
return 0
|
|
}
|
|
return retryLimit / 10 // c2s or s2c
|
|
}
|
|
func (app *p2pApp) checkDirectTunnel() error {
|
|
if app.config.ForceRelay == 1 && app.config.RelayNode != app.config.PeerNode {
|
|
return nil
|
|
}
|
|
if app.DirectTunnel() != nil && app.DirectTunnel().isActive() {
|
|
return nil
|
|
}
|
|
if app.config.nextRetryTime.After(time.Now()) || app.config.Enabled == 0 || app.config.retryNum >= app.directRetryLimit() {
|
|
return nil
|
|
}
|
|
if time.Now().Add(-time.Minute * 15).After(app.config.retryTime) { // run normally 15min, reset retrynum
|
|
app.config.retryNum = 1
|
|
}
|
|
if app.config.retryNum > 0 { // first time not show reconnect log
|
|
gLog.Printf(LvINFO, "detect app %s appid:%d disconnect, reconnecting the %d times...", app.config.LogPeerNode(), app.id, app.config.retryNum)
|
|
}
|
|
app.config.retryNum++
|
|
app.config.retryTime = time.Now()
|
|
app.config.nextRetryTime = time.Now().Add(retryInterval)
|
|
app.config.connectTime = time.Now()
|
|
err := app.buildDirectTunnel()
|
|
if err != nil {
|
|
app.config.errMsg = err.Error()
|
|
if err == ErrPeerOffline && app.config.retryNum > 2 { // stop retry, waiting for online
|
|
app.config.retryNum = retryLimit
|
|
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.LogPeerNode())
|
|
}
|
|
if err == ErrBuildTunnelBusy {
|
|
app.config.retryNum--
|
|
}
|
|
}
|
|
if app.Tunnel() != nil {
|
|
app.once.Do(func() {
|
|
go app.listen()
|
|
// memapp also need
|
|
go app.relayHeartbeatLoop()
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
func (app *p2pApp) buildDirectTunnel() error {
|
|
relayNode := ""
|
|
peerNatType := NATUnknown
|
|
peerIP := ""
|
|
errMsg := ""
|
|
var t *P2PTunnel
|
|
var err error
|
|
pn := GNetwork
|
|
initErr := pn.requestPeerInfo(&app.config)
|
|
if initErr != nil {
|
|
gLog.Printf(LvERROR, "%s requestPeerInfo error:%s", app.config.LogPeerNode(), initErr)
|
|
return initErr
|
|
}
|
|
t, err = pn.addDirectTunnel(app.config, 0)
|
|
if t != nil {
|
|
peerNatType = t.config.peerNatType
|
|
peerIP = t.config.peerIP
|
|
}
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
req := ReportConnect{
|
|
Error: errMsg,
|
|
Protocol: app.config.Protocol,
|
|
SrcPort: app.config.SrcPort,
|
|
NatType: gConf.Network.natType,
|
|
PeerNode: app.config.PeerNode,
|
|
DstPort: app.config.DstPort,
|
|
DstHost: app.config.DstHost,
|
|
PeerNatType: peerNatType,
|
|
PeerIP: peerIP,
|
|
ShareBandwidth: gConf.Network.ShareBandwidth,
|
|
RelayNode: relayNode,
|
|
Version: OpenP2PVersion,
|
|
}
|
|
pn.write(MsgReport, MsgReportConnect, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if rtid != 0 || t.conn.Protocol() == "tcp" {
|
|
// sync appkey
|
|
if t == nil {
|
|
return err
|
|
}
|
|
syncKeyReq := APPKeySync{
|
|
AppID: app.id,
|
|
AppKey: app.key,
|
|
}
|
|
gLog.Printf(LvDEBUG, "sync appkey direct to %s", app.config.LogPeerNode())
|
|
pn.push(app.config.PeerNode, MsgPushAPPKey, &syncKeyReq)
|
|
app.setDirectTunnel(t)
|
|
|
|
// if memapp notify peer addmemapp
|
|
if app.config.SrcPort == 0 {
|
|
req := ServerSideSaveMemApp{From: gConf.Network.Node, Node: gConf.Network.Node, TunnelID: t.id, RelayTunnelID: 0, AppID: app.id}
|
|
pn.push(app.config.PeerNode, MsgPushServerSideSaveMemApp, &req)
|
|
gLog.Printf(LvDEBUG, "push %s ServerSideSaveMemApp: %s", app.config.LogPeerNode(), prettyJson(req))
|
|
}
|
|
gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, t.id)
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) checkRelayTunnel() error {
|
|
// if app.config.ForceRelay == 1 && (gConf.sdwan.CentralNode == app.config.PeerNode && compareVersion(app.config.peerVersion, SupportDualTunnelVersion) < 0) {
|
|
if app.config.SrcPort == 0 && (gConf.sdwan.CentralNode == app.config.PeerNode || gConf.sdwan.CentralNode == gConf.Network.Node) { // memapp central node not build relay tunnel
|
|
return nil
|
|
}
|
|
app.hbMtx.Lock()
|
|
if app.RelayTunnel() != nil && time.Now().Before(app.hbTimeRelay.Add(TunnelHeartbeatTime*2)) { // must check app.hbtime instead of relayTunnel
|
|
app.hbMtx.Unlock()
|
|
return nil
|
|
}
|
|
app.hbMtx.Unlock()
|
|
if app.nextRetryRelayTime.After(time.Now()) || app.config.Enabled == 0 || app.retryRelayNum >= retryLimit {
|
|
return nil
|
|
}
|
|
if time.Now().Add(-time.Minute * 15).After(app.retryRelayTime) { // run normally 15min, reset retrynum
|
|
app.retryRelayNum = 1
|
|
}
|
|
if app.retryRelayNum > 0 { // first time not show reconnect log
|
|
gLog.Printf(LvINFO, "detect app %s appid:%d relay disconnect, reconnecting the %d times...", app.config.LogPeerNode(), app.id, app.retryRelayNum)
|
|
}
|
|
app.setRelayTunnel(nil) // reset relayTunnel
|
|
app.retryRelayNum++
|
|
app.retryRelayTime = time.Now()
|
|
app.nextRetryRelayTime = time.Now().Add(retryInterval)
|
|
app.connectTime = time.Now()
|
|
err := app.buildRelayTunnel()
|
|
if err != nil {
|
|
app.errMsg = err.Error()
|
|
if err == ErrPeerOffline && app.retryRelayNum > 2 { // stop retry, waiting for online
|
|
app.retryRelayNum = retryLimit
|
|
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", app.config.LogPeerNode())
|
|
}
|
|
}
|
|
if app.Tunnel() != nil {
|
|
app.once.Do(func() {
|
|
go app.listen()
|
|
// memapp also need
|
|
go app.relayHeartbeatLoop()
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) buildRelayTunnel() error {
|
|
var rtid uint64
|
|
relayNode := ""
|
|
relayMode := ""
|
|
peerNatType := NATUnknown
|
|
peerIP := ""
|
|
errMsg := ""
|
|
var t *P2PTunnel
|
|
var err error
|
|
pn := GNetwork
|
|
config := app.config
|
|
initErr := pn.requestPeerInfo(&config)
|
|
if initErr != nil {
|
|
gLog.Printf(LvERROR, "%s init error:%s", config.LogPeerNode(), initErr)
|
|
return initErr
|
|
}
|
|
|
|
t, rtid, relayMode, err = pn.addRelayTunnel(config)
|
|
if t != nil {
|
|
relayNode = t.config.PeerNode
|
|
}
|
|
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
req := ReportConnect{
|
|
Error: errMsg,
|
|
Protocol: config.Protocol,
|
|
SrcPort: config.SrcPort,
|
|
NatType: gConf.Network.natType,
|
|
PeerNode: config.PeerNode,
|
|
DstPort: config.DstPort,
|
|
DstHost: config.DstHost,
|
|
PeerNatType: peerNatType,
|
|
PeerIP: peerIP,
|
|
ShareBandwidth: gConf.Network.ShareBandwidth,
|
|
RelayNode: relayNode,
|
|
Version: OpenP2PVersion,
|
|
}
|
|
pn.write(MsgReport, MsgReportConnect, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if rtid != 0 || t.conn.Protocol() == "tcp" {
|
|
// sync appkey
|
|
syncKeyReq := APPKeySync{
|
|
AppID: app.id,
|
|
AppKey: app.key,
|
|
}
|
|
gLog.Printf(LvDEBUG, "sync appkey relay to %s", config.LogPeerNode())
|
|
pn.push(config.PeerNode, MsgPushAPPKey, &syncKeyReq)
|
|
app.setRelayTunnelID(rtid)
|
|
app.setRelayTunnel(t)
|
|
app.relayNode = relayNode
|
|
app.relayMode = relayMode
|
|
app.hbTimeRelay = time.Now()
|
|
|
|
// if memapp notify peer addmemapp
|
|
if config.SrcPort == 0 {
|
|
req := ServerSideSaveMemApp{From: gConf.Network.Node, Node: relayNode, TunnelID: rtid, RelayTunnelID: t.id, AppID: app.id, RelayMode: relayMode}
|
|
pn.push(config.PeerNode, MsgPushServerSideSaveMemApp, &req)
|
|
gLog.Printf(LvDEBUG, "push %s relay ServerSideSaveMemApp: %s", config.LogPeerNode(), prettyJson(req))
|
|
}
|
|
gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, t.id)
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) buildOfficialTunnel() error {
|
|
return nil
|
|
}
|
|
|
|
// cache relayHead, refresh when rtid change
|
|
func (app *p2pApp) RelayHead() *bytes.Buffer {
|
|
if app.relayHead == nil {
|
|
app.relayHead = new(bytes.Buffer)
|
|
binary.Write(app.relayHead, binary.LittleEndian, app.rtid)
|
|
}
|
|
return app.relayHead
|
|
}
|
|
|
|
func (app *p2pApp) setRelayTunnelID(rtid uint64) {
|
|
app.rtid = rtid
|
|
app.relayHead = new(bytes.Buffer)
|
|
binary.Write(app.relayHead, binary.LittleEndian, app.rtid)
|
|
}
|
|
|
|
func (app *p2pApp) isActive() bool {
|
|
if app.Tunnel() == nil {
|
|
// gLog.Printf(LvDEBUG, "isActive app.tunnel==nil")
|
|
return false
|
|
}
|
|
if app.isDirect() { // direct mode app heartbeat equals to tunnel heartbeat
|
|
return app.Tunnel().isActive()
|
|
}
|
|
// relay mode calc app heartbeat
|
|
app.hbMtx.Lock()
|
|
defer app.hbMtx.Unlock()
|
|
res := time.Now().Before(app.hbTimeRelay.Add(TunnelHeartbeatTime * 2))
|
|
// if !res {
|
|
// gLog.Printf(LvDEBUG, "%d app isActive false. peer=%s", app.id, app.config.PeerNode)
|
|
// }
|
|
return res
|
|
}
|
|
|
|
func (app *p2pApp) updateHeartbeat() {
|
|
app.hbMtx.Lock()
|
|
defer app.hbMtx.Unlock()
|
|
app.hbTimeRelay = time.Now()
|
|
}
|
|
|
|
func (app *p2pApp) listenTCP() error {
|
|
gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort)
|
|
defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort)
|
|
var err error
|
|
listenAddr := ""
|
|
if IsLocalhost(app.config.Whitelist) { // not expose port
|
|
listenAddr = "127.0.0.1"
|
|
}
|
|
app.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", listenAddr, app.config.SrcPort))
|
|
if err != nil {
|
|
gLog.Printf(LvERROR, "listen error:%s", err)
|
|
return err
|
|
}
|
|
defer app.listener.Close()
|
|
for app.running {
|
|
conn, err := app.listener.Accept()
|
|
if err != nil {
|
|
if app.running {
|
|
gLog.Printf(LvERROR, "%d accept error:%s", app.id, err)
|
|
}
|
|
break
|
|
}
|
|
if app.Tunnel() == nil {
|
|
gLog.Printf(LvDEBUG, "srcPort=%d, app.Tunnel()==nil, not ready", app.config.SrcPort)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
// check white list
|
|
if app.config.Whitelist != "" {
|
|
remoteIP := conn.RemoteAddr().(*net.TCPAddr).IP.String()
|
|
if !app.iptree.Contains(remoteIP) && !IsLocalhost(remoteIP) {
|
|
conn.Close()
|
|
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
|
|
continue
|
|
}
|
|
}
|
|
oConn := overlayConn{
|
|
tunnel: app.Tunnel(),
|
|
app: app,
|
|
connTCP: conn,
|
|
id: rand.Uint64(),
|
|
isClient: true,
|
|
appID: app.id,
|
|
appKey: app.key,
|
|
running: true,
|
|
}
|
|
if !app.isDirect() {
|
|
oConn.rtid = app.rtid
|
|
}
|
|
// pre-calc key bytes for encrypt
|
|
if oConn.appKey != 0 {
|
|
encryptKey := make([]byte, AESKeySize)
|
|
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
|
|
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
|
|
oConn.appKeyBytes = encryptKey
|
|
}
|
|
app.Tunnel().overlayConns.Store(oConn.id, &oConn)
|
|
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d, %s", oConn.id, oConn.connTCP.RemoteAddr())
|
|
// tell peer connect
|
|
req := OverlayConnectReq{ID: oConn.id,
|
|
Token: gConf.Network.Token,
|
|
DstIP: app.config.DstHost,
|
|
DstPort: app.config.DstPort,
|
|
Protocol: app.config.Protocol,
|
|
AppID: app.id,
|
|
}
|
|
if !app.isDirect() {
|
|
req.RelayTunnelID = app.Tunnel().id
|
|
}
|
|
app.Tunnel().WriteMessage(app.RelayTunnelID(), MsgP2P, MsgOverlayConnectReq, &req)
|
|
// TODO: wait OverlayConnectRsp instead of sleep
|
|
time.Sleep(time.Second) // waiting remote node connection ok
|
|
go oConn.run()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) listenUDP() error {
|
|
gLog.Printf(LvDEBUG, "udp accept on port %d start", app.config.SrcPort)
|
|
defer gLog.Printf(LvDEBUG, "udp accept on port %d end", app.config.SrcPort)
|
|
var err error
|
|
app.listenerUDP, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: app.config.SrcPort})
|
|
if err != nil {
|
|
gLog.Printf(LvERROR, "listen error:%s", err)
|
|
return err
|
|
}
|
|
defer app.listenerUDP.Close()
|
|
buffer := make([]byte, 64*1024+PaddingSize)
|
|
udpID := make([]byte, 8)
|
|
for {
|
|
app.listenerUDP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
|
|
len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer)
|
|
if err != nil {
|
|
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
|
continue
|
|
} else {
|
|
gLog.Printf(LvERROR, "udp read failed:%s", err)
|
|
break
|
|
}
|
|
} else {
|
|
if app.Tunnel() == nil {
|
|
gLog.Printf(LvDEBUG, "srcPort=%d, app.Tunnel()==nil, not ready", app.config.SrcPort)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
dupData := bytes.Buffer{} // should uses memory pool
|
|
dupData.Write(buffer[:len+PaddingSize])
|
|
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
|
|
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
|
|
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
|
|
a := net.ParseIP(remoteIP)
|
|
udpID[0] = a[0]
|
|
udpID[1] = a[1]
|
|
udpID[2] = a[2]
|
|
udpID[3] = a[3]
|
|
udpID[4] = byte(port)
|
|
udpID[5] = byte(port >> 8)
|
|
id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64
|
|
s, ok := app.Tunnel().overlayConns.Load(id)
|
|
if !ok {
|
|
oConn := overlayConn{
|
|
tunnel: app.Tunnel(),
|
|
connUDP: app.listenerUDP,
|
|
remoteAddr: remoteAddr,
|
|
udpData: make(chan []byte, 1000),
|
|
id: id,
|
|
isClient: true,
|
|
appID: app.id,
|
|
appKey: app.key,
|
|
running: true,
|
|
}
|
|
if !app.isDirect() {
|
|
oConn.rtid = app.rtid
|
|
}
|
|
// calc key bytes for encrypt
|
|
if oConn.appKey != 0 {
|
|
encryptKey := make([]byte, AESKeySize)
|
|
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
|
|
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
|
|
oConn.appKeyBytes = encryptKey
|
|
}
|
|
app.Tunnel().overlayConns.Store(oConn.id, &oConn)
|
|
gLog.Printf(LvDEBUG, "Accept UDP overlayID:%d", oConn.id)
|
|
// tell peer connect
|
|
req := OverlayConnectReq{ID: oConn.id,
|
|
Token: gConf.Network.Token,
|
|
DstIP: app.config.DstHost,
|
|
DstPort: app.config.DstPort,
|
|
Protocol: app.config.Protocol,
|
|
AppID: app.id,
|
|
}
|
|
if !app.isDirect() {
|
|
req.RelayTunnelID = app.Tunnel().id
|
|
}
|
|
app.Tunnel().WriteMessage(app.RelayTunnelID(), MsgP2P, MsgOverlayConnectReq, &req)
|
|
// TODO: wait OverlayConnectRsp instead of sleep
|
|
time.Sleep(time.Second) // waiting remote node connection ok
|
|
go oConn.run()
|
|
oConn.udpData <- dupData.Bytes()
|
|
}
|
|
|
|
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
|
|
overlayConn, ok := s.(*overlayConn)
|
|
if !ok {
|
|
continue
|
|
}
|
|
overlayConn.udpData <- dupData.Bytes()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) listen() error {
|
|
if app.config.SrcPort == 0 {
|
|
return nil
|
|
}
|
|
gLog.Printf(LvINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort)
|
|
defer gLog.Printf(LvINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort)
|
|
app.wg.Add(1)
|
|
defer app.wg.Done()
|
|
for app.running {
|
|
if app.config.Protocol == "udp" {
|
|
app.listenUDP()
|
|
} else {
|
|
app.listenTCP()
|
|
}
|
|
if !app.running {
|
|
break
|
|
}
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (app *p2pApp) close() {
|
|
app.running = false
|
|
if app.listener != nil {
|
|
app.listener.Close()
|
|
}
|
|
if app.listenerUDP != nil {
|
|
app.listenerUDP.Close()
|
|
}
|
|
if app.DirectTunnel() != nil {
|
|
app.DirectTunnel().closeOverlayConns(app.id)
|
|
}
|
|
if app.RelayTunnel() != nil {
|
|
app.RelayTunnel().closeOverlayConns(app.id)
|
|
}
|
|
app.wg.Wait()
|
|
}
|
|
|
|
// TODO: many relay app on the same P2PTunnel will send a lot of relay heartbeat
|
|
func (app *p2pApp) relayHeartbeatLoop() {
|
|
app.wg.Add(1)
|
|
defer app.wg.Done()
|
|
gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid:%d start", app.config.LogPeerNode(), app.id, app.rtid)
|
|
defer gLog.Printf(LvDEBUG, "%s appid:%d relayHeartbeat to rtid%d end", app.config.LogPeerNode(), app.id, app.rtid)
|
|
|
|
for app.running {
|
|
if app.RelayTunnel() == nil || !app.RelayTunnel().isRuning() {
|
|
time.Sleep(TunnelHeartbeatTime)
|
|
continue
|
|
}
|
|
req := RelayHeartbeat{From: gConf.Network.Node, RelayTunnelID: app.RelayTunnel().id,
|
|
AppID: app.id}
|
|
err := app.RelayTunnel().WriteMessage(app.rtid, MsgP2P, MsgRelayHeartbeat, &req)
|
|
if err != nil {
|
|
gLog.Printf(LvERROR, "%s appid:%d rtid:%d write relay tunnel heartbeat error %s", app.config.LogPeerNode(), app.id, app.rtid, err)
|
|
return
|
|
}
|
|
// TODO: debug relay heartbeat
|
|
gLog.Printf(LvDEBUG, "%s appid:%d rtid:%d write relay tunnel heartbeat ok", app.config.LogPeerNode(), app.id, app.rtid)
|
|
time.Sleep(TunnelHeartbeatTime)
|
|
}
|
|
}
|