fea: openudp: update link state

This commit is contained in:
Daniel Ding
2022-10-07 18:56:06 +08:00
parent f354e6d85f
commit c03c099dfb
12 changed files with 362 additions and 103 deletions

View File

@@ -68,7 +68,7 @@ env:
@gofmt -w -s ./pkg ./cmd ./misc
## linux platform
linux: linux-proxy linux-point linux-switch core
linux: linux-proxy linux-point linux-switch
core: env
./3rd/auto.sh build
@@ -86,12 +86,15 @@ rpm: env ## build rpm packages
cmd: env
go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan ./cmd/main.go
openudp: env
go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openudp ./cmd/openudp
linux: linux-point linux-switch linux-proxy ## build all linux binary
linux-point: env
go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point ./cmd/point_linux
linux-switch: env cmd
linux-switch: env cmd openudp
go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-switch ./cmd/switch
linux-proxy: env
@@ -155,7 +158,7 @@ install: env linux ## install packages
windows: windows-point ## build windows binary
windows-point: env
GOOS=windows GOARCH=amd64 go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.exe ./cmd/point_windows
GOOS=windows go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.exe ./cmd/point_windows
windows-gz: env windows ## build windows packages
@rm -rf $(WIN_DIR) && mkdir -p $(WIN_DIR)
@@ -175,7 +178,7 @@ windows-syso: ## build windows syso
osx: darwin
darwin: env ## build darwin binary
GOOS=darwin GOARCH=amd64 go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.darwin ./cmd/point_darwin
GOOS=darwin go build -mod=vendor -ldflags "$(LDFLAGS)" -o $(BD)/openlan-point.darwin ./cmd/point_darwin
darwin-gz: env darwin ## build darwin packages
@rm -rf $(MAC_DIR) && mkdir -p $(MAC_DIR)

View File

@@ -7,7 +7,7 @@ import (
)
func Before(c *cli.Context) error {
if _, err := database.NewDBClient(nil); err == nil {
if _, err := database.NewConfClient(nil); err == nil {
return nil
} else {
return err

183
cmd/openudp/main.go Normal file
View File

@@ -0,0 +1,183 @@
package main
import (
"flag"
"fmt"
db "github.com/luscis/openlan/pkg/database"
"github.com/luscis/openlan/pkg/libol"
"time"
)
type Config struct {
UdpPort int
LogLevel int
LogFile string
}
func (c *Config) Parse() {
flag.IntVar(&c.UdpPort, "port", 4500, "UDP port listen on")
flag.StringVar(&c.LogFile, "log:file", "/dev/null", "File log saved to")
flag.IntVar(&c.LogLevel, "log:level", 20, "Log level value")
flag.Parse()
}
type UdpServer struct {
stop chan struct{}
out *libol.SubLogger
server *libol.UdpInServer
cfg *Config
links *libol.SafeStrMap
}
func NewUdpServer(cfg *Config) *UdpServer {
c := &UdpServer{
out: libol.NewSubLogger("udp"),
stop: make(chan struct{}),
cfg: cfg,
links: libol.NewSafeStrMap(128),
}
return c
}
func (u *UdpServer) Initialize() {
u.server = &libol.UdpInServer{
Port: uint16(u.cfg.UdpPort),
}
}
func (u *UdpServer) Start() {
if err := u.server.Open(); err != nil {
u.out.Error("UdpServer.Start open socket %s", err)
return
}
}
func (u *UdpServer) Stop() {
}
func (u *UdpServer) Device2UUID(value string) string {
if link := u.links.Get(value); link != nil {
if older, ok := link.(*db.VirtualLink); ok {
return older.UUID
}
}
return ""
}
func (u *UdpServer) toStatus(li *db.DBClient, from *libol.UdpInConnection) {
device := fmt.Sprintf("spi:%d", from.Spi)
obj := &db.VirtualLink{
UUID: u.Device2UUID(device),
}
if err := li.Client.Get(obj); err != nil {
return
}
if obj.Status == nil {
obj.Status = make(map[string]string, 2)
}
obj.Status["remote_connection"] = fmt.Sprintf("udp:%s", from.Connection())
ops, err := li.Client.Where(obj).Update(obj)
if err != nil {
u.out.Warn("UdpServer.toStatus update %s", err)
return
}
if _, err := li.Client.Transact(ops...); err != nil {
u.out.Warn("UdpServer.toStatus commit %s", err)
return
}
if obj.Connection == "any" {
_ = u.server.Send(from)
}
}
func (u *UdpServer) toLinkState(li *db.DBClient, from *libol.UdpInConnection) {
device := fmt.Sprintf("spi:%d", from.Spi)
obj := &db.VirtualLink{
UUID: u.Device2UUID(device),
}
if err := li.Client.Get(obj); err != nil {
return
}
obj.LinkState = "up"
ops, err := li.Client.Where(obj).Update(obj)
if err != nil {
u.out.Warn("UdpServer.toLinkState update %s", err)
return
}
if _, err := li.Client.Transact(ops...); err != nil {
u.out.Warn("UdpServer.toLinkState commit %s", err)
return
}
}
func (u *UdpServer) Pong() {
li, err := db.NewClient(nil)
if err != nil {
u.out.Error("UdpServer.Pong open db with %s", err)
return
}
for {
from, _ := u.server.Recv()
u.out.Cmd("UdpServer.Pong received %s", from.String())
u.toStatus(li, from)
u.toLinkState(li, from)
}
}
func (u *UdpServer) toPing(li *db.DBClient, obj *db.VirtualLink) {
addr, port := db.GetAddrPort(obj.Connection[4:])
if port == 0 {
port = 4500
}
conn := &libol.UdpInConnection{
Spi: obj.Spi(),
RemotePort: uint16(port),
RemoteAddr: addr,
}
u.out.Cmd("UdpServer.toPing send to %s", conn.String())
_ = u.server.Send(conn)
}
func (u *UdpServer) Ping() {
li, err := db.NewClient(nil)
if err != nil {
u.out.Error("UdpServer.Ping open db with %s", err)
return
}
for {
var ls []db.VirtualLink
_ = li.Client.List(&ls)
u.links.Clear()
for i := range ls {
obj := &ls[i]
if err := u.links.Mod(obj.Device, obj); err != nil {
u.out.Error("UdpServer.Ping %s", err)
}
if !obj.IsUdpIn() {
continue
}
u.toPing(li, obj)
}
time.Sleep(10 * time.Second)
}
}
func main() {
c := &Config{}
c.Parse()
libol.SetLogger(c.LogFile, c.LogLevel)
srv := NewUdpServer(c)
srv.Initialize()
srv.Start()
libol.Go(srv.Ping)
libol.Go(srv.Pong)
libol.Wait()
srv.Stop()
}

View File

@@ -1,5 +1,6 @@
package database
import "C"
import (
"context"
"github.com/go-logr/logr"
@@ -60,8 +61,6 @@ func (o *OvSDB) WhereList(predict interface{}, result interface{}) error {
return o.Client.WhereCache(predict).List(o.Context(), result)
}
var Client *OvSDB
type DBClient struct {
Server string
Database string
@@ -103,8 +102,7 @@ func (c *DBClient) Open(handler *cache.EventHandlerFuncs) error {
if err := ovs.Connect(c.Context()); err != nil {
return err
}
Client = &OvSDB{Client: ovs}
c.Client = Client
c.Client = &OvSDB{Client: ovs}
if handler != nil {
processor := ovs.Cache()
if processor == nil {
@@ -119,16 +117,31 @@ func (c *DBClient) Open(handler *cache.EventHandlerFuncs) error {
}
var Conf *DBClient
var Client *OvSDB
func NewDBClient(handler *cache.EventHandlerFuncs) (*DBClient, error) {
func NewConfClient(handler *cache.EventHandlerFuncs) (*DBClient, error) {
var err error
if Conf == nil {
Conf = &DBClient{
obj := &DBClient{
Server: api.Server,
Database: api.Database,
Verbose: api.Verbose,
}
err = Conf.Open(handler)
err = obj.Open(handler)
if err == nil {
Conf = obj
Client = obj.Client
}
}
return Conf, err
}
func NewClient(handler *cache.EventHandlerFuncs) (*DBClient, error) {
obj := &DBClient{
Server: api.Server,
Database: api.Database,
Verbose: api.Verbose,
}
err := obj.Open(handler)
return obj, err
}

View File

@@ -1,5 +1,9 @@
package database
import (
"strconv"
)
type Switch struct {
UUID string `ovsdb:"_uuid" json:"uuid"`
Protocol string `ovsdb:"protocol" json:"protocol"`
@@ -32,6 +36,19 @@ type VirtualLink struct {
Status map[string]string `ovsdb:"status" json:"status"`
}
func (l *VirtualLink) IsUdpIn() bool {
if HasPrefix(l.Device, 4, "spi:") &&
HasPrefix(l.Connection, 4, "udp:") {
return true
}
return false
}
func (l *VirtualLink) Spi() uint32 {
spi, _ := strconv.Atoi(l.Device[4:])
return uint32(spi)
}
type OpenVPN struct {
UUID string `ovsdb:"_uuid" json:"uuid"`
Protocol string `ovsdb:"protocol" json:"protocol"`

View File

@@ -3,6 +3,8 @@ package database
import (
"github.com/luscis/openlan/pkg/libol"
"github.com/ovn-org/libovsdb/ovsdb"
"strconv"
"strings"
)
func PrintError(result []ovsdb.OperationResult) {
@@ -17,3 +19,19 @@ func PrintError(result []ovsdb.OperationResult) {
func GenUUID() string {
return libol.GenString(32)
}
func HasPrefix(value string, index int, dest string) bool {
if len(value) >= index {
return value[:index] == dest
}
return false
}
func GetAddrPort(conn string) (string, int) {
values := strings.SplitN(conn, ":", 2)
if len(values) == 2 {
port, _ := strconv.Atoi(values[1])
return values[0], port
}
return values[0], 0
}

View File

@@ -1,6 +1,7 @@
package libol
import (
"net"
"os/exec"
"runtime"
"strings"
@@ -201,3 +202,10 @@ func IpMetricSet(name, metric string, opts ...string) ([]byte, error) {
return nil, NewErr("IpAddrAdd %s notSupport", runtime.GOOS)
}
}
func LookupIP(name string) string {
if addr, _ := net.LookupIP(name); len(addr) > 0 {
return addr[0].String()
}
return ""
}

View File

@@ -1,9 +0,0 @@
package libol
import (
"testing"
)
func TestStartUDP_C(t *testing.T) {
StartUDP(84209, 4500, "180.109.49.146")
}

View File

@@ -20,12 +20,12 @@ typedef struct {
u_int32_t padding[2];
u_int32_t spi;
u_int32_t seqno;
} udp_message;
} udpin_message;
typedef struct {
u_int16_t port;
int32_t socket;
} udp_server;
} udpin_server;
typedef struct {
int32_t socket;
@@ -33,11 +33,11 @@ typedef struct {
const char *remote_addr;
u_int32_t spi;
u_int32_t seqno;
} udp_connection;
} udpin_connection;
int seqno = 0;
int send_ping_once(udp_connection *conn) {
int send_ping_once(udpin_connection *conn) {
int retval = 0;
struct sockaddr_in dstaddr = {
.sin_family = AF_INET,
@@ -46,34 +46,29 @@ int send_ping_once(udp_connection *conn) {
.s_addr = inet_addr(conn->remote_addr),
},
};
udp_message data = {
udpin_message data = {
.padding = {0, 0},
.spi = htonl(conn->spi),
};
data.seqno = htonl(conn->seqno++);
data.seqno = htonl(conn->seqno);
retval = sendto(conn->socket, &data, sizeof data, 0, (struct sockaddr *)&dstaddr, sizeof dstaddr);
if (retval <= 0) {
printf("%s: could not send data\n", conn->remote_addr);
}
return retval;
}
int recv_ping_once(udp_connection *conn, udp_connection *from) {
int recv_ping_once(udpin_server *srv, udpin_connection *from) {
struct sockaddr_in addr;
int addrlen = sizeof addr;
udp_message data;
udpin_message data;
int datalen = sizeof data;
int retval = 0;
memset(&data, 0, sizeof data);
retval = recvfrom(conn->socket, &data, datalen, 0, (struct sockaddr *)&addr, &addrlen);
retval = recvfrom(srv->socket, &data, datalen, 0, (struct sockaddr *)&addr, &addrlen);
if ( retval <= 0 ) {
if (errno == EAGAIN) {
return 0;
}
printf("recvfrom: %s\n", strerror(errno));
return retval;
}
@@ -84,7 +79,7 @@ int recv_ping_once(udp_connection *conn, udp_connection *from) {
return retval;
}
int open_socket(udp_server *srv) {
int open_socket(udpin_server *srv) {
int op = 1;
struct sockaddr_in addr = {
.sin_family = AF_INET,
@@ -102,75 +97,91 @@ int open_socket(udp_server *srv) {
retval = setsockopt(srv->socket, SOL_SOCKET, SO_REUSEADDR, &op, sizeof op);
if (retval < 0) {
return -1;
return retval;
}
retval = bind(srv->socket, (struct sockaddr *)&addr, sizeof addr);
if ( retval == -1) {
return -1;
return retval;
}
return 0;
}
int configure_socket(udp_server *srv) {
int configure_socket(udpin_server *srv) {
int encap = UDP_ENCAP_ESPINUDP;
if (setsockopt(srv->socket, IPPROTO_UDP, UDP_ENCAP, &encap, sizeof encap) < 0) {
return -1;
}
return 0;
return setsockopt(srv->socket, IPPROTO_UDP, UDP_ENCAP, &encap, sizeof encap);
}
*/
import "C"
import (
"fmt"
"sync"
"time"
"unsafe"
)
func StartUDP(spi uint32, port uint16, remote string) {
server := &C.udp_server{
port: C.ushort(port),
socket : -1,
}
_ = C.open_socket(server)
C.configure_socket(server)
addr := C.CString(remote)
defer C.free(unsafe.Pointer(addr))
conn := &C.udp_connection {
socket: server.socket,
spi: C.uint(spi),
remote_port: C.ushort(port),
remote_addr: addr,
}
C.send_ping_once(conn)
w := sync.WaitGroup{}
w.Add(2)
Go(func() {
defer w.Done()
for i := 0; i < 100; i++ {
time.Sleep(time.Second)
C.send_ping_once(conn)
}
})
Go(func() {
from := &C.udp_connection{}
C.recv_ping_once(conn, from)
addr := C.GoString(from.remote_addr)
fmt.Printf("receive from %s:%d spi %d\n", addr, from.remote_port, from.spi)
w.Done()
for {
from := &C.udp_connection{}
C.recv_ping_once(conn, from)
addr := C.GoString(from.remote_addr)
fmt.Printf("receive from %s:%d spi %d\n", addr, from.remote_port, from.spi)
}
})
w.Wait()
type UdpInServer struct {
Port uint16
Socket int
server *C.udpin_server
SeqNo uint32
}
type UdpInConnection struct {
Socket int
RemotePort uint16
RemoteAddr string
Spi uint32
}
func (c *UdpInConnection) Connection() string {
return fmt.Sprintf("%s:%d", c.RemoteAddr, c.RemotePort)
}
func (c *UdpInConnection) String() string {
return fmt.Sprintf("%d on %s:%d", c.Spi, c.RemoteAddr, c.RemotePort)
}
func (u *UdpInServer) Open() error {
server := &C.udpin_server{
port: C.ushort(u.Port),
socket: -1,
}
if ret := C.open_socket(server); ret < 0 {
return NewErr("UdpInServer.Open errno:%d", ret)
}
if ret := C.configure_socket(server); ret < 0 {
return NewErr("UdpInServer.Open errno:%d", ret)
}
u.server = server
u.Socket = int(server.socket)
return nil
}
func (u *UdpInServer) Send(to *UdpInConnection) error {
u.SeqNo += 1
addr := C.CString(LookupIP(to.RemoteAddr))
defer C.free(unsafe.Pointer(addr))
conn := &C.udpin_connection{
socket: u.server.socket,
spi: C.uint(to.Spi),
remote_port: C.ushort(to.RemotePort),
remote_addr: addr,
seqno: C.uint(u.SeqNo),
}
if ret := C.send_ping_once(conn); ret < 0 {
return NewErr("UdpInServer.Ping errno:%d", ret)
}
return nil
}
func (u *UdpInServer) Recv() (*UdpInConnection, error) {
from := &C.udpin_connection{}
if ret := C.recv_ping_once(u.server, from); ret < 0 {
return nil, NewErr("UdpInServer.Pong errno:%d", ret)
}
return &UdpInConnection{
RemotePort: uint16(from.remote_port),
RemoteAddr: C.GoString(from.remote_addr),
Spi: uint32(from.spi),
}, nil
}

28
pkg/libol/udpin_test.go Normal file
View File

@@ -0,0 +1,28 @@
package libol
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestOpenUDP_C(t *testing.T) {
udp := &UdpInServer{Port: 4500}
err := udp.Open()
assert.Equal(t, nil, err, "has not error")
assert.NotEqual(t, -1, udp.Socket, "valid socket")
go func() {
conn, err := udp.Recv()
fmt.Println(conn, err)
}()
err = udp.Send(&UdpInConnection{
Spi: 84209,
RemoteAddr: "180.109.49.146",
RemotePort: 4500,
})
assert.Equal(t, nil, err, "has not error")
time.Sleep(time.Second * 2)
}

View File

@@ -7,8 +7,6 @@ import (
"github.com/luscis/openlan/pkg/libol"
"github.com/ovn-org/libovsdb/cache"
"github.com/ovn-org/libovsdb/model"
"strconv"
"strings"
)
type ConfD struct {
@@ -35,7 +33,7 @@ func (c *ConfD) Start() {
DeleteFunc: c.Delete,
UpdateFunc: c.Update,
}
if _, err := database.NewDBClient(handler); err != nil {
if _, err := database.NewConfClient(handler); err != nil {
c.out.Error("Confd.Start open db with %s", err)
return
}
@@ -104,15 +102,6 @@ func (c *ConfD) Update(table string, old model.Model, new model.Model) {
}
}
func GetAddrPort(conn string) (string, int) {
values := strings.SplitN(conn, ":", 2)
if len(values) == 2 {
port, _ := strconv.Atoi(values[1])
return values[0], port
}
return values[0], 0
}
func GetRoutes(result *[]database.PrefixRoute, device string) error {
if err := database.Client.WhereList(
func(l *database.PrefixRoute) bool {
@@ -301,14 +290,14 @@ func (l *MemberLink) Add(obj *database.VirtualLink) {
if conn == "any" {
remoteConn := obj.Status["remote_connection"]
if libol.GetPrefix(remoteConn, 4) == "udp:" {
remote, port = GetAddrPort(remoteConn[4:])
remote, port = database.GetAddrPort(remoteConn[4:])
} else {
l.out.Warn("MemberLink.Add %s remote not found.", conn)
return
}
} else if libol.GetPrefix(conn, 4) == "udp:" {
remoteConn := obj.Connection
remote, port = GetAddrPort(remoteConn[4:])
remote, port = database.GetAddrPort(remoteConn[4:])
} else {
return
}

View File

@@ -395,13 +395,11 @@ func (w *EspWorker) downMember() {
func OpenUDP() {
libol.Go(func() {
args := []string{
"-p", strconv.Itoa(co.EspLocalUdp),
"-vconsole:emer",
"--log-file=/var/openlan/openudp.log",
"-port", strconv.Itoa(co.EspLocalUdp),
}
cmd := exec.Command(UDPBin, args...)
if err := cmd.Run(); err != nil {
libol.Error("esp.init %s", err)
libol.Error("OpenUDP %s", err)
}
})
}