diff --git a/core/common_test.go b/core/common_test.go index 66ac093..6348dec 100644 --- a/core/common_test.go +++ b/core/common_test.go @@ -127,3 +127,59 @@ func TestNodeID(t *testing.T) { fmt.Printf("%s >= %s\n", node1, node2) } } + +func TestCalcRetryTime(t *testing.T) { + // 0-2 < 13s + // 3-5:300 + // 6-10:600 + tests := []struct { + retryNum float64 + want float64 + }{ + {1.0, 10}, + {5.0, 13}, + {10.0, 180}, + {15.0, 9000}, + {18.0, 90000}, + // 可以添加更多测试用例 + } + + for _, tt := range tests { + got := calcRetryTimeRelay(tt.retryNum) + if got < tt.want*0.85 || got > tt.want*1.15 { + t.Errorf("calcRetryTime(%f) = %f, want %f", tt.retryNum, got, tt.want) + } + } + + for i := 0; i < 20; i++ { + log.Printf("%d retryTime=%fs", i, calcRetryTimeRelay(float64(i))) + } +} + +func TestCalcRetryTimeDirect(t *testing.T) { + // 0-2 < 13s + // 3-5:300 + // 6-10:600 + tests := []struct { + retryNum float64 + want float64 + }{ + {1.0, 10}, + {5.0, 13}, + {10.0, 180}, + {15.0, 9000}, + {18.0, 90000}, + // 可以添加更多测试用例 + } + + for _, tt := range tests { + got := calcRetryTimeRelay(tt.retryNum) + if got < tt.want*0.85 || got > tt.want*1.15 { + t.Errorf("calcRetryTime(%f) = %f, want %f", tt.retryNum, got, tt.want) + } + } + + for i := 0; i < 20; i++ { + log.Printf("%d retryTime=%fs", i, calcRetryTimeDirect(float64(i))) + } +} diff --git a/core/daemon.go b/core/daemon.go index 306d58a..5ba7bbc 100644 --- a/core/daemon.go +++ b/core/daemon.go @@ -3,6 +3,7 @@ package openp2p import ( "os" "path/filepath" + "runtime" "time" "github.com/openp2p-cn/service" @@ -14,27 +15,30 @@ type daemon struct { } func (d *daemon) Start(s service.Service) error { - gLog.Println(LvINFO, "system service start") + gLog.i("system service start") return nil } func (d *daemon) Stop(s service.Service) error { - gLog.Println(LvINFO, "system service stop") + gLog.i("system service stop") d.running = false if d.proc != nil { - gLog.Println(LvINFO, "stop worker") + gLog.i("stop worker") d.proc.Kill() } if service.Interactive() { - gLog.Println(LvINFO, "stop daemon") + gLog.i("stop daemon") os.Exit(0) } return nil } func (d *daemon) run() { - gLog.Println(LvINFO, "daemon run start") - defer gLog.Println(LvINFO, "daemon run end") + gLog.close() + baseDir := filepath.Dir(os.Args[0]) + gLog = NewLogger(baseDir, "daemon", LogLevel(gConf.LogLevel), 1024*1024, LogFile|LogConsole) + gLog.i("daemon run start") + defer gLog.i("daemon run end") d.running = true binPath, _ := os.Executable() conf := &service.Config{ @@ -58,34 +62,55 @@ func (d *daemon) run() { args = append(args, "-nv") for { // start worker - tmpDump := filepath.Join("log", "dump.log.tmp") - dumpFile := filepath.Join("log", "dump.log") - f, err := os.Create(filepath.Join(tmpDump)) + tmpDump := filepath.Join(filepath.Dir(binPath), "log", "dump.log.tmp") + dumpFile := filepath.Join(filepath.Dir(binPath), "log", "dump.log") + // f, err := os.Create(filepath.Join(tmpDump)) + f, err := os.OpenFile(filepath.Join(tmpDump), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0775) if err != nil { - gLog.Printf(LvERROR, "create file %s error:%s", tmpDump, err) + gLog.e("OpenFile %s error:%s", tmpDump, err) return } - gLog.Println(LvINFO, "start worker process, args:", args) + gLog.i("start worker process, args:%v", args) execSpec := &os.ProcAttr{Env: append(os.Environ(), "GOTRACEBACK=crash"), Files: []*os.File{os.Stdin, os.Stdout, f}} lastRebootTime := time.Now() p, err := os.StartProcess(binPath, args, execSpec) if err != nil { - gLog.Printf(LvERROR, "start worker error:%s", err) + gLog.e("start worker error:%s", err) return } d.proc = p - _, _ = p.Wait() + processState, err := p.Wait() + if err != nil { + gLog.e("wait process error:%s", err) + } + + if processState != nil { + exitCode := processState.ExitCode() + gLog.i("worker process exited with code: %d", exitCode) + + if exitCode == 9 { + gLog.i("worker process update with code: %d", exitCode) + // os.Exit(9) // old client installed system service will not auto restart. fuck + } + } + // Write the current time to the end of the dump file + currentTime := time.Now().Format("2006-01-02 15:04:05") + _, err = f.WriteString("\nProcess ended at: " + currentTime + "\n") + if err != nil { + gLog.e("Failed to write time to dump file: %s", err) + } + f.Close() time.Sleep(time.Second) err = os.Rename(tmpDump, dumpFile) if err != nil { - gLog.Printf(LvERROR, "rename dump error:%s", err) + gLog.e("rename dump error:%s", err) } if !d.running { return } if time.Since(lastRebootTime) < time.Second*10 { - gLog.Printf(LvERROR, "worker stop, restart it after 10s") + gLog.e("worker stop, restart it after 10s") time.Sleep(time.Second * 10) } @@ -93,13 +118,7 @@ func (d *daemon) run() { } func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) error { - svcConfig := &service.Config{ - Name: ProductName, - DisplayName: ProductName, - Description: ProductName, - Executable: exeAbsPath, - Arguments: args, - } + svcConfig := getServiceConfig(exeAbsPath, args) s, e := service.New(d, svcConfig) if e != nil { @@ -112,3 +131,56 @@ func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) erro return nil } + +func getServiceConfig(exeAbsPath string, args []string) *service.Config { + config := &service.Config{ + Name: ProductName, + DisplayName: ProductName, + Description: ProductName, + Executable: exeAbsPath, + Arguments: args, + Option: make(map[string]interface{}), + } + + if runtime.GOOS == "windows" { + setupWindowsConfig(config) + } else { + setupLinuxConfig(config) + } + + return config +} + +func setupWindowsConfig(config *service.Config) { + failureActions := []map[string]interface{}{ + { + "Type": "restart", + "Delay": "10000", + }, + { + "Type": "restart", + "Delay": "10000", + }, + { + "Type": "restart", + "Delay": "10000", + }, + } + + config.Option = map[string]interface{}{ + "OnFailure": "restart", + "OnFailureDelay": "10s", + "OnFailureResetPeriod": "3600", + "FailureActions": failureActions, + "DelayedAutoStart": true, + } +} + +func setupLinuxConfig(config *service.Config) { + config.Option = map[string]interface{}{ + "Restart": "always", + "RestartSec": "10", + "StartLimitBurst": 64, + "SuccessExitStatus": "1 2 8 SIGKILL", + } +} diff --git a/core/holepunch.go b/core/holepunch.go index 59ac770..52c2752 100644 --- a/core/holepunch.go +++ b/core/holepunch.go @@ -11,8 +11,8 @@ import ( ) func handshakeC2C(t *P2PTunnel) (err error) { - gLog.Printf(LvDEBUG, "handshakeC2C %s:%d:%d to %s:%d", gConf.Network.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) - defer gLog.Printf(LvDEBUG, "handshakeC2C end") + gLog.d("handshakeC2C %s:%d:%d to %s:%d", gConf.Network.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) + defer gLog.d("handshakeC2C end") conn, err := net.ListenUDP("udp", t.localHoleAddr) if err != nil { return err @@ -20,12 +20,12 @@ func handshakeC2C(t *P2PTunnel) (err error) { defer conn.Close() _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) + gLog.d("handshakeC2C write MsgPunchHandshake error:%s", err) return err } ra, head, buff, _, err := UDPRead(conn, HandshakeTimeout) if err != nil { - gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) + gLog.d("handshakeC2C read MsgPunchHandshake error:%s", err) return err } t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", ra.String()) @@ -39,29 +39,29 @@ func handshakeC2C(t *P2PTunnel) (err error) { tunnelID = t.id } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { - gLog.Printf(LvDEBUG, "read tunnelid:%d handshake ", t.id) + gLog.d("read tunnelid:%d handshake ", t.id) UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) _, head, _, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { - gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error:", err) + gLog.d("handshakeC2C write MsgPunchHandshakeAck error:", err) return err } } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id { - gLog.Printf(LvDEBUG, "read tunnelID:%d handshake ack ", t.id) + gLog.d("read tunnelID:%d handshake ack ", t.id) _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error:", err) + gLog.d("handshakeC2C write MsgPunchHandshakeAck error:%s", err) return err } } - gLog.Printf(LvINFO, "handshakeC2C ok") + gLog.i("handshakeC2C ok") return nil } func handshakeC2S(t *P2PTunnel) error { - gLog.Printf(LvDEBUG, "tid:%d handshakeC2S start", t.id) - defer gLog.Printf(LvDEBUG, "tid:%d handshakeC2S end", t.id) + gLog.d("tid:%d handshakeC2S start", t.id) + defer gLog.d("tid:%d handshakeC2S end", t.id) if !buildTunnelMtx.TryLock() { // time.Sleep(time.Second * 3) return ErrBuildTunnelBusy @@ -77,7 +77,7 @@ func handshakeC2S(t *P2PTunnel) error { defer conn.Close() go func() error { - gLog.Printf(LvDEBUG, "tid:%d send symmetric handshake to %s from %d:%d start", t.id, t.config.peerIP, t.coneLocalPort, t.coneNatPort) + gLog.d("tid:%d send symmetric handshake to %s from %d:%d start", t.id, t.config.peerIP, t.coneLocalPort, t.coneNatPort) for i := 0; i < SymmetricHandshakeNum; i++ { // time.Sleep(SymmetricHandshakeInterval) dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2)) @@ -86,29 +86,29 @@ func handshakeC2S(t *P2PTunnel) error { } _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Printf(LvDEBUG, "tid:%d handshakeC2S write MsgPunchHandshake error:%s", t.id, err) + gLog.d("tid:%d handshakeC2S write MsgPunchHandshake error:%s", t.id, err) return err } } - gLog.Printf(LvDEBUG, "tid:%d send symmetric handshake end", t.id) + gLog.d("tid:%d send symmetric handshake end", t.id) return nil }() err = conn.SetReadDeadline(time.Now().Add(HandshakeTimeout)) if err != nil { - gLog.Println(LvERROR, "tid:%d SymmetricHandshakeAckTimeout SetReadDeadline error", t.id) + gLog.d("tid:%d SymmetricHandshakeAckTimeout SetReadDeadline error", t.id) return err } // read response of the punching hole ok port buff := make([]byte, 1024) _, dst, err := conn.ReadFrom(buff) if err != nil { - gLog.Println(LvERROR, "tid:%d handshakeC2S wait timeout", t.id) + gLog.d("tid:%d handshakeC2S wait timeout", t.id) return err } head := &openP2PHeader{} err = binary.Read(bytes.NewReader(buff[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { - gLog.Printf(LvERROR, "tid:%d parse p2pheader error:%s", t.id, err) + gLog.e("tid:%d parse p2pheader error:%s", t.id, err) return err } t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", dst.String()) @@ -122,12 +122,12 @@ func handshakeC2S(t *P2PTunnel) error { tunnelID = t.id } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { - gLog.Printf(LvDEBUG, "tid:%d handshakeC2S read handshake ", t.id) + gLog.d("tid:%d handshakeC2S read handshake ", t.id) UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) for { _, head, buff, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { - gLog.Printf(LvDEBUG, "tid:%d handshakeC2S handshake error", t.id) + gLog.d("tid:%d handshakeC2S handshake error", t.id) return err } var tunnelID uint64 @@ -146,35 +146,35 @@ func handshakeC2S(t *P2PTunnel) error { } } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "tid:%d handshakeC2S read handshake ack %s", t.id, t.remoteHoleAddr.String()) + gLog.d("tid:%d handshakeC2S read handshake ack %s", t.id, t.remoteHoleAddr.String()) _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) return err } else { - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) + gLog.d("tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) } - gLog.Printf(LvINFO, "tid:%d handshakeC2S ok. cost %d ms", t.id, time.Since(startTime)/time.Millisecond) + gLog.i("tid:%d handshakeC2S ok. cost %d ms", t.id, time.Since(startTime)/time.Millisecond) return nil } func handshakeS2C(t *P2PTunnel) error { - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C start", t.id) - defer gLog.Printf(LvDEBUG, "tid:%d handshakeS2C end", t.id) + gLog.d("tid:%d handshakeS2C start", t.id) + defer gLog.d("tid:%d handshakeS2C end", t.id) if !buildTunnelMtx.TryLock() { // time.Sleep(time.Second * 3) return ErrBuildTunnelBusy } defer buildTunnelMtx.Unlock() startTime := time.Now() - gotCh := make(chan *net.UDPAddr, 5) + gotCh := make(chan *net.UDPAddr, 50) // sequencely udp send handshake, do not parallel send - gLog.Printf(LvDEBUG, "tid:%d send symmetric handshake to %s:%d start", t.id, t.config.peerIP, t.config.peerConeNatPort) + gLog.d("tid:%d send symmetric handshake to %s:%d start", t.id, t.config.peerIP, t.config.peerConeNatPort) gotIt := false for i := 0; i < SymmetricHandshakeNum; i++ { // time.Sleep(SymmetricHandshakeInterval) go func(t *P2PTunnel) error { conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random? if err != nil { - gLog.Printf(LvDEBUG, "tid:%d listen error", t.id) + gLog.d("tid:%d listen error", t.id) return err } defer conn.Close() @@ -198,13 +198,13 @@ func handshakeS2C(t *P2PTunnel) error { } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C read handshake ", t.id) + gLog.d("tid:%d handshakeS2C read handshake ", t.id) UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) // may read several MsgPunchHandshake for { _, head, buff, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { - gLog.Println(LvDEBUG, "tid:%d handshakeS2C handshake error", t.id) + gLog.d("tid:%d handshakeS2C handshake error", t.id) return err } if len(buff) > openP2PHeaderSize { @@ -218,26 +218,26 @@ func handshakeS2C(t *P2PTunnel) error { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id { break } else { - gLog.Println(LvDEBUG, "tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) + gLog.d("tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) } } } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C read handshake ack %s", t.id, conn.LocalAddr().String()) + gLog.d("tid:%d handshakeS2C read handshake ack %s", t.id, conn.LocalAddr().String()) UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) gotIt = true la, _ := net.ResolveUDPAddr("udp", conn.LocalAddr().String()) gotCh <- la return nil } else { - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) + gLog.d("tid:%d handshakeS2C read msg but not MsgPunchHandshakeAck", t.id) } return nil }(t) } - gLog.Printf(LvDEBUG, "tid:%d send symmetric handshake end", t.id) + gLog.d("tid:%d send symmetric handshake end", t.id) if compareVersion(t.config.peerVersion, SymmetricSimultaneouslySendVersion) < 0 { // compatible with old client - gLog.Printf(LvDEBUG, "tid:%d handshakeS2C ready, notify peer connect", t.id) + gLog.d("tid:%d handshakeS2C ready, notify peer connect", t.id) GNetwork.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) } @@ -246,7 +246,7 @@ func handshakeS2C(t *P2PTunnel) error { return fmt.Errorf("tid:%d wait handshake timeout", t.id) case la := <-gotCh: t.localHoleAddr = la - gLog.Printf(LvINFO, "tid:%d handshakeS2C ok. cost %dms", t.id, time.Since(startTime)/time.Millisecond) + gLog.i("tid:%d handshakeS2C ok. cost %dms", t.id, time.Since(startTime)/time.Millisecond) } return nil } diff --git a/core/nat.go b/core/nat.go index 9cc8c71..50fb9dd 100644 --- a/core/nat.go +++ b/core/nat.go @@ -12,50 +12,55 @@ import ( reuse "github.com/openp2p-cn/go-reuseport" ) -func natTCP(serverHost string, serverPort int) (publicIP string, publicPort int, localPort int) { - // dialer := &net.Dialer{ - // LocalAddr: &net.TCPAddr{ - // IP: net.ParseIP("0.0.0.0"), - // Port: localPort, - // }, - // } - conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", 0), fmt.Sprintf("%s:%d", serverHost, serverPort), NatTestTimeout) - // conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort)) - // log.Println(LvINFO, conn.LocalAddr()) +func natDetectTCP(serverHost string, serverPort int, lp int) (publicIP string, publicPort int, localPort int, err error) { + gLog.d("natDetectTCP start") + defer gLog.d("natDetectTCP end") + conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("0.0.0.0:%d", lp), fmt.Sprintf("%s:%d", serverHost, serverPort), NatDetectTimeout) if err != nil { - fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err) + err = fmt.Errorf("dial tcp4 %s:%d error: %w", serverHost, serverPort, err) return } defer conn.Close() - localPort, _ = strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1]) - _, wrerr := conn.Write([]byte("1")) - if wrerr != nil { - fmt.Printf("Write error: %s\n", wrerr) - return - } - b := make([]byte, 1000) - conn.SetReadDeadline(time.Now().Add(NatTestTimeout)) - n, rderr := conn.Read(b) - if rderr != nil { - fmt.Printf("Read error: %s\n", rderr) - return - } - arr := strings.Split(string(b[:n]), ":") - if len(arr) < 2 { - return - } - publicIP = arr[0] - port, _ := strconv.ParseInt(arr[1], 10, 32) - publicPort = int(port) - return + localAddr := conn.LocalAddr().(*net.TCPAddr) + localPort = localAddr.Port + + if _, err = conn.Write([]byte("1")); err != nil { + err = fmt.Errorf("write error: %w", err) + return + } + + b := make([]byte, 1000) + conn.SetReadDeadline(time.Now().Add(NatDetectTimeout)) + n, err := conn.Read(b) + if err != nil { + err = fmt.Errorf("read error: %w", err) + return + } + + response := strings.Split(string(b[:n]), ":") + if len(response) < 2 { + err = fmt.Errorf("invalid response format: %s", string(b[:n])) + return + } + + publicIP = response[0] + port, err := strconv.Atoi(response[1]) + if err != nil { + err = fmt.Errorf("invalid port format: %w", err) + return + } + publicPort = port + + return } -func natTest(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int, err error) { - gLog.Println(LvDEBUG, "natTest start") - defer gLog.Println(LvDEBUG, "natTest end") + +func natDetectUDP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int, err error) { + gLog.d("natDetectUDP start") + defer gLog.d("natDetectUDP end") conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", localPort)) if err != nil { - gLog.Println(LvERROR, "natTest listen udp error:", err) + gLog.e("natDetectUDP listen udp error:%s", err) return "", 0, err } defer conn.Close() @@ -71,7 +76,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string, if err != nil { return "", 0, err } - deadline := time.Now().Add(NatTestTimeout) + deadline := time.Now().Add(NatDetectTimeout) err = conn.SetReadDeadline(deadline) if err != nil { return "", 0, err @@ -79,7 +84,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string, buffer := make([]byte, 1024) nRead, _, err := conn.ReadFrom(buffer) if err != nil { - gLog.Println(LvERROR, "NAT detect error:", err) + gLog.e("NAT detect error:%s", err) return "", 0, err } natRsp := NatDetectRsp{} @@ -88,19 +93,27 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string, return natRsp.IP, natRsp.Port, nil } -func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, err error) { +func getNATType(host string, detectPort1 int, detectPort2 int) (publicIP string, NATType int, err error) { + setUPNP(gConf.Network.PublicIPPort) // the random local port may be used by other. localPort := int(rand.Uint32()%15000 + 50000) - ip1, port1, err := natTest(host, udp1, localPort) + ip1, port1, err := natDetectUDP(host, detectPort1, localPort) if err != nil { - return "", 0, err + // udp block try tcp + gLog.w("udp block, try tcp nat detect") + if ip1, port1, _, err = natDetectTCP(host, detectPort1, localPort); err != nil { + return "", 0, err + } } - _, port2, err := natTest(host, udp2, localPort) // 2rd nat test not need testing publicip - gLog.Printf(LvDEBUG, "local port:%d nat port:%d", localPort, port2) + _, port2, err := natDetectUDP(host, detectPort2, localPort) // 2rd nat test not need testing publicip if err != nil { - return "", 0, err + gLog.w("udp block, try tcp nat detect") + if _, port2, _, err = natDetectTCP(host, detectPort2, localPort); err != nil { + return "", 0, err + } } + gLog.d("local port:%d nat port:%d", localPort, port2) natType := NATSymmetric if port1 == port2 { natType = NATCone @@ -113,11 +126,11 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP return } var echoConn *net.UDPConn - gLog.Println(LvDEBUG, "echo server start") + gLog.d("echo server start") var err error echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort}) if err != nil { // listen error - gLog.Println(LvERROR, "echo server listen error:", err) + gLog.e("echo server listen error:%s", err) return } defer echoConn.Close() @@ -125,34 +138,18 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP for i := 0; i < 2; i++ { if i == 1 { // test upnp or nat-pmp - gLog.Println(LvDEBUG, "upnp test start") - nat, err := Discover() - if err != nil || nat == nil { - gLog.Println(LvDEBUG, "could not perform UPNP discover:", err) - break - } - ext, err := nat.GetExternalAddress() - if err != nil { - gLog.Println(LvDEBUG, "could not perform UPNP external address:", err) - break - } - gLog.Println(LvINFO, "PublicIP:", ext) - - externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30) // 30 seconds fot upnp testing - if err != nil { - gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort) - break - } else { - nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days for tcp connection - } + gLog.d("upnp test start") + // 7 days for udp connection + // 7 days for tcp connection + setUPNP(echoPort) } - gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort) + gLog.d("public ip test start %s:%d", publicIP, echoPort) conn, err := net.ListenUDP("udp", nil) if err != nil { break } defer conn.Close() - dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", gConf.Network.ServerHost, gConf.Network.ServerPort)) + dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", gConf.Network.ServerIP, gConf.Network.ServerPort)) if err != nil { break } @@ -169,21 +166,21 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP echoConn.SetReadDeadline(time.Now().Add(PublicIPEchoTimeout)) nRead, _, err := echoConn.ReadFromUDP(buf) if err != nil { - gLog.Println(LvDEBUG, "PublicIP detect error:", err) + gLog.d("publicIPTest echoConn read timeout:%s", err) continue } natRsp := NatDetectRsp{} err = json.Unmarshal(buf[openP2PHeaderSize:nRead], &natRsp) if err != nil { - gLog.Println(LvDEBUG, "PublicIP detect error:", err) + gLog.d("publicIPTest Unmarshal error:%s", err) continue } if natRsp.Port == echoPort { if i == 1 { - gLog.Println(LvDEBUG, "UPNP or NAT-PMP:YES") + gLog.d("UPNP or NAT-PMP:YES") hasUPNPorNATPMP = 1 } else { - gLog.Println(LvDEBUG, "public ip:YES") + gLog.d("public ip:YES") hasPublicIP = 1 } break @@ -191,3 +188,25 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP } return } + +func setUPNP(echoPort int) { + nat, err := Discover() + if err != nil || nat == nil { + gLog.d("could not perform UPNP discover:%s", err) + return + } + ext, err := nat.GetExternalAddress() + if err != nil { + gLog.d("could not perform UPNP external address:%s", err) + return + } + gLog.i("PublicIP:%v", ext) + + externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 604800) + if err != nil { + gLog.d("could not add udp UPNP port mapping %d", externalPort) + return + } else { + nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) + } +} diff --git a/core/speedlimiter_test.go b/core/speedlimiter_test.go index 9a1c053..a5cb8d7 100644 --- a/core/speedlimiter_test.go +++ b/core/speedlimiter_test.go @@ -1,6 +1,7 @@ package openp2p import ( + "log" "testing" "time" ) @@ -15,7 +16,7 @@ func TestBandwidth(t *testing.T) { for i := 0; i < writeNum; i++ { speedl.Add(oneBuffSize, true) } - t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + log.Printf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { t.Error("error") } @@ -27,12 +28,12 @@ func TestSymmetric(t *testing.T) { oneBuffSize := 300 writeNum := 70 expectTime := (oneBuffSize*writeNum - 20000) / speed - t.Logf("expect %ds", expectTime) + log.Printf("expect %ds", expectTime) startTs := time.Now() for i := 0; i < writeNum; i++ { speedl.Add(oneBuffSize, true) } - t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + log.Printf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { t.Error("error") } @@ -44,6 +45,7 @@ func TestSymmetric2(t *testing.T) { oneBuffSize := 800 writeNum := 40 expectTime := (oneBuffSize*writeNum - 30000) / speed + log.Printf("expect %ds", expectTime) startTs := time.Now() for i := 0; i < writeNum; { if speedl.Add(oneBuffSize, true) { @@ -52,7 +54,7 @@ func TestSymmetric2(t *testing.T) { time.Sleep(time.Millisecond) } } - t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + log.Printf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { t.Error("error") } diff --git a/core/underlay_kcp.go b/core/underlay_kcp.go index b93c3b2..0a84c7c 100644 --- a/core/underlay_kcp.go +++ b/core/underlay_kcp.go @@ -66,7 +66,7 @@ func (conn *underlayKCP) Accept() error { } func listenKCP(addr string, idleTimeout time.Duration) (*underlayKCP, error) { - gLog.Println(LvDEBUG, "kcp listen on ", addr) + gLog.d("kcp listen on %s", addr) listener, err := kcp.ListenWithOptions(addr, nil, 0, 0) if err != nil { return nil, fmt.Errorf("quic.ListenAddr error:%s", err) @@ -81,6 +81,7 @@ func listenKCP(addr string, idleTimeout time.Duration) (*underlayKCP, error) { } func dialKCP(conn *net.UDPConn, remoteAddr *net.UDPAddr, idleTimeout time.Duration) (*underlayKCP, error) { + conn.SetDeadline(time.Now().Add(idleTimeout)) kConn, err := kcp.NewConn(remoteAddr.String(), nil, 0, 0, conn) if err != nil { return nil, fmt.Errorf("quic.DialContext error:%s", err) diff --git a/core/underlay_quic.go b/core/underlay_quic.go index 3e61e99..a9bb585 100644 --- a/core/underlay_quic.go +++ b/core/underlay_quic.go @@ -81,7 +81,7 @@ func (conn *underlayQUIC) Accept() error { } func listenQuic(addr string, idleTimeout time.Duration) (*underlayQUIC, error) { - gLog.Println(LvDEBUG, "quic listen on ", addr) + gLog.d("quic listen on %s", addr) listener, err := quic.ListenAddr(addr, generateTLSConfig(), &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) if err != nil { @@ -96,13 +96,15 @@ func listenQuic(addr string, idleTimeout time.Duration) (*underlayQUIC, error) { return ul, nil } -func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, idleTimeout time.Duration) (*underlayQUIC, error) { +func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, timeout time.Duration) (*underlayQUIC, error) { tlsConf := &tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"openp2pv1"}, } - Connection, err := quic.DialContext(context.Background(), conn, remoteAddr, conn.LocalAddr().String(), tlsConf, - &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + Connection, err := quic.DialContext(ctx, conn, remoteAddr, conn.LocalAddr().String(), tlsConf, + &quic.Config{Versions: quicVersion, MaxIdleTimeout: TunnelIdleTimeout, DisablePathMTUDiscovery: true}) if err != nil { return nil, fmt.Errorf("quic.DialContext error:%s", err) } diff --git a/docker/Dockerfile b/docker/Dockerfile index 82e9bf7..815eab7 100755 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -6,7 +6,9 @@ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/* COPY get-client.sh / -ARG DOCKER_VER="latest" +ARG VERSION +LABEL version=${VERSION} +# ARG DOCKER_VER="latest" RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh -ENTRYPOINT ["/openp2p"] +ENTRYPOINT ["/usr/local/openp2p/openp2p"] diff --git a/docker/get-client.sh b/docker/get-client.sh index 687642e..443b6cc 100755 --- a/docker/get-client.sh +++ b/docker/get-client.sh @@ -1,7 +1,7 @@ #!/bin/sh -echo "Building version:${DOCKER_VER}" +echo "Building version:${VERSION}" echo "Running on platform: $TARGETPLATFORM" # TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/') echo "Running on platform: $TARGETPLATFORM" @@ -25,7 +25,7 @@ sysType="linux-amd64" sysType="linux-mipsbe" fi fi -url="https://openp2p.cn/download/v1/${DOCKER_VER}/openp2p-latest.$sysType.tar.gz" +url="https://console.openpxp.com/download/v1/${VERSION}/openp2p-${VERSION}.$sysType.tar.gz" echo "download $url start" if [ -f /usr/bin/curl ]; then @@ -38,8 +38,9 @@ if [ $? -ne 0 ]; then exit 9 fi echo "download ok" -tar -xzvf openp2p.tar.gz -chmod +x openp2p +mkdir -p /usr/local/openp2p/ +tar -xzvf openp2p.tar.gz -C /usr/local/openp2p/ +chmod +x /usr/local/openp2p/openp2p pwd ls -l exit 0