mirror of
https://github.com/liloew/gvn.git
synced 2025-12-24 13:38:00 +08:00
release 1.0.0
This commit is contained in:
24
cmd/init.go
24
cmd/init.go
@@ -43,14 +43,14 @@ type Device struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Id string `yaml:"id,omitempty"`
|
||||
Port uint `yaml:"port,omitempty"`
|
||||
Mode MODE `yaml:"mode,omitempty"`
|
||||
Server string `yaml:"server,omitempty"`
|
||||
Dev Device `yaml:"dev,omitempty"`
|
||||
Protol string `yaml:"protocol"`
|
||||
PriKey string `yaml:"priKey,omitempty"`
|
||||
PubKey string `yaml:"pubKey,omitempty"`
|
||||
Id string `yaml:"id,omitempty"`
|
||||
Port uint `yaml:"port,omitempty"`
|
||||
Mode MODE `yaml:"mode,omitempty"`
|
||||
Server string `yaml:"server,omitempty"`
|
||||
Dev Device `yaml:"dev,omitempty"`
|
||||
Version string `yaml:"protocol"`
|
||||
PriKey string `yaml:"priKey,omitempty"`
|
||||
PubKey string `yaml:"pubKey,omitempty"`
|
||||
}
|
||||
|
||||
// initCmd represents the init command
|
||||
@@ -98,7 +98,7 @@ func init() {
|
||||
initCmd.Flags().BoolP("force", "f", false, "force overide the file")
|
||||
initCmd.Flags().BoolP("server", "s", false, "server mode")
|
||||
initCmd.Flags().UintP("port", "", 6543, "the port all the other nodes connect to")
|
||||
initCmd.Flags().StringP("protocol", "", "/gvn/1.0.0", "the protocol support currently")
|
||||
initCmd.Flags().StringP("version", "", "1.0.0", "the version current in use")
|
||||
initCmd.Flags().StringP("devname", "", "", "the TUN device name, recommend using utun[\\d] for cross platform, utun3 for example")
|
||||
initCmd.Flags().StringP("subnets", "", "", "the subnets traffice through this node")
|
||||
initCmd.Flags().UintP("mtu", "", 1500, "the MUT will be used in TUN device")
|
||||
@@ -142,12 +142,10 @@ func parseConfig(cmd cobra.Command, config *Config) {
|
||||
dev.Mtu = mtu
|
||||
}
|
||||
if subnets, err := cmd.Flags().GetString("subnets"); err == nil {
|
||||
// sbs := make([]string, 0)
|
||||
// sbs = append(sbs, strings.Split(subnets, ",")...)
|
||||
dev.Subnets = append(dev.Subnets, strings.Split(subnets, ",")...)
|
||||
}
|
||||
if protol, err := cmd.Flags().GetString("protocol"); err == nil {
|
||||
config.Protol = protol
|
||||
if version, err := cmd.Flags().GetString("version"); err == nil {
|
||||
config.Version = version
|
||||
}
|
||||
config.Dev = dev
|
||||
}
|
||||
|
||||
240
cmd/up.go
240
cmd/up.go
@@ -19,8 +19,10 @@ import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -53,6 +55,7 @@ to quickly create a Cobra application.`,
|
||||
},
|
||||
}
|
||||
mainDev tun.Device
|
||||
pub *p2p.Publisher
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -79,8 +82,9 @@ func upCommand(cmd *cobra.Command) {
|
||||
"ID": host.ID().Pretty(),
|
||||
"Addrs": host.Addrs(),
|
||||
}).Info("Peer info")
|
||||
zone := viper.GetString("protocol")
|
||||
// BEGIN: STREAM HANDLER
|
||||
|
||||
zone := fmt.Sprintf("/gvn/%s", viper.GetString("version"))
|
||||
rpcZone := fmt.Sprintf("/rpc/%s", viper.GetString("version"))
|
||||
host.SetStreamHandler(protocol.ID(zone), func(stream network.Stream) {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"LocalPeer": stream.Conn().LocalPeer(),
|
||||
@@ -92,7 +96,7 @@ func upCommand(cmd *cobra.Command) {
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
|
||||
go readData(stream, rw)
|
||||
})
|
||||
// END: STREAM HANDLER
|
||||
|
||||
var bootstraps []string
|
||||
if MODE(viper.GetUint("mode")) == MODECLIENT {
|
||||
// start dht in server mode
|
||||
@@ -102,19 +106,20 @@ func upCommand(cmd *cobra.Command) {
|
||||
for _, addr := range host.Addrs() {
|
||||
bootstraps = append(bootstraps, fmt.Sprintf("%s/p2p/%s", addr.String(), host.ID().Pretty()))
|
||||
}
|
||||
dhcp.NewRPCServer(host, zone, viper.GetString("dev.vip"), viper.GetInt("dev.mtu"))
|
||||
dhcp.NewRPCServer(host, rpcZone, viper.GetString("dev.vip"), viper.GetInt("dev.mtu"))
|
||||
// auto config in server mode
|
||||
devChan <- tun.Device{
|
||||
Name: viper.GetString("dev.name"),
|
||||
Ip: viper.GetString("dev.vip"),
|
||||
Mtu: viper.GetInt("dev.mtu"),
|
||||
Subnets: viper.GetStringSlice("dev.subnets"),
|
||||
Name: viper.GetString("dev.name"),
|
||||
Ip: viper.GetString("dev.vip"),
|
||||
Mtu: viper.GetInt("dev.mtu"),
|
||||
// Subnets: viper.GetStringSlice("dev.subnets"),
|
||||
ServerVIP: viper.GetString("dev.vip"),
|
||||
Port: viper.GetUint("port"),
|
||||
}
|
||||
}
|
||||
p2p.NewDHT(host, zone, bootstraps)
|
||||
// BEGIN: DHCP for client mode
|
||||
|
||||
// DHCP for client mode
|
||||
if MODE(viper.GetUint("mode")) == MODECLIENT {
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
@@ -124,116 +129,79 @@ func upCommand(cmd *cobra.Command) {
|
||||
Name: viper.GetString("dev.name"),
|
||||
Subnets: viper.GetStringSlice("dev.subnets"),
|
||||
}
|
||||
res := dhcp.NewRPCClient(host, zone, viper.GetString("server"), req)
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"RES": res,
|
||||
}).Info("DHCP - Got IP")
|
||||
// TODO: if rs OK and push to chan
|
||||
ticker.Stop()
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"res": res,
|
||||
"req": req,
|
||||
}).Info("RPC - Client received data")
|
||||
devChan <- tun.Device{
|
||||
Name: req.Name,
|
||||
Ip: res.Ip,
|
||||
Mtu: res.Mtu,
|
||||
Subnets: res.Subnets,
|
||||
ServerVIP: res.ServerVIP,
|
||||
if client, res := dhcp.NewRPCClient(host, rpcZone, viper.GetString("server"), req); client != nil {
|
||||
ticker.Stop()
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"res": res,
|
||||
"req": req,
|
||||
}).Info("RPC - Client received data")
|
||||
devChan <- tun.Device{
|
||||
Name: req.Name,
|
||||
Ip: res.Ip,
|
||||
Mtu: res.Mtu,
|
||||
// ignore subnets because of self did't forward it to TUN
|
||||
// Subnets: res.Subnets,
|
||||
ServerVIP: res.ServerVIP,
|
||||
}
|
||||
|
||||
// refresh local VIP table
|
||||
var ress []dhcp.Response
|
||||
if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil {
|
||||
for _, r := range ress {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"VIP": r.Ip,
|
||||
"ID": r.Id,
|
||||
}).Debug("Refresh local vip table")
|
||||
p2p.RouteTable.AddByString(strings.Split(r.Ip, "/")[0]+"/32", r.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
// END: DHCP
|
||||
go p2p.FindPeerIdsViaDHT(host, zone)
|
||||
// TODO: find peers
|
||||
// peerIds := p2p.FindPeerIds()
|
||||
// peerIds, _ := cmd.Flags().GetString("peers")
|
||||
// logrus.WithFields(logrus.Fields{
|
||||
// "PEERS": peerIds,
|
||||
// }).Info("")
|
||||
// streams := p2p.NewStreams(host, zone, strings.Split(peerIds, ","))
|
||||
// BEGIN: DEBUG
|
||||
// for _, peerId := range strings.Split(peerIds, ",") {
|
||||
// if stream, ok := streams[peerId]; ok {
|
||||
// for i := 0; i < 3; i++ {
|
||||
// // read until new line
|
||||
// bytes := []byte(fmt.Sprintf("%d", i))
|
||||
// bytes = append(bytes, "\n"...)
|
||||
// stream.Write(bytes)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
if pub := p2p.NewPubSub(host, "route"); pub != nil {
|
||||
pub.Publish(host.ID().Pretty(), config.Dev.Subnets)
|
||||
}
|
||||
/*
|
||||
// BEGIN: handler route PubSub
|
||||
topic, sub := p2p.NewPubSub(host, "route")
|
||||
if topic != nil && sub != nil {
|
||||
message := &p2p.Message{
|
||||
Id: host.ID().Pretty(),
|
||||
MessageType: p2p.MessageTypeOnline,
|
||||
Subnets: config.Dev.Subnets,
|
||||
}
|
||||
if bytes, err := json.Marshal(message); err == nil {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
go func(tk *time.Ticker) {
|
||||
interval := 10
|
||||
for {
|
||||
select {
|
||||
case <-tk.C:
|
||||
if err := topic.Publish(context.Background(), bytes); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
"Message": message,
|
||||
}).Error("Publish message from to topic error")
|
||||
}
|
||||
if interval < 30*60 {
|
||||
// half of hour for the longest
|
||||
interval *= 2
|
||||
}
|
||||
ticker = time.NewTicker(time.Duration(interval) * time.Second)
|
||||
}
|
||||
}
|
||||
}(ticker)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
if msg, err := sub.Next(context.Background()); err == nil {
|
||||
message := new(p2p.Message)
|
||||
if err := json.Unmarshal(msg.Data, message); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("Parse message from topic error")
|
||||
} else {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"Message": message,
|
||||
}).Info("Receive message from topic")
|
||||
if message.MessageType == p2p.MessageTypeRoute {
|
||||
// TODO: add MASQUERADE if self
|
||||
// refresh route table - delete exist route if match then add table
|
||||
tun.RefreshRoute(message.Subnets)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("Subscribe error")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
// END: handler route PubSub
|
||||
*/
|
||||
pub = p2p.NewPubSub(host, "route")
|
||||
|
||||
select {
|
||||
case dev := <-devChan:
|
||||
// BEGIN: TUN
|
||||
mainDev = dev
|
||||
tun.NewTun(dev)
|
||||
// avoid create duplicate
|
||||
close(devChan)
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
go func() {
|
||||
for sig := range c {
|
||||
switch sig {
|
||||
case syscall.SIGINT:
|
||||
// exit when receive ctrl+c
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("Exit for SIGINT")
|
||||
tun.Close(mainDev)
|
||||
os.Exit(0)
|
||||
case syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT:
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("Receive SIGHUP/SIGTERM/SIGQUIT but ignore currently")
|
||||
default:
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("Default ignore current SIG")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
mainDev = <-devChan
|
||||
tun.NewTun(mainDev)
|
||||
// avoid create duplicate
|
||||
close(devChan)
|
||||
vip := strings.Split(mainDev.Ip, "/")[0]
|
||||
if pub != nil {
|
||||
pub.Publish(host.ID().Pretty(), vip, config.Dev.Subnets)
|
||||
}
|
||||
_, vipNet, err := net.ParseCIDR(mainDev.Ip)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Fatal("Device VIP error")
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
var frame ethernet.Frame
|
||||
frame.Resize(int(config.Dev.Mtu))
|
||||
@@ -250,51 +218,16 @@ func upCommand(cmd *cobra.Command) {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SRC": waterutil.IPv4Source(frame).String(),
|
||||
"DST": waterutil.IPv4Destination(frame).String(),
|
||||
}).Info("TUN - Packet SRC and DST")
|
||||
if waterutil.IPv4Source(frame).String() == waterutil.IPv4Destination(frame).String() {
|
||||
// FIXME: need't check src and dst ?
|
||||
} else {
|
||||
// TODO: froward to exactlly socket
|
||||
// for id, stream := range streams {
|
||||
// // TODO: check id and route
|
||||
// if id != "" {
|
||||
// bytes := append(frame, "\n"...)
|
||||
// stream.Write(bytes)
|
||||
// }
|
||||
// }
|
||||
p2p.ForwardPacket(host, zone, frame)
|
||||
}).Debug("TUN - Packet SRC and DST")
|
||||
if waterutil.IPv4Source(frame).String() != waterutil.IPv4Destination(frame).String() {
|
||||
p2p.ForwardPacket(host, zone, frame, vipNet)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// END: TUN
|
||||
}
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
go func() {
|
||||
for sig := range c {
|
||||
switch sig {
|
||||
case syscall.SIGINT:
|
||||
// TODO: ctrl+c - 退出
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("Exit for SIGINT")
|
||||
tun.Close(mainDev)
|
||||
os.Exit(0)
|
||||
case syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT:
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("Receive SIGHUP/SIGTERM/SIGQUIT but ignore currently")
|
||||
default:
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"SIG": sig,
|
||||
}).Info("默认信号")
|
||||
}
|
||||
}
|
||||
}()
|
||||
ch := make(chan int, 1)
|
||||
<-ch
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
func readData(stream network.Stream, rw *bufio.ReadWriter) {
|
||||
@@ -310,6 +243,9 @@ func readData(stream network.Stream, rw *bufio.ReadWriter) {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("Read data error")
|
||||
if err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
"github.com/liloew/altgvn/p2p"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -19,8 +20,13 @@ var (
|
||||
MaxCIDR string
|
||||
Mtu int
|
||||
mu sync.Mutex
|
||||
client *rpc.Client
|
||||
serverId peer.ID
|
||||
)
|
||||
|
||||
type RPC struct {
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
Id string
|
||||
Name string
|
||||
@@ -47,7 +53,7 @@ type DHCPService struct {
|
||||
func (s *DHCPService) DHCP(ctx context.Context, req Request, res *Response) error {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"Request": req,
|
||||
}).Info("RPC - request")
|
||||
}).Info("RPC call Clients")
|
||||
mu.Lock()
|
||||
data, ok := s.KV[req.Id]
|
||||
if !ok {
|
||||
@@ -92,6 +98,26 @@ func (s *DHCPService) DHCP(ctx context.Context, req Request, res *Response) erro
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"res": res,
|
||||
}).Info("RPC - Client requested data")
|
||||
|
||||
// vip/mask -> vip/32
|
||||
p2p.RouteTable.AddByString(strings.Split(data.Ip, "/")[0]+"/32", data.Id)
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DHCPService) Clients(ctx context.Context, req Request, res *[]Response) error {
|
||||
mu.Lock()
|
||||
for _, v := range s.KV {
|
||||
r := Response{
|
||||
Id: v.Id,
|
||||
Name: v.Name,
|
||||
Ip: v.Ip,
|
||||
Mtu: v.Mtu,
|
||||
Subnets: v.Subnets,
|
||||
ServerVIP: v.ServerVIP,
|
||||
}
|
||||
*res = append(*res, r)
|
||||
}
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
@@ -104,20 +130,40 @@ func NewRPCServer(host host.Host, zone string, cidr string, mtu int) {
|
||||
"ERROR": err,
|
||||
}).Panic("RPC - build RPC service error")
|
||||
}
|
||||
// server register
|
||||
mu.Lock()
|
||||
service.KV[host.ID().Pretty()] = Response{
|
||||
Id: host.ID().Pretty(),
|
||||
Ip: cidr,
|
||||
Mtu: mtu,
|
||||
ServerVIP: cidr,
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func NewRPCClient(host host.Host, zone string, server string, req Request) Response {
|
||||
client := rpc.NewClient(host, protocol.ID(zone))
|
||||
func NewRPCClient(host host.Host, zone string, server string, req Request) (*rpc.Client, Response) {
|
||||
client = rpc.NewClient(host, protocol.ID(zone))
|
||||
var res Response
|
||||
if ma, err := multiaddr.NewMultiaddr(server); err == nil {
|
||||
if addr, err := peer.AddrInfoFromP2pAddr(ma); err == nil {
|
||||
serverId = addr.ID
|
||||
if err := client.Call(addr.ID, "DHCPService", "DHCP", req, &res); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("RPC - call RPC serveice error")
|
||||
}
|
||||
return res
|
||||
return client, res
|
||||
}
|
||||
}
|
||||
return res
|
||||
return nil, res
|
||||
}
|
||||
|
||||
func Call(svcName string, svcMethod string, req Request, res interface{}) error {
|
||||
if err := client.Call(serverId, svcName, svcMethod, req, &res); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("RPC - call RPC serveice error")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
80
p2p/dht.go
80
p2p/dht.go
@@ -144,7 +144,6 @@ func NewStreams(host host.Host, zone string, peerIds []string) map[string]networ
|
||||
}
|
||||
|
||||
func NewStream(host host.Host, zone string, peerId string) network.Stream {
|
||||
// TODO: streams := make(map[string][]network.Stream)
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
// defer cancel()
|
||||
if id, err := peer.Decode(peerId); err == nil {
|
||||
@@ -165,32 +164,6 @@ func FindPeerIdsViaDHT(host host.Host, zone string) []string {
|
||||
// TODO: check kdht nil
|
||||
// TODO: multiplex the connection
|
||||
peerIds := make([]string, 0)
|
||||
/*
|
||||
peers := kdht.Host().Network().Peers()
|
||||
for _, peer := range peers {
|
||||
peerIds = append(peerIds, peer.Pretty())
|
||||
}
|
||||
*/
|
||||
// BEGIN: DEBUG
|
||||
ticker := time.NewTicker(20 * time.Second)
|
||||
go func(tk *time.Ticker) {
|
||||
for _ = range tk.C {
|
||||
// TODO:
|
||||
peers, err := routingDiscovery.FindPeers(context.Background(), zone)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
}).Error("Error at timer")
|
||||
continue
|
||||
}
|
||||
for peer := range peers {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"PEER": peer.ID.Pretty(),
|
||||
}).Debug("Peers at timer")
|
||||
}
|
||||
}
|
||||
}(ticker)
|
||||
// END: DEBUG
|
||||
if routingDiscovery != nil {
|
||||
peers, err := routingDiscovery.FindPeers(context.Background(), zone)
|
||||
if err != nil {
|
||||
@@ -205,57 +178,4 @@ func FindPeerIdsViaDHT(host host.Host, zone string) []string {
|
||||
return peerIds
|
||||
}
|
||||
return peerIds
|
||||
/*
|
||||
if host.Network().Connectedness(peerId) != network.Connected {
|
||||
addrs, err := kdht.FindPeer(ctx, peerId)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ID": id,
|
||||
"addrs": addrs,
|
||||
}).Error("unable match peer")
|
||||
continue
|
||||
}
|
||||
stream, err := host.NewStream(ctx, addrs.ID, protocol.ID(zone))
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ID": id,
|
||||
"addrs": addrs,
|
||||
"protocol": zone,
|
||||
}).Error("new stream to peer error")
|
||||
continue
|
||||
}
|
||||
streams[id] = stream
|
||||
} else if _, ok := streams[id]; !ok {
|
||||
adrs1 := host.Network().Peerstore().Addrs(peerId)
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ID": id,
|
||||
"addrs1": adrs1,
|
||||
"protocol": zone,
|
||||
}).Error("new stream to peer error")
|
||||
for _, addrs := range adrs1 {
|
||||
adr, err := multiaddr.NewMultiaddr(fmt.Sprintf("%s%s%s", addrs.String(), protocol.ID(zone), id))
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ID": id,
|
||||
"addrs": addrs,
|
||||
"protocol": zone,
|
||||
"ERROR": err,
|
||||
"ADDR": adr,
|
||||
}).Error("Addr error")
|
||||
}
|
||||
stream, err := host.NewStream(ctx, peerId, protocol.ID(zone))
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ID": id,
|
||||
"addrs": addrs,
|
||||
"protocol": zone,
|
||||
"ERROR": err,
|
||||
}).Error("new stream to peer error")
|
||||
continue
|
||||
}
|
||||
streams[id] = stream
|
||||
break
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
20
p2p/peer.go
20
p2p/peer.go
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -47,6 +48,7 @@ type Message struct {
|
||||
var (
|
||||
RouteTable = iptree.New()
|
||||
Streams = make(map[string]network.Stream)
|
||||
VIP string
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -98,13 +100,11 @@ func NewPubSub(host host.Host, topic string) *Publisher {
|
||||
}).Error("Subscribe error")
|
||||
}
|
||||
|
||||
// BEGIN: publish routes broadcast and subscribe other peers
|
||||
go func() {
|
||||
for {
|
||||
if msg, err := sub.Next(context.Background()); err == nil {
|
||||
message := new(Message)
|
||||
if msg.ReceivedFrom.Pretty() == host.ID().Pretty() {
|
||||
// does not handler self route
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal(msg.Data, message); err != nil {
|
||||
@@ -115,10 +115,8 @@ func NewPubSub(host host.Host, topic string) *Publisher {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"Message": message,
|
||||
}).Info("Receive message from topic")
|
||||
// refresh local table
|
||||
if message.MessageType == MessageTypeRoute {
|
||||
// TODO: add MASQUERADE if self
|
||||
// refresh route table - delete exist route if match then add table
|
||||
tun.RefreshRoute(message.Subnets)
|
||||
for _, subnet := range message.Subnets {
|
||||
// Add will override the exist one
|
||||
@@ -142,11 +140,12 @@ func NewPubSub(host host.Host, topic string) *Publisher {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) Publish(peerId string, subnets []string) {
|
||||
func (p *Publisher) Publish(peerId string, vip string, subnets []string) {
|
||||
if len(subnets) > 0 {
|
||||
message := &Message{
|
||||
Id: peerId,
|
||||
MessageType: MessageTypeRoute,
|
||||
Vip: vip,
|
||||
Subnets: subnets,
|
||||
}
|
||||
if bytes, err := json.Marshal(message); err == nil {
|
||||
@@ -174,10 +173,9 @@ func (p *Publisher) Publish(peerId string, subnets []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func ForwardPacket(host host.Host, zone string, packets []byte) {
|
||||
dst := waterutil.IPv4Destination(packets).String()
|
||||
if peerId, found, err := RouteTable.GetByString(dst); err == nil && found {
|
||||
// TODO: fetch stream using peerId
|
||||
func ForwardPacket(host host.Host, zone string, packets []byte, vipNet *net.IPNet) {
|
||||
dst := waterutil.IPv4Destination(packets)
|
||||
if peerId, found, err := RouteTable.GetByString(dst.String()); err == nil && found {
|
||||
if stream, ok := Streams[peerId.(string)]; ok {
|
||||
binary.Write(stream, binary.LittleEndian, uint16(len(packets)))
|
||||
if n, err := stream.Write(packets); n != len(packets) || err != nil {
|
||||
@@ -186,17 +184,15 @@ func ForwardPacket(host host.Host, zone string, packets []byte) {
|
||||
"SIZE": n,
|
||||
}).Error("Forward to stream error")
|
||||
if err.Error() == "stream reset" {
|
||||
// TODO:
|
||||
stream.Close()
|
||||
delete(Streams, peerId.(string))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// make new stream
|
||||
// if s, ok := NewStreams(host, zone, strings.Split(peerId.(string), ","))[peerId.(string)]; ok {
|
||||
if s := NewStream(host, zone, peerId.(string)); s != nil {
|
||||
Streams[peerId.(string)] = s
|
||||
ForwardPacket(host, zone, packets)
|
||||
ForwardPacket(host, zone, packets, vipNet)
|
||||
} else {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"ERROR": err,
|
||||
|
||||
Reference in New Issue
Block a user