mirror of
https://github.com/sigcn/pg.git
synced 2025-09-27 01:05:51 +08:00
disco/udp: make relayprotocol works better
This commit is contained in:
@@ -2,6 +2,7 @@ package udp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/sigcn/pg/disco"
|
||||
)
|
||||
@@ -11,6 +12,19 @@ var (
|
||||
MAGIC_FROM_RELAY = []byte{'_', 'p', 'g', 3}
|
||||
)
|
||||
|
||||
type RelayToPeerError struct {
|
||||
PeerID disco.PeerID
|
||||
err error
|
||||
}
|
||||
|
||||
func (e RelayToPeerError) Error() string {
|
||||
return fmt.Errorf("relay to %s : %w", e.PeerID, e.err).Error()
|
||||
}
|
||||
|
||||
func (e RelayToPeerError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
type relayProtocol struct {
|
||||
}
|
||||
|
||||
|
@@ -32,7 +32,7 @@ type UDPConn struct {
|
||||
udpConnsMutex sync.RWMutex
|
||||
udpConns []*net.UDPConn
|
||||
|
||||
closedSig chan int
|
||||
closeChan chan int
|
||||
closedWG sync.WaitGroup
|
||||
|
||||
cfg UDPConfig
|
||||
@@ -40,6 +40,7 @@ type UDPConn struct {
|
||||
datagrams chan *disco.Datagram
|
||||
natEvents chan *disco.NATInfo
|
||||
endpoints chan *disco.Endpoint
|
||||
errChan chan error
|
||||
relayProtocol relayProtocol
|
||||
upnpPortMapping upnpPortMapping
|
||||
stunRoundTripper stunRoundTripper
|
||||
@@ -54,7 +55,7 @@ type UDPConn struct {
|
||||
|
||||
func (c *UDPConn) Close() error {
|
||||
c.upnpPortMapping.close()
|
||||
close(c.closedSig)
|
||||
close(c.closeChan)
|
||||
c.udpConnsMutex.RLock()
|
||||
for _, conn := range c.udpConns {
|
||||
conn.Close()
|
||||
@@ -64,6 +65,7 @@ func (c *UDPConn) Close() error {
|
||||
close(c.natEvents)
|
||||
close(c.datagrams)
|
||||
close(c.endpoints)
|
||||
close(c.errChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -101,6 +103,10 @@ func (c *UDPConn) Endpoints() <-chan *disco.Endpoint {
|
||||
return c.endpoints
|
||||
}
|
||||
|
||||
func (c *UDPConn) Errors() <-chan error {
|
||||
return c.errChan
|
||||
}
|
||||
|
||||
func (c *UDPConn) GenerateLocalAddrsSends(peerID disco.PeerID, stunServers []string) {
|
||||
// UPnP
|
||||
go func() {
|
||||
@@ -234,7 +240,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) {
|
||||
for i := 0; i < defaultDiscoConfig.ChallengesRetry; i++ {
|
||||
time.Sleep(interval)
|
||||
select {
|
||||
case <-c.closedSig:
|
||||
case <-c.closeChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -251,7 +257,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) {
|
||||
rl := rate.NewLimiter(rate.Limit(256), 256)
|
||||
for range 2000 {
|
||||
select {
|
||||
case <-c.closedSig:
|
||||
case <-c.closeChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -315,7 +321,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) {
|
||||
rl := rate.NewLimiter(rate.Limit(limit), limit)
|
||||
for port := udpAddr.Addr.Port + defaultDiscoConfig.PortScanOffset; port <= udpAddr.Addr.Port+defaultDiscoConfig.PortScanCount; port++ {
|
||||
select {
|
||||
case <-c.closedSig:
|
||||
case <-c.closeChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -454,7 +460,7 @@ func (c *UDPConn) udpRead(udpConn *net.UDPConn) {
|
||||
buf := make([]byte, 65535)
|
||||
for {
|
||||
select {
|
||||
case <-c.closedSig:
|
||||
case <-c.closeChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -492,15 +498,16 @@ func (c *UDPConn) udpRead(udpConn *net.UDPConn) {
|
||||
c.tryGetPeerkeeper(udpConn, peerID).heartbeat(peerAddr)
|
||||
slog.Log(context.Background(), -3, "[UDP] ReadFrom", "peer", peerID, "addr", peerAddr)
|
||||
if pkt, dst := c.relayProtocol.tryToDst(buf[:n], peerID); pkt != nil {
|
||||
c.WriteTo(pkt, dst) // relay to dest
|
||||
if _, err := c.WriteTo(pkt, dst); err != nil {
|
||||
c.errChan <- RelayToPeerError{PeerID: dst, err: err}
|
||||
} // relay to dest
|
||||
continue
|
||||
}
|
||||
if pkt, src := c.relayProtocol.tryRecv(buf[:n]); pkt != nil {
|
||||
c.datagrams <- &disco.Datagram{PeerID: src, Data: pkt} // recv from relay
|
||||
continue
|
||||
}
|
||||
b := append([]byte(nil), buf[:n]...)
|
||||
c.datagrams <- &disco.Datagram{PeerID: peerID, Data: b}
|
||||
c.datagrams <- &disco.Datagram{PeerID: peerID, Data: slices.Clone(buf[:n])}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -508,7 +515,7 @@ func (c *UDPConn) runPeersHealthcheckLoop() {
|
||||
ticker := time.NewTicker(c.cfg.PeerKeepaliveInterval/2 + time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-c.closedSig:
|
||||
case <-c.closeChan:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
@@ -581,7 +588,8 @@ func ListenUDP(cfg UDPConfig) (*UDPConn, error) {
|
||||
udpConn := UDPConn{
|
||||
cfg: cfg,
|
||||
disco: &disco.Disco{Magic: cfg.DiscoMagic},
|
||||
closedSig: make(chan int),
|
||||
closeChan: make(chan int),
|
||||
errChan: make(chan error),
|
||||
natEvents: make(chan *disco.NATInfo, 3),
|
||||
datagrams: make(chan *disco.Datagram),
|
||||
endpoints: make(chan *disco.Endpoint, 10),
|
||||
|
@@ -436,6 +436,13 @@ func (c *PacketConn) eventsHandle() {
|
||||
return
|
||||
}
|
||||
go sendEndpoint(endpoint)
|
||||
case err, ok := <-c.udpConn.Errors():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if relayToErr, ok := err.(udp.RelayToPeerError); ok && !errors.Is(relayToErr.Unwrap(), udp.ErrUDPConnInactive) {
|
||||
go c.TryLeadDisco(relayToErr.PeerID) // peer not found, and trying to discover it.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user