refactor: refactor the config module

This commit is contained in:
lynx
2024-04-10 21:15:24 +08:00
parent 5c9a8a5e55
commit 0d490cc4db
6 changed files with 107 additions and 110 deletions

View File

@@ -1,29 +1,36 @@
package config
import (
"log"
"net/netip"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/os/gfile"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
mlog "github.com/wlynxg/NetHive/pkgs/log"
)
type Config struct {
path string
TUNName string
MTU int
path string
// tun
TUNName string
MTU int
LocalAddr netip.Prefix
// libp2p
PrivateKey *PrivateKey
PeerID string
PeersRouteTable map[peer.ID][]netip.Prefix
LocalRoute []netip.Prefix
LocalAddr netip.Prefix
Bootstraps []string
PeersRouteTable map[string]netip.Prefix
EnableMDNS bool
// log
LogConfigs []mlog.CoreConfig
}
func (c *Config) Save() error {
if err := gfile.PutBytes(c.path, gjson.New(c).MustToJsonIndent()); err != nil {
log.Fatal(err)
return err
}
return nil
@@ -61,6 +68,10 @@ func defaultConfig(cfg *Config) {
cfg.MTU = 1500
}
if !cfg.LocalAddr.IsValid() {
cfg.LocalAddr = netip.MustParsePrefix("192.168.168.1/24")
}
if cfg.PrivateKey == nil {
cfg.PrivateKey, _ = NewPrivateKey()
key, err := cfg.PrivateKey.PrivKey()
@@ -74,7 +85,9 @@ func defaultConfig(cfg *Config) {
cfg.PeerID = id.String()
}
if !cfg.LocalAddr.IsValid() {
cfg.LocalAddr = netip.MustParsePrefix("192.168.168.1/24")
if len(cfg.Bootstraps) == 0 {
for _, n := range dht.DefaultBootstrapPeers {
cfg.Bootstraps = append(cfg.Bootstraps, n.String())
}
}
}

View File

@@ -16,7 +16,7 @@ func (e *Engine) addConnByDst(dst netip.Addr) (PacketChan, error) {
e.log.Debugf("Try to connect to the corresponding node of %s", dst)
var conn PacketChan
e.routeTable.set.Range(func(id peer.ID, prefix *netipx.IPSet) bool {
e.routeTable.set.Range(func(id string, prefix *netipx.IPSet) bool {
if !prefix.Contains(dst) {
return true
}
@@ -44,8 +44,8 @@ func (e *Engine) addConnByDst(dst netip.Addr) (PacketChan, error) {
return nil, errors.New(fmt.Sprintf("the routing rule corresponding to %s was not found", dst.String()))
}
func (e *Engine) addConnByID(id peer.ID) (PacketChan, error) {
e.log.Debugf("Try to connect to the corresponding node of %s", string(id))
func (e *Engine) addConnByID(id string) (PacketChan, error) {
e.log.Debugf("Try to connect to the corresponding node of %s", id)
if conn, ok := e.routeTable.id.Load(id); ok {
return conn, nil
@@ -59,19 +59,19 @@ func (e *Engine) addConnByID(id peer.ID) (PacketChan, error) {
e.addConn(peerChan, id)
}()
return nil, errors.New(fmt.Sprintf("unknown dst addr: %s", string(id)))
return nil, errors.New(fmt.Sprintf("unknown dst addr: %s", id))
}
func (e *Engine) addConn(peerChan PacketChan, id peer.ID) {
func (e *Engine) addConn(peerChan PacketChan, id string) {
dev := &devWrapper{w: e.devWriter, r: peerChan}
e.log.Infof("start find peer %s", string(id))
e.log.Infof("start find peer %s", id)
var (
stream network.Stream
err error
)
idr, err := base58.Decode(string(id))
idr, err := base58.Decode(id)
if err != nil {
e.log.Infof("base58 decode failed: %s", err)
}
@@ -80,14 +80,14 @@ func (e *Engine) addConn(peerChan PacketChan, id peer.ID) {
if len(info.Addrs) > 0 {
stream, err = e.host.NewStream(e.ctx, info.ID, VPNStreamProtocol)
if err != nil {
peerc, err := e.discovery.FindPeers(e.ctx, string(id))
peerc, err := e.discovery.FindPeers(e.ctx, id)
if err != nil {
e.log.Warnf("Finding node by dht %s failed because %s", string(id), err)
return
}
for info := range peerc {
if info.ID.String() == string(id) && len(info.Addrs) > 0 {
if info.ID.String() == id && len(info.Addrs) > 0 {
stream, err = e.host.NewStream(e.ctx, info.ID, VPNStreamProtocol)
if err == nil {
break

View File

@@ -6,7 +6,6 @@ import (
"io"
"net/netip"
"sync"
"time"
"github.com/wlynxg/NetHive/core/config"
"github.com/wlynxg/NetHive/core/device"
@@ -54,9 +53,9 @@ type Engine struct {
errChan chan error
routeTable struct {
m xsync.Map[peer.ID, []netip.Prefix]
set xsync.Map[peer.ID, *netipx.IPSet]
id xsync.Map[peer.ID, PacketChan]
m xsync.Map[string, netip.Prefix]
set xsync.Map[string, *netipx.IPSet]
id xsync.Map[string, PacketChan]
addr xsync.Map[netip.Addr, PacketChan]
}
}
@@ -71,10 +70,6 @@ func New(cfg *config.Config) (*Engine, error) {
e.devWriter = make(PacketChan, ChanSize)
e.devReader = make(PacketChan, ChanSize)
e.relayChan = make(chan peer.AddrInfo, ChanSize)
e.routeTable.m = xsync.Map[peer.ID, []netip.Prefix]{}
e.routeTable.set = xsync.Map[peer.ID, *netipx.IPSet]{}
e.routeTable.id = xsync.Map[peer.ID, PacketChan]{}
e.routeTable.addr = xsync.Map[netip.Addr, PacketChan]{}
// create tun
tun, err := device.CreateTUN(cfg.TUNName, cfg.MTU)
@@ -88,17 +83,12 @@ func New(cfg *config.Config) (*Engine, error) {
return nil, err
}
for id, prefixes := range e.cfg.PeersRouteTable {
e.routeTable.m.Store(id, prefixes)
for id, prefix := range e.cfg.PeersRouteTable {
e.log.Debugf("r: %s -> %s", id, prefix)
e.routeTable.m.Store(id, prefix)
b := netipx.IPSetBuilder{}
for _, prefix := range prefixes {
err := route.Add(name, prefix)
if err != nil {
return nil, err
}
b.AddPrefix(prefix)
}
err := route.Add(name, prefix)
set, err := b.IPSet()
if err != nil {
return nil, err
@@ -112,7 +102,6 @@ func New(cfg *config.Config) (*Engine, error) {
}
node, err := libp2p.New(
libp2p.Identity(pk),
libp2p.EnableAutoRelayWithPeerSource(func(ctx context.Context, num int) <-chan peer.AddrInfo { return e.relayChan }),
)
if err != nil {
return nil, err
@@ -142,84 +131,25 @@ func (e *Engine) Start() error {
}
wg := sync.WaitGroup{}
for _, info := range dht.GetDefaultBootstrapPeerAddrInfos() {
info := info
for _, info := range cfg.Bootstraps {
addrInfo, err := peer.AddrInfoFromString(info)
if err != nil {
e.log.Debugf("fail to parse '%s': %v", info, err)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
if err := e.host.Connect(e.ctx, info); err != nil {
e.log.Warnf("connection %s fails, because of error :%s", info.String(), err)
if err := e.host.Connect(e.ctx, *addrInfo); err != nil {
e.log.Warnf("connection %s fails, because of error :%s", addrInfo.String(), err)
}
}()
}
wg.Wait()
e.host.SetStreamHandler(VPNStreamProtocol, func(stream network.Stream) {
id := peer.ID(stream.Conn().RemotePeer().String())
if _, ok := e.routeTable.m.Load(id); !ok {
stream.Close()
return
}
peerChan := make(PacketChan, ChanSize)
e.routeTable.id.Store(id, peerChan)
defer e.routeTable.id.Delete(id)
dev := &devWrapper{w: e.devWriter, r: peerChan}
go func() {
defer stream.Close()
_, err := io.Copy(stream, dev)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream write error: %s", string(id), err)
}
}()
defer func() {
stream.Close()
set, ok := e.routeTable.set.Load(id)
if !ok {
return
}
var addr []netip.Addr
e.routeTable.addr.Range(func(key netip.Addr, value PacketChan) bool {
if set.Contains(key) {
addr = append(addr, key)
}
return true
})
for _, a := range addr {
e.routeTable.addr.Delete(a)
}
}()
_, err := io.Copy(dev, stream)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream read error: %s", string(id), err)
}
})
e.host.SetStreamHandler(VPNStreamProtocol, e.Handler)
util.Advertise(e.ctx, e.discovery, e.host.ID().String())
go func() {
ticker := time.NewTimer(5 * time.Minute)
for {
select {
case <-ticker.C:
peers, err := e.dht.GetClosestPeers(e.ctx, string(e.host.ID()))
if err != nil {
e.log.Warnf("Failed to get nearest node: %s", err)
continue
}
for _, id := range peers {
e.relayChan <- e.host.Peerstore().PeerInfo(id)
}
case <-e.ctx.Done():
ticker.Stop()
return
}
}
}()
if cfg.EnableMDNS {
e.mdns = mdns.NewMdnsService(e.host, "_net._hive", e)
@@ -292,9 +222,10 @@ func (e *Engine) RoutineRouteTableWriter() {
for payload = range e.devReader {
var conn PacketChan
e.log.Debugf("%s -> %s", payload.Src, payload.Dst)
if payload.Dst.IsMulticast() {
e.routeTable.m.Range(func(key peer.ID, value []netip.Prefix) bool {
if key == peer.ID(e.host.ID().String()) {
e.routeTable.m.Range(func(key string, value netip.Prefix) bool {
if key == e.host.ID().String() {
return true
}
@@ -342,3 +273,50 @@ func (e *Engine) RoutineRouteTableWriter() {
}
}
}
func (e *Engine) Handler(stream network.Stream) {
e.log.Debugf("%s connect", stream.Conn().RemotePeer())
id := stream.Conn().RemotePeer().String()
if _, ok := e.routeTable.m.Load(id); !ok {
stream.Close()
return
}
peerChan := make(PacketChan, ChanSize)
e.routeTable.id.Store(id, peerChan)
defer e.routeTable.id.Delete(id)
dev := &devWrapper{w: e.devWriter, r: peerChan}
go func() {
defer stream.Close()
_, err := io.Copy(stream, dev)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream write error: %s", string(id), err)
}
}()
defer func() {
stream.Close()
set, ok := e.routeTable.set.Load(id)
if !ok {
return
}
var addr []netip.Addr
e.routeTable.addr.Range(func(key netip.Addr, value PacketChan) bool {
if set.Contains(key) {
addr = append(addr, key)
}
return true
})
for _, a := range addr {
e.routeTable.addr.Delete(a)
}
}()
_, err := io.Copy(dev, stream)
if err != nil && err != io.EOF {
e.log.Errorf("Peer [%s] stream read error: %s", string(id), err)
}
}

4
go.mod
View File

@@ -106,7 +106,7 @@ require (
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.21.0 // indirect
@@ -114,7 +114,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.18.0 // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

4
go.sum
View File

@@ -443,6 +443,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
@@ -585,6 +587,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

View File

@@ -57,6 +57,7 @@ func ConsoleCore(cfg CoreConfig) zapcore.Core {
FunctionKey: zapcore.OmitKey,
MessageKey: "M",
StacktraceKey: "S",
EncodeTime: zapcore.RFC3339TimeEncoder,
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
@@ -100,6 +101,7 @@ func FileCore(cfg CoreConfig) zapcore.Core {
FunctionKey: zapcore.OmitKey,
MessageKey: "M",
StacktraceKey: "S",
EncodeTime: zapcore.RFC3339TimeEncoder,
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeDuration: zapcore.StringDurationEncoder,