optimization route logical

This commit is contained in:
lilo
2022-03-01 17:53:37 +08:00
parent 504d531d72
commit 77c00d05cb
4 changed files with 106 additions and 115 deletions

107
cmd/up.go
View File

@@ -40,14 +40,12 @@ import (
"github.com/spf13/viper"
)
// upCmd represents the up command
var (
upCmd = &cobra.Command{
Use: "up",
Short: "run gvn",
Long: `Run gvn using the configure file (gvn.yaml)`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("up called")
upCommand(cmd)
},
}
@@ -102,7 +100,7 @@ func upCommand(cmd *cobra.Command) {
for _, addr := range host.Addrs() {
bootstraps = append(bootstraps, fmt.Sprintf("%s/p2p/%s", addr.String(), host.ID().Pretty()))
}
dhcp.NewRPCServer(host, rpcZone, viper.GetString("dev.vip"), viper.GetInt("dev.mtu"))
dhcp.NewRPCServer(host, rpcZone, viper.GetString("dev.vip"), viper.GetInt("dev.mtu"), viper.GetStringSlice("dev.subnets"))
// auto config in server mode
devChan <- tun.Device{
Name: viper.GetString("dev.name"),
@@ -139,41 +137,30 @@ func upCommand(cmd *cobra.Command) {
ServerVIP: res.ServerVIP,
Port: viper.GetUint("port"),
}
}
var ress []dhcp.Response
if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil {
// subnets := make([]string, 0)
for _, r := range ress {
logrus.WithFields(logrus.Fields{
"VIP": r.Ip,
"ID": r.Id,
"Subnet": r.Subnets,
}).Info("Refresh local vip table")
// route.Route.Add(strings.Split(r.Ip, "/")[0]+"/32", r.Id)
subnet := strings.Split(r.Ip, "/")[0] + "/32"
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}})
if r.Id != host.ID().Pretty() {
// does not change route via local ethernet
// subnets = append(subnets, r.Subnets...)
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets})
}
}
// logrus.WithFields(logrus.Fields{
// "subnets": subnets,
// }).Info("Refresh subnets")
// tun.RefreshRoute(subnets)
} else {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Request clients error")
}
ticker := time.NewTicker(INTERVAL * time.Second)
for range ticker.C {
// TODO: sleep or make sure call after tun.New
// refresh local VIP table
var ress []dhcp.Response
if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil {
for _, r := range ress {
logrus.WithFields(logrus.Fields{
"VIP": r.Ip,
"ID": r.Id,
"Subnet": r.Subnets,
}).Debug("Refresh local vip table")
subnet := strings.Split(r.Ip, "/")[0] + "/32"
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}})
if r.Id != host.ID().Pretty() {
// does not change route via local ethernet
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets})
}
}
} else {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Request clients error")
}
if err := dhcp.Call("DHCPService", "Ping", req, nil); err != nil {
// TODO: log error
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("RPC - Ping error")
@@ -219,10 +206,12 @@ func upCommand(cmd *cobra.Command) {
tun.NewTun(mainDev)
// avoid create duplicate
close(devChan)
vip := strings.Split(mainDev.Ip, "/")[0]
if pub != nil {
pub.Publish(host.ID().Pretty(), vip, config.Dev.Subnets)
}
/*
vip := strings.Split(mainDev.Ip, "/")[0]
if pub != nil {
pub.Publish(host.ID().Pretty(), vip, config.Dev.Subnets)
}
*/
_, vipNet, err := net.ParseCIDR(mainDev.Ip)
if err != nil {
logrus.WithFields(logrus.Fields{
@@ -309,45 +298,3 @@ func readData(stream network.Stream) {
}
}
}
/*
func contains(elems []string, v string) bool {
for _, s := range elems {
if v == s {
return true
}
}
return false
}
// check pre contains pos elements, eg:
// pre: [1, 3, 5, 6], pos: [1, 2, 3, 4]
// result should be: [2, 4]
func contains(left, right []string) []string {
sort.Strings(left)
sort.Strings(right)
tmp := make([]string, 0)
for _, item := range right {
if !contains(left, item) {
tmp = append(tmp, item)
}
}
return tmp
}
func mins(left, right *[]string) {
for _, item := range *right {
left = remove(*left, item)
}
}
func remove(items []string, item string) []string {
newitems := []string{}
for _, i := range items {
if i != item {
newitems = append(newitems, i)
}
}
return newitems
}
*/

View File

@@ -126,6 +126,7 @@ func (s *DHCPService) DHCP(ctx context.Context, req Request, res *Response) erro
// route.Route.Add(strings.Split(data.Ip, "/")[0]+"/32", data.Id)
subnet := strings.Split(data.Ip, "/")[0] + "/32"
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: data.Id, Subnets: []string{subnet}})
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: req.Id, Subnets: req.Subnets})
mu.Unlock()
return nil
}
@@ -165,7 +166,7 @@ func (s *DHCPService) Ping(ctx context.Context, req Request, res *Response) erro
return nil
}
func NewRPCServer(host host.Host, zone string, cidr string, mtu int) {
func NewRPCServer(host host.Host, zone string, cidr string, mtu int, subnets []string) {
server := rpc.NewServer(host, protocol.ID(zone))
service := DHCPService{KV: map[string]Response{}, Cidr: cidr, Mtu: mtu}
if err := server.Register(&service); err != nil {
@@ -196,6 +197,7 @@ func NewRPCServer(host host.Host, zone string, cidr string, mtu int) {
Mtu: mtu,
ServerVIP: cidr,
Mode: 1,
Subnets: subnets,
LoginTime: time.Now().Unix(),
Ttl: 10 * 60, // 10 min
}

View File

@@ -132,12 +132,11 @@ func NewPubSub(host host.Host, topic string) *Publisher {
continue
}
// TODO: add MASQUERADE if self
// tun.RefreshRoute(message.Subnets)
for _, subnet := range message.Subnets {
// Add will override the exist one
// route.Route.Add(strings.TrimSpace(subnet), message.Id)
route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: []string{subnet}})
}
// for _, subnet := range message.Subnets {
// // Add will override the exist one
// route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: []string{subnet}})
// }
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: message.Subnets})
} else if message.MessageType == MessageTypeOnline {
// refresh clients
if viper.GetUint("mode") == 1 {
@@ -147,7 +146,6 @@ func NewPubSub(host host.Host, topic string) *Publisher {
req := dhcp.Request{}
var ress []dhcp.Response
if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil {
// subnets := make([]string, 0)
for _, r := range ress {
if r.Id == host.ID().Pretty() {
continue
@@ -157,23 +155,16 @@ func NewPubSub(host host.Host, topic string) *Publisher {
"ID": r.Id,
"Subnet": r.Subnets,
}).Info("Refresh local vip table")
// route.Route.Add(strings.Split(r.Ip, "/")[0]+"/32", r.Id)
subnet := strings.Split(r.Ip, "/")[0] + "/32"
route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}})
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}})
if r.Id != host.ID().Pretty() {
// subnets = append(subnets, r.Subnets...)
route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets})
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets})
}
}
// logrus.WithFields(logrus.Fields{
// "subnets": subnets,
// }).Info("Refresh subnets")
// tun.RefreshRoute(subnets)
}
}
} else if message.MessageType == MessageTypeOffline {
// route.Route.Remove(strings.Split(message.Vip, "/")[0] + "/32")
subnet := strings.Split(message.Vip, "/")[0] + "/32"
route.EventBus.Publish(route.REMOVE_ROUTE_TOPIC, route.RouteEvent{Subnets: []string{subnet}})
}
@@ -209,7 +200,10 @@ func (p *Publisher) Publish(peerId string, vip string, subnets []string) {
ticker := time.NewTicker(10 * time.Second)
go func(tk *time.Ticker) {
interval := 10
for _ = range tk.C {
for range tk.C {
logrus.WithFields(logrus.Fields{
"Time": tk.C,
}).Debug("Publish fired")
if err := p.pub.Publish(context.Background(), bytes); err != nil {
logrus.WithFields(logrus.Fields{
"ERROR": err,
@@ -220,9 +214,6 @@ func (p *Publisher) Publish(peerId string, vip string, subnets []string) {
// half of hour for the longest
interval *= 2
}
logrus.WithFields(logrus.Fields{
"Interval": interval,
}).Info("")
ticker.Reset(time.Duration(interval) * time.Second)
}
}(ticker)

View File

@@ -54,11 +54,12 @@ func init() {
removeRouteCh := make(chan eventbus.DataEvent)
refreshRouteCh := make(chan eventbus.DataEvent)
onlineCh := make(chan eventbus.DataEvent)
// TODO: ofline channel
offlineCh := make(chan eventbus.DataEvent)
EventBus.Subscribe(ADD_ROUTE_TOPIC, addRouteCh)
EventBus.Subscribe(REMOVE_ROUTE_TOPIC, removeRouteCh)
EventBus.Subscribe(REFRESH_ROUTE_TOPIC, refreshRouteCh)
EventBus.Subscribe(ONLINE_TOPIC, onlineCh)
EventBus.Subscribe(OFFLINE_TOPIC, offlineCh)
for {
select {
case data := <-addRouteCh:
@@ -82,15 +83,21 @@ func init() {
"Data": data.Data,
"Topic": data.Topic,
}).Debug("Refresh Route Channel")
for _, subnet := range data.Data.(RouteEvent).Subnets {
Route.remove(subnet)
Route.add(subnet, data.Data.(RouteEvent).Id)
}
// for _, subnet := range data.Data.(RouteEvent).Subnets {
// Route.remove(subnet)
// Route.add(subnet, data.Data.(RouteEvent).Id)
// }
Route.refresh(data.Data.(RouteEvent).Subnets, data.Data.(RouteEvent).Id)
case data := <-onlineCh:
logrus.WithFields(logrus.Fields{
"Data": data.Data,
"Topic": data.Topic,
}).Debug("Online Channel")
case data := <-offlineCh:
logrus.WithFields(logrus.Fields{
"Data": data.Data,
"Topic": data.Topic,
}).Debug("Offline Channel")
}
}
}()
@@ -101,18 +108,28 @@ type RouteTable struct {
rm sync.RWMutex
}
// subnet - 192.168.1.0/24
// peerId - P2P node ID
func (r *RouteTable) refresh(subnet string, peerId string) {
r.remove(subnet)
r.add(subnet, peerId)
func (r *RouteTable) refresh(subnets []string, peerId string) {
removed := oneSideSlice(LocalRoute, subnets)
added := oneSideSlice(subnets, LocalRoute)
logrus.WithFields(logrus.Fields{
"Removed": removed,
"Added": added,
"LocalRoute": LocalRoute,
"Subnets": subnets,
}).Debug("Remove and Added subnets")
for _, v := range removed {
r.remove(v)
}
// for _, v := range mergeSlice(LocalRoute, subnets) {
for _, v := range added {
r.add(v, peerId)
}
}
func (r *RouteTable) remove(subnet string) {
// TODO: LocalRoute remove
r.rm.Lock()
r.tree.DeleteByString(subnet)
if err := tun.RemoveRoute([]string{subnet}); err == nil {
r.tree.DeleteByString(subnet)
LocalRoute = remove(LocalRoute, subnet)
} else {
logrus.WithFields(logrus.Fields{
@@ -128,12 +145,12 @@ func (r *RouteTable) add(subnet string, peerId string) {
logrus.WithFields(logrus.Fields{
"LocalRoute": LocalRoute,
"Subnet": subnet,
}).Info("Ignore the subnet becuase of existence")
}).Debug("Ignore the subnet becuase of existence")
r.rm.Unlock()
return
}
r.tree.AddByString(subnet, peerId)
if err := tun.AddRoute([]string{subnet}); err == nil {
r.tree.AddByString(subnet, peerId)
LocalRoute = append(LocalRoute, subnet)
} else {
logrus.WithFields(logrus.Fields{
@@ -152,6 +169,8 @@ func (r *RouteTable) Clean() {
r.tree = iptree.New()
}
////////////////////////////////////////////////////////////////////////////////
func contains(elems []string, v string) bool {
for _, s := range elems {
if v == s {
@@ -170,3 +189,35 @@ func remove(items []string, item string) []string {
}
return newitems
}
// return the items co-exists in left and right
func mergeSlice(left, right []string) []string {
tmp := make([]string, 0)
for _, v := range left {
if contains(right, v) && !contains(tmp, v) {
tmp = append(tmp, v)
}
}
for _, v := range right {
if contains(left, v) && !contains(tmp, v) {
tmp = append(tmp, v)
}
}
return tmp
}
// return the items only exists left not exists right
// left: [1, 3, 5, 7, 9]
// right: [1, 2, 3, 4]
// result: [5, 7, 9]
func oneSideSlice(left, right []string) []string {
tmp := make([]string, 0)
for _, v := range left {
if !contains(right, v) {
tmp = append(tmp, v)
}
}
return tmp
}
////////////////////////////////////////////////////////////////////////////////