From 50c1b147cb92b47ac1768dc3473ae202d9bce758 Mon Sep 17 00:00:00 2001 From: lilo Date: Tue, 1 Mar 2022 18:09:31 +0800 Subject: [PATCH] remove pubsub module --- cmd/up.go | 8 --- go.mod | 1 - go.sum | 8 --- p2p/peer.go | 140 ---------------------------------------------------- 4 files changed, 157 deletions(-) diff --git a/cmd/up.go b/cmd/up.go index 467e114..9bb8a33 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -50,7 +50,6 @@ var ( }, } mainDev tun.Device - pub *p2p.Publisher ) func init() { @@ -170,7 +169,6 @@ func upCommand(cmd *cobra.Command) { } // END: DHCP go p2p.FindPeerIdsViaDHT(host, zone) - pub = p2p.NewPubSub(host, "route") c := make(chan os.Signal) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) @@ -206,12 +204,6 @@ func upCommand(cmd *cobra.Command) { tun.NewTun(mainDev) // avoid create duplicate close(devChan) - /* - vip := strings.Split(mainDev.Ip, "/")[0] - if pub != nil { - pub.Publish(host.ID().Pretty(), vip, config.Dev.Subnets) - } - */ _, vipNet, err := net.ParseCIDR(mainDev.Ip) if err != nil { logrus.WithFields(logrus.Fields{ diff --git a/go.mod b/go.mod index 8408238..55819ce 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/libp2p/go-libp2p-discovery v0.6.0 github.com/libp2p/go-libp2p-gorpc v0.1.4 github.com/libp2p/go-libp2p-kad-dht v0.15.0 - github.com/libp2p/go-libp2p-pubsub v0.6.1 github.com/liloew/wireguard-go v0.0.0-20220224014633-9cd745e6f114 github.com/multiformats/go-multiaddr v0.4.0 github.com/sevlyar/go-daemon v0.1.5 diff --git a/go.sum b/go.sum index 1f68bf1..f944245 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,6 @@ github.com/asergeyev/nradix v0.0.0-20170505151046-3872ab85bb56/go.mod h1:8BhOLuq github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= -github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -564,8 +562,6 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3 github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc= github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA= -github.com/libp2p/go-libp2p-connmgr v0.2.4 h1:TMS0vc0TCBomtQJyWr7fYxcVYYhx+q/2gF++G5Jkl/w= -github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI= @@ -641,8 +637,6 @@ github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnH github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.6.1 h1:wycbV+f4rreCoVY61Do6g/BUk0RIrbNRcYVbn+QkjGk= -github.com/libp2p/go-libp2p-pubsub v0.6.1/go.mod h1:nJv87QM2cU0w45KPR1rZicq+FmFIOD16zmT+ep1nOmg= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc= @@ -1152,8 +1146,6 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/p2p/peer.go b/p2p/peer.go index 1a1989c..6de1f62 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -16,33 +16,20 @@ limitations under the License. package p2p import ( - "context" "encoding/binary" - "encoding/json" "fmt" "net" - "strings" - "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/liloew/gvn/dhcp" "github.com/liloew/gvn/route" "github.com/sirupsen/logrus" "github.com/songgao/water/waterutil" - "github.com/spf13/viper" ) type MessageType uint -type MessageHandler pubsub.TopicEventHandlerOpt - -type Publisher struct { - pub *pubsub.Topic - sub *pubsub.Subscription -} type Publish interface { Publish(subnets []string) @@ -93,133 +80,6 @@ func NewPeer(priKey string, port uint) (host.Host, error) { return host, nil } -func NewPubSub(host host.Host, topic string) *Publisher { - ctx, _ := context.WithCancel(context.Background()) - // defer cancle() - ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithDiscovery(routingDiscovery)) - if err != nil { - logrus.WithFields(logrus.Fields{ - "ERROR": err, - }).Panic("Create PubSub error") - } - if tc, err := ps.Join(topic); err != nil { - // TODO: re-join - logrus.WithFields(logrus.Fields{ - "ERROR": err, - }).Error("Create PubSub error") - } else { - sub, err := tc.Subscribe() - if err != nil { - logrus.WithFields(logrus.Fields{ - "ERROR": err, - }).Error("Subscribe error") - } - - go func() { - for { - if msg, err := sub.Next(context.Background()); err == nil { - message := new(Message) - if err := json.Unmarshal(msg.Data, message); err != nil { - logrus.WithFields(logrus.Fields{ - "ERROR": err, - }).Error("Parse message from topic error") - } else { - logrus.WithFields(logrus.Fields{ - "Message": message, - }).Info("Receive message from topic") - if message.MessageType == MessageTypeRoute { - if msg.ReceivedFrom.Pretty() == host.ID().Pretty() { - continue - } - // TODO: add MASQUERADE if self - // for _, subnet := range message.Subnets { - // // Add will override the exist one - // route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: []string{subnet}}) - // } - route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: message.Subnets}) - } else if message.MessageType == MessageTypeOnline { - // refresh clients - if viper.GetUint("mode") == 1 { - // server - continue - } else { - req := dhcp.Request{} - var ress []dhcp.Response - if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil { - for _, r := range ress { - if r.Id == host.ID().Pretty() { - continue - } - logrus.WithFields(logrus.Fields{ - "VIP": r.Ip, - "ID": r.Id, - "Subnet": r.Subnets, - }).Info("Refresh local vip table") - subnet := strings.Split(r.Ip, "/")[0] + "/32" - route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}}) - if r.Id != host.ID().Pretty() { - route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets}) - } - } - } - - } - } else if message.MessageType == MessageTypeOffline { - subnet := strings.Split(message.Vip, "/")[0] + "/32" - route.EventBus.Publish(route.REMOVE_ROUTE_TOPIC, route.RouteEvent{Subnets: []string{subnet}}) - } - } - } else { - logrus.WithFields(logrus.Fields{ - "ERROR": err, - }).Error("Subscribe error") - } - } - }() - - return &Publisher{ - pub: tc, - sub: sub, - } - } - return nil -} - -func (p *Publisher) Publish(peerId string, vip string, subnets []string) { - message := &Message{ - Id: peerId, - MessageType: MessageTypeRoute, - Vip: vip, - } - if len(subnets) > 0 { - message.Subnets = subnets - } else { - message.MessageType = MessageTypeOnline - } - if bytes, err := json.Marshal(message); err == nil { - ticker := time.NewTicker(10 * time.Second) - go func(tk *time.Ticker) { - interval := 10 - for range tk.C { - logrus.WithFields(logrus.Fields{ - "Time": tk.C, - }).Debug("Publish fired") - if err := p.pub.Publish(context.Background(), bytes); err != nil { - logrus.WithFields(logrus.Fields{ - "ERROR": err, - "Message": message, - }).Error("Publish message from to topic error") - } - if interval < 30*60 { - // half of hour for the longest - interval *= 2 - } - ticker.Reset(time.Duration(interval) * time.Second) - } - }(ticker) - } -} - func ForwardPacket(host host.Host, zone string, packets []byte, vipNet *net.IPNet) { dst := waterutil.IPv4Destination(packets) if peerId, found, err := route.Route.Get(dst.String()); err == nil && found {