mirror of
https://github.com/wlynxg/NetHive.git
synced 2025-12-24 13:08:30 +08:00
refactor: 重构节点连接逻辑
This commit is contained in:
@@ -280,73 +280,52 @@ func (e *Engine) addConn(dst netip.Addr) (PacketChan, error) {
|
||||
peerChan := make(chan Payload, ChanSize)
|
||||
conn = peerChan
|
||||
e.routeTable.addr.Store(dst, peerChan)
|
||||
dev := &devWrapper{w: e.devWriter, r: peerChan}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
peerInfo := make(chan peer.AddrInfo)
|
||||
|
||||
go func() {
|
||||
e.log.Infof(ctx, "start find peer %s by DHT", string(id))
|
||||
peers, err := e.discovery.FindPeers(ctx, string(id))
|
||||
dev := &devWrapper{w: e.devWriter, r: peerChan}
|
||||
e.log.Infof(e.ctx, "start find peer %s", string(id))
|
||||
peerc, err := e.discovery.FindPeers(e.ctx, string(id))
|
||||
if err != nil {
|
||||
e.log.Warningf(ctx, "Finding node %s failed because %s", string(id), err)
|
||||
e.log.Warningf(e.ctx, "Finding node by dht %s failed because %s", string(id), err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case info := <-peers:
|
||||
if info.ID == id && len(info.Addrs) > 0 {
|
||||
peerInfo <- info
|
||||
}
|
||||
var peers []peer.AddrInfo
|
||||
for info := range peerc {
|
||||
if info.ID.String() == string(id) && len(info.Addrs) > 0 {
|
||||
peers = append(peers, info)
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
e.log.Infof(ctx, "start find peer %s by mDNS", string(id))
|
||||
idr, err := base58.Decode(string(id))
|
||||
if err != nil {
|
||||
e.log.Warningf(ctx, "base58 decode failed")
|
||||
return
|
||||
}
|
||||
ticker := time.NewTimer(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
info := e.host.Peerstore().PeerInfo(peer.ID(idr))
|
||||
if len(info.Addrs) > 0 {
|
||||
peerInfo <- info
|
||||
}
|
||||
|
||||
if idr, err := base58.Decode(string(id)); err == nil {
|
||||
info := e.host.Peerstore().PeerInfo(peer.ID(idr))
|
||||
if len(info.Addrs) > 0 {
|
||||
peers = append(peers, info)
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for info := range peerInfo {
|
||||
|
||||
var stream network.Stream
|
||||
for _, info := range peers {
|
||||
e.log.Infof(e.ctx, "find peer: %s", info)
|
||||
stream, err := e.host.NewStream(e.ctx, info.ID, VPNStreamProtocol)
|
||||
if err != nil {
|
||||
e.log.Warningf(e.ctx, "Connection establishment with node %s failed due to %s", info, err)
|
||||
continue
|
||||
stream, err = e.host.NewStream(e.ctx, info.ID, VPNStreamProtocol)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
e.log.Infof(e.ctx, "Peer [%s] connect success")
|
||||
e.log.Warningf(e.ctx, "Connection establishment with node %s failed due to %s", info, err)
|
||||
}
|
||||
e.log.Infof(e.ctx, "Peer [%s] connect success", string(id))
|
||||
|
||||
go func() {
|
||||
_, err := io.Copy(stream, dev)
|
||||
if err != nil && err != io.EOF {
|
||||
e.log.Errorf(e.ctx, "Peer [%s] stream write error: %s", info.ID, err)
|
||||
}
|
||||
}()
|
||||
_, err = io.Copy(dev, stream)
|
||||
go func() {
|
||||
defer stream.Close()
|
||||
_, err := io.Copy(stream, dev)
|
||||
if err != nil && err != io.EOF {
|
||||
e.log.Errorf(e.ctx, "Peer [%s] stream read error: %s", info.ID, err)
|
||||
stream.Close()
|
||||
return
|
||||
e.log.Errorf(e.ctx, "Peer [%s] stream write error: %s", id, err)
|
||||
}
|
||||
stream.Close()
|
||||
return
|
||||
}()
|
||||
|
||||
defer stream.Close()
|
||||
_, err = io.Copy(dev, stream)
|
||||
if err != nil && err != io.EOF {
|
||||
e.log.Errorf(e.ctx, "Peer [%s] stream read error: %s", id, err)
|
||||
}
|
||||
}()
|
||||
return false
|
||||
|
||||
Reference in New Issue
Block a user