refactor: update engine logic

This commit is contained in:
wlynxg
2024-04-10 22:36:08 +08:00
parent 0d490cc4db
commit 2b9e410f1a
6 changed files with 83 additions and 140 deletions

View File

@@ -27,12 +27,12 @@ func main() {
log.Fatal(err)
}
e, err := engine.New(cfg)
e, err := engine.Run(cfg)
if err != nil {
log.Fatal(err)
}
err = e.Start()
err = e.Run()
if err != nil {
log.Fatal(err)
}

View File

@@ -37,7 +37,7 @@ func (c *Config) Save() error {
}
func Load(path string) (*Config, error) {
cfg := &Config{}
cfg := &Config{path: path}
if gfile.Exists(path) {
load, err := gjson.Load(path)
if err != nil {
@@ -49,17 +49,19 @@ func Load(path string) (*Config, error) {
}
}
cfg.path = path
defaultConfig(cfg)
err := defaultConfig(cfg)
if err != nil {
return nil, err
}
err := cfg.Save()
err = cfg.Save()
if err != nil {
return nil, err
}
return cfg, nil
}
func defaultConfig(cfg *Config) {
func defaultConfig(cfg *Config) error {
if cfg.TUNName == "" {
cfg.TUNName = "hive0"
}
@@ -76,11 +78,11 @@ func defaultConfig(cfg *Config) {
cfg.PrivateKey, _ = NewPrivateKey()
key, err := cfg.PrivateKey.PrivKey()
if err != nil {
return
return err
}
id, err := peer.IDFromPrivateKey(key)
if err != nil {
panic(err)
return err
}
cfg.PeerID = id.String()
}
@@ -90,4 +92,5 @@ func defaultConfig(cfg *Config) {
cfg.Bootstraps = append(cfg.Bootstraps, n.String())
}
}
return nil
}

View File

@@ -9,33 +9,28 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mr-tron/base58/base58"
"go4.org/netipx"
)
func (e *Engine) addConnByDst(dst netip.Addr) (PacketChan, error) {
e.log.Debugf("Try to connect to the corresponding node of %s", dst)
var conn PacketChan
e.routeTable.set.Range(func(id string, prefix *netipx.IPSet) bool {
if !prefix.Contains(dst) {
if c, ok := e.routeTable.addr.Load(dst); ok {
conn = c
} else {
e.routeTable.m.Range(func(key string, value netip.Prefix) bool {
if value.Addr().Compare(dst) == 0 {
conn = make(PacketChan, ChanSize)
go func() {
defer e.routeTable.id.Delete(key)
defer e.routeTable.addr.Delete(dst)
e.addConn(conn, key)
}()
return false
}
return true
}
if c, ok := e.routeTable.id.Load(id); ok {
conn = c
e.routeTable.addr.Store(dst, c)
return false
}
peerChan := make(chan Payload, ChanSize)
conn = peerChan
e.routeTable.addr.Store(dst, peerChan)
go func() {
defer e.routeTable.addr.Delete(dst)
e.addConn(peerChan, id)
}()
return false
})
})
}
if conn != nil {
return conn, nil

View File

@@ -3,6 +3,7 @@ package engine
import (
"context"
"fmt"
"github.com/wlynxg/NetHive/core/route"
"io"
"net/netip"
"sync"
@@ -10,7 +11,6 @@ import (
"github.com/wlynxg/NetHive/core/config"
"github.com/wlynxg/NetHive/core/device"
"github.com/wlynxg/NetHive/core/protocol"
"github.com/wlynxg/NetHive/core/route"
mlog "github.com/wlynxg/NetHive/pkgs/log"
"github.com/wlynxg/NetHive/pkgs/xsync"
@@ -22,7 +22,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
"go4.org/netipx"
)
const (
@@ -54,16 +53,17 @@ type Engine struct {
routeTable struct {
m xsync.Map[string, netip.Prefix]
set xsync.Map[string, *netipx.IPSet]
id xsync.Map[string, PacketChan]
addr xsync.Map[netip.Addr, PacketChan]
}
}
func New(cfg *config.Config) (*Engine, error) {
func Run(cfg *config.Config) (*Engine, error) {
var (
e = new(Engine)
e = new(Engine)
err error
)
e.log = mlog.New("engine")
e.cfg = cfg
e.ctx, e.cancel = context.WithCancel(context.Background())
@@ -71,31 +71,6 @@ func New(cfg *config.Config) (*Engine, error) {
e.devReader = make(PacketChan, ChanSize)
e.relayChan = make(chan peer.AddrInfo, ChanSize)
// create tun
tun, err := device.CreateTUN(cfg.TUNName, cfg.MTU)
if err != nil {
return nil, err
}
e.device = tun
name, err := e.device.Name()
if err != nil {
return nil, err
}
for id, prefix := range e.cfg.PeersRouteTable {
e.log.Debugf("r: %s -> %s", id, prefix)
e.routeTable.m.Store(id, prefix)
b := netipx.IPSetBuilder{}
err := route.Add(name, prefix)
set, err := b.IPSet()
if err != nil {
return nil, err
}
e.routeTable.set.Store(id, set)
}
pk, err := cfg.PrivateKey.PrivKey()
if err != nil {
return nil, err
@@ -118,11 +93,33 @@ func New(cfg *config.Config) (*Engine, error) {
return e, nil
}
func (e *Engine) Start() error {
func (e *Engine) Run() error {
var err error
defer e.cancel()
cfg := e.cfg
if err := e.device.AddAddress(cfg.LocalAddr); err != nil {
// create tun
e.device, err = device.CreateTUN(e.cfg.TUNName, e.cfg.MTU)
if err != nil {
return err
}
name, err := e.device.Name()
if err != nil {
return err
}
for id, prefix := range e.cfg.PeersRouteTable {
e.routeTable.m.Store(id, prefix)
err := route.Add(name, prefix)
if err != nil {
e.log.Warnf("fail to add %s's route: %s", id, prefix)
continue
}
e.log.Debugf("successfully add %s's route: %s", id, prefix)
}
if err := e.device.AddAddress(e.cfg.LocalAddr); err != nil {
return err
}
@@ -131,7 +128,7 @@ func (e *Engine) Start() error {
}
wg := sync.WaitGroup{}
for _, info := range cfg.Bootstraps {
for _, info := range e.cfg.Bootstraps {
addrInfo, err := peer.AddrInfoFromString(info)
if err != nil {
e.log.Debugf("fail to parse '%s': %v", info, err)
@@ -148,10 +145,10 @@ func (e *Engine) Start() error {
}
wg.Wait()
e.host.SetStreamHandler(VPNStreamProtocol, e.Handler)
e.host.SetStreamHandler(VPNStreamProtocol, e.VPNHandler)
util.Advertise(e.ctx, e.discovery, e.host.ID().String())
if cfg.EnableMDNS {
if e.cfg.EnableMDNS {
e.mdns = mdns.NewMdnsService(e.host, "_net._hive", e)
if err := e.mdns.Start(); err != nil {
e.log.Warnf("fail to run mdns: %v", err)
@@ -223,58 +220,32 @@ func (e *Engine) RoutineRouteTableWriter() {
var conn PacketChan
e.log.Debugf("%s -> %s", payload.Src, payload.Dst)
if payload.Dst.IsMulticast() {
e.routeTable.m.Range(func(key string, value netip.Prefix) bool {
if key == e.host.ID().String() {
return true
}
if conn, ok := e.routeTable.id.Load(key); ok {
select {
case conn <- payload:
default:
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst)
}
return true
}
conn, err := e.addConnByID(key)
if err != nil {
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because %s", payload.Dst, err)
return true
}
select {
case conn <- payload:
default:
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst)
}
return true
})
c, ok := e.routeTable.addr.Load(payload.Dst)
if ok {
conn = c
} else {
c, ok := e.routeTable.addr.Load(payload.Dst)
if ok {
conn = c
} else {
c, err := e.addConnByDst(payload.Dst)
if err != nil {
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because %s", payload.Dst, err)
continue
}
conn = c
c, err := e.addConnByDst(payload.Dst)
if err != nil {
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because %s", payload.Dst, err)
continue
}
conn = c
}
select {
case conn <- payload:
default:
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst)
}
if conn == nil {
continue
}
select {
case conn <- payload:
default:
e.log.Warnf("[RoutineRouteTableWriter] drop packet: %s, because the sending queue is already full", payload.Dst)
}
}
}
func (e *Engine) Handler(stream network.Stream) {
func (e *Engine) VPNHandler(stream network.Stream) {
e.log.Debugf("%s connect", stream.Conn().RemotePeer())
id := stream.Conn().RemotePeer().String()
@@ -292,31 +263,12 @@ func (e *Engine) Handler(stream network.Stream) {
defer stream.Close()
_, err := io.Copy(stream, dev)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream write error: %s", string(id), err)
e.log.Errorf("Peer [%s] stream write error: %s", id, err)
}
}()
defer func() {
stream.Close()
set, ok := e.routeTable.set.Load(id)
if !ok {
return
}
var addr []netip.Addr
e.routeTable.addr.Range(func(key netip.Addr, value PacketChan) bool {
if set.Contains(key) {
addr = append(addr, key)
}
return true
})
for _, a := range addr {
e.routeTable.addr.Delete(a)
}
}()
_, err := io.Copy(dev, stream)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream read error: %s", string(id), err)
e.log.Errorf("Peer [%s] stream read error: %s", id, err)
}
}

3
go.mod
View File

@@ -4,12 +4,11 @@ go 1.21
require (
github.com/gogf/gf/v2 v2.7.0
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-libp2p v0.33.1
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/mr-tron/base58 v1.2.0
github.com/pkg/errors v0.9.1
go.uber.org/zap v1.27.0
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
golang.org/x/sys v0.19.0
)

10
go.sum
View File

@@ -204,8 +204,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.33.2 h1:vCdwnFxoGOXMKmaGHlDSnL4bM3fQeW8pgIa9DECnb40=
github.com/libp2p/go-libp2p v0.33.2/go.mod h1:zTeppLuCvUIkT118pFVzA8xzP/p2dJYOMApCkFh0Yww=
github.com/libp2p/go-libp2p v0.33.1 h1:tvJl9b9M6nSLBtZSXSguq+/lRhRj2oLRkyhBmQNMFLA=
github.com/libp2p/go-libp2p v0.33.1/go.mod h1:zOUTMjG4I7TXwMndNyOBn/CNtVBLlvBlnxfi+8xzx+E=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0TrtApsMDPjAVQ=
@@ -429,8 +429,6 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -441,8 +439,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -585,8 +581,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=