diff --git a/Makefile b/Makefile index 32aff40..0e4fbb4 100755 --- a/Makefile +++ b/Makefile @@ -68,7 +68,7 @@ env: @gofmt -w -s ./pkg ./cmd ./misc ## linux platform -linux: linux-proxy linux-point linux-switch core +linux: linux-proxy linux-point linux-switch core: env ./3rd/auto.sh build @@ -86,12 +86,15 @@ rpm: env ## build rpm packages cmd: env go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan ./cmd/main.go +openudp: env + go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openudp ./cmd/openudp + linux: linux-point linux-switch linux-proxy ## build all linux binary linux-point: env go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point ./cmd/point_linux -linux-switch: env cmd +linux-switch: env cmd openudp go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-switch ./cmd/switch linux-proxy: env @@ -155,7 +158,7 @@ install: env linux ## install packages windows: windows-point ## build windows binary windows-point: env - GOOS=windows GOARCH=amd64 go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.exe ./cmd/point_windows + GOOS=windows go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.exe ./cmd/point_windows windows-gz: env windows ## build windows packages @rm -rf $(WIN_DIR) && mkdir -p $(WIN_DIR) @@ -175,7 +178,7 @@ windows-syso: ## build windows syso osx: darwin darwin: env ## build darwin binary - GOOS=darwin GOARCH=amd64 go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.darwin ./cmd/point_darwin + GOOS=darwin go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.darwin ./cmd/point_darwin darwin-gz: env darwin ## build darwin packages @rm -rf $(MAC_DIR) && mkdir -p $(MAC_DIR) diff --git a/cmd/api/v6/cmd.go b/cmd/api/v6/cmd.go index d4435f2..16de0c1 100755 --- a/cmd/api/v6/cmd.go +++ b/cmd/api/v6/cmd.go @@ -7,7 +7,7 @@ import ( ) func Before(c *cli.Context) error { - if _, err := database.NewDBClient(nil); err == nil { + if _, err := database.NewConfClient(nil); err == nil { return nil } else { return err diff --git a/cmd/openudp/main.go b/cmd/openudp/main.go new file mode 100644 index 0000000..0e36e2c --- /dev/null +++ b/cmd/openudp/main.go @@ -0,0 +1,183 @@ +package main + +import ( + "flag" + "fmt" + db "github.com/luscis/openlan/pkg/database" + "github.com/luscis/openlan/pkg/libol" + "time" +) + +type Config struct { + UdpPort int + LogLevel int + LogFile string +} + +func (c *Config) Parse() { + flag.IntVar(&c.UdpPort, "port", 4500, "UDP port listen on") + flag.StringVar(&c.LogFile, "log:file", "/dev/null", "File log saved to") + flag.IntVar(&c.LogLevel, "log:level", 20, "Log level value") + flag.Parse() +} + +type UdpServer struct { + stop chan struct{} + out *libol.SubLogger + server *libol.UdpInServer + cfg *Config + links *libol.SafeStrMap +} + +func NewUdpServer(cfg *Config) *UdpServer { + c := &UdpServer{ + out: libol.NewSubLogger("udp"), + stop: make(chan struct{}), + cfg: cfg, + links: libol.NewSafeStrMap(128), + } + return c +} + +func (u *UdpServer) Initialize() { + u.server = &libol.UdpInServer{ + Port: uint16(u.cfg.UdpPort), + } +} + +func (u *UdpServer) Start() { + if err := u.server.Open(); err != nil { + u.out.Error("UdpServer.Start open socket %s", err) + return + } +} + +func (u *UdpServer) Stop() { +} + +func (u *UdpServer) Device2UUID(value string) string { + if link := u.links.Get(value); link != nil { + if older, ok := link.(*db.VirtualLink); ok { + return older.UUID + } + } + return "" +} + +func (u *UdpServer) toStatus(li *db.DBClient, from *libol.UdpInConnection) { + device := fmt.Sprintf("spi:%d", from.Spi) + obj := &db.VirtualLink{ + UUID: u.Device2UUID(device), + } + if err := li.Client.Get(obj); err != nil { + return + } + if obj.Status == nil { + obj.Status = make(map[string]string, 2) + } + obj.Status["remote_connection"] = fmt.Sprintf("udp:%s", from.Connection()) + ops, err := li.Client.Where(obj).Update(obj) + if err != nil { + u.out.Warn("UdpServer.toStatus update %s", err) + return + } + if _, err := li.Client.Transact(ops...); err != nil { + u.out.Warn("UdpServer.toStatus commit %s", err) + return + } + if obj.Connection == "any" { + _ = u.server.Send(from) + } +} + +func (u *UdpServer) toLinkState(li *db.DBClient, from *libol.UdpInConnection) { + device := fmt.Sprintf("spi:%d", from.Spi) + obj := &db.VirtualLink{ + UUID: u.Device2UUID(device), + } + if err := li.Client.Get(obj); err != nil { + return + } + obj.LinkState = "up" + ops, err := li.Client.Where(obj).Update(obj) + if err != nil { + u.out.Warn("UdpServer.toLinkState update %s", err) + return + } + if _, err := li.Client.Transact(ops...); err != nil { + u.out.Warn("UdpServer.toLinkState commit %s", err) + return + } +} + +func (u *UdpServer) Pong() { + li, err := db.NewClient(nil) + if err != nil { + u.out.Error("UdpServer.Pong open db with %s", err) + return + } + + for { + from, _ := u.server.Recv() + u.out.Cmd("UdpServer.Pong received %s", from.String()) + + u.toStatus(li, from) + u.toLinkState(li, from) + } +} + +func (u *UdpServer) toPing(li *db.DBClient, obj *db.VirtualLink) { + addr, port := db.GetAddrPort(obj.Connection[4:]) + if port == 0 { + port = 4500 + } + conn := &libol.UdpInConnection{ + Spi: obj.Spi(), + RemotePort: uint16(port), + RemoteAddr: addr, + } + u.out.Cmd("UdpServer.toPing send to %s", conn.String()) + _ = u.server.Send(conn) +} + +func (u *UdpServer) Ping() { + li, err := db.NewClient(nil) + if err != nil { + u.out.Error("UdpServer.Ping open db with %s", err) + return + } + + for { + var ls []db.VirtualLink + _ = li.Client.List(&ls) + u.links.Clear() + for i := range ls { + obj := &ls[i] + if err := u.links.Mod(obj.Device, obj); err != nil { + u.out.Error("UdpServer.Ping %s", err) + } + if !obj.IsUdpIn() { + continue + } + u.toPing(li, obj) + } + time.Sleep(10 * time.Second) + } +} + +func main() { + c := &Config{} + c.Parse() + + libol.SetLogger(c.LogFile, c.LogLevel) + + srv := NewUdpServer(c) + srv.Initialize() + + srv.Start() + libol.Go(srv.Ping) + libol.Go(srv.Pong) + + libol.Wait() + srv.Stop() +} diff --git a/pkg/database/client.go b/pkg/database/client.go index 6ad6a91..38c3399 100755 --- a/pkg/database/client.go +++ b/pkg/database/client.go @@ -1,5 +1,6 @@ package database +import "C" import ( "context" "github.com/go-logr/logr" @@ -60,8 +61,6 @@ func (o *OvSDB) WhereList(predict interface{}, result interface{}) error { return o.Client.WhereCache(predict).List(o.Context(), result) } -var Client *OvSDB - type DBClient struct { Server string Database string @@ -103,8 +102,7 @@ func (c *DBClient) Open(handler *cache.EventHandlerFuncs) error { if err := ovs.Connect(c.Context()); err != nil { return err } - Client = &OvSDB{Client: ovs} - c.Client = Client + c.Client = &OvSDB{Client: ovs} if handler != nil { processor := ovs.Cache() if processor == nil { @@ -119,16 +117,31 @@ func (c *DBClient) Open(handler *cache.EventHandlerFuncs) error { } var Conf *DBClient +var Client *OvSDB -func NewDBClient(handler *cache.EventHandlerFuncs) (*DBClient, error) { +func NewConfClient(handler *cache.EventHandlerFuncs) (*DBClient, error) { var err error if Conf == nil { - Conf = &DBClient{ + obj := &DBClient{ Server: api.Server, Database: api.Database, Verbose: api.Verbose, } - err = Conf.Open(handler) + err = obj.Open(handler) + if err == nil { + Conf = obj + Client = obj.Client + } } return Conf, err } + +func NewClient(handler *cache.EventHandlerFuncs) (*DBClient, error) { + obj := &DBClient{ + Server: api.Server, + Database: api.Database, + Verbose: api.Verbose, + } + err := obj.Open(handler) + return obj, err +} diff --git a/pkg/database/schema.go b/pkg/database/schema.go index 767b8be..09646de 100755 --- a/pkg/database/schema.go +++ b/pkg/database/schema.go @@ -1,5 +1,9 @@ package database +import ( + "strconv" +) + type Switch struct { UUID string `ovsdb:"_uuid" json:"uuid"` Protocol string `ovsdb:"protocol" json:"protocol"` @@ -32,6 +36,19 @@ type VirtualLink struct { Status map[string]string `ovsdb:"status" json:"status"` } +func (l *VirtualLink) IsUdpIn() bool { + if HasPrefix(l.Device, 4, "spi:") && + HasPrefix(l.Connection, 4, "udp:") { + return true + } + return false +} + +func (l *VirtualLink) Spi() uint32 { + spi, _ := strconv.Atoi(l.Device[4:]) + return uint32(spi) +} + type OpenVPN struct { UUID string `ovsdb:"_uuid" json:"uuid"` Protocol string `ovsdb:"protocol" json:"protocol"` diff --git a/pkg/database/utils.go b/pkg/database/utils.go index c334853..4fd8481 100755 --- a/pkg/database/utils.go +++ b/pkg/database/utils.go @@ -3,6 +3,8 @@ package database import ( "github.com/luscis/openlan/pkg/libol" "github.com/ovn-org/libovsdb/ovsdb" + "strconv" + "strings" ) func PrintError(result []ovsdb.OperationResult) { @@ -17,3 +19,19 @@ func PrintError(result []ovsdb.OperationResult) { func GenUUID() string { return libol.GenString(32) } + +func HasPrefix(value string, index int, dest string) bool { + if len(value) >= index { + return value[:index] == dest + } + return false +} + +func GetAddrPort(conn string) (string, int) { + values := strings.SplitN(conn, ":", 2) + if len(values) == 2 { + port, _ := strconv.Atoi(values[1]) + return values[0], port + } + return values[0], 0 +} diff --git a/pkg/libol/iputils.go b/pkg/libol/iputils.go index 6cd4ed2..f365da8 100755 --- a/pkg/libol/iputils.go +++ b/pkg/libol/iputils.go @@ -1,6 +1,7 @@ package libol import ( + "net" "os/exec" "runtime" "strings" @@ -201,3 +202,10 @@ func IpMetricSet(name, metric string, opts ...string) ([]byte, error) { return nil, NewErr("IpAddrAdd %s notSupport", runtime.GOOS) } } + +func LookupIP(name string) string { + if addr, _ := net.LookupIP(name); len(addr) > 0 { + return addr[0].String() + } + return "" +} diff --git a/pkg/libol/udp_test.go b/pkg/libol/udp_test.go deleted file mode 100644 index d6d64c0..0000000 --- a/pkg/libol/udp_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package libol - -import ( - "testing" -) - -func TestStartUDP_C(t *testing.T) { - StartUDP(84209, 4500, "180.109.49.146") -} diff --git a/pkg/libol/udp.go b/pkg/libol/udpin.go similarity index 50% rename from pkg/libol/udp.go rename to pkg/libol/udpin.go index c684da7..83feff8 100644 --- a/pkg/libol/udp.go +++ b/pkg/libol/udpin.go @@ -20,12 +20,12 @@ typedef struct { u_int32_t padding[2]; u_int32_t spi; u_int32_t seqno; -} udp_message; +} udpin_message; typedef struct { u_int16_t port; int32_t socket; -} udp_server; +} udpin_server; typedef struct { int32_t socket; @@ -33,11 +33,11 @@ typedef struct { const char *remote_addr; u_int32_t spi; u_int32_t seqno; -} udp_connection; +} udpin_connection; int seqno = 0; -int send_ping_once(udp_connection *conn) { +int send_ping_once(udpin_connection *conn) { int retval = 0; struct sockaddr_in dstaddr = { .sin_family = AF_INET, @@ -46,34 +46,29 @@ int send_ping_once(udp_connection *conn) { .s_addr = inet_addr(conn->remote_addr), }, }; - udp_message data = { + udpin_message data = { .padding = {0, 0}, .spi = htonl(conn->spi), }; - data.seqno = htonl(conn->seqno++); + data.seqno = htonl(conn->seqno); retval = sendto(conn->socket, &data, sizeof data, 0, (struct sockaddr *)&dstaddr, sizeof dstaddr); - if (retval <= 0) { - printf("%s: could not send data\n", conn->remote_addr); - } - return retval; } -int recv_ping_once(udp_connection *conn, udp_connection *from) { +int recv_ping_once(udpin_server *srv, udpin_connection *from) { struct sockaddr_in addr; int addrlen = sizeof addr; - udp_message data; + udpin_message data; int datalen = sizeof data; int retval = 0; memset(&data, 0, sizeof data); - retval = recvfrom(conn->socket, &data, datalen, 0, (struct sockaddr *)&addr, &addrlen); + retval = recvfrom(srv->socket, &data, datalen, 0, (struct sockaddr *)&addr, &addrlen); if ( retval <= 0 ) { if (errno == EAGAIN) { return 0; } - printf("recvfrom: %s\n", strerror(errno)); return retval; } @@ -84,7 +79,7 @@ int recv_ping_once(udp_connection *conn, udp_connection *from) { return retval; } -int open_socket(udp_server *srv) { +int open_socket(udpin_server *srv) { int op = 1; struct sockaddr_in addr = { .sin_family = AF_INET, @@ -102,75 +97,91 @@ int open_socket(udp_server *srv) { retval = setsockopt(srv->socket, SOL_SOCKET, SO_REUSEADDR, &op, sizeof op); if (retval < 0) { - return -1; + return retval; } retval = bind(srv->socket, (struct sockaddr *)&addr, sizeof addr); if ( retval == -1) { - return -1; + return retval; } return 0; } -int configure_socket(udp_server *srv) { +int configure_socket(udpin_server *srv) { int encap = UDP_ENCAP_ESPINUDP; - if (setsockopt(srv->socket, IPPROTO_UDP, UDP_ENCAP, &encap, sizeof encap) < 0) { - return -1; - } - return 0; + return setsockopt(srv->socket, IPPROTO_UDP, UDP_ENCAP, &encap, sizeof encap); } */ import "C" import ( "fmt" - "sync" - "time" "unsafe" ) -func StartUDP(spi uint32, port uint16, remote string) { - server := &C.udp_server{ - port: C.ushort(port), - socket : -1, - } - _ = C.open_socket(server) - C.configure_socket(server) +type UdpInServer struct { + Port uint16 + Socket int + server *C.udpin_server + SeqNo uint32 +} - addr := C.CString(remote) +type UdpInConnection struct { + Socket int + RemotePort uint16 + RemoteAddr string + Spi uint32 +} + +func (c *UdpInConnection) Connection() string { + return fmt.Sprintf("%s:%d", c.RemoteAddr, c.RemotePort) +} + +func (c *UdpInConnection) String() string { + return fmt.Sprintf("%d on %s:%d", c.Spi, c.RemoteAddr, c.RemotePort) +} + +func (u *UdpInServer) Open() error { + server := &C.udpin_server{ + port: C.ushort(u.Port), + socket: -1, + } + if ret := C.open_socket(server); ret < 0 { + return NewErr("UdpInServer.Open errno:%d", ret) + } + if ret := C.configure_socket(server); ret < 0 { + return NewErr("UdpInServer.Open errno:%d", ret) + } + u.server = server + u.Socket = int(server.socket) + return nil +} + +func (u *UdpInServer) Send(to *UdpInConnection) error { + u.SeqNo += 1 + addr := C.CString(LookupIP(to.RemoteAddr)) defer C.free(unsafe.Pointer(addr)) - conn := &C.udp_connection { - socket: server.socket, - spi: C.uint(spi), - remote_port: C.ushort(port), + conn := &C.udpin_connection{ + socket: u.server.socket, + spi: C.uint(to.Spi), + remote_port: C.ushort(to.RemotePort), remote_addr: addr, + seqno: C.uint(u.SeqNo), } + if ret := C.send_ping_once(conn); ret < 0 { + return NewErr("UdpInServer.Ping errno:%d", ret) + } + return nil +} - C.send_ping_once(conn) - - w := sync.WaitGroup{} - w.Add(2) - Go(func() { - defer w.Done() - for i := 0; i < 100; i++ { - time.Sleep(time.Second) - C.send_ping_once(conn) - } - }) - - Go(func() { - from := &C.udp_connection{} - C.recv_ping_once(conn, from) - addr := C.GoString(from.remote_addr) - fmt.Printf("receive from %s:%d spi %d\n", addr, from.remote_port, from.spi) - w.Done() - for { - from := &C.udp_connection{} - C.recv_ping_once(conn, from) - addr := C.GoString(from.remote_addr) - fmt.Printf("receive from %s:%d spi %d\n", addr, from.remote_port, from.spi) - } - }) - - w.Wait() -} \ No newline at end of file +func (u *UdpInServer) Recv() (*UdpInConnection, error) { + from := &C.udpin_connection{} + if ret := C.recv_ping_once(u.server, from); ret < 0 { + return nil, NewErr("UdpInServer.Pong errno:%d", ret) + } + return &UdpInConnection{ + RemotePort: uint16(from.remote_port), + RemoteAddr: C.GoString(from.remote_addr), + Spi: uint32(from.spi), + }, nil +} diff --git a/pkg/libol/udpin_test.go b/pkg/libol/udpin_test.go new file mode 100644 index 0000000..39df0a7 --- /dev/null +++ b/pkg/libol/udpin_test.go @@ -0,0 +1,28 @@ +package libol + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestOpenUDP_C(t *testing.T) { + udp := &UdpInServer{Port: 4500} + err := udp.Open() + assert.Equal(t, nil, err, "has not error") + assert.NotEqual(t, -1, udp.Socket, "valid socket") + + go func() { + conn, err := udp.Recv() + fmt.Println(conn, err) + }() + + err = udp.Send(&UdpInConnection{ + Spi: 84209, + RemoteAddr: "180.109.49.146", + RemotePort: 4500, + }) + assert.Equal(t, nil, err, "has not error") + time.Sleep(time.Second * 2) +} diff --git a/pkg/switch/confd.go b/pkg/switch/confd.go index a485152..fd801bc 100755 --- a/pkg/switch/confd.go +++ b/pkg/switch/confd.go @@ -7,8 +7,6 @@ import ( "github.com/luscis/openlan/pkg/libol" "github.com/ovn-org/libovsdb/cache" "github.com/ovn-org/libovsdb/model" - "strconv" - "strings" ) type ConfD struct { @@ -35,7 +33,7 @@ func (c *ConfD) Start() { DeleteFunc: c.Delete, UpdateFunc: c.Update, } - if _, err := database.NewDBClient(handler); err != nil { + if _, err := database.NewConfClient(handler); err != nil { c.out.Error("Confd.Start open db with %s", err) return } @@ -104,15 +102,6 @@ func (c *ConfD) Update(table string, old model.Model, new model.Model) { } } -func GetAddrPort(conn string) (string, int) { - values := strings.SplitN(conn, ":", 2) - if len(values) == 2 { - port, _ := strconv.Atoi(values[1]) - return values[0], port - } - return values[0], 0 -} - func GetRoutes(result *[]database.PrefixRoute, device string) error { if err := database.Client.WhereList( func(l *database.PrefixRoute) bool { @@ -301,14 +290,14 @@ func (l *MemberLink) Add(obj *database.VirtualLink) { if conn == "any" { remoteConn := obj.Status["remote_connection"] if libol.GetPrefix(remoteConn, 4) == "udp:" { - remote, port = GetAddrPort(remoteConn[4:]) + remote, port = database.GetAddrPort(remoteConn[4:]) } else { l.out.Warn("MemberLink.Add %s remote not found.", conn) return } } else if libol.GetPrefix(conn, 4) == "udp:" { remoteConn := obj.Connection - remote, port = GetAddrPort(remoteConn[4:]) + remote, port = database.GetAddrPort(remoteConn[4:]) } else { return } diff --git a/pkg/switch/esp.go b/pkg/switch/esp.go index 73e6e73..2fb453d 100755 --- a/pkg/switch/esp.go +++ b/pkg/switch/esp.go @@ -395,13 +395,11 @@ func (w *EspWorker) downMember() { func OpenUDP() { libol.Go(func() { args := []string{ - "-p", strconv.Itoa(co.EspLocalUdp), - "-vconsole:emer", - "--log-file=/var/openlan/openudp.log", + "-port", strconv.Itoa(co.EspLocalUdp), } cmd := exec.Command(UDPBin, args...) if err := cmd.Run(); err != nil { - libol.Error("esp.init %s", err) + libol.Error("OpenUDP %s", err) } }) }