Use error channel to get goroutine errors

This commit is contained in:
lucheng
2024-07-12 11:35:45 +08:00
parent c4db99d32a
commit a08e9802e4
4 changed files with 127 additions and 105 deletions

View File

@@ -22,7 +22,7 @@ func main() {
}
if err := app.Run(os.Args); err != nil {
log.Panic(err)
log.Error(err)
os.Exit(1)
}
}

View File

@@ -83,13 +83,13 @@ func SendKeepalive(conn *net.UDPConn, addr string) error {
return nil
}
func (c *Client) DoKeepalive(interval int) {
func (c *Client) DoKeepalive(interval int) error {
ticker := time.NewTicker(time.Second * time.Duration(interval))
for {
err := SendKeepalive(c.Conn, c.IPAddr)
if err != nil {
log.Errorf("send keepalive to %s %s", c.Conn.RemoteAddr(), err.Error())
return fmt.Errorf("send keepalive to %s %s", c.Conn.RemoteAddr(), err.Error())
}
<-ticker.C
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/lucheng0127/virtuallan/pkg/utils"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/unix"
)
@@ -75,28 +74,30 @@ func (c *Client) Launch() error {
c.Conn = conn
// Use errgroup check goruntine error
g := new(errgroup.Group)
// Use errChan capture goroutine error
errChan := make(chan error)
// Handle signal
sigChan := make(chan os.Signal, 8)
signal.Notify(sigChan, unix.SIGTERM, unix.SIGINT)
g.Go(func() error {
return c.HandleSignal(sigChan)
})
go func() {
if err := c.HandleSignal(sigChan); err != nil {
errChan <- err
}
}()
// Do auth
ipChan := make(chan string)
netToIface := make(chan *packet.VLPkt, 1024)
// Handle udp packet
g.Go(func() error {
go func() {
for {
var buf [65535]byte
n, _, err := conn.ReadFromUDP(buf[:])
if err != nil {
return fmt.Errorf("read from conn %s", err.Error())
errChan <- fmt.Errorf("read from conn %s", err.Error())
}
if n < 2 {
@@ -113,11 +114,11 @@ func (c *Client) Launch() error {
case packet.P_RESPONSE:
switch pkt.VLBody.(*packet.RspBody).Code {
case packet.RSP_AUTH_REQUIRED:
return errors.New("auth failed")
errChan <- errors.New("auth failed")
case packet.RSP_IP_NOT_MATCH:
return errors.New("ip not match")
errChan <- errors.New("ip not match")
case packet.RSP_USER_LOGGED:
return errors.New("user already logged by other endpoint")
errChan <- errors.New("user already logged by other endpoint")
default:
continue
}
@@ -131,7 +132,7 @@ func (c *Client) Launch() error {
continue
}
}
})
}()
// Auth
authPkt := packet.NewAuthPkt(c.user, c.password)
@@ -146,61 +147,72 @@ func (c *Client) Launch() error {
}
authChan := make(chan string, 1)
g.Go(func() error {
return checkLoginTimeout(authChan)
})
go func() {
if err := checkLoginTimeout(authChan); err != nil {
errChan <- err
}
}()
// Waiting for dhcp ip
ipAddr := <-ipChan
authChan <- "ok"
log.Infof("auth with %s succeed, endpoint ip %s\n", c.user, ipAddr)
c.IPAddr = ipAddr
select {
case err := <-errChan:
return err
case ipAddr := <-ipChan:
// Waiting for dhcp ip
authChan <- "ok"
log.Infof("auth with %s succeed, endpoint ip %s\n", c.user, ipAddr)
c.IPAddr = ipAddr
iface, err := utils.NewTap("")
if err != nil {
iface, err := utils.NewTap("")
if err != nil {
return err
}
c.Iface = iface
// Set tap mac address according to ipv4 address,
// it will make sure each ip with a fixed mac address,
// so the arp entry will always be correct even when
// tap interface has been recreate
if err := utils.SetMacToTap(c.Iface.Name(), strings.Split(c.IPAddr, "/")[0]); err != nil {
return err
}
if err := utils.AsignAddrToLink(c.Iface.Name(), c.IPAddr, true); err != nil {
return err
}
// Add multicast route 224.0.0.1 dev tap
tapIface, err := net.InterfaceByName(c.Iface.Name())
if err != nil {
return fmt.Errorf("get tap interface %s", err.Error())
}
if err := utils.AddMulticastRouteToIface(fmt.Sprintf("%s/32", packet.MULTICAST_ADDR), tapIface.Index); err != nil {
return err
}
// XXX: Sometime when client restart too fast will not reveice the first multicast pkt
// Monitor multicast for route bordcast
go func() {
if err := packet.MonitorRouteMulticast(tapIface, strings.Split(c.IPAddr, "/")[0]); err != nil {
errChan <- err
}
}()
// Send keepalive
go func() {
if err := c.DoKeepalive(10); err != nil {
errChan <- err
}
}()
// Switch io between udp net and tap interface
go func() {
if err := c.HandleConn(netToIface); err != nil {
errChan <- err
}
}()
err = <-errChan
return err
}
c.Iface = iface
// Set tap mac address according to ipv4 address,
// it will make sure each ip with a fixed mac address,
// so the arp entry will always be correct even when
// tap interface has been recreate
if err := utils.SetMacToTap(c.Iface.Name(), strings.Split(c.IPAddr, "/")[0]); err != nil {
return err
}
if err := utils.AsignAddrToLink(c.Iface.Name(), c.IPAddr, true); err != nil {
return err
}
// Add multicast route 224.0.0.1 dev tap
tapIface, err := net.InterfaceByName(c.Iface.Name())
if err != nil {
return fmt.Errorf("get tap interface %s", err.Error())
}
if err := utils.AddMulticastRouteToIface(fmt.Sprintf("%s/32", packet.MULTICAST_ADDR), tapIface.Index); err != nil {
return err
}
// XXX: Sometime when client restart too fast will not reveice the first multicast pkt
// Monitor multicast for route bordcast
g.Go(func() error {
return packet.MonitorRouteMulticast(tapIface, strings.Split(c.IPAddr, "/")[0])
})
// Send keepalive
go c.DoKeepalive(10)
// Switch io between udp net and tap interface
g.Go(func() error {
return c.HandleConn(netToIface)
})
if err := g.Wait(); err != nil {
return err
}
return nil
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/lucheng0127/virtuallan/pkg/utils"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)
func Run(cCtx *cli.Context) error {
@@ -69,28 +68,30 @@ func (c *Client) Launch() error {
c.Conn = conn
// Use errgroup check goruntine error
g := new(errgroup.Group)
// Use errChan capture goroutine error
errChan := make(chan error)
// Handle signal
sigChan := make(chan os.Signal, 8)
signal.Notify(sigChan, os.Interrupt)
g.Go(func() error {
return c.HandleSignal(sigChan)
})
go func() {
if err := c.HandleSignal(sigChan); err != nil {
errChan <- err
}
}()
// Do auth
ipChan := make(chan string)
netToIface := make(chan *packet.VLPkt, 1024)
// Handle udp packet
g.Go(func() error {
go func() {
for {
var buf [65535]byte
n, _, err := conn.ReadFromUDP(buf[:])
if err != nil {
return fmt.Errorf("read from conn %s", err)
errChan <- fmt.Errorf("read from conn %s", err)
}
if n < 2 {
@@ -107,11 +108,11 @@ func (c *Client) Launch() error {
case packet.P_RESPONSE:
switch pkt.VLBody.(*packet.RspBody).Code {
case packet.RSP_AUTH_REQUIRED:
return errors.New("auth failed")
errChan <- errors.New("auth failed")
case packet.RSP_IP_NOT_MATCH:
return errors.New("ip not match")
errChan <- errors.New("ip not match")
case packet.RSP_USER_LOGGED:
return errors.New("user already logged by other endpoint")
errChan <- errors.New("user already logged by other endpoint")
default:
continue
}
@@ -125,7 +126,7 @@ func (c *Client) Launch() error {
continue
}
}
})
}()
// Auth
authPkt := packet.NewAuthPkt(c.user, c.password)
@@ -140,33 +141,42 @@ func (c *Client) Launch() error {
}
authChan := make(chan string, 1)
g.Go(func() error {
return checkLoginTimeout(authChan)
})
go func() {
if err := checkLoginTimeout(authChan); err != nil {
errChan <- err
}
}()
// Waiting for dhcp ip
ipAddr := <-ipChan
authChan <- "ok"
log.Infof("auth with %s succeed, endpoint ip %s\n", c.user, ipAddr)
c.IPAddr = ipAddr
select {
case err := <-errChan:
return err
case ipAddr := <-ipChan:
// Waiting for dhcp ip
authChan <- "ok"
log.Infof("auth with %s succeed, endpoint ip %s\n", c.user, ipAddr)
c.IPAddr = ipAddr
iface, err := utils.NewTap(ipAddr)
if err != nil {
iface, err := utils.NewTap(ipAddr)
if err != nil {
return err
}
c.Iface = iface
// Send keepalive
go func() {
if err := c.DoKeepalive(10); err != nil {
errChan <- err
}
}()
// Switch io between udp net and tap interface
go func() {
if err := c.HandleConn(netToIface); err != nil {
errChan <- err
}
}()
err = <-errChan
return err
}
c.Iface = iface
// Send keepalive
go c.DoKeepalive(10)
// Switch io between udp net and tap interface
g.Go(func() error {
return c.HandleConn(netToIface)
})
if err := g.Wait(); err != nil {
return err
}
return nil
}