remove pubsub module

This commit is contained in:
lilo
2022-03-01 18:09:31 +08:00
parent 77c00d05cb
commit 50c1b147cb
4 changed files with 0 additions and 157 deletions

View File

@@ -50,7 +50,6 @@ var (
},
}
mainDev tun.Device
pub *p2p.Publisher
)
func init() {
@@ -170,7 +169,6 @@ func upCommand(cmd *cobra.Command) {
}
// END: DHCP
go p2p.FindPeerIdsViaDHT(host, zone)
pub = p2p.NewPubSub(host, "route")
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
@@ -206,12 +204,6 @@ func upCommand(cmd *cobra.Command) {
tun.NewTun(mainDev)
// avoid create duplicate
close(devChan)
/*
vip := strings.Split(mainDev.Ip, "/")[0]
if pub != nil {
pub.Publish(host.ID().Pretty(), vip, config.Dev.Subnets)
}
*/
_, vipNet, err := net.ParseCIDR(mainDev.Ip)
if err != nil {
logrus.WithFields(logrus.Fields{

1
go.mod
View File

@@ -10,7 +10,6 @@ require (
github.com/libp2p/go-libp2p-discovery v0.6.0
github.com/libp2p/go-libp2p-gorpc v0.1.4
github.com/libp2p/go-libp2p-kad-dht v0.15.0
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/liloew/wireguard-go v0.0.0-20220224014633-9cd745e6f114
github.com/multiformats/go-multiaddr v0.4.0
github.com/sevlyar/go-daemon v0.1.5

8
go.sum
View File

@@ -88,8 +88,6 @@ github.com/asergeyev/nradix v0.0.0-20170505151046-3872ab85bb56/go.mod h1:8BhOLuq
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -564,8 +562,6 @@ github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3
github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo=
github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc=
github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA=
github.com/libp2p/go-libp2p-connmgr v0.2.4 h1:TMS0vc0TCBomtQJyWr7fYxcVYYhx+q/2gF++G5Jkl/w=
github.com/libp2p/go-libp2p-connmgr v0.2.4/go.mod h1:YV0b/RIm8NGPnnNWM7hG9Q38OeQiQfKhHCCs1++ufn0=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
@@ -641,8 +637,6 @@ github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnH
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.6.1 h1:wycbV+f4rreCoVY61Do6g/BUk0RIrbNRcYVbn+QkjGk=
github.com/libp2p/go-libp2p-pubsub v0.6.1/go.mod h1:nJv87QM2cU0w45KPR1rZicq+FmFIOD16zmT+ep1nOmg=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ=
github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc=
@@ -1152,8 +1146,6 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=

View File

@@ -16,33 +16,20 @@ limitations under the License.
package p2p
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"strings"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/liloew/gvn/dhcp"
"github.com/liloew/gvn/route"
"github.com/sirupsen/logrus"
"github.com/songgao/water/waterutil"
"github.com/spf13/viper"
)
type MessageType uint
type MessageHandler pubsub.TopicEventHandlerOpt
type Publisher struct {
pub *pubsub.Topic
sub *pubsub.Subscription
}
type Publish interface {
Publish(subnets []string)
@@ -93,133 +80,6 @@ func NewPeer(priKey string, port uint) (host.Host, error) {
return host, nil
}
func NewPubSub(host host.Host, topic string) *Publisher {
ctx, _ := context.WithCancel(context.Background())
// defer cancle()
ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithDiscovery(routingDiscovery))
if err != nil {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Panic("Create PubSub error")
}
if tc, err := ps.Join(topic); err != nil {
// TODO: re-join
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Create PubSub error")
} else {
sub, err := tc.Subscribe()
if err != nil {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Subscribe error")
}
go func() {
for {
if msg, err := sub.Next(context.Background()); err == nil {
message := new(Message)
if err := json.Unmarshal(msg.Data, message); err != nil {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Parse message from topic error")
} else {
logrus.WithFields(logrus.Fields{
"Message": message,
}).Info("Receive message from topic")
if message.MessageType == MessageTypeRoute {
if msg.ReceivedFrom.Pretty() == host.ID().Pretty() {
continue
}
// TODO: add MASQUERADE if self
// for _, subnet := range message.Subnets {
// // Add will override the exist one
// route.EventBus.Publish(route.REFRESH_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: []string{subnet}})
// }
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: message.Id, Subnets: message.Subnets})
} else if message.MessageType == MessageTypeOnline {
// refresh clients
if viper.GetUint("mode") == 1 {
// server
continue
} else {
req := dhcp.Request{}
var ress []dhcp.Response
if err := dhcp.Call("DHCPService", "Clients", req, &ress); err == nil {
for _, r := range ress {
if r.Id == host.ID().Pretty() {
continue
}
logrus.WithFields(logrus.Fields{
"VIP": r.Ip,
"ID": r.Id,
"Subnet": r.Subnets,
}).Info("Refresh local vip table")
subnet := strings.Split(r.Ip, "/")[0] + "/32"
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: []string{subnet}})
if r.Id != host.ID().Pretty() {
route.EventBus.Publish(route.ADD_ROUTE_TOPIC, route.RouteEvent{Id: r.Id, Subnets: r.Subnets})
}
}
}
}
} else if message.MessageType == MessageTypeOffline {
subnet := strings.Split(message.Vip, "/")[0] + "/32"
route.EventBus.Publish(route.REMOVE_ROUTE_TOPIC, route.RouteEvent{Subnets: []string{subnet}})
}
}
} else {
logrus.WithFields(logrus.Fields{
"ERROR": err,
}).Error("Subscribe error")
}
}
}()
return &Publisher{
pub: tc,
sub: sub,
}
}
return nil
}
func (p *Publisher) Publish(peerId string, vip string, subnets []string) {
message := &Message{
Id: peerId,
MessageType: MessageTypeRoute,
Vip: vip,
}
if len(subnets) > 0 {
message.Subnets = subnets
} else {
message.MessageType = MessageTypeOnline
}
if bytes, err := json.Marshal(message); err == nil {
ticker := time.NewTicker(10 * time.Second)
go func(tk *time.Ticker) {
interval := 10
for range tk.C {
logrus.WithFields(logrus.Fields{
"Time": tk.C,
}).Debug("Publish fired")
if err := p.pub.Publish(context.Background(), bytes); err != nil {
logrus.WithFields(logrus.Fields{
"ERROR": err,
"Message": message,
}).Error("Publish message from to topic error")
}
if interval < 30*60 {
// half of hour for the longest
interval *= 2
}
ticker.Reset(time.Duration(interval) * time.Second)
}
}(ticker)
}
}
func ForwardPacket(host host.Host, zone string, packets []byte, vipNet *net.IPNet) {
dst := waterutil.IPv4Destination(packets)
if peerId, found, err := route.Route.Get(dst.String()); err == nil && found {